import os import logging import threading import time import requests import asyncio import httpx # Configuration from environment TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "") TELEGRAM_MAX_RETRIES = int(os.environ.get("TELEGRAM_MAX_RETRIES", 3)) logger = logging.getLogger(__name__) class TelegramBot: async def _handle_help_command(self) -> None: """Send a help message with available commands.""" help_text = ( "Available commands:\n" "/autopilot on|off - Enable/disable autopilot\n" "/status - Show current status\n" "/plot - Show weekly listing pattern plot\n" "/errorrate - Show autopilot error rate plot\n" "/retryfailed - Retry failed applications\n" "/resetlistings - Reset listings file\n" "/help - Show this help message" ) await self._send_message(help_text) async def _handle_unknown_command(self, text: str) -> None: """Handle unknown commands and notify the user.""" cmd = text.split()[0] if text else text msg = ( f"ā“ Unknown command: {cmd}\n\nUse /help to see available commands." ) await self._send_message(msg) async def _handle_reset_listings_command(self) -> None: """Move listings.json to data/old/ with a timestamp, preserving statistics and application history.""" import shutil from datetime import datetime try: listings_path = os.path.join("data", "listings.json") old_dir = os.path.join("data", "old") if os.path.exists(listings_path): # Ensure old_dir exists os.makedirs(old_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") dest_path = os.path.join( old_dir, f"listings_{timestamp}.json" ) shutil.move(listings_path, dest_path) msg = ( f"šŸ—‘ļø Listings reset:\nlistings.json moved to " f"old/listings_{timestamp}.json." ) else: msg = "ā„¹ļø No listings file found to move." await self._send_message(msg) except Exception as e: logger.error(f"Error resetting listings: {e}") await self._send_message(f"āŒ Error resetting listings: {str(e)}") def __init__(self, monitor, bot_token: str | None = None, chat_id: str | None = None, event_loop=None) -> None: self.monitor = monitor self.bot_token = bot_token or TELEGRAM_BOT_TOKEN self.chat_id = chat_id or TELEGRAM_CHAT_ID self.last_update_id: int = 0 self.running: bool = False # Add reference to application handler self.app_handler = monitor # Store the main event loop for thread-safe async calls self.event_loop = event_loop or asyncio.get_event_loop() # Initialize persistent httpx client with connection pooling self._http_client: httpx.AsyncClient | None = None async def _get_http_client(self) -> httpx.AsyncClient: """Get or create the persistent httpx client with connection pooling.""" if self._http_client is None: self._http_client = httpx.AsyncClient( timeout=30, limits=httpx.Limits(max_keepalive_connections=5, max_connections=10) ) return self._http_client async def close(self) -> None: """Close the httpx client gracefully.""" if self._http_client: await self._http_client.aclose() self._http_client = None def start(self) -> None: if not self.bot_token: logger.warning("Telegram bot token not configured, commands disabled") return self.running = True thread = threading.Thread(target=self._poll_updates, daemon=True) thread.start() logger.info("Telegram command listener started") def stop(self) -> None: self.running = False def _poll_updates(self) -> None: while self.running: try: url = f"https://api.telegram.org/bot{self.bot_token}/getUpdates" params = {"offset": self.last_update_id + 1, "timeout": 30} response = requests.get(url, params=params, timeout=35) if response.ok: data = response.json() if data.get("ok") and data.get("result"): for update in data["result"]: self.last_update_id = update["update_id"] self._handle_update(update) except requests.exceptions.Timeout: continue except Exception as e: logger.error(f"Telegram polling error: {e}") time.sleep(5) def _handle_update(self, update: dict) -> None: message = update.get("message", {}) text = message.get("text", "") chat_id = str(message.get("chat", {}).get("id", "")) if chat_id != self.chat_id: logger.debug(f"Ignoring message from unknown chat: {chat_id}") return logger.info(f"Received Telegram command: {text}") loop = self.event_loop if text.startswith("/autopilot"): asyncio.run_coroutine_threadsafe(self._handle_autopilot_command(text), loop) elif text == "/status": asyncio.run_coroutine_threadsafe(self._handle_status_command(), loop) elif text == "/help": asyncio.run_coroutine_threadsafe(self._handle_help_command(), loop) elif text == "/plot": asyncio.run_coroutine_threadsafe(self._handle_plot_command(), loop) elif text == "/errorrate": asyncio.run_coroutine_threadsafe(self._handle_error_rate_command(), loop) elif text == "/retryfailed": fut = asyncio.run_coroutine_threadsafe( self._handle_retry_failed_command(max_retries=TELEGRAM_MAX_RETRIES), loop ) try: fut.result() except Exception as e: logger.error(f"/retryfailed command failed: {e}") elif text == "/resetlistings": asyncio.run_coroutine_threadsafe(self._handle_reset_listings_command(), loop) elif text.startswith("/"): asyncio.run_coroutine_threadsafe(self._handle_unknown_command(text), loop) async def _handle_retry_failed_command(self, max_retries: int = 3) -> None: """Retry all failed applications up to max_retries.""" # Ensure browser context is initialized if not hasattr(self.app_handler, 'context') or self.app_handler.context is None: if hasattr(self.app_handler, 'init_browser'): await self.app_handler.init_browser() # After (re-)init, propagate context to all sub-handlers (defensive) if hasattr(self.app_handler, 'context') and hasattr(self.app_handler, 'handlers'): for handler in self.app_handler.handlers.values(): handler.context = self.app_handler.context applications = self.app_handler.load_applications() failed = [ app for app in applications.values() if not app.get("success") and app.get("retries", 0) < max_retries and not app.get("deactivated", False) ] await self._send_message(f"šŸ”„ Retrying {len(failed)} failed applications (max retries: {max_retries})...") if not failed: await self._send_message("āœ… No failed applications to retry (or all reached max retries).") return results = {} details = [] for app in failed: listing = { "id": app["listing_id"], "rooms": app.get("rooms", ""), "size": app.get("size", ""), "price": app.get("price", ""), "address": app.get("address", ""), "link": app.get("link", "") } retries = app.get("retries", 0) + 1 result = await self.app_handler.apply(listing) result["retries"] = retries # Preserve original timestamp only if still failing; update on success if not result["success"]: result["timestamp"] = app.get("timestamp", result["timestamp"]) self.app_handler.save_application(result) results[listing["id"]] = result status_emoji = "āœ…" if result["success"] else "āŒ" details.append( f"{status_emoji} {result.get('address', '')} ({result.get('company', '')})\n" f"{result.get('link', '')}\n" f"{result.get('message', '')}\n" ) n_success = sum(1 for r in results.values() if r["success"]) n_fail = sum(1 for r in results.values() if not r["success"]) summary = f"šŸ”„ Retried {len(results)} failed applications.\nāœ… Success: {n_success}\nāŒ Still failed: {n_fail}" if details: summary += "\n\nDetails:\n" + "\n".join(details) await self._send_message(summary) async def _handle_autopilot_command(self, text: str) -> None: logger.info(f"Processing autopilot command: {text}") parts = text.split() if len(parts) < 2: await self._send_message("Usage: /autopilot on|off") return action = parts[1].lower() if action == "on": logger.info("Enabling autopilot mode") self.monitor.set_autopilot(True) await self._send_message("šŸ¤– Autopilot ENABLED\n\nI will automatically apply to new listings!") elif action == "off": self.monitor.set_autopilot(False) await self._send_message("šŸ›‘ Autopilot DISABLED\n\nI will only notify you of new listings.") else: await self._send_message("Usage: /autopilot on|off") async def _handle_status_command(self) -> None: state = self.app_handler.load_state() autopilot = state.get("autopilot", False) applications = self.app_handler.load_applications() status = "šŸ¤– Autopilot: " + ("ON āœ…" if autopilot else "OFF āŒ") status += f"\nšŸ“ Applications sent: {len(applications)}" by_company: dict[str, int] = {} for app in applications.values(): company = app.get("company", "unknown") by_company[company] = by_company.get(company, 0) + 1 if by_company: status += "\n\nBy company:" for company, count in sorted(by_company.items()): status += f"\n • {company}: {count}" await self._send_message(status) async def _handle_plot_command(self) -> None: logger.info("Generating listing times plot...") try: plot_path = self.app_handler._generate_weekly_plot() await self._send_photo(plot_path, "\U0001f4ca Weekly Listing Patterns\n\nThis shows when new listings typically appear throughout the week.") except Exception as e: logger.error(f"Error generating plot: {e}") import traceback logger.error(traceback.format_exc()) await self._send_message(f"\u274c Error generating plot: {str(e)}") async def _handle_error_rate_command(self) -> None: logger.info("Generating autopilot errorrate plot...") try: plot_path, summary = self.app_handler._generate_error_rate_plot() caption = "šŸ“‰ Autopilot Success vs Failure\n\n" + summary await self._send_photo(plot_path, caption) except Exception as e: logger.error(f"Error generating errorrate plot: {e}") import traceback logger.error(traceback.format_exc()) await self._send_message(f"āŒ Error generating errorrate plot: {str(e)}") async def _send_message(self, text: str) -> None: """Send a text message to the configured Telegram chat, with retry logic and detailed error logging (async).""" MAX_LENGTH = 4096 # Telegram message character limit if not self.bot_token or not self.chat_id: logger.warning("Telegram bot token or chat ID not configured, cannot send message") return url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" # Split message into chunks if too long messages: list[str] = [] if isinstance(text, str) and len(text) > MAX_LENGTH: # Try to split on line breaks for readability lines = text.split('\n') chunk = "" for line in lines: if len(chunk) + len(line) + 1 > MAX_LENGTH: messages.append(chunk) chunk = line else: if chunk: chunk += "\n" chunk += line if chunk: messages.append(chunk) else: messages = [text] max_retries = 3 retry_delay = 1 # Initial delay in seconds try: client = await self._get_http_client() for idx, msg in enumerate(messages): payload = {"chat_id": self.chat_id, "text": msg, "parse_mode": "HTML"} # Retry logic for each message chunk for attempt in range(max_retries): try: response = await client.post(url, json=payload) if response.is_success: logger.info(f"[TELEGRAM] Sent message part {idx+1}/{len(messages)}: status={response.status_code}") break else: logger.warning(f"[TELEGRAM] Failed attempt {attempt+1}/{max_retries}: {response.status_code}") if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) await asyncio.sleep(wait_time) else: logger.error(f"Failed to send Telegram message after {max_retries} attempts: {response.text}") except httpx.TimeoutException as e: logger.warning(f"[TELEGRAM] Timeout on attempt {attempt+1}/{max_retries}") if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) await asyncio.sleep(wait_time) else: logger.error(f"Telegram message timed out after {max_retries} attempts") except httpx.RequestError as e: logger.warning(f"[TELEGRAM] Network error on attempt {attempt+1}/{max_retries}: {e}") if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) await asyncio.sleep(wait_time) else: logger.error(f"Telegram message failed after {max_retries} attempts: {e}") except Exception as e: logger.error(f"Unexpected error while sending Telegram message: {e}") async def _send_photo(self, photo_path: str, caption: str) -> None: """Send a photo to the configured Telegram chat with retry logic (async).""" if not self.bot_token or not self.chat_id: logger.warning("Telegram bot token or chat ID not configured, cannot send photo") return url = f"https://api.telegram.org/bot{self.bot_token}/sendPhoto" max_retries = 3 retry_delay = 1 # Initial delay in seconds for attempt in range(max_retries): try: with open(photo_path, "rb") as photo: files = {"photo": (photo_path, photo, "image/jpeg")} data = {"chat_id": self.chat_id, "caption": caption, "parse_mode": "HTML"} client = await self._get_http_client() response = await client.post(url, data=data, files=files) if response.is_success: logger.info(f"[TELEGRAM] Sent photo: {photo_path}") return else: logger.warning(f"[TELEGRAM] Photo send attempt {attempt+1}/{max_retries} failed: {response.status_code}") if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) await asyncio.sleep(wait_time) else: logger.error(f"Failed to send Telegram photo after {max_retries} attempts: {response.text}") except httpx.TimeoutException: logger.warning(f"[TELEGRAM] Photo timeout on attempt {attempt+1}/{max_retries}") if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) await asyncio.sleep(wait_time) else: logger.error(f"Telegram photo timed out after {max_retries} attempts") except httpx.RequestError as e: logger.warning(f"[TELEGRAM] Network error on attempt {attempt+1}/{max_retries}: {e}") if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) await asyncio.sleep(wait_time) else: logger.error(f"Telegram photo failed after {max_retries} attempts: {e}") except FileNotFoundError: logger.error(f"Photo file not found: {photo_path}") return # No point retrying if file doesn't exist except Exception as e: logger.error(f"Unexpected error while sending Telegram photo: {e}") if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) await asyncio.sleep(wait_time) else: return