import asyncio import logging import hashlib import re from datetime import datetime from pathlib import Path import json import os from playwright.async_api import async_playwright logger = logging.getLogger(__name__) WGCOMPANY_LISTINGS_FILE = Path("data/wgcompany_listings.json") WGCOMPANY_TIMING_FILE = Path("data/wgcompany_times.csv") # Environment variables for search filters WGCOMPANY_MIN_SIZE = os.environ.get("WGCOMPANY_MIN_SIZE", "") WGCOMPANY_MAX_PRICE = os.environ.get("WGCOMPANY_MAX_PRICE", "") WGCOMPANY_AGE = os.environ.get("WGCOMPANY_AGE", "") WGCOMPANY_SMOKER = os.environ.get("WGCOMPANY_SMOKER", "") WGCOMPANY_BEZIRK = os.environ.get("WGCOMPANY_BEZIRK", "0") class WGCompanyNotifier: def __init__(self, telegram_bot=None, refresh_minutes=10): self.browser = None self.context = None self.telegram_bot = telegram_bot self.refresh_minutes = refresh_minutes async def init_browser(self): if self.browser is None: self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch(headless=True) self.context = await self.browser.new_context() logger.debug("[WG] Browser ready") async def fetch_listings(self): await self.init_browser() listings = [] try: page = await self.context.new_page() search_url = "http://www.wgcompany.de/cgi-bin/seite?st=1&mi=10&li=100" logger.info(f"[WGCOMPANY] Loading search page: {search_url}") await page.goto(search_url, wait_until="networkidle") await asyncio.sleep(2) if WGCOMPANY_MIN_SIZE: min_size_field = await page.query_selector('input[name="c"]') if min_size_field: await min_size_field.fill(WGCOMPANY_MIN_SIZE) if WGCOMPANY_MAX_PRICE: max_price_field = await page.query_selector('input[name="a"]') if max_price_field: await max_price_field.fill(WGCOMPANY_MAX_PRICE) if WGCOMPANY_AGE: age_field = await page.query_selector('input[name="l"]') if age_field: await age_field.fill(WGCOMPANY_AGE) if WGCOMPANY_SMOKER: smoker_select = await page.query_selector('select[name="o"]') if smoker_select: await smoker_select.select_option(WGCOMPANY_SMOKER) if WGCOMPANY_BEZIRK and WGCOMPANY_BEZIRK != "0": bezirk_select = await page.query_selector('select[name="e"]') if bezirk_select: await bezirk_select.select_option(WGCOMPANY_BEZIRK) submit_btn = await page.query_selector('input[type="submit"][value*="finde"], input[type="submit"]') if submit_btn: await submit_btn.click() await page.wait_for_load_state("networkidle") await asyncio.sleep(2) content = await page.content() with open("data/wgcompany_debug.html", "w", encoding="utf-8") as f: f.write(content) listing_links = await page.query_selector_all('a[href*="wg.pl"][href*="wgzeigen"]') logger.info(f"[WGCOMPANY] Found {len(listing_links)} listing links") for link_elem in listing_links: try: href = await link_elem.get_attribute("href") if not href: continue parent = await link_elem.evaluate_handle("el => el.closest('tr') || el.parentElement") row_text = await parent.evaluate("el => el.innerText") if parent else "" price_match = re.search(r'(\d+)\s*€', row_text) price = price_match.group(1) + " €" if price_match else "?" size_match = re.search(r'(\d+)\s*m²', row_text) size = size_match.group(1) + " m²" if size_match else "?" bezirk_patterns = [ "Kreuzberg", "Neukölln", "Friedrichshain", "Prenzlauer Berg", "Mitte", "Wedding", "Charlottenburg", "Schöneberg", "Tempelhof", "Steglitz", "Wilmersdorf", "Pankow", "Lichtenberg", "Treptow", "Köpenick", "Reinickendorf", "Spandau", "Zehlendorf", "Moabit" ] location = "Berlin" for bez in bezirk_patterns: if bez.lower() in row_text.lower(): location = bez break if not href.startswith("http"): href = f"http://www.wgcompany.de{href}" if href.startswith("/") else f"http://www.wgcompany.de/cgi-bin/{href}" listing_id = hashlib.md5(f"{href}{price}{size}".encode()).hexdigest()[:12] listings.append({ "id": listing_id, "rooms": "1 Zimmer (WG)", "size": size, "price": price, "address": location, "link": href, "source": "wgcompany", "fetched_at": datetime.now().isoformat() }) except Exception as e: logger.debug(f"[WGCOMPANY] Error parsing listing: {e}") continue # Deduplicate seen_ids = set() unique_listings = [] for listing in listings: if listing["id"] not in seen_ids: seen_ids.add(listing["id"]) unique_listings.append(listing) await page.close() logger.info(f"[WGCOMPANY] Fetched {len(unique_listings)} unique listings") return unique_listings except Exception as e: logger.error(f"[WGCOMPANY] Error fetching listings: {e}") return [] def load_previous_listings(self): if self.listings_file.exists(): with open(self.listings_file, 'r') as f: data = json.load(f) logger.debug(f"[WG] Loaded {len(data)} previous listings") return data return {} def save_listings(self, listings: list[dict]) -> None: listings_dict = {l['id']: l for l in listings} logger.debug(f"[WG] Saving {len(listings_dict)} listings") with open(self.listings_file, 'w') as f: json.dump(listings_dict, f, indent=2, ensure_ascii=False) def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]: new = [] for listing in current: if listing['id'] not in previous: new.append(listing) if new: logger.info(f"[WG] {len(new)} new listing{'s' if len(new) > 1 else ''} detected") return new def log_listing_times(self, new_listings): if not new_listings: return import csv file_exists = WGCOMPANY_TIMING_FILE.exists() with open(WGCOMPANY_TIMING_FILE, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) if not file_exists: writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"]) now = datetime.now() for listing in new_listings: writer.writerow([ now.isoformat(), now.strftime("%A"), now.hour, now.minute, listing["rooms"], listing["size"], listing["price"], listing["address"], listing['id'] ]) logger.debug(f"[WG] Logged {len(new_listings)} to CSV") async def notify_new_listings(self, new_listings: list[dict]) -> None: if not new_listings or not self.telegram_bot: return for idx, listing in enumerate(new_listings, start=1): try: message = ( f"[WG-Company] Neues WG-Zimmer!\n\n" f"🚪 {listing['rooms']}\n" f"📏 {listing['size']}\n" f"💰 {listing['price']}\n" f"📍 {listing['address']}\n\n" f"👉 Zum Angebot" ) loop = self.telegram_bot.event_loop or asyncio.get_event_loop() asyncio.run_coroutine_threadsafe(self.telegram_bot._send_message(message), loop) await asyncio.sleep(0.5) except Exception as e: logger.error(f"[WG] Telegram failed for listing {idx}: {str(e)[:50]}") async def run(self): await self.init_browser() while True: listings = await self.fetch_listings() previous = self.load_previous_listings() new_listings = self.find_new_listings(listings, previous) if new_listings: logger.info(f"[WGCOMPANY] Found {len(new_listings)} new listing(s)") self.log_listing_times(new_listings) await self.notify_new_listings(new_listings) else: logger.info("[WGCOMPANY] No new listings") self.save_listings(listings) await asyncio.sleep(self.refresh_minutes * 60)