wohnbot/main.py
2026-01-02 13:41:21 +01:00

219 lines
9.1 KiB
Python

import asyncio
from playwright.async_api import async_playwright
from application_handler import ApplicationHandler
from telegram_bot import TelegramBot
from handlers.wgcompany_notifier import WGCompanyNotifier
import logging
from logging.handlers import RotatingFileHandler
import os
from dotenv import load_dotenv
from state_manager import StateManager
from pathlib import Path
from autoclean_debug import autoclean_debug_material
# --- Environment & Logging Setup ---
# Load environment variables from .env file
load_dotenv()
# Configure logging: file (rotating) + console for Docker visibility, enforce for all modules
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)-5s] %(name)-20s | %(message)s",
handlers=[
RotatingFileHandler("data/monitor.log", maxBytes=1 * 1024 * 1024, backupCount=3),
logging.StreamHandler()
],
force=True # Enforce for all modules, Python 3.8+
)
logger = logging.getLogger(__name__) # Use named logger
logger.info("Bot starting | Logs: data/monitor.log + console")
# Interval (seconds) between checks for new listings
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", 300)) # Default: 300 seconds
def validate_config() -> bool:
"""Validate required environment variables on startup with clear error messages."""
errors: list[str] = []
warnings: list[str] = []
# Required for Telegram notifications
if not os.getenv("TELEGRAM_BOT_TOKEN"):
errors.append("TELEGRAM_BOT_TOKEN is not set - notifications will not work")
if not os.getenv("TELEGRAM_CHAT_ID"):
errors.append("TELEGRAM_CHAT_ID is not set - notifications will not work")
# Required for InBerlin login and auto-apply
if not os.getenv("INBERLIN_EMAIL"):
warnings.append("INBERLIN_EMAIL is not set - will use public listings only")
if not os.getenv("INBERLIN_PASSWORD"):
warnings.append("INBERLIN_PASSWORD is not set - will use public listings only")
# Required for auto-apply form filling
form_fields = [
"FORM_ANREDE", "FORM_VORNAME", "FORM_NACHNAME", "FORM_EMAIL",
"FORM_PHONE", "FORM_STRASSE", "FORM_HAUSNUMMER", "FORM_PLZ",
"FORM_ORT", "FORM_PERSONS", "FORM_CHILDREN", "FORM_INCOME"
]
missing_form_fields = [f for f in form_fields if not os.getenv(f)]
if missing_form_fields:
warnings.append(f"Form fields not set: {', '.join(missing_form_fields)} - autopilot may fail")
# Print warnings
if warnings:
logger.warning("Configuration warnings:")
for warning in warnings:
logger.warning(f" - {warning}")
# Print errors and exit if critical
if errors:
logger.error("Configuration errors - bot cannot start:")
for error in errors:
logger.error(f" - {error}")
logger.error("Please set required environment variables in .env file")
return False
logger.info("Configuration validated successfully")
return True
def _flush_rotating_file_handlers() -> None:
"""Flush all RotatingFileHandlers attached to the root logger."""
root_logger = logging.getLogger()
for handler in root_logger.handlers:
if isinstance(handler, RotatingFileHandler):
handler.flush()
async def init_browser_context() -> tuple:
"""Initialize or reinitialize browser context with error handling."""
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(headless=True)
browser_context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
return playwright, browser, browser_context
async def main() -> None:
logger.info("Initializing wohn-bot...")
# Validate configuration before starting
if not validate_config():
return
# Initialize state manager
state_manager = StateManager(Path("data/state.json"))
# --- Playwright browser/context setup with recovery ---
logger.info("Initializing browser...")
playwright, browser, browser_context = await init_browser_context()
# Application handler manages browser/context
app_handler = ApplicationHandler(browser_context, state_manager)
# Set up Telegram bot and inject into handler, passing the main event loop
event_loop = asyncio.get_running_loop()
telegram_bot = TelegramBot(app_handler, event_loop=event_loop)
telegram_bot.start() # Start Telegram command listener for reactivity
app_handler.set_telegram_bot(telegram_bot)
# Start WGCompanyNotifier as a background task
wg_notifier = WGCompanyNotifier(telegram_bot=telegram_bot, refresh_minutes=10)
wg_task = asyncio.create_task(wg_notifier.run())
try:
logger.info(f"Bot is now running. Refreshing every {CHECK_INTERVAL} seconds...")
last_clean = 0
CLEAN_INTERVAL = 48 * 3600 # 48 hours in seconds
while True:
now = asyncio.get_event_loop().time()
# Autoclean debug material every 48 hours
if now - last_clean > CLEAN_INTERVAL:
try:
deleted = autoclean_debug_material()
if deleted:
logger.info(f"Cleaned {len(deleted)} debug files (48h)")
except Exception as e:
logger.warning(f"Autoclean failed: {e}")
last_clean = now
try:
# Check if monitoring is enabled before fetching listings
if not state_manager.is_monitoring_enabled():
logger.debug("Monitoring is paused, skipping listing check")
await asyncio.sleep(CHECK_INTERVAL)
_flush_rotating_file_handlers()
continue
current_listings = await app_handler.fetch_listings()
except Exception as e:
logger.error(f"💥 Browser crash: {e}")
logger.info("Recovering...")
try:
await browser.close()
await playwright.stop()
except:
pass
# Reinitialize browser
try:
playwright, browser, browser_context = await init_browser_context()
app_handler.context = browser_context
logger.info("Browser recovered")
await asyncio.sleep(5)
continue
except Exception as recovery_error:
logger.error(f"Failed to recover: {recovery_error}")
await asyncio.sleep(60)
continue
if not current_listings:
logger.warning("No listings fetched")
await asyncio.sleep(CHECK_INTERVAL)
_flush_rotating_file_handlers()
continue
previous_listings = app_handler.load_previous_listings()
if not previous_listings:
logger.info(f"🎬 First run: saving {len(current_listings)} listings as baseline")
# Mark all as failed applications so /retryfailed can be used
for listing in current_listings:
result = {
"listing_id": listing.get("id"),
"company": app_handler._detect_company(listing.get("link", "")),
"link": listing.get("link"),
"timestamp": str(listing.get("timestamp", "")) or str(listing.get("date", "")) or "",
"success": False,
"message": "First run, not auto-applied. Use /retryfailed to attempt.",
"address": listing.get("address", ""),
"rooms": listing.get("rooms", ""),
"price": listing.get("price", ""),
"retries": 0
}
app_handler.save_application(result)
app_handler.save_listings(current_listings)
await asyncio.sleep(CHECK_INTERVAL)
_flush_rotating_file_handlers()
continue
new_listings = app_handler.find_new_listings(current_listings, previous_listings)
application_results = {}
if new_listings:
logger.info(f"{len(new_listings)} new listing{'s' if len(new_listings) > 1 else ''} detected")
app_handler.log_listing_times(new_listings)
if app_handler.is_autopilot_enabled():
logger.info("Autopilot active - applying...")
application_results = await app_handler.apply_to_listings(new_listings)
app_handler.notify_new_listings(new_listings, application_results)
app_handler.save_listings(current_listings)
await asyncio.sleep(CHECK_INTERVAL)
_flush_rotating_file_handlers()
except (KeyboardInterrupt, SystemExit):
logger.info("Shutting down...")
except Exception as e:
logger.error(f"[MAIN] Error in main loop: {e}")
finally:
await browser.close()
logger.info("Browser closed successfully.")
if __name__ == "__main__":
asyncio.run(main())