wohnbot/telegram_bot.py
2026-01-02 11:23:35 +01:00

394 lines
18 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import logging
import threading
import time
import requests
import asyncio
import httpx
# Configuration from environment
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
TELEGRAM_MAX_RETRIES = int(os.environ.get("TELEGRAM_MAX_RETRIES", 3))
logger = logging.getLogger(__name__)
class TelegramBot:
async def _handle_help_command(self) -> None:
"""Send a help message with available commands."""
help_text = (
"<b>Available commands:</b>\n"
"/autopilot on|off - Enable/disable autopilot\n"
"/status - Show current status\n"
"/plot - Show weekly listing pattern plot\n"
"/errorrate - Show autopilot error rate plot\n"
"/retryfailed - Retry failed applications\n"
"/resetlistings - Reset listings file\n"
"/help - Show this help message"
)
await self._send_message(help_text)
async def _handle_unknown_command(self, text: str) -> None:
"""Handle unknown commands and notify the user."""
cmd = text.split()[0] if text else text
msg = (
f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands."
)
await self._send_message(msg)
async def _handle_reset_listings_command(self) -> None:
"""Move listings.json to data/old/ with a timestamp, preserving statistics and application history."""
import shutil
from datetime import datetime
try:
listings_path = os.path.join("data", "listings.json")
old_dir = os.path.join("data", "old")
if os.path.exists(listings_path):
# Ensure old_dir exists
os.makedirs(old_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
dest_path = os.path.join(
old_dir, f"listings_{timestamp}.json"
)
shutil.move(listings_path, dest_path)
msg = (
f"🗑️ <b>Listings reset:</b>\n<code>listings.json</code> moved to "
f"<code>old/listings_{timestamp}.json</code>."
)
else:
msg = " No listings file found to move."
await self._send_message(msg)
except Exception as e:
logger.error(f"Error resetting listings: {e}")
await self._send_message(f"[ERROR] Error resetting listings: {str(e)}")
def __init__(self, monitor, bot_token: str | None = None, chat_id: str | None = None, event_loop=None) -> None:
self.monitor = monitor
self.bot_token = bot_token or TELEGRAM_BOT_TOKEN
self.chat_id = chat_id or TELEGRAM_CHAT_ID
self.last_update_id: int = 0
self.running: bool = False
# Add reference to application handler
self.app_handler = monitor
# Store the main event loop for thread-safe async calls
self.event_loop = event_loop or asyncio.get_event_loop()
# Initialize persistent httpx client with connection pooling
self._http_client: httpx.AsyncClient | None = None
async def _get_http_client(self) -> httpx.AsyncClient:
"""Get or create the persistent httpx client with connection pooling."""
if self._http_client is None:
self._http_client = httpx.AsyncClient(
timeout=30,
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
)
return self._http_client
async def close(self) -> None:
"""Close the httpx client gracefully."""
if self._http_client:
await self._http_client.aclose()
self._http_client = None
def start(self) -> None:
if not self.bot_token:
logger.warning("Telegram bot token not configured, commands disabled")
return
self.running = True
thread = threading.Thread(target=self._poll_updates, daemon=True)
thread.start()
logger.info("Telegram command listener started")
def stop(self) -> None:
self.running = False
def _poll_updates(self) -> None:
while self.running:
try:
url = f"https://api.telegram.org/bot{self.bot_token}/getUpdates"
params = {"offset": self.last_update_id + 1, "timeout": 30}
response = requests.get(url, params=params, timeout=35)
if response.ok:
data = response.json()
if data.get("ok") and data.get("result"):
for update in data["result"]:
self.last_update_id = update["update_id"]
self._handle_update(update)
except requests.exceptions.Timeout:
continue
except Exception as e:
logger.error(f"Telegram polling error: {e}")
time.sleep(5)
def _handle_update(self, update: dict) -> None:
message = update.get("message", {})
text = message.get("text", "")
chat_id = str(message.get("chat", {}).get("id", ""))
if chat_id != self.chat_id:
logger.debug(f"Ignoring message from unknown chat: {chat_id}")
return
logger.info(f"Received Telegram command: {text}")
loop = self.event_loop
if text.startswith("/autopilot"):
asyncio.run_coroutine_threadsafe(self._handle_autopilot_command(text), loop)
elif text == "/status":
asyncio.run_coroutine_threadsafe(self._handle_status_command(), loop)
elif text == "/help":
asyncio.run_coroutine_threadsafe(self._handle_help_command(), loop)
elif text == "/plot":
asyncio.run_coroutine_threadsafe(self._handle_plot_command(), loop)
elif text == "/errorrate":
asyncio.run_coroutine_threadsafe(self._handle_error_rate_command(), loop)
elif text == "/retryfailed":
fut = asyncio.run_coroutine_threadsafe(
self._handle_retry_failed_command(max_retries=TELEGRAM_MAX_RETRIES),
loop
)
try:
fut.result()
except Exception as e:
logger.error(f"/retryfailed command failed: {e}")
elif text == "/resetlistings":
asyncio.run_coroutine_threadsafe(self._handle_reset_listings_command(), loop)
elif text.startswith("/"):
asyncio.run_coroutine_threadsafe(self._handle_unknown_command(text), loop)
async def _handle_retry_failed_command(self, max_retries: int = 3) -> None:
"""Retry all failed applications up to max_retries."""
# Ensure browser context is initialized
if not hasattr(self.app_handler, 'context') or self.app_handler.context is None:
if hasattr(self.app_handler, 'init_browser'):
await self.app_handler.init_browser()
# After (re-)init, propagate context to all sub-handlers (defensive)
if hasattr(self.app_handler, 'context') and hasattr(self.app_handler, 'handlers'):
for handler in self.app_handler.handlers.values():
handler.context = self.app_handler.context
applications = self.app_handler.load_applications()
failed = [
app for app in applications.values()
if not app.get("success")
and app.get("retries", 0) < max_retries
and not app.get("deactivated", False)
]
await self._send_message(f"[RETRY] Retrying {len(failed)} failed applications (max retries: {max_retries})...")
if not failed:
await self._send_message("[INFO] No failed applications to retry (or all reached max retries).")
return
results = {}
details = []
for app in failed:
listing = {
"id": app["listing_id"],
"rooms": app.get("rooms", ""),
"size": app.get("size", ""),
"price": app.get("price", ""),
"address": app.get("address", ""),
"link": app.get("link", "")
}
retries = app.get("retries", 0) + 1
result = await self.app_handler.apply(listing)
result["retries"] = retries
# Preserve original timestamp only if still failing; update on success
if not result["success"]:
result["timestamp"] = app.get("timestamp", result["timestamp"])
self.app_handler.save_application(result)
results[listing["id"]] = result
status_emoji = "[SUCCESS]" if result["success"] else "[FAILED]"
details.append(
f"{status_emoji} <b>{result.get('address', '')}</b> ({result.get('company', '')})\n"
f"<code>{result.get('link', '')}</code>\n"
f"<i>{result.get('message', '')}</i>\n"
)
n_success = sum(1 for r in results.values() if r["success"])
n_fail = sum(1 for r in results.values() if not r["success"])
summary = f"[RETRY] Retried {len(results)} failed applications.\n[SUCCESS]: {n_success}\n[FAILED]: {n_fail}"
if details:
summary += "\n\n<b>Details:</b>\n" + "\n".join(details)
await self._send_message(summary)
async def _handle_autopilot_command(self, text: str) -> None:
logger.info(f"Processing autopilot command: {text}")
parts = text.split()
if len(parts) < 2:
await self._send_message("Usage: /autopilot on|off")
return
action = parts[1].lower()
if action == "on":
logger.info("Enabling autopilot mode")
self.monitor.set_autopilot(True)
await self._send_message("<b>Autopilot ENABLED</b>\n\nI will automatically apply to new listings!")
elif action == "off":
self.monitor.set_autopilot(False)
await self._send_message("🛑 <b>Autopilot DISABLED</b>\n\nI will only notify you of new listings.")
else:
await self._send_message("Usage: /autopilot on|off")
async def _handle_status_command(self) -> None:
state = self.app_handler.load_state()
autopilot = state.get("autopilot", False)
applications = self.app_handler.load_applications()
status = "<b>Autopilot:</b> " + ("ON" if autopilot else "OFF")
status += f"\n📝 <b>Applications sent:</b> {len(applications)}"
by_company: dict[str, int] = {}
for app in applications.values():
company = app.get("company", "unknown")
by_company[company] = by_company.get(company, 0) + 1
if by_company:
status += "\n\n<b>By company:</b>"
for company, count in sorted(by_company.items()):
status += f"\n{company}: {count}"
await self._send_message(status)
async def _handle_plot_command(self) -> None:
logger.info("Generating listing times plot...")
try:
plot_path = self.app_handler._generate_weekly_plot()
await self._send_photo(plot_path, "\U0001f4ca <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
except Exception as e:
logger.error(f"Error generating plot: {e}")
import traceback
logger.error(traceback.format_exc())
await self._send_message(f"\u274c Error generating plot: {str(e)}")
async def _handle_error_rate_command(self) -> None:
logger.info("Generating autopilot errorrate plot...")
try:
plot_path, summary = self.app_handler._generate_error_rate_plot()
caption = "📉 <b>Autopilot Success vs Failure</b>\n\n" + summary
await self._send_photo(plot_path, caption)
except Exception as e:
logger.error(f"Error generating errorrate plot: {e}")
import traceback
logger.error(traceback.format_exc())
await self._send_message(f"[ERROR] Error generating errorrate plot: {str(e)}")
async def _send_message(self, text: str) -> None:
"""Send a text message to the configured Telegram chat, with retry logic and detailed error logging (async)."""
MAX_LENGTH = 4096 # Telegram message character limit
if not self.bot_token or not self.chat_id:
logger.warning("Telegram bot token or chat ID not configured, cannot send message")
return
# Clean text: remove invalid unicode surrogates
text = text.encode('utf-8', errors='ignore').decode('utf-8')
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
# Split message into chunks if too long
messages: list[str] = []
if isinstance(text, str) and len(text) > MAX_LENGTH:
# Try to split on line breaks for readability
lines = text.split('\n')
chunk = ""
for line in lines:
if len(chunk) + len(line) + 1 > MAX_LENGTH:
messages.append(chunk)
chunk = line
else:
if chunk:
chunk += "\n"
chunk += line
if chunk:
messages.append(chunk)
else:
messages = [text]
max_retries = 3
retry_delay = 1 # Initial delay in seconds
try:
client = await self._get_http_client()
for idx, msg in enumerate(messages):
payload = {"chat_id": self.chat_id, "text": msg, "parse_mode": "HTML"}
# Retry logic for each message chunk
for attempt in range(max_retries):
try:
response = await client.post(url, json=payload)
if response.is_success:
logger.info(f"[TELEGRAM] Sent message part {idx+1}/{len(messages)}: status={response.status_code}")
break
else:
logger.warning(f"[TELEGRAM] Failed attempt {attempt+1}/{max_retries}: {response.status_code}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Failed to send Telegram message after {max_retries} attempts: {response.text}")
except httpx.TimeoutException as e:
logger.warning(f"[TELEGRAM] Timeout on attempt {attempt+1}/{max_retries}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Telegram message timed out after {max_retries} attempts")
except httpx.RequestError as e:
logger.warning(f"[TELEGRAM] Network error on attempt {attempt+1}/{max_retries}: {e}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Telegram message failed after {max_retries} attempts: {e}")
except Exception as e:
logger.error(f"Unexpected error while sending Telegram message: {e}")
async def _send_photo(self, photo_path: str, caption: str) -> None:
"""Send a photo to the configured Telegram chat with retry logic (async)."""
if not self.bot_token or not self.chat_id:
logger.warning("Telegram bot token or chat ID not configured, cannot send photo")
return
# Clean caption: remove invalid unicode surrogates
caption = caption.encode('utf-8', errors='ignore').decode('utf-8')
url = f"https://api.telegram.org/bot{self.bot_token}/sendPhoto"
max_retries = 3
retry_delay = 1 # Initial delay in seconds
for attempt in range(max_retries):
try:
with open(photo_path, "rb") as photo:
files = {"photo": (photo_path, photo, "image/jpeg")}
data = {"chat_id": self.chat_id, "caption": caption, "parse_mode": "HTML"}
client = await self._get_http_client()
response = await client.post(url, data=data, files=files)
if response.is_success:
logger.info(f"[TELEGRAM] Sent photo: {photo_path}")
return
else:
logger.warning(f"[TELEGRAM] Photo send attempt {attempt+1}/{max_retries} failed: {response.status_code}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Failed to send Telegram photo after {max_retries} attempts: {response.text}")
except httpx.TimeoutException:
logger.warning(f"[TELEGRAM] Photo timeout on attempt {attempt+1}/{max_retries}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Telegram photo timed out after {max_retries} attempts")
except httpx.RequestError as e:
logger.warning(f"[TELEGRAM] Network error on attempt {attempt+1}/{max_retries}: {e}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Telegram photo failed after {max_retries} attempts: {e}")
except FileNotFoundError:
logger.error(f"Photo file not found: {photo_path}")
return # No point retrying if file doesn't exist
except Exception as e:
logger.error(f"Unexpected error while sending Telegram photo: {e}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
return