import asyncio import logging import hashlib import re import csv from datetime import datetime from pathlib import Path import json import os from typing import Optional from playwright.async_api import async_playwright, Browser, BrowserContext, Playwright logger = logging.getLogger(__name__) WGCOMPANY_LISTINGS_FILE = Path("data/wgcompany_listings.json") WGCOMPANY_TIMING_FILE = Path("data/wgcompany_times.csv") CONTACTS_FILE = Path("data/contacts.csv") # Environment variables for search filters WGCOMPANY_MIN_SIZE = os.environ.get("WGCOMPANY_MIN_SIZE", "") WGCOMPANY_MAX_PRICE = os.environ.get("WGCOMPANY_MAX_PRICE", "") WGCOMPANY_AGE = os.environ.get("WGCOMPANY_AGE", "") WGCOMPANY_SMOKER = os.environ.get("WGCOMPANY_SMOKER", "") WGCOMPANY_BEZIRK = os.environ.get("WGCOMPANY_BEZIRK", "0") class WGCompanyNotifier: def __init__(self, telegram_bot=None, refresh_minutes=10): self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None self.playwright: Optional[Playwright] = None self.telegram_bot = telegram_bot self.refresh_minutes = refresh_minutes async def init_browser(self): if self.browser is None: self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch(headless=True) self.context = await self.browser.new_context() logger.debug("[WG] Browser ready") async def fetch_listings(self): await self.init_browser() listings = [] try: assert self.context is not None, "Browser context not initialized" page = await self.context.new_page() search_url = "http://www.wgcompany.de/cgi-bin/seite?st=1&mi=10&li=100" logger.info(f"[WGCOMPANY] Loading search page: {search_url}") await page.goto(search_url, wait_until="networkidle") await asyncio.sleep(2) if WGCOMPANY_MIN_SIZE: min_size_field = await page.query_selector('input[name="c"]') if min_size_field: await min_size_field.fill(WGCOMPANY_MIN_SIZE) if WGCOMPANY_MAX_PRICE: max_price_field = await page.query_selector('input[name="a"]') if max_price_field: await max_price_field.fill(WGCOMPANY_MAX_PRICE) if WGCOMPANY_AGE: age_field = await page.query_selector('input[name="l"]') if age_field: await age_field.fill(WGCOMPANY_AGE) if WGCOMPANY_SMOKER: smoker_select = await page.query_selector('select[name="o"]') if smoker_select: await smoker_select.select_option(WGCOMPANY_SMOKER) if WGCOMPANY_BEZIRK and WGCOMPANY_BEZIRK != "0": bezirk_select = await page.query_selector('select[name="e"]') if bezirk_select: await bezirk_select.select_option(WGCOMPANY_BEZIRK) submit_btn = await page.query_selector('input[type="submit"][value*="finde"], input[type="submit"]') if submit_btn: await submit_btn.click() await page.wait_for_load_state("networkidle") await asyncio.sleep(2) content = await page.content() with open("data/wgcompany_debug.html", "w", encoding="utf-8") as f: f.write(content) listing_links = await page.query_selector_all('a[href*="wg.pl"][href*="wgzeigen"]') logger.info(f"[WGCOMPANY] Found {len(listing_links)} listing links") for link_elem in listing_links: try: href = await link_elem.get_attribute("href") if not href: continue parent = await link_elem.evaluate_handle("el => el.closest('tr') || el.parentElement") row_text = await parent.evaluate("el => el.innerText") if parent else "" price_match = re.search(r'(\d+)\s*€', row_text) price = price_match.group(1) + " €" if price_match else "?" size_match = re.search(r'(\d+)\s*m²', row_text) size = size_match.group(1) + " m²" if size_match else "?" bezirk_patterns = [ "Kreuzberg", "Neukölln", "Friedrichshain", "Prenzlauer Berg", "Mitte", "Wedding", "Charlottenburg", "Schöneberg", "Tempelhof", "Steglitz", "Wilmersdorf", "Pankow", "Lichtenberg", "Treptow", "Köpenick", "Reinickendorf", "Spandau", "Zehlendorf", "Moabit" ] location = "Berlin" for bez in bezirk_patterns: if bez.lower() in row_text.lower(): location = bez break if not href.startswith("http"): href = f"http://www.wgcompany.de{href}" if href.startswith("/") else f"http://www.wgcompany.de/cgi-bin/{href}" listing_id = hashlib.md5(f"{href}{price}{size}".encode()).hexdigest()[:12] listings.append({ "id": listing_id, "rooms": "1 Zimmer (WG)", "size": size, "price": price, "address": location, "link": href, "source": "wgcompany", "fetched_at": datetime.now().isoformat() }) except Exception as e: logger.debug(f"[WGCOMPANY] Error parsing listing: {e}") continue # Deduplicate seen_ids = set() unique_listings = [] for listing in listings: if listing["id"] not in seen_ids: seen_ids.add(listing["id"]) unique_listings.append(listing) await page.close() logger.info(f"[WGCOMPANY] Fetched {len(unique_listings)} unique listings") return unique_listings except Exception as e: logger.error(f"[WGCOMPANY] Error fetching listings: {e}") return [] def load_previous_listings(self): if WGCOMPANY_LISTINGS_FILE.exists(): with open(WGCOMPANY_LISTINGS_FILE, 'r') as f: data = json.load(f) logger.debug(f"[WG] Loaded {len(data)} previous listings") return data return {} def save_listings(self, listings: list[dict]) -> None: listings_dict = {l['id']: l for l in listings} logger.debug(f"[WG] Saving {len(listings_dict)} listings") with open(WGCOMPANY_LISTINGS_FILE, 'w') as f: json.dump(listings_dict, f, indent=2, ensure_ascii=False) def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]: new = [] for listing in current: if listing['id'] not in previous: new.append(listing) if new: logger.info(f"[WG] {len(new)} new listing{'s' if len(new) > 1 else ''} detected") return new async def fetch_listing_details(self, listing_url: str) -> dict: """Fetch detailed information from a listing page including email.""" details = { "email": "", "contact_person": "", "address": "", "description": "", "wg_name": "" } try: assert self.context is not None, "Browser context not initialized" page = await self.context.new_page() await page.goto(listing_url, wait_until="networkidle") await asyncio.sleep(1) content = await page.content() # Extract email (look for patterns like email: xxx@yyy.zz or Email: xxx) email_patterns = [ r'[Ee]-?[Mm]ail[:\s]+([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})' ] for pattern in email_patterns: email_match = re.search(pattern, content) if email_match: details["email"] = email_match.group(1) break # Extract WG name from URL wg_match = re.search(r'wg=([^&]+)', listing_url) if wg_match: details["wg_name"] = wg_match.group(1) # Try to extract address or location details text = await page.inner_text('body') # Look for address patterns addr_patterns = [ r'((?:[A-ZÄÖÜ][a-zäöüß]+(?:straße|str\.|platz|weg|allee))\s*\d+)', r'Adresse[:\s]+([^\n]+)', r'Lage[:\s]+([^\n]+)' ] for pattern in addr_patterns: addr_match = re.search(pattern, text, re.IGNORECASE) if addr_match: details["address"] = addr_match.group(1).strip() break # Extract contact person name if available contact_patterns = [ r'Kontakt[:\s]+([A-ZÄÖÜ][a-zäöüß]+(?:\s+[A-ZÄÖÜ][a-zäöüß]+)?)', r'Ansprechpartner[:\s]+([A-ZÄÖÜ][a-zäöüß]+(?:\s+[A-ZÄÖÜ][a-zäöüß]+)?)', ] for pattern in contact_patterns: contact_match = re.search(pattern, text) if contact_match: details["contact_person"] = contact_match.group(1).strip() break await page.close() logger.debug(f"[WG] Fetched details: email={details['email']}, wg={details['wg_name']}") except Exception as e: logger.error(f"[WG] Error fetching listing details: {e}") return details def log_listing_times(self, new_listings): if not new_listings: return file_exists = WGCOMPANY_TIMING_FILE.exists() with open(WGCOMPANY_TIMING_FILE, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) if not file_exists: writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"]) now = datetime.now() for listing in new_listings: writer.writerow([ now.isoformat(), now.strftime("%A"), now.hour, now.minute, listing["rooms"], listing["size"], listing["price"], listing["address"], listing['id'] ]) logger.debug(f"[WG] Logged {len(new_listings)} to CSV") async def save_to_contacts(self, listing: dict, details: dict) -> None: """Save new listing to contacts.csv with details.""" try: # Check if contacts file exists, create with header if not file_exists = CONTACTS_FILE.exists() # Read existing contacts to avoid duplicates existing_urls = set() if file_exists: with open(CONTACTS_FILE, 'r', newline='', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: if row.get('ListingLink'): existing_urls.add(row['ListingLink']) # Skip if already exists if listing['link'] in existing_urls: logger.debug(f"[WG] Listing already in contacts: {listing['link']}") return # Prepare row data wg_name = details.get('wg_name', '') contact_person = details.get('contact_person', '') address_full = details.get('address', '') or listing.get('address', '') # Combine room info with listing details for Notes notes = f"{listing.get('size', '')} / {listing.get('rooms', '')}; {listing.get('price', '')}" row = { 'Name': f"WG {wg_name}" if wg_name else "WG (unnamed)", 'ContactPerson': contact_person, 'Platform': 'WGcompany', 'DateContacted': '', # Empty - user will fill when contacting 'Address': address_full, 'ListingLink': listing['link'], 'ContactMethod': f"email: {details.get('email', '')}" if details.get('email') else 'wgcompany message', 'Response': '', 'FollowUpDate': '', 'PreferredMoveIn': '', 'Notes': notes, 'Status': 'open' } # Append to CSV with open(CONTACTS_FILE, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=[ 'Name', 'ContactPerson', 'Platform', 'DateContacted', 'Address', 'ListingLink', 'ContactMethod', 'Response', 'FollowUpDate', 'PreferredMoveIn', 'Notes', 'Status' ]) if not file_exists: writer.writeheader() writer.writerow(row) logger.info(f"[WG] Saved to contacts.csv: {row['Name']} - {details.get('email', 'no email')}") except Exception as e: logger.error(f"[WG] Error saving to contacts: {e}") async def notify_new_listings(self, new_listings: list[dict]) -> None: if not new_listings or not self.telegram_bot: return for idx, listing in enumerate(new_listings, start=1): try: message = ( f"[WG-Company] Neues WG-Zimmer!\n\n" f"🚪 {listing['rooms']}\n" f"📏 {listing['size']}\n" f"💰 {listing['price']}\n" f"📍 {listing['address']}\n\n" f"👉 Zum Angebot" ) loop = self.telegram_bot.event_loop or asyncio.get_event_loop() asyncio.run_coroutine_threadsafe(self.telegram_bot._send_message(message), loop) await asyncio.sleep(0.5) except Exception as e: logger.error(f"[WG] Telegram failed for listing {idx}: {str(e)[:50]}") async def run(self): await self.init_browser() while True: listings = await self.fetch_listings() previous = self.load_previous_listings() new_listings = self.find_new_listings(listings, previous) if new_listings: logger.info(f"[WGCOMPANY] Found {len(new_listings)} new listing(s)") self.log_listing_times(new_listings) # Fetch details and save to contacts for each new listing for listing in new_listings: details = await self.fetch_listing_details(listing['link']) await self.save_to_contacts(listing, details) await asyncio.sleep(1) # Be polite with requests await self.notify_new_listings(new_listings) else: logger.info("[WGCOMPANY] No new listings") self.save_listings(listings) await asyncio.sleep(self.refresh_minutes * 60)