wohnbot/telegram_bot.py
2025-12-31 16:06:42 +01:00

300 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import logging
import threading
import time
import requests
import asyncio
# Configuration from environment
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
TELEGRAM_MAX_RETRIES = int(os.environ.get("TELEGRAM_MAX_RETRIES", 3))
logger = logging.getLogger(__name__)
class TelegramBot:
async def _handle_help_command(self):
"""Send a help message with available commands."""
help_text = (
"<b>Available commands:</b>\n"
"/autopilot on|off - Enable/disable autopilot\n"
"/status - Show current status\n"
"/plot - Show weekly listing pattern plot\n"
"/errorrate - Show autopilot error rate plot\n"
"/retryfailed - Retry failed applications\n"
"/resetlistings - Reset listings file\n"
"/help - Show this help message"
)
await self._send_message(help_text)
async def _handle_unknown_command(self, text):
"""Handle unknown commands and notify the user."""
cmd = text.split()[0] if text else text
msg = (
f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands."
)
await self._send_message(msg)
async def _handle_reset_listings_command(self):
"""Move listings.json to data/old/ with a timestamp, preserving statistics and application history."""
import shutil
from datetime import datetime
try:
listings_path = os.path.join("data", "listings.json")
old_dir = os.path.join("data", "old")
if os.path.exists(listings_path):
# Ensure old_dir exists
os.makedirs(old_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
dest_path = os.path.join(
old_dir, f"listings_{timestamp}.json"
)
shutil.move(listings_path, dest_path)
msg = (
f"🗑️ <b>Listings reset:</b>\n<code>listings.json</code> moved to "
f"<code>old/listings_{timestamp}.json</code>."
)
else:
msg = " No listings file found to move."
await self._send_message(msg)
except Exception as e:
logger.error(f"Error resetting listings: {e}")
await self._send_message(f"❌ Error resetting listings: {str(e)}")
def __init__(self, monitor, bot_token=None, chat_id=None, event_loop=None):
self.monitor = monitor
self.bot_token = bot_token or TELEGRAM_BOT_TOKEN
self.chat_id = chat_id or TELEGRAM_CHAT_ID
self.last_update_id = 0
self.running = False
# Add reference to application handler
self.app_handler = monitor
# Store the main event loop for thread-safe async calls
self.event_loop = event_loop or asyncio.get_event_loop()
def start(self):
if not self.bot_token:
logger.warning("Telegram bot token not configured, commands disabled")
return
self.running = True
thread = threading.Thread(target=self._poll_updates, daemon=True)
thread.start()
logger.info("Telegram command listener started")
def stop(self):
self.running = False
def _poll_updates(self):
while self.running:
try:
url = f"https://api.telegram.org/bot{self.bot_token}/getUpdates"
params = {"offset": self.last_update_id + 1, "timeout": 30}
response = requests.get(url, params=params, timeout=35)
if response.ok:
data = response.json()
if data.get("ok") and data.get("result"):
for update in data["result"]:
self.last_update_id = update["update_id"]
self._handle_update(update)
except requests.exceptions.Timeout:
continue
except Exception as e:
logger.error(f"Telegram polling error: {e}")
time.sleep(5)
def _handle_update(self, update):
message = update.get("message", {})
text = message.get("text", "")
chat_id = str(message.get("chat", {}).get("id", ""))
if chat_id != self.chat_id:
logger.debug(f"Ignoring message from unknown chat: {chat_id}")
return
logger.info(f"Received Telegram command: {text}")
loop = self.event_loop
if text.startswith("/autopilot"):
asyncio.run_coroutine_threadsafe(self._handle_autopilot_command(text), loop)
elif text == "/status":
asyncio.run_coroutine_threadsafe(self._handle_status_command(), loop)
elif text == "/help":
asyncio.run_coroutine_threadsafe(self._handle_help_command(), loop)
elif text == "/plot":
asyncio.run_coroutine_threadsafe(self._handle_plot_command(), loop)
elif text == "/errorrate":
asyncio.run_coroutine_threadsafe(self._handle_error_rate_command(), loop)
elif text == "/retryfailed":
fut = asyncio.run_coroutine_threadsafe(
self._handle_retry_failed_command(max_retries=TELEGRAM_MAX_RETRIES),
loop
)
try:
fut.result()
except Exception as e:
logger.error(f"/retryfailed command failed: {e}")
elif text == "/resetlistings":
asyncio.run_coroutine_threadsafe(self._handle_reset_listings_command(), loop)
elif text.startswith("/"):
asyncio.run_coroutine_threadsafe(self._handle_unknown_command(text), loop)
async def _handle_retry_failed_command(self, max_retries: int = 3):
"""Retry all failed applications up to max_retries."""
# Ensure browser context is initialized
if not hasattr(self.app_handler, 'context') or self.app_handler.context is None:
if hasattr(self.app_handler, 'init_browser'):
await self.app_handler.init_browser()
# After (re-)init, propagate context to all sub-handlers (defensive)
if hasattr(self.app_handler, 'context') and hasattr(self.app_handler, 'handlers'):
for handler in self.app_handler.handlers.values():
handler.context = self.app_handler.context
applications = self.app_handler.load_applications()
failed = [app for app in applications.values() if not app.get("success") and app.get("retries", 0) < max_retries]
await self._send_message(f"🔄 Retrying {len(failed)} failed applications (max retries: {max_retries})...")
if not failed:
await self._send_message("✅ No failed applications to retry (or all reached max retries).")
return
results = {}
details = []
for app in failed:
listing = {
"id": app["listing_id"],
"rooms": app.get("rooms", ""),
"size": app.get("size", ""),
"price": app.get("price", ""),
"address": app.get("address", ""),
"link": app.get("link", "")
}
retries = app.get("retries", 0) + 1
result = await self.app_handler.apply(listing)
result["retries"] = retries
self.app_handler.save_application(result)
results[listing["id"]] = result
status_emoji = "" if result["success"] else ""
details.append(
f"{status_emoji} <b>{result.get('address', '')}</b> ({result.get('company', '')})\n"
f"<code>{result.get('link', '')}</code>\n"
f"<i>{result.get('message', '')}</i>\n"
)
n_success = sum(1 for r in results.values() if r["success"])
n_fail = sum(1 for r in results.values() if not r["success"])
summary = f"🔄 Retried {len(results)} failed applications.\n✅ Success: {n_success}\n❌ Still failed: {n_fail}"
if details:
summary += "\n\n<b>Details:</b>\n" + "\n".join(details)
await self._send_message(summary)
async def _handle_autopilot_command(self, text):
logger.info(f"Processing autopilot command: {text}")
parts = text.split()
if len(parts) < 2:
await self._send_message("Usage: /autopilot on|off")
return
action = parts[1].lower()
if action == "on":
logger.info("Enabling autopilot mode")
self.monitor.set_autopilot(True)
await self._send_message("🤖 <b>Autopilot ENABLED</b>\n\nI will automatically apply to new listings!")
elif action == "off":
self.monitor.set_autopilot(False)
await self._send_message("🛑 <b>Autopilot DISABLED</b>\n\nI will only notify you of new listings.")
else:
await self._send_message("Usage: /autopilot on|off")
async def _handle_status_command(self):
state = self.app_handler.load_state()
autopilot = state.get("autopilot", False)
applications = self.app_handler.load_applications()
status = "🤖 <b>Autopilot:</b> " + ("ON ✅" if autopilot else "OFF ❌")
status += f"\n📝 <b>Applications sent:</b> {len(applications)}"
by_company = {}
for app in applications.values():
company = app.get("company", "unknown")
by_company[company] = by_company.get(company, 0) + 1
if by_company:
status += "\n\n<b>By company:</b>"
for company, count in sorted(by_company.items()):
status += f"\n{company}: {count}"
await self._send_message(status)
async def _handle_plot_command(self):
logger.info("Generating listing times plot...")
try:
plot_path = self.app_handler._generate_weekly_plot()
await self._send_photo(plot_path, "\U0001f4ca <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
except Exception as e:
logger.error(f"Error generating plot: {e}")
import traceback
logger.error(traceback.format_exc())
await self._send_message(f"\u274c Error generating plot: {str(e)}")
async def _handle_error_rate_command(self):
logger.info("Generating autopilot errorrate plot...")
try:
plot_path, summary = self.app_handler._generate_error_rate_plot()
caption = "📉 <b>Autopilot Success vs Failure</b>\n\n" + summary
await self._send_photo(plot_path, caption)
except Exception as e:
logger.error(f"Error generating errorrate plot: {e}")
import traceback
logger.error(traceback.format_exc())
await self._send_message(f"❌ Error generating errorrate plot: {str(e)}")
async def _send_message(self, text):
"""Send a text message to the configured Telegram chat, with detailed error logging (async)."""
import httpx
MAX_LENGTH = 4096 # Telegram message character limit
if not self.bot_token or not self.chat_id:
logger.warning("Telegram bot token or chat ID not configured, cannot send message")
return
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
# Split message into chunks if too long
messages = []
if isinstance(text, str) and len(text) > MAX_LENGTH:
# Try to split on line breaks for readability
lines = text.split('\n')
chunk = ""
for line in lines:
if len(chunk) + len(line) + 1 > MAX_LENGTH:
messages.append(chunk)
chunk = line
else:
if chunk:
chunk += "\n"
chunk += line
if chunk:
messages.append(chunk)
else:
messages = [text]
try:
async with httpx.AsyncClient(timeout=10) as client:
for idx, msg in enumerate(messages):
payload = {"chat_id": self.chat_id, "text": msg, "parse_mode": "HTML"}
response = await client.post(url, json=payload)
logger.info(f"[TELEGRAM] Sent message part {idx+1}/{len(messages)}: status={response.status_code}, ok={response.is_success}")
if not response.is_success:
logger.error(f"Failed to send Telegram message: {response.text}")
except Exception as e:
logger.error(f"Error while sending Telegram message: {e}")
async def _send_photo(self, photo_path, caption):
"""Send a photo to the configured Telegram chat (async)."""
import httpx
if not self.bot_token or not self.chat_id:
logger.warning("Telegram bot token or chat ID not configured, cannot send photo")
return
url = f"https://api.telegram.org/bot{self.bot_token}/sendPhoto"
try:
with open(photo_path, "rb") as photo:
files = {"photo": (photo_path, photo, "image/jpeg")}
data = {"chat_id": self.chat_id, "caption": caption, "parse_mode": "HTML"}
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(url, data=data, files=files)
if not response.is_success:
logger.error(f"Failed to send Telegram photo: {response.text}")
except Exception as e:
logger.error(f"Error while sending Telegram photo: {e}")