wohnbot/handlers/wgcompany_notifier.py

215 lines
9.4 KiB
Python
Raw Normal View History

import asyncio
import logging
import hashlib
import re
from datetime import datetime
from pathlib import Path
import json
import os
from playwright.async_api import async_playwright
logger = logging.getLogger(__name__)
WGCOMPANY_LISTINGS_FILE = Path("data/wgcompany_listings.json")
WGCOMPANY_TIMING_FILE = Path("data/wgcompany_times.csv")
# Environment variables for search filters
WGCOMPANY_MIN_SIZE = os.environ.get("WGCOMPANY_MIN_SIZE", "")
WGCOMPANY_MAX_PRICE = os.environ.get("WGCOMPANY_MAX_PRICE", "")
WGCOMPANY_AGE = os.environ.get("WGCOMPANY_AGE", "")
WGCOMPANY_SMOKER = os.environ.get("WGCOMPANY_SMOKER", "")
WGCOMPANY_BEZIRK = os.environ.get("WGCOMPANY_BEZIRK", "0")
class WGCompanyNotifier:
def __init__(self, telegram_bot=None, refresh_minutes=10):
self.browser = None
self.context = None
self.telegram_bot = telegram_bot
self.refresh_minutes = refresh_minutes
async def init_browser(self):
if self.browser is None:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
2026-01-01 15:27:25 +01:00
self.context = await self.browser.new_context()
logger.debug("[WG] Browser ready")
async def fetch_listings(self):
2025-12-31 16:06:42 +01:00
await self.init_browser()
listings = []
try:
page = await self.context.new_page()
search_url = "http://www.wgcompany.de/cgi-bin/seite?st=1&mi=10&li=100"
logger.info(f"[WGCOMPANY] Loading search page: {search_url}")
await page.goto(search_url, wait_until="networkidle")
await asyncio.sleep(2)
if WGCOMPANY_MIN_SIZE:
min_size_field = await page.query_selector('input[name="c"]')
if min_size_field:
await min_size_field.fill(WGCOMPANY_MIN_SIZE)
if WGCOMPANY_MAX_PRICE:
max_price_field = await page.query_selector('input[name="a"]')
if max_price_field:
await max_price_field.fill(WGCOMPANY_MAX_PRICE)
if WGCOMPANY_AGE:
age_field = await page.query_selector('input[name="l"]')
if age_field:
await age_field.fill(WGCOMPANY_AGE)
if WGCOMPANY_SMOKER:
smoker_select = await page.query_selector('select[name="o"]')
if smoker_select:
await smoker_select.select_option(WGCOMPANY_SMOKER)
if WGCOMPANY_BEZIRK and WGCOMPANY_BEZIRK != "0":
bezirk_select = await page.query_selector('select[name="e"]')
if bezirk_select:
await bezirk_select.select_option(WGCOMPANY_BEZIRK)
submit_btn = await page.query_selector('input[type="submit"][value*="finde"], input[type="submit"]')
if submit_btn:
await submit_btn.click()
await page.wait_for_load_state("networkidle")
await asyncio.sleep(2)
content = await page.content()
with open("data/wgcompany_debug.html", "w", encoding="utf-8") as f:
f.write(content)
listing_links = await page.query_selector_all('a[href*="wg.pl"][href*="wgzeigen"]')
logger.info(f"[WGCOMPANY] Found {len(listing_links)} listing links")
for link_elem in listing_links:
try:
href = await link_elem.get_attribute("href")
if not href:
continue
parent = await link_elem.evaluate_handle("el => el.closest('tr') || el.parentElement")
row_text = await parent.evaluate("el => el.innerText") if parent else ""
price_match = re.search(r'(\d+)\s*€', row_text)
price = price_match.group(1) + "" if price_match else "?"
size_match = re.search(r'(\d+)\s*m²', row_text)
size = size_match.group(1) + "" if size_match else "?"
bezirk_patterns = [
"Kreuzberg", "Neukölln", "Friedrichshain", "Prenzlauer Berg",
"Mitte", "Wedding", "Charlottenburg", "Schöneberg", "Tempelhof",
"Steglitz", "Wilmersdorf", "Pankow", "Lichtenberg", "Treptow",
"Köpenick", "Reinickendorf", "Spandau", "Zehlendorf", "Moabit"
]
location = "Berlin"
for bez in bezirk_patterns:
if bez.lower() in row_text.lower():
location = bez
break
if not href.startswith("http"):
href = f"http://www.wgcompany.de{href}" if href.startswith("/") else f"http://www.wgcompany.de/cgi-bin/{href}"
listing_id = hashlib.md5(f"{href}{price}{size}".encode()).hexdigest()[:12]
listings.append({
"id": listing_id,
"rooms": "1 Zimmer (WG)",
"size": size,
"price": price,
"address": location,
"link": href,
"source": "wgcompany",
"fetched_at": datetime.now().isoformat()
})
except Exception as e:
logger.debug(f"[WGCOMPANY] Error parsing listing: {e}")
continue
# Deduplicate
seen_ids = set()
unique_listings = []
for listing in listings:
if listing["id"] not in seen_ids:
seen_ids.add(listing["id"])
unique_listings.append(listing)
await page.close()
logger.info(f"[WGCOMPANY] Fetched {len(unique_listings)} unique listings")
return unique_listings
except Exception as e:
logger.error(f"[WGCOMPANY] Error fetching listings: {e}")
return []
def load_previous_listings(self):
2026-01-02 23:29:17 +01:00
if WGCOMPANY_LISTINGS_FILE.exists():
with open(WGCOMPANY_LISTINGS_FILE, 'r') as f:
data = json.load(f)
2026-01-01 15:27:25 +01:00
logger.debug(f"[WG] Loaded {len(data)} previous listings")
return data
return {}
2026-01-01 15:27:25 +01:00
def save_listings(self, listings: list[dict]) -> None:
listings_dict = {l['id']: l for l in listings}
logger.debug(f"[WG] Saving {len(listings_dict)} listings")
2026-01-02 23:29:17 +01:00
with open(WGCOMPANY_LISTINGS_FILE, 'w') as f:
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
2026-01-01 15:27:25 +01:00
def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
new = []
for listing in current:
if listing['id'] not in previous:
new.append(listing)
if new:
2026-01-02 11:23:35 +01:00
logger.info(f"[WG] {len(new)} new listing{'s' if len(new) > 1 else ''} detected")
2026-01-01 15:27:25 +01:00
return new
def log_listing_times(self, new_listings):
if not new_listings:
return
import csv
file_exists = WGCOMPANY_TIMING_FILE.exists()
with open(WGCOMPANY_TIMING_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
now = datetime.now()
for listing in new_listings:
writer.writerow([
now.isoformat(),
now.strftime("%A"),
now.hour,
now.minute,
listing["rooms"],
listing["size"],
listing["price"],
listing["address"],
2026-01-01 15:27:25 +01:00
listing['id']
])
2026-01-01 15:27:25 +01:00
logger.debug(f"[WG] Logged {len(new_listings)} to CSV")
2026-01-01 15:27:25 +01:00
async def notify_new_listings(self, new_listings: list[dict]) -> None:
if not new_listings or not self.telegram_bot:
return
2026-01-01 15:27:25 +01:00
for idx, listing in enumerate(new_listings, start=1):
try:
2026-01-01 15:27:25 +01:00
message = (
2026-01-02 11:23:35 +01:00
f"<b>[WG-Company] Neues WG-Zimmer!</b>\n\n"
2026-01-01 15:27:25 +01:00
f"🚪 <b>{listing['rooms']}</b>\n"
f"📏 {listing['size']}\n"
f"💰 {listing['price']}\n"
f"📍 {listing['address']}\n\n"
f"👉 <a href=\"{listing['link']}\">Zum Angebot</a>"
)
loop = self.telegram_bot.event_loop or asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self.telegram_bot._send_message(message), loop)
await asyncio.sleep(0.5)
except Exception as e:
2026-01-02 11:23:35 +01:00
logger.error(f"[WG] Telegram failed for listing {idx}: {str(e)[:50]}")
async def run(self):
await self.init_browser()
while True:
listings = await self.fetch_listings()
previous = self.load_previous_listings()
new_listings = self.find_new_listings(listings, previous)
if new_listings:
logger.info(f"[WGCOMPANY] Found {len(new_listings)} new listing(s)")
self.log_listing_times(new_listings)
await self.notify_new_listings(new_listings)
else:
logger.info("[WGCOMPANY] No new listings")
self.save_listings(listings)
await asyncio.sleep(self.refresh_minutes * 60)