wohnbot/telegram_bot.py

260 lines
No EOL
12 KiB
Python

import os
import logging
import threading
import time
import requests
import asyncio
# Configuration from environment
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
TELEGRAM_MAX_RETRIES = int(os.environ.get("TELEGRAM_MAX_RETRIES", 3))
logger = logging.getLogger(__name__)
class TelegramBot:
"""Handle Telegram commands for controlling the monitor"""
def __init__(self, monitor, bot_token=None, chat_id=None, event_loop=None):
self.monitor = monitor
self.bot_token = bot_token or TELEGRAM_BOT_TOKEN
self.chat_id = chat_id or TELEGRAM_CHAT_ID
self.last_update_id = 0
self.running = False
# Add reference to application handler
self.app_handler = monitor
# Store the main event loop for thread-safe async calls
self.event_loop = event_loop or asyncio.get_event_loop()
def start(self):
if not self.bot_token:
logger.warning("Telegram bot token not configured, commands disabled")
return
self.running = True
thread = threading.Thread(target=self._poll_updates, daemon=True)
thread.start()
logger.info("Telegram command listener started")
def stop(self):
self.running = False
def _poll_updates(self):
while self.running:
try:
url = f"https://api.telegram.org/bot{self.bot_token}/getUpdates"
params = {"offset": self.last_update_id + 1, "timeout": 30}
response = requests.get(url, params=params, timeout=35)
if response.ok:
data = response.json()
if data.get("ok") and data.get("result"):
for update in data["result"]:
self.last_update_id = update["update_id"]
self._handle_update(update)
except requests.exceptions.Timeout:
continue
except Exception as e:
logger.error(f"Telegram polling error: {e}")
time.sleep(5)
def _handle_update(self, update):
message = update.get("message", {})
text = message.get("text", "")
chat_id = str(message.get("chat", {}).get("id", ""))
if chat_id != self.chat_id:
logger.debug(f"Ignoring message from unknown chat: {chat_id}")
return
logger.info(f"Received Telegram command: {text}")
if text.startswith("/autopilot"):
self._handle_autopilot_command(text)
elif text == "/status":
self._handle_status_command()
elif text == "/help":
self._handle_help_command()
elif text == "/plot":
self._handle_plot_command()
elif text == "/errorrate":
self._handle_error_rate_command()
elif text == "/retryfailed":
# Schedule coroutine on the main event loop for thread safety
fut = asyncio.run_coroutine_threadsafe(
self._handle_retry_failed_command(max_retries=TELEGRAM_MAX_RETRIES),
self.event_loop
)
# Optionally, wait for result or handle exceptions
try:
fut.result()
except Exception as e:
logger.error(f"/retryfailed command failed: {e}")
elif text.startswith("/"):
self._handle_unknown_command(text)
async def _handle_retry_failed_command(self, max_retries: int = 3):
"""Retry all failed applications up to max_retries."""
# Ensure browser context is initialized
if not hasattr(self.app_handler, 'context') or self.app_handler.context is None:
if hasattr(self.app_handler, 'init_browser'):
await self.app_handler.init_browser()
# After (re-)init, propagate context to all sub-handlers (defensive)
if hasattr(self.app_handler, 'context') and hasattr(self.app_handler, 'handlers'):
for handler in self.app_handler.handlers.values():
handler.context = self.app_handler.context
self._send_message(f"🔄 Retrying failed applications (max retries: {max_retries})...")
applications = self.app_handler.load_applications()
failed = [app for app in applications.values() if not app.get("success") and app.get("retries", 0) < max_retries]
if not failed:
self._send_message("✅ No failed applications to retry (or all reached max retries).")
return
results = {}
details = []
for app in failed:
listing = {
"id": app["listing_id"],
"rooms": app.get("rooms", ""),
"size": app.get("size", ""),
"price": app.get("price", ""),
"address": app.get("address", ""),
"link": app.get("link", "")
}
retries = app.get("retries", 0) + 1
result = await self.app_handler.apply(listing)
result["retries"] = retries
self.app_handler.save_application(result)
results[listing["id"]] = result
status_emoji = "" if result["success"] else ""
details.append(
f"{status_emoji} <b>{result.get('address', '')}</b> ({result.get('company', '')})\n"
f"<code>{result.get('link', '')}</code>\n"
f"<i>{result.get('message', '')}</i>\n"
)
n_success = sum(1 for r in results.values() if r["success"])
n_fail = sum(1 for r in results.values() if not r["success"])
summary = f"🔄 Retried {len(results)} failed applications.\n✅ Success: {n_success}\n❌ Still failed: {n_fail}"
if details:
summary += "\n\n<b>Details:</b>\n" + "\n".join(details)
self._send_message(summary)
def _handle_autopilot_command(self, text):
logger.info(f"Processing autopilot command: {text}")
parts = text.split()
if len(parts) < 2:
self._send_message("Usage: /autopilot on|off")
return
action = parts[1].lower()
if action == "on":
logger.info("Enabling autopilot mode")
self.monitor.set_autopilot(True)
self._send_message("🤖 <b>Autopilot ENABLED</b>\n\nI will automatically apply to new listings!")
elif action == "off":
self.monitor.set_autopilot(False)
self._send_message("🛑 <b>Autopilot DISABLED</b>\n\nI will only notify you of new listings.")
else:
self._send_message("Usage: /autopilot on|off")
def _handle_status_command(self):
state = self.app_handler.load_state()
autopilot = state.get("autopilot", False)
applications = self.app_handler.load_applications()
status = "🤖 <b>Autopilot:</b> " + ("ON ✅" if autopilot else "OFF ❌")
status += f"\n📝 <b>Applications sent:</b> {len(applications)}"
by_company = {}
for app in applications.values():
company = app.get("company", "unknown")
by_company[company] = by_company.get(company, 0) + 1
if by_company:
status += "\n\n<b>By company:</b>"
for company, count in sorted(by_company.items()):
status += f"\n{company}: {count}"
self._send_message(status)
def _handle_help_command(self):
help_text = """🏠 <b>InBerlin Monitor Commands</b>
/autopilot on - Enable automatic applications
/autopilot off - Disable automatic applications
/status - Show current status and stats
/plot - Show weekly listing patterns
/help - Show this help message
When autopilot is ON, I will automatically apply to new listings."""
self._send_message(help_text)
def _handle_unknown_command(self, text):
cmd = text.split()[0] if text else text
self._send_message(f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands.")
def _handle_error_rate_command(self):
logger.info("Generating autopilot errorrate plot...")
try:
plot_path, summary = self.app_handler._generate_error_rate_plot()
if plot_path:
caption = "📉 <b>Autopilot Success vs Failure</b>\n\n" + summary
self._send_photo(plot_path, caption)
else:
self._send_message("📉 Not enough application data to generate errorrate plot.")
except Exception as e:
logger.error(f"Error generating errorrate plot: {e}")
import traceback
logger.error(traceback.format_exc())
self._send_message(f"❌ Error generating errorrate plot: {str(e)}")
def _handle_plot_command(self):
logger.info("Generating listing times plot...")
try:
plot_path = self.app_handler._generate_weekly_plot()
if plot_path:
self._send_photo(plot_path, "📊 <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
else:
self._send_message("📊 Not enough data to generate plot yet. Keep monitoring!")
except Exception as e:
logger.error(f"Error generating plot: {e}")
import traceback
logger.error(traceback.format_exc())
self._send_message(f"❌ Error generating plot: {str(e)}")
def _send_message(self, text):
"""Send a text message to the configured Telegram chat, with detailed error logging."""
if not self.bot_token or not self.chat_id:
logger.warning("Telegram bot token or chat ID not configured, cannot send message")
return
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {"chat_id": self.chat_id, "text": text, "parse_mode": "HTML"}
try:
response = requests.post(url, json=payload, timeout=10)
logger.info(f"[TELEGRAM] Sent message: status={response.status_code}, ok={response.ok}, response={response.text}")
if not response.ok:
logger.error(f"Failed to send Telegram message: {response.text}")
except Exception as e:
logger.error(f"Error while sending Telegram message: {e}")
import traceback
logger.error(traceback.format_exc())
def _send_photo(self, photo_path, caption):
"""Send a photo to the configured Telegram chat."""
if not self.bot_token or not self.chat_id:
logger.warning("Telegram bot token or chat ID not configured, cannot send photo")
return
url = f"https://api.telegram.org/bot{self.bot_token}/sendPhoto"
with open(photo_path, "rb") as photo:
payload = {"chat_id": self.chat_id, "caption": caption, "parse_mode": "HTML"}
files = {"photo": photo}
try:
response = requests.post(url, data=payload, files=files, timeout=10)
if not response.ok:
logger.error(f"Failed to send Telegram photo: {response.text}")
except Exception as e:
logger.error(f"Error while sending Telegram photo: {e}")
def _generate_error_rate_plot(self):
"""Generate and send a plot showing success vs failure ratio for autopilot applications."""
logger.info("Generating autopilot errorrate plot...")
try:
plot_path, summary = self.app_handler._generate_error_rate_plot()
if plot_path:
self._send_photo(plot_path, caption=summary)
else:
self._send_message("No data available to generate the error rate plot.")
except Exception as e:
logger.error(f"Error generating errorrate plot: {e}")
self._send_message(f"❌ Error generating errorrate plot: {str(e)}")