import asyncio from playwright.async_api import async_playwright from application_handler import ApplicationHandler from telegram_bot import TelegramBot from handlers.wgcompany_notifier import WGCompanyNotifier import logging from logging.handlers import RotatingFileHandler import os from dotenv import load_dotenv from state_manager import StateManager from pathlib import Path from autoclean_debug import autoclean_debug_material from datetime import datetime, timezone import time import random # --- Environment & Logging Setup --- # Load environment variables from .env file load_dotenv() # Custom formatter with Berlin timezone (UTC+1) class BerlinFormatter(logging.Formatter): def formatTime(self, record, datefmt=None): dt = datetime.fromtimestamp(record.created, tz=timezone.utc) # Berlin is UTC+1 (CET) or UTC+2 (CEST), using UTC+1 for simplicity berlin_dt = dt.astimezone(timezone(timedelta(hours=1))) if datefmt: return berlin_dt.strftime(datefmt) return berlin_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3] from datetime import timedelta # Configure logging: file (rotating) + console for Docker visibility, enforce for all modules file_handler = RotatingFileHandler("data/monitor.log", maxBytes=1 * 1024 * 1024, backupCount=3) file_handler.setFormatter(BerlinFormatter("%(asctime)s [%(levelname)-5s] %(name)-20s | %(message)s")) console_handler = logging.StreamHandler() console_handler.setFormatter(BerlinFormatter("%(asctime)s [%(levelname)-5s] %(name)-20s | %(message)s")) logging.basicConfig( level=logging.INFO, handlers=[file_handler, console_handler], force=True # Enforce for all modules, Python 3.8+ ) logger = logging.getLogger(__name__) # Use named logger logger.info("Bot starting | Logs: data/monitor.log + console") # Interval (seconds) between checks for new listings CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", 150)) # Default: 150 seconds (2.5 min) CHECK_INTERVAL_VARIANCE = int(os.getenv("CHECK_INTERVAL_VARIANCE", 30)) # Default: ±30 seconds randomization def get_randomized_interval() -> int: """Get check interval with random variance to avoid bot detection patterns.""" variance = random.randint(-CHECK_INTERVAL_VARIANCE, CHECK_INTERVAL_VARIANCE) interval = CHECK_INTERVAL + variance # Ensure minimum of 60 seconds to avoid excessive load return max(60, interval) def validate_config() -> bool: """Validate required environment variables on startup with clear error messages.""" errors: list[str] = [] warnings: list[str] = [] # Required for Telegram notifications if not os.getenv("TELEGRAM_BOT_TOKEN"): errors.append("TELEGRAM_BOT_TOKEN is not set - notifications will not work") if not os.getenv("TELEGRAM_CHAT_ID"): errors.append("TELEGRAM_CHAT_ID is not set - notifications will not work") # Required for InBerlin login and auto-apply if not os.getenv("INBERLIN_EMAIL"): warnings.append("INBERLIN_EMAIL is not set - will use public listings only") if not os.getenv("INBERLIN_PASSWORD"): warnings.append("INBERLIN_PASSWORD is not set - will use public listings only") # Required for auto-apply form filling form_fields = [ "FORM_ANREDE", "FORM_VORNAME", "FORM_NACHNAME", "FORM_EMAIL", "FORM_PHONE", "FORM_STRASSE", "FORM_HAUSNUMMER", "FORM_PLZ", "FORM_ORT", "FORM_PERSONS", "FORM_CHILDREN", "FORM_INCOME" ] missing_form_fields = [f for f in form_fields if not os.getenv(f)] if missing_form_fields: warnings.append(f"Form fields not set: {', '.join(missing_form_fields)} - autopilot may fail") # Print warnings if warnings: logger.warning("Configuration warnings:") for warning in warnings: logger.warning(f" - {warning}") # Print errors and exit if critical if errors: logger.error("Configuration errors - bot cannot start:") for error in errors: logger.error(f" - {error}") logger.error("Please set required environment variables in .env file") return False logger.info("Configuration validated successfully") return True def _flush_rotating_file_handlers() -> None: """Flush all RotatingFileHandlers attached to the root logger.""" root_logger = logging.getLogger() for handler in root_logger.handlers: if isinstance(handler, RotatingFileHandler): handler.flush() async def init_browser_context() -> tuple: """Initialize or reinitialize browser context with error handling.""" playwright = await async_playwright().start() browser = await playwright.chromium.launch(headless=True) browser_context = await browser.new_context( user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" ) return playwright, browser, browser_context async def main() -> None: logger.info("Initializing wohn-bot...") # Validate configuration before starting if not validate_config(): return # Initialize state manager state_manager = StateManager(Path("data/state.json")) # --- Playwright browser/context setup with recovery --- logger.info("Initializing browser...") playwright, browser, browser_context = await init_browser_context() # Application handler manages browser/context app_handler = ApplicationHandler(browser_context, state_manager) # Set up Telegram bot and inject into handler, passing the main event loop event_loop = asyncio.get_running_loop() telegram_bot = TelegramBot(app_handler, event_loop=event_loop) telegram_bot.start() # Start Telegram command listener for reactivity app_handler.set_telegram_bot(telegram_bot) # Start WGCompanyNotifier as a background task wg_notifier = WGCompanyNotifier(telegram_bot=telegram_bot, refresh_minutes=10) wg_task = asyncio.create_task(wg_notifier.run()) try: logger.info(f"Bot is now running. Refreshing every {CHECK_INTERVAL}±{CHECK_INTERVAL_VARIANCE}s (randomized)...") last_clean = 0 CLEAN_INTERVAL = 48 * 3600 # 48 hours in seconds zero_listings_count = 0 # Track consecutive zero-listing fetches MAX_ZERO_LISTINGS = 3 # Re-login after 3 consecutive zero fetches while True: now = asyncio.get_event_loop().time() # Autoclean debug material every 48 hours if now - last_clean > CLEAN_INTERVAL: try: deleted = autoclean_debug_material() if deleted: logger.info(f"Cleaned {len(deleted)} debug files (48h)") except Exception as e: logger.warning(f"Autoclean failed: {e}") last_clean = now try: # Check if monitoring is enabled before fetching listings if not state_manager.is_monitoring_enabled(): logger.debug("Monitoring is paused, skipping listing check") sleep_interval = get_randomized_interval() await asyncio.sleep(sleep_interval) _flush_rotating_file_handlers() continue current_listings = await app_handler.fetch_listings() # Session validation: detect when listings fetch returns empty repeatedly if not current_listings or len(current_listings) == 0: zero_listings_count += 1 logger.warning(f"Zero listings fetched ({zero_listings_count}/{MAX_ZERO_LISTINGS})") if zero_listings_count >= MAX_ZERO_LISTINGS: logger.warning("Session likely expired - forcing re-login") state_manager.set_logged_in(False) zero_listings_count = 0 await asyncio.sleep(5) continue else: # Reset counter on successful fetch if zero_listings_count > 0: logger.info(f"Session recovered - listings fetched successfully") zero_listings_count = 0 except Exception as e: logger.error(f"💥 Browser crash: {e}") logger.info("Recovering...") try: await browser.close() await playwright.stop() except: pass # Reinitialize browser try: playwright, browser, browser_context = await init_browser_context() app_handler.context = browser_context logger.info("Browser recovered") await asyncio.sleep(5) continue except Exception as recovery_error: logger.error(f"Failed to recover: {recovery_error}") await asyncio.sleep(60) continue if not current_listings: logger.warning("No listings fetched") sleep_interval = get_randomized_interval() logger.debug(f"Sleeping for {sleep_interval}s (randomized)") await asyncio.sleep(sleep_interval) _flush_rotating_file_handlers() continue previous_listings = app_handler.load_previous_listings() if not previous_listings: logger.info(f"🎬 First run: saving {len(current_listings)} listings as baseline") # Mark all as failed applications so /retryfailed can be used for listing in current_listings: result = { "listing_id": listing.get("id"), "company": app_handler._detect_company(listing.get("link", "")), "link": listing.get("link"), "timestamp": str(listing.get("timestamp", "")) or str(listing.get("date", "")) or "", "success": False, "message": "First run, not auto-applied. Use /retryfailed to attempt.", "address": listing.get("address", ""), "rooms": listing.get("rooms", ""), "price": listing.get("price", ""), "retries": 0 } app_handler.save_application(result) app_handler.save_listings(current_listings) sleep_interval = get_randomized_interval() logger.debug(f"Sleeping for {sleep_interval}s (randomized)") await asyncio.sleep(sleep_interval) _flush_rotating_file_handlers() continue new_listings = app_handler.find_new_listings(current_listings, previous_listings) application_results = {} if new_listings: logger.info(f"{len(new_listings)} new listing{'s' if len(new_listings) > 1 else ''} detected") app_handler.log_listing_times(new_listings) if app_handler.is_autopilot_enabled(): logger.info("Autopilot active - applying...") application_results = await app_handler.apply_to_listings(new_listings) app_handler.notify_new_listings(new_listings, application_results) app_handler.save_listings(current_listings) sleep_interval = get_randomized_interval() logger.debug(f"Next check in {sleep_interval}s") await asyncio.sleep(sleep_interval) _flush_rotating_file_handlers() except (KeyboardInterrupt, SystemExit): logger.info("Shutting down...") except Exception as e: logger.error(f"[MAIN] Error in main loop: {e}") finally: await browser.close() logger.info("Browser closed successfully.") if __name__ == "__main__": asyncio.run(main())