from abc import ABC, abstractmethod from playwright.async_api import Page import logging import asyncio import html import re import hashlib from datetime import datetime import traceback from pathlib import Path logger = logging.getLogger(__name__) DATA_DIR = Path("data") class BaseHandler(ABC): def __init__(self, context, email=None, password=None): self.context = context self.email = email self.password = password @abstractmethod async def apply(self, listing: dict, result: dict) -> dict: """Abstract method to handle the application process for a specific company.""" pass async def handle_cookies(self, page: Page): """Handle cookie banners if present.""" try: cookie_selectors = [ 'button:has-text("Akzeptieren")', 'button:has-text("Alle akzeptieren")', '#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll' ] for sel in cookie_selectors: cookie_btn = await page.query_selector(sel) if cookie_btn and await cookie_btn.is_visible(): await cookie_btn.click() logger.info("[BaseHandler] Dismissed cookie banner") await asyncio.sleep(1) break except Exception as e: logger.warning(f"[BaseHandler] Failed to handle cookies: {e}") async def handle_consent(self, page: Page): """Handle consent manager banners if present.""" try: consent_selectors = [ '#cmpbntyestxt', '.cmpboxbtnyes', 'a.cmpboxbtn.cmpboxbtnyes', '#cmpwelcomebtnyes', '.cmptxt_btn_yes' ] for sel in consent_selectors: consent_btn = await page.query_selector(sel) if consent_btn and await consent_btn.is_visible(): await consent_btn.click() logger.info("[BaseHandler] Dismissed consent manager") await asyncio.sleep(1) break except Exception as e: logger.warning(f"[BaseHandler] Failed to handle consent manager: {e}") async def log_listing_details(self, listing: dict): """Log details of the listing being processed.""" logger.info(f"[BaseHandler] Processing listing: {listing}") async def login(self, page): """Login to inberlinwohnen.de""" if not self.email or not self.password: logger.warning("No credentials provided, using public listings") return False try: await page.goto("https://www.inberlinwohnen.de/login", wait_until="networkidle") # Handle cookie/privacy modal first await self.handle_cookies(page) # Fill login form await page.fill('input[name="email"], input[type="email"]', self.email) await page.fill('input[name="password"], input[type="password"]', self.password) # Click submit button await page.click('button[type="submit"], input[type="submit"]') # Wait for navigation await page.wait_for_load_state("networkidle") await asyncio.sleep(2) # Check if login successful if "mein-bereich" in page.url or await page.query_selector('text="Abmelden"'): logger.info("Login successful") return True else: logger.error(f"Login failed - ended up at {page.url}") return False except Exception as e: logger.error(f"Login error: {e}") return False async def fetch_listings(self, logged_in: bool) -> list[dict]: """Fetch listings from the Wohnungsfinder""" listings = [] try: page = await self.context.new_page() # Use personal Wohnungsfinder when logged in to see filtered listings url = "https://www.inberlinwohnen.de/mein-bereich/wohnungsfinder" if logged_in else "https://www.inberlinwohnen.de/wohnungsfinder/" logger.info(f"Fetching listings from {url}") await page.goto(url, wait_until="networkidle") # Handle cookie modal if not logged in if not logged_in: await self.handle_cookies(page) # Wait for dynamic content to load - look for listing text pattern try: await page.wait_for_selector('text=/\\d,\\d\\s*Zimmer/', timeout=15000) logger.info("Listings content loaded") except: logger.warning("Timeout waiting for listings content") # Additional wait for initial listings to render await asyncio.sleep(2) # Collect all listings content by clicking through pagination all_content = "" page_num = 1 max_pages = 10 # Safety limit while page_num <= max_pages: current_content = await page.content() all_content += current_content next_btn = await page.query_selector('[wire\\:click*="nextPage"]') if next_btn and await next_btn.is_visible(): await next_btn.click() await asyncio.sleep(2) # Wait for Livewire to update page_num += 1 else: break logger.info(f"Collected content from {page_num} page(s)") # Debug: save HTML to file for inspection debug_path = DATA_DIR / "debug_page.html" with open(debug_path, "w", encoding="utf-8") as f: f.write(all_content) logger.info(f"Saved debug HTML to {debug_path}") # Decode HTML entities and JSON escaped slashes for extraction content_decoded = html.unescape(all_content).replace('\\/', '/') # Build flatId -> deeplink mapping from wire:snapshot JSON data deeplink_pattern = r'"deeplink":"(https://[^"]+)","flatId":(\d+)' deeplink_matches = re.findall(deeplink_pattern, content_decoded) id_to_link = {flat_id: link for link, flat_id in deeplink_matches} logger.info(f"Found {len(id_to_link)} deeplink mappings") # Extract listings from button elements with aria-label button_pattern = r'@click="open !== (\d+)[^\"]*"[^>]*aria-label="Wohnungsangebot - ([^\"]+)' button_matches = re.findall(button_pattern, content_decoded) logger.info(f"Found {len(button_matches)} listing buttons") for flat_id, listing_text in button_matches: parts_match = re.match(r'(\d,\d)\\s*Zimmer,\\s*([\d,]+)\\s*m²,\\s*([\d.,]+)\\s*€\\s*(?:Kaltmiete\\s*)?\\|\\s*(.+)', listing_text) if not parts_match: continue rooms, size, price, address = parts_match.groups() rooms = rooms.strip() address = address.strip() if len(address) < 5: continue detail_link = id_to_link.get(flat_id, url) listing_id = hashlib.md5(f"{rooms}{size}{price}{address}".encode()).hexdigest()[:12] listings.append({ "id": listing_id, "rooms": f"{rooms} Zimmer", "size": f"{size} m²", "price": f"{price} €", "address": address, "link": detail_link, "fetched_at": datetime.now().isoformat() }) # Deduplicate by id seen_ids = set() unique_listings = [] for listing in listings: if listing["id"] not in seen_ids: seen_ids.add(listing["id"]) unique_listings.append(listing) listings = unique_listings await page.close() logger.info(f"Fetched {len(listings)} unique listings") return listings except Exception as e: logger.error(f"Error fetching listings: {e}") import traceback logger.error(traceback.format_exc()) return [] async def save_screenshot(self, page, filename): """Save a screenshot of the current page.""" screenshot_path = DATA_DIR / filename await page.screenshot(path=str(screenshot_path)) logger.info(f"Saved screenshot to {screenshot_path}") async def save_html(self, page, filename): """Save the HTML content of the current page.""" html_path = DATA_DIR / filename content = await page.content() with open(html_path, "w", encoding="utf-8") as f: f.write(content) logger.info(f"Saved HTML to {html_path}") async def log_buttons(self, page): """Log the text of buttons on the current page.""" buttons = await page.query_selector_all('button, a.btn, a[class*="button"]') for btn in buttons[:10]: try: text = await btn.inner_text() logger.info(f"Found button: {text[:50]}") except Exception as e: logger.debug(f"Error logging button text: {e}") async def handle_exception(self, e): """Log an exception with traceback.""" logger.error(f"Exception: {str(e)}") logger.error(traceback.format_exc())