wohnbot/handlers/wgcompany_notifier.py
2026-01-08 21:04:43 +01:00

358 lines
16 KiB
Python

import asyncio
import logging
import hashlib
import re
import csv
from datetime import datetime
from pathlib import Path
import json
import os
from typing import Optional
from playwright.async_api import async_playwright, Browser, BrowserContext, Playwright
logger = logging.getLogger(__name__)
WGCOMPANY_LISTINGS_FILE = Path("data/wgcompany_listings.json")
WGCOMPANY_TIMING_FILE = Path("data/wgcompany_times.csv")
CONTACTS_FILE = Path("data/contacts.csv")
# Environment variables for search filters
WGCOMPANY_MIN_SIZE = os.environ.get("WGCOMPANY_MIN_SIZE", "")
WGCOMPANY_MAX_PRICE = os.environ.get("WGCOMPANY_MAX_PRICE", "")
WGCOMPANY_AGE = os.environ.get("WGCOMPANY_AGE", "")
WGCOMPANY_SMOKER = os.environ.get("WGCOMPANY_SMOKER", "")
WGCOMPANY_BEZIRK = os.environ.get("WGCOMPANY_BEZIRK", "0")
class WGCompanyNotifier:
def __init__(self, telegram_bot=None, refresh_minutes=10):
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.playwright: Optional[Playwright] = None
self.telegram_bot = telegram_bot
self.refresh_minutes = refresh_minutes
async def init_browser(self):
if self.browser is None:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
self.context = await self.browser.new_context()
logger.debug("[WG] Browser ready")
async def fetch_listings(self):
await self.init_browser()
listings = []
try:
assert self.context is not None, "Browser context not initialized"
page = await self.context.new_page()
search_url = "http://www.wgcompany.de/cgi-bin/seite?st=1&mi=10&li=100"
logger.info(f"[WGCOMPANY] Loading search page: {search_url}")
await page.goto(search_url, wait_until="networkidle")
await asyncio.sleep(2)
if WGCOMPANY_MIN_SIZE:
min_size_field = await page.query_selector('input[name="c"]')
if min_size_field:
await min_size_field.fill(WGCOMPANY_MIN_SIZE)
if WGCOMPANY_MAX_PRICE:
max_price_field = await page.query_selector('input[name="a"]')
if max_price_field:
await max_price_field.fill(WGCOMPANY_MAX_PRICE)
if WGCOMPANY_AGE:
age_field = await page.query_selector('input[name="l"]')
if age_field:
await age_field.fill(WGCOMPANY_AGE)
if WGCOMPANY_SMOKER:
smoker_select = await page.query_selector('select[name="o"]')
if smoker_select:
await smoker_select.select_option(WGCOMPANY_SMOKER)
if WGCOMPANY_BEZIRK and WGCOMPANY_BEZIRK != "0":
bezirk_select = await page.query_selector('select[name="e"]')
if bezirk_select:
await bezirk_select.select_option(WGCOMPANY_BEZIRK)
submit_btn = await page.query_selector('input[type="submit"][value*="finde"], input[type="submit"]')
if submit_btn:
await submit_btn.click()
await page.wait_for_load_state("networkidle")
await asyncio.sleep(2)
content = await page.content()
with open("data/wgcompany_debug.html", "w", encoding="utf-8") as f:
f.write(content)
listing_links = await page.query_selector_all('a[href*="wg.pl"][href*="wgzeigen"]')
logger.info(f"[WGCOMPANY] Found {len(listing_links)} listing links")
for link_elem in listing_links:
try:
href = await link_elem.get_attribute("href")
if not href:
continue
parent = await link_elem.evaluate_handle("el => el.closest('tr') || el.parentElement")
row_text = await parent.evaluate("el => el.innerText") if parent else ""
price_match = re.search(r'(\d+)\s*€', row_text)
price = price_match.group(1) + "" if price_match else "?"
size_match = re.search(r'(\d+)\s*m²', row_text)
size = size_match.group(1) + "" if size_match else "?"
bezirk_patterns = [
"Kreuzberg", "Neukölln", "Friedrichshain", "Prenzlauer Berg",
"Mitte", "Wedding", "Charlottenburg", "Schöneberg", "Tempelhof",
"Steglitz", "Wilmersdorf", "Pankow", "Lichtenberg", "Treptow",
"Köpenick", "Reinickendorf", "Spandau", "Zehlendorf", "Moabit"
]
location = "Berlin"
for bez in bezirk_patterns:
if bez.lower() in row_text.lower():
location = bez
break
if not href.startswith("http"):
href = f"http://www.wgcompany.de{href}" if href.startswith("/") else f"http://www.wgcompany.de/cgi-bin/{href}"
listing_id = hashlib.md5(f"{href}{price}{size}".encode()).hexdigest()[:12]
listings.append({
"id": listing_id,
"rooms": "1 Zimmer (WG)",
"size": size,
"price": price,
"address": location,
"link": href,
"source": "wgcompany",
"fetched_at": datetime.now().isoformat()
})
except Exception as e:
logger.debug(f"[WGCOMPANY] Error parsing listing: {e}")
continue
# Deduplicate
seen_ids = set()
unique_listings = []
for listing in listings:
if listing["id"] not in seen_ids:
seen_ids.add(listing["id"])
unique_listings.append(listing)
await page.close()
logger.info(f"[WGCOMPANY] Fetched {len(unique_listings)} unique listings")
return unique_listings
except Exception as e:
logger.error(f"[WGCOMPANY] Error fetching listings: {e}")
return []
def load_previous_listings(self):
if WGCOMPANY_LISTINGS_FILE.exists():
with open(WGCOMPANY_LISTINGS_FILE, 'r') as f:
data = json.load(f)
logger.debug(f"[WG] Loaded {len(data)} previous listings")
return data
return {}
def save_listings(self, listings: list[dict]) -> None:
listings_dict = {l['id']: l for l in listings}
logger.debug(f"[WG] Saving {len(listings_dict)} listings")
with open(WGCOMPANY_LISTINGS_FILE, 'w') as f:
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
new = []
for listing in current:
if listing['id'] not in previous:
new.append(listing)
if new:
logger.info(f"[WG] {len(new)} new listing{'s' if len(new) > 1 else ''} detected")
return new
async def fetch_listing_details(self, listing_url: str) -> dict:
"""Fetch detailed information from a listing page including email."""
details = {
"email": "",
"contact_person": "",
"address": "",
"description": "",
"wg_name": ""
}
try:
assert self.context is not None, "Browser context not initialized"
page = await self.context.new_page()
await page.goto(listing_url, wait_until="networkidle")
await asyncio.sleep(1)
content = await page.content()
# Extract email (look for patterns like email: xxx@yyy.zz or Email: xxx)
# Priority: Look for email in table cell context (WG-specific email), exclude footer email
email_patterns = [
r'email\s*:\s*</font></b></td>\s*<td[^>]*>.*?mailto:([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', # Table cell email
r'<a href="mailto:([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})">', # Any mailto link
r'[Ee]-?[Mm]ail[:\s]+([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', # Plain email: pattern
]
for pattern in email_patterns:
email_matches = re.finditer(pattern, content, re.IGNORECASE | re.DOTALL)
for match in email_matches:
email = match.group(1)
# Exclude the footer/contact email
if email != "wgcompany@wgcompany.de":
details["email"] = email
break
if "email" in details:
break
# Extract WG name from URL
wg_match = re.search(r'wg=([^&]+)', listing_url)
if wg_match:
details["wg_name"] = wg_match.group(1)
# Try to extract address or location details
text = await page.inner_text('body')
# Look for address patterns
addr_patterns = [
r'((?:[A-ZÄÖÜ][a-zäöüß]+(?:straße|str\.|platz|weg|allee))\s*\d+)',
r'Adresse[:\s]+([^\n]+)',
r'Lage[:\s]+([^\n]+)'
]
for pattern in addr_patterns:
addr_match = re.search(pattern, text, re.IGNORECASE)
if addr_match:
details["address"] = addr_match.group(1).strip()
break
# Extract contact person name if available
contact_patterns = [
r'Kontakt[:\s]+([A-ZÄÖÜ][a-zäöüß]+(?:\s+[A-ZÄÖÜ][a-zäöüß]+)?)',
r'Ansprechpartner[:\s]+([A-ZÄÖÜ][a-zäöüß]+(?:\s+[A-ZÄÖÜ][a-zäöüß]+)?)',
]
for pattern in contact_patterns:
contact_match = re.search(pattern, text)
if contact_match:
details["contact_person"] = contact_match.group(1).strip()
break
await page.close()
logger.debug(f"[WG] Fetched details: email={details['email']}, wg={details['wg_name']}")
except Exception as e:
logger.error(f"[WG] Error fetching listing details: {e}")
return details
def log_listing_times(self, new_listings):
if not new_listings:
return
file_exists = WGCOMPANY_TIMING_FILE.exists()
with open(WGCOMPANY_TIMING_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
now = datetime.now()
for listing in new_listings:
writer.writerow([
now.isoformat(),
now.strftime("%A"),
now.hour,
now.minute,
listing["rooms"],
listing["size"],
listing["price"],
listing["address"],
listing['id']
])
logger.debug(f"[WG] Logged {len(new_listings)} to CSV")
async def save_to_contacts(self, listing: dict, details: dict) -> None:
"""Save new listing to contacts.csv with details."""
try:
# Check if contacts file exists, create with header if not
file_exists = CONTACTS_FILE.exists()
# Read existing contacts to avoid duplicates
existing_urls = set()
if file_exists:
with open(CONTACTS_FILE, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if row.get('ListingLink'):
existing_urls.add(row['ListingLink'])
# Skip if already exists
if listing['link'] in existing_urls:
logger.debug(f"[WG] Listing already in contacts: {listing['link']}")
return
# Prepare row data
wg_name = details.get('wg_name', '')
contact_person = details.get('contact_person', '')
address_full = details.get('address', '') or listing.get('address', '')
# Combine room info with listing details for Notes
notes = f"{listing.get('size', '')} / {listing.get('rooms', '')}; {listing.get('price', '')}"
row = {
'Name': f"WG {wg_name}" if wg_name else "WG (unnamed)",
'ContactPerson': contact_person,
'Platform': 'WGcompany',
'DateContacted': '', # Empty - user will fill when contacting
'Address': address_full,
'ListingLink': listing['link'],
'ContactMethod': f"email: {details.get('email', '')}" if details.get('email') else 'wgcompany message',
'Response': '',
'FollowUpDate': '',
'PreferredMoveIn': '',
'Notes': notes,
'Status': 'open'
}
# Append to CSV
with open(CONTACTS_FILE, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'Name', 'ContactPerson', 'Platform', 'DateContacted', 'Address',
'ListingLink', 'ContactMethod', 'Response', 'FollowUpDate',
'PreferredMoveIn', 'Notes', 'Status'
])
if not file_exists:
writer.writeheader()
writer.writerow(row)
logger.info(f"[WG] Saved to contacts.csv: {row['Name']} - {details.get('email', 'no email')}")
except Exception as e:
logger.error(f"[WG] Error saving to contacts: {e}")
async def notify_new_listings(self, new_listings: list[dict]) -> None:
if not new_listings or not self.telegram_bot:
return
for idx, listing in enumerate(new_listings, start=1):
try:
message = (
f"<b>[WG-Company] Neues WG-Zimmer!</b>\n\n"
f"🚪 <b>{listing['rooms']}</b>\n"
f"📏 {listing['size']}\n"
f"💰 {listing['price']}\n"
f"📍 {listing['address']}\n\n"
f"👉 <a href=\"{listing['link']}\">Zum Angebot</a>"
)
loop = self.telegram_bot.event_loop or asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self.telegram_bot._send_message(message), loop)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"[WG] Telegram failed for listing {idx}: {str(e)[:50]}")
async def run(self):
await self.init_browser()
while True:
listings = await self.fetch_listings()
previous = self.load_previous_listings()
new_listings = self.find_new_listings(listings, previous)
if new_listings:
logger.info(f"[WGCOMPANY] Found {len(new_listings)} new listing(s)")
self.log_listing_times(new_listings)
# Fetch details and save to contacts for each new listing
for listing in new_listings:
details = await self.fetch_listing_details(listing['link'])
await self.save_to_contacts(listing, details)
await asyncio.sleep(1) # Be polite with requests
await self.notify_new_listings(new_listings)
else:
logger.info("[WGCOMPANY] No new listings")
self.save_listings(listings)
await asyncio.sleep(self.refresh_minutes * 60)