import os import logging import threading import time import requests # Configuration from environment TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "") logger = logging.getLogger(__name__) class TelegramBot: """Handle Telegram commands for controlling the monitor""" def __init__(self, monitor, bot_token=None, chat_id=None): self.monitor = monitor self.bot_token = bot_token or TELEGRAM_BOT_TOKEN self.chat_id = chat_id or TELEGRAM_CHAT_ID self.last_update_id = 0 self.running = False def start(self): if not self.bot_token: logger.warning("Telegram bot token not configured, commands disabled") return self.running = True thread = threading.Thread(target=self._poll_updates, daemon=True) thread.start() logger.info("Telegram command listener started") def stop(self): self.running = False def _poll_updates(self): while self.running: try: url = f"https://api.telegram.org/bot{self.bot_token}/getUpdates" params = {"offset": self.last_update_id + 1, "timeout": 30} response = requests.get(url, params=params, timeout=35) if response.ok: data = response.json() if data.get("ok") and data.get("result"): for update in data["result"]: self.last_update_id = update["update_id"] self._handle_update(update) except requests.exceptions.Timeout: continue except Exception as e: logger.error(f"Telegram polling error: {e}") time.sleep(5) def _handle_update(self, update): message = update.get("message", {}) text = message.get("text", "") chat_id = str(message.get("chat", {}).get("id", "")) if chat_id != self.chat_id: logger.debug(f"Ignoring message from unknown chat: {chat_id}") return logger.info(f"Received Telegram command: {text}") if text.startswith("/autopilot"): self._handle_autopilot_command(text) elif text == "/status": self._handle_status_command() elif text == "/help": self._handle_help_command() elif text == "/plot": self._handle_plot_command() elif text == "/errorrate": self._handle_error_rate_command() elif text.startswith("/"): self._handle_unknown_command(text) def _handle_autopilot_command(self, text): logger.info(f"Processing autopilot command: {text}") parts = text.split() if len(parts) < 2: self._send_message("Usage: /autopilot on|off") return action = parts[1].lower() if action == "on": logger.info("Enabling autopilot mode") self.monitor.set_autopilot(True) self._send_message("šŸ¤– Autopilot ENABLED\n\nI will automatically apply to new listings!") elif action == "off": self.monitor.set_autopilot(False) self._send_message("šŸ›‘ Autopilot DISABLED\n\nI will only notify you of new listings.") else: self._send_message("Usage: /autopilot on|off") def _handle_status_command(self): state = self.monitor.load_state() autopilot = state.get("autopilot", False) applications = self.monitor.load_applications() status = "šŸ¤– Autopilot: " + ("ON āœ…" if autopilot else "OFF āŒ") status += f"\nšŸ“ Applications sent: {len(applications)}" by_company = {} for app in applications.values(): company = app.get("company", "unknown") by_company[company] = by_company.get(company, 0) + 1 if by_company: status += "\n\nBy company:" for company, count in sorted(by_company.items()): status += f"\n • {company}: {count}" self._send_message(status) def _handle_help_command(self): help_text = """šŸ  InBerlin Monitor Commands /autopilot on - Enable automatic applications /autopilot off - Disable automatic applications /status - Show current status and stats /plot - Show weekly listing patterns /help - Show this help message When autopilot is ON, I will automatically apply to new listings.""" self._send_message(help_text) def _handle_unknown_command(self, text): cmd = text.split()[0] if text else text self._send_message(f"ā“ Unknown command: {cmd}") def _handle_error_rate_command(self): """Generate and send a plot showing success vs failure ratio for autopilot applications.""" logger.info("Generating autopilot errorrate plot...") try: plot_path, summary = self._generate_error_rate_plot() if plot_path: caption = "šŸ“‰ Autopilot Success vs Failure\n\n" + summary self._send_photo(plot_path, caption) else: self._send_message("šŸ“‰ Not enough application data to generate errorrate plot.") except Exception as e: logger.error(f"Error generating error rate plot: {e}") self._send_message("šŸ“‰ Error generating error rate plot.") def _send_message(self, text): """Send a text message to the configured Telegram chat.""" if not self.bot_token or not self.chat_id: logger.warning("Telegram bot token or chat ID not configured, cannot send message") return url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" payload = {"chat_id": self.chat_id, "text": text, "parse_mode": "HTML"} try: response = requests.post(url, json=payload, timeout=10) if not response.ok: logger.error(f"Failed to send Telegram message: {response.text}") except Exception as e: logger.error(f"Error while sending Telegram message: {e}") def _send_photo(self, photo_path, caption): """Send a photo to the configured Telegram chat.""" if not self.bot_token or not self.chat_id: logger.warning("Telegram bot token or chat ID not configured, cannot send photo") return url = f"https://api.telegram.org/bot{self.bot_token}/sendPhoto" with open(photo_path, "rb") as photo: payload = {"chat_id": self.chat_id, "caption": caption, "parse_mode": "HTML"} files = {"photo": photo} try: response = requests.post(url, data=payload, files=files, timeout=10) if not response.ok: logger.error(f"Failed to send Telegram photo: {response.text}") except Exception as e: logger.error(f"Error while sending Telegram photo: {e}") def _generate_error_rate_plot(self): """Placeholder for generating an error rate plot.""" logger.warning("_generate_error_rate_plot is not implemented.") return None, "Error rate plot generation not implemented." def _handle_plot_command(self): """Placeholder for handling the /plot command.""" logger.warning("_handle_plot_command is not implemented.") self._send_message("šŸ“Š Plot command is not implemented yet.")