from datetime import datetime from handlers.base_handler import BaseHandler from handlers.howoge_handler import HowogeHandler from handlers.gewobag_handler import GewobagHandler from handlers.degewo_handler import DegewoHandler from handlers.gesobau_handler import GesobauHandler from handlers.stadtundland_handler import StadtUndLandHandler from handlers.wbm_handler import WBMHandler import json from pathlib import Path import pandas as pd from typing import Optional import matplotlib.pyplot as plt import matplotlib.dates as mdates import logging import matplotlib import matplotlib.font_manager as fm import seaborn as sns import html import re import hashlib import asyncio from playwright.async_api import async_playwright import os STATE_FILE = Path("data/state.json") APPLICATIONS_FILE = Path("data/applications.json") TIMING_FILE = Path("data/listing_times.csv") LISTINGS_FILE = Path("data/listings.json") DATA_DIR = Path("data") # --- Matplotlib & Seaborn Setup --- font_cache_dir = Path("data/fonts") font_cache_dir.mkdir(parents=True, exist_ok=True) matplotlib.get_configdir = lambda: str(font_cache_dir) fm.findSystemFonts(fontpaths=str(font_cache_dir), fontext='ttf') matplotlib.rcParams['font.family'] = 'Noto Sans' # Configure seaborn for beautiful plots sns.set_theme(style="whitegrid", palette="deep") sns.set_context("notebook", font_scale=1.1) matplotlib.rcParams['figure.dpi'] = 300 matplotlib.rcParams['savefig.dpi'] = 300 matplotlib.rcParams['figure.facecolor'] = 'white' # Use the root logger for consistency with main.py logger = logging.getLogger() class ApplicationHandler: """ Main handler for apartment monitoring, application automation, and notification logic. Handles browser automation, listing extraction, application delegation, and Telegram notifications. """ def __init__(self, browser_context, state_manager, applications_file: Optional[Path] = None): if browser_context is None: raise ValueError("browser_context must not be None. ApplicationHandler requires a valid Playwright context.") self.context = browser_context self.state_manager = state_manager self.applications_file = applications_file or APPLICATIONS_FILE self.handlers = { "howoge": HowogeHandler(browser_context), "gewobag": GewobagHandler(browser_context), "degewo": DegewoHandler(browser_context), "gesobau": GesobauHandler(browser_context), "stadtundland": StadtUndLandHandler(browser_context), "wbm": WBMHandler(browser_context), } def set_telegram_bot(self, telegram_bot) -> None: """Attach a TelegramBot instance for notifications.""" self.telegram_bot = telegram_bot def notify_new_listings(self, new_listings: list[dict], application_results: Optional[dict] = None) -> None: """ Send a Telegram notification for each new listing. Includes application result if autopilot was enabled. """ for listing in new_listings: link = listing.get('link', 'https://www.inberlinwohnen.de/wohnungsfinder/') company = self._detect_company(link) if company == "wgcompany": continue # skip WGCompany listings for main handler company_label = company.capitalize() if company != "unknown" else "Wohnung" message = ( f"[{company_label}] Neue Wohnung!\n\n" f"🚪 {listing['rooms']}\n" f"📏 {listing['size']}\n" f"💰 {listing['price']}\n" f"📍 {listing['address']}\n\n" f"👉 Alle Details" ) # Always show autopilot/apply status for clarity if application_results is not None: if listing["id"] in application_results: result = application_results[listing["id"]] # Skip already-applied listings (no notification needed) if result.get("skipped"): logger.debug(f"Skip notification for already-applied: {listing['address']}") continue # Skip to next listing if result["success"]: message += f"\n\n\ud83e\udd16 Auto-applied! ({result['company']})" if result["message"]: message += f"\n{result['message']}" else: # Handler attempted but failed fail_msg = result.get("message") or "Unknown error during application." message += f"\n\n\u26a0\ufe0f Auto-apply failed ({result['company']})" message += f"\nReason: {html.escape(fail_msg)}" else: # Should not happen if logic is correct, but fallback # Save as failed so /retryfailed can retry later message += "\n\n\u2139\ufe0f No application attempted (internal logic error)" failed_result = { "listing_id": listing["id"], "company": company, "link": link, "timestamp": listing.get("timestamp", ""), "success": False, "message": "Internal logic error: listing not in application_results", "address": listing.get("address", ""), "rooms": listing.get("rooms", ""), "price": listing.get("price", ""), "retries": 0 } self.save_application(failed_result) logger.warning(f"[INTERNAL ERROR] Saved as failed: {listing['id']} - {listing.get('address', '')}") else: # Autopilot was off or not attempted at all message += "\n\n\u2139\ufe0f No application attempted (autopilot off)" # Send via TelegramBot if available if hasattr(self, 'telegram_bot') and self.telegram_bot: loop = getattr(self.telegram_bot, 'event_loop', None) or asyncio.get_event_loop() asyncio.run_coroutine_threadsafe(self.telegram_bot._send_message(message), loop) else: logger.debug(f"[No Telegram] {listing['address']} ({listing['rooms']})") async def apply_to_listings(self, listings: list[dict]) -> dict: """ Apply to multiple listings (autopilot mode). Returns a dict of application results keyed by listing ID. """ results = {} # Fail fast if context is ever None (should never happen) if self.context is None: raise RuntimeError("browser_context is None in apply_to_listings. This should never happen.") for listing in listings: # Check if we've already successfully applied applications = self.load_applications() if listing["id"] in applications: app = applications[listing["id"]] if app.get("success", False): # Check if it's the same listing (same link) or a reused ID if app.get("link") == listing.get("link"): logger.debug(f"Skip (applied): {listing['address']}") # Mark as skipped so notify_new_listings knows not to send notification results[listing["id"]] = { "listing_id": listing["id"], "skipped": True, # Flag to prevent duplicate notifications } continue else: # Same ID but different link - companies reused the ID for a new listing logger.info(f"Reused ID detected for {listing['address']}: old link={app.get('link')}, new link={listing.get('link')}") # Treat as new listing and apply result = await self.apply(listing) results[listing["id"]] = result self.save_application(result) status = "[SUCCESS]" if result["success"] else "[FAILED]" logger.info(f"{status} {listing['address'][:30]}... | {result['message'][:50]}") await asyncio.sleep(2) return results def log_listing_times(self, new_listings: list[dict]) -> None: """ Log new listing appearance times to CSV for later analysis and pattern mining. Appends to data/listing_times.csv, creating header if needed. """ if not new_listings: return import csv TIMING_FILE = Path("data/listing_times.csv") file_exists = TIMING_FILE.exists() with open(TIMING_FILE, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) if not file_exists: writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"]) now = datetime.now() for listing in new_listings: writer.writerow([ now.isoformat(), now.strftime("%A"), # Weekday name now.hour, now.minute, listing["rooms"], listing["size"], listing["price"], listing["address"], listing["id"] ]) logger.debug(f"Logged {len(new_listings)} listings to CSV") # ...existing code... async def init_browser(self) -> None: """Initialize Playwright browser (minimal, like test script)""" if not hasattr(self, 'browser') or self.browser is None: self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch(headless=True) self.context = await self.browser.new_context( user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" ) logger.info("Browser initialized (minimal context)") self.application_handler = ApplicationHandler(self.context, self.state_manager) async def apply(self, listing: dict) -> dict: company = self._detect_company(listing.get("link", "")) handler = self.handlers.get(company) result = { "listing_id": listing.get("id"), "company": company, "link": listing.get("link"), "timestamp": datetime.now().isoformat(), "success": False, "message": "", "address": listing.get("address", ""), "rooms": listing.get("rooms", ""), "price": listing.get("price", "") } if handler: result = await handler.apply(listing, result) else: result["message"] = f"No handler found for company: {company}" return result def _detect_company(self, link: str) -> str: """Robust company detection logic, matching monitor.py as closely as possible.""" link = (link or "").lower() # Remove URL scheme and www for easier matching link = re.sub(r"^https?://(www\.)?", "", link) # Use domain-based matching, including subdomains if re.search(r"howoge\\.de", link): return "howoge" if re.search(r"gewobag\\.de", link): return "gewobag" if re.search(r"degewo\\.de", link): return "degewo" if re.search(r"gesobau\\.de", link): return "gesobau" if re.search(r"stadt-und-land\\.de|stadtundland\\.de", link): return "stadtundland" if re.search(r"wbm\\.de", link): return "wbm" # Also check for company in the path or query (legacy/edge cases) if re.search(r"howoge", link): return "howoge" if re.search(r"gewobag", link): return "gewobag" if re.search(r"degewo", link): return "degewo" if re.search(r"gesobau", link): return "gesobau" if re.search(r"stadt-und-land|stadtundland", link): return "stadtundland" if re.search(r"wbm", link): return "wbm" return "unknown" def load_state(self) -> dict: """Load persistent state""" if STATE_FILE.exists(): with open(STATE_FILE, "r") as f: return json.load(f) return {"autopilot": False} def save_state(self, state: dict) -> None: """Save persistent state""" with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2) def set_autopilot(self, enabled: bool) -> None: """Enable or disable autopilot mode""" self.state_manager.set_autopilot(enabled) def is_autopilot_enabled(self) -> bool: """Check if autopilot mode is enabled""" return self.state_manager.is_autopilot_enabled() def load_applications(self) -> dict: """Load application history.""" if self.applications_file.exists(): try: with open(self.applications_file, "r", encoding="utf-8") as f: return json.load(f) except json.JSONDecodeError: logger.error("Failed to decode applications file. Returning empty history.") return {} def save_application(self, result: dict) -> None: """Save an application result.""" applications = self.load_applications() applications[result["listing_id"]] = result with open(self.applications_file, "w", encoding="utf-8") as f: json.dump(applications, f, indent=2, ensure_ascii=False) def has_applied(self, listing_id: str) -> bool: """ Check if we've successfully applied to this listing. Only returns True if application was successful. Failed applications can be retried. """ applications = self.load_applications() if listing_id not in applications: return False app = applications[listing_id] # Only skip if application was successful # Failed applications (success=False) should be retried return app.get("success", False) def load_previous_listings(self) -> dict: """Load previously saved listings""" if LISTINGS_FILE.exists(): with open(LISTINGS_FILE, "r") as f: return json.load(f) return {} def save_listings(self, listings: list[dict]) -> None: """Save current listings""" listings_dict = {l["id"]: l for l in listings} with open(LISTINGS_FILE, "w") as f: json.dump(listings_dict, f, indent=2, ensure_ascii=False) def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]: """Find listings that are new since last check""" new = [] for listing in current: if listing["id"] not in previous: new.append(listing) return new def _generate_weekly_plot(self) -> str: """Generate a heatmap, bar chart, line chart, and summary of listings by day/hour, like monitor.py.""" plot_path = DATA_DIR / "weekly_plot.png" try: if not TIMING_FILE.exists(): logger.warning("No timing data file found") return "" df = pd.read_csv(TIMING_FILE) if len(df) < 1: logger.warning("Timing file is empty") return "" logger.info(f"Loaded {len(df)} listing records for plot") # Create day-hour matrix days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] # Count listings per day and hour heatmap_data = pd.DataFrame(0, index=days_order, columns=range(24)) for _, row in df.iterrows(): day = row['weekday'] hour = int(row['hour']) if day in days_order: # Use pd.to_numeric to ensure value is numeric before incrementing val = pd.to_numeric(heatmap_data.loc[day, hour], errors='coerce') if pd.isna(val): heatmap_data.loc[day, hour] = 1 else: heatmap_data.loc[day, hour] = int(val) + 1 # Create figure with two subplots fig, axes = plt.subplots(2, 2, figsize=(16, 12)) fig.suptitle('Listing Appearance Patterns', fontsize=18, fontweight='bold', y=0.995) # 1. Heatmap - Day vs Hour (using seaborn) ax1 = axes[0, 0] sns.heatmap(heatmap_data, cmap='RdYlGn_r', annot=False, fmt='d', cbar_kws={'label': 'Count'}, ax=ax1, linewidths=0.5, linecolor='gray') ax1.set_xlabel('Hour of Day', fontsize=11, fontweight='bold') ax1.set_ylabel('Day of Week', fontsize=11, fontweight='bold') ax1.set_title('Listings by Day & Hour', fontsize=12, fontweight='bold', pad=10) ax1.set_xticklabels(range(24), fontsize=9) ax1.set_yticklabels(days_order, rotation=0, fontsize=9) # 2. Bar chart - By day of week (seaborn style) ax2 = axes[0, 1] day_counts = df['weekday'].value_counts().reindex(days_order, fill_value=0) sns.barplot(x=range(7), y=day_counts.values, ax=ax2, palette='Blues_d', hue=range(7), legend=False) ax2.set_xticks(range(7)) ax2.set_xticklabels(['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'], fontsize=9) ax2.set_xlabel('Day of Week', fontsize=11, fontweight='bold') ax2.set_ylabel('Number of Listings', fontsize=11, fontweight='bold') ax2.set_title('Total Listings by Day', fontsize=12, fontweight='bold', pad=10) for i, v in enumerate(day_counts.values): if v > 0: ax2.text(i, v + 0.5, str(v), ha='center', fontsize=9, fontweight='bold') # 3. Line chart - By hour (seaborn style) ax3 = axes[1, 0] hour_counts = df['hour'].value_counts().reindex(range(24), fill_value=0) sns.lineplot(x=range(24), y=hour_counts.values, ax=ax3, marker='o', linewidth=2.5, markersize=6, color='#2E86AB') ax3.fill_between(range(24), hour_counts.values, alpha=0.2, color='#2E86AB') ax3.set_xticks(range(0, 24, 2)) ax3.set_xlabel('Hour of Day', fontsize=11, fontweight='bold') ax3.set_ylabel('Number of Listings', fontsize=11, fontweight='bold') ax3.set_title('Total Listings by Hour', fontsize=12, fontweight='bold', pad=10) ax3.grid(True, alpha=0.3, linestyle='--') # 4. Summary stats ax4 = axes[1, 1] ax4.axis('off') # Calculate best times best_day = day_counts.idxmax() if day_counts.max() > 0 else "N/A" best_hour = hour_counts.idxmax() if hour_counts.max() > 0 else "N/A" total_listings = len(df) # Find peak combinations peak_combo = heatmap_data.stack().idxmax() if heatmap_data.values.max() > 0 else ("N/A", "N/A") # Fix: Ensure peak_combo is iterable if isinstance(peak_combo, tuple) and len(peak_combo) == 2: stats_text = f"🎯 Peak time: {peak_combo[0]} at {peak_combo[1]}:00" else: stats_text = "🎯 Peak time: N/A" stats_text = f"""Summary Statistics Total listings tracked: {total_listings} 🏆 Best day: {best_day} ⏰ Best hour: {best_hour}:00 {stats_text} 📈 Average per day: {total_listings/7:.1f} 📅 Data collection period: From: {df['timestamp'].min()[:10] if 'timestamp' in df.columns else 'N/A'} To: {df['timestamp'].max()[:10] if 'timestamp' in df.columns else 'N/A'} """ ax4.text(0.1, 0.9, stats_text, transform=ax4.transAxes, fontsize=11, verticalalignment='top', fontfamily='monospace', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) plt.tight_layout(rect=(0, 0, 1, 0.99)) # Save plot with high resolution plt.savefig(plot_path, dpi=300, bbox_inches='tight', facecolor='white', edgecolor='none') plt.close() logger.info(f"Plot saved to {plot_path}") return str(plot_path) except Exception as e: logger.error(f"Error creating plot: {e}") return "" def _generate_error_rate_plot(self) -> tuple[str | None, str]: """Read applications.json and produce a plot image + summary text. Returns (plot_path, summary_text) or (None, "") if insufficient data. """ import matplotlib.dates as mdates from pathlib import Path if not self.applications_file.exists(): logger.warning("No applications.json found for errorrate plot") return None, "" try: with open(self.applications_file, 'r', encoding='utf-8') as f: apps = json.load(f) if not apps: return None, "" # Convert to DataFrame rows = [] for _id, rec in apps.items(): ts = rec.get('timestamp') try: dt = pd.to_datetime(ts) except Exception: dt = pd.NaT rows.append({'id': _id, 'company': rec.get('company'), 'success': bool(rec.get('success')), 'ts': dt}) df = pd.DataFrame(rows) df = df.dropna(subset=['ts']) if df.empty: return None, "" df['date'] = df['ts'].dt.floor('D') grouped = df.groupby('date').agg(total=('id','count'), successes=('success', lambda x: x.sum())) grouped['failures'] = grouped['total'] - grouped['successes'] grouped['error_rate'] = grouped['failures'] / grouped['total'] # Ensure index is sorted by date for plotting grouped = grouped.sort_index() # Prepare plot: convert dates to matplotlib numeric x-values so bars and line align fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 14), sharex=True) fig.suptitle('Autopilot Performance Analysis', fontsize=18, fontweight='bold', y=0.995) dates = pd.to_datetime(grouped.index).to_pydatetime() x = mdates.date2num(dates) width = 0.6 # width in days for bars successes = grouped['successes'].values failures = grouped['failures'].values # Use seaborn color palette success_color = sns.color_palette('RdYlGn', n_colors=10)[8] # Green failure_color = sns.color_palette('RdYlGn', n_colors=10)[1] # Red ax1.bar(x, successes, width=width, color=success_color, align='center', label='Success', edgecolor='white', linewidth=0.5) ax1.bar(x, failures, bottom=successes, width=width, color=failure_color, align='center', label='Failure', edgecolor='white', linewidth=0.5) ax1.set_ylabel('Count', fontsize=11, fontweight='bold') ax1.set_title('Successes vs Failures (by day)', fontsize=13, fontweight='bold', pad=10) ax1.set_xticks(x) ax1.set_xlim(min(x) - 1, max(x) + 1) ax1.xaxis.set_major_locator(mdates.AutoDateLocator()) ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) ax1.legend(loc='upper left', framealpha=0.9) ax1.grid(True, alpha=0.3, linestyle='--', axis='y') # Plot error rate line on same x (date) axis sns.lineplot(x=x, y=grouped['error_rate'].values, ax=ax2, marker='o', linewidth=2.5, markersize=8, color='#E74C3C') ax2.fill_between(x, grouped['error_rate'].values, alpha=0.2, color='#E74C3C') ax2.set_ylim(-0.02, 1.02) ax2.set_ylabel('Error Rate', fontsize=11, fontweight='bold') ax2.set_xlabel('Date', fontsize=11, fontweight='bold') ax2.set_title('Daily Error Rate (failures / total)', fontsize=13, fontweight='bold', pad=10) ax2.grid(True, alpha=0.3, linestyle='--') ax2.set_xticks(x) ax2.set_xlim(min(x) - 1, max(x) + 1) ax2.xaxis.set_major_locator(mdates.AutoDateLocator()) ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) # Error rate by company (line plot with seaborn palette) company_grouped = df.groupby(['date', 'company']).agg(total=('id','count'), successes=('success', lambda x: x.sum())) company_grouped['failures'] = company_grouped['total'] - company_grouped['successes'] company_grouped['error_rate'] = company_grouped['failures'] / company_grouped['total'] company_grouped = company_grouped.reset_index() error_rate_pivot = company_grouped.pivot(index='date', columns='company', values='error_rate') # Use distinct seaborn colors for each company palette = sns.color_palette('husl', n_colors=len(error_rate_pivot.columns)) for idx, company in enumerate(error_rate_pivot.columns): y = error_rate_pivot[company].values ax3.plot(x, y, marker='o', label=str(company), linewidth=2.5, markersize=7, color=palette[idx]) ax3.set_ylim(-0.02, 1.02) ax3.set_ylabel('Error Rate', fontsize=11, fontweight='bold') ax3.set_xlabel('Date', fontsize=11, fontweight='bold') ax3.set_title('Daily Error Rate by Company', fontsize=13, fontweight='bold', pad=10) ax3.grid(True, alpha=0.3, linestyle='--') ax3.set_xticks(x) ax3.set_xlim(min(x) - 1, max(x) + 1) ax3.xaxis.set_major_locator(mdates.AutoDateLocator()) ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) ax3.legend(title='Company', loc='upper right', fontsize=10, framealpha=0.9) fig.autofmt_xdate() plt.tight_layout(rect=(0, 0, 1, 0.99)) plot_path = self.applications_file.parent / 'error_rate.png' tmp_path = self.applications_file.parent / 'error_rate.tmp.png' # Save to a temp file first and atomically replace to ensure overwrite fig.savefig(tmp_path, format='png', dpi=300, bbox_inches='tight', facecolor='white', edgecolor='none') plt.close(fig) try: tmp_path.replace(plot_path) except Exception: # Fallback: try removing existing and renaming try: if plot_path.exists(): plot_path.unlink() tmp_path.rename(plot_path) except Exception: logger.exception(f"Failed to write plot to {plot_path}") # Summary total_attempts = int(grouped['total'].sum()) total_success = int(grouped['successes'].sum()) total_fail = int(grouped['failures'].sum()) overall_error = (total_fail / total_attempts) if total_attempts>0 else 0.0 summary = f"Total attempts: {total_attempts}\nSuccesses: {total_success}\nFailures: {total_fail}\nOverall error rate: {overall_error:.1%}" return str(plot_path), summary except Exception as e: logger.exception(f"Failed to generate error rate plot: {e}") return None, "" async def login(self, page) -> bool: """Login to inberlinwohnen.de (minimal, like test script)""" if not self.state_manager.email or not self.state_manager.password: logger.warning("No credentials provided. Ensure INBERLIN_EMAIL and INBERLIN_PASSWORD are set in the environment.") return False try: logger.info("Navigating to login page...") login_response = await page.goto("https://www.inberlinwohnen.de/login", wait_until="networkidle") logger.info(f"Login page status: {login_response.status if login_response else 'No response'}") await asyncio.sleep(2) # Dismiss cookie/privacy modal before login logger.info("Attempting to dismiss cookie/privacy modal before login...") await self.dismiss_cookie_modal(page) logger.info("Cookie/privacy modal dismissed.") # Fill login form (if present) logger.info("Filling in login credentials...") await page.fill('input[name="email"], input[type="email"]', self.state_manager.email) await page.fill('input[name="password"], input[type="password"]', self.state_manager.password) logger.info("Login credentials filled.") # Click submit button logger.info("Submitting login form...") submit_response = await page.click('button[type="submit"], input[type="submit"]', timeout=30000) logger.info(f"Clicked submit, waiting for navigation...") try: await page.wait_for_load_state("networkidle", timeout=30000) logger.info(f"After login, page url: {page.url}") logger.info(f"After login, page content length: {len(await page.content())}") except Exception as e: logger.error(f"Timeout or error after login submit: {e}") await asyncio.sleep(2) # Check if login successful logger.info("Checking if login was successful...") if "mein-bereich" in page.url or await page.query_selector('text="Abmelden"'): logger.info("Login successful.") return True else: logger.error(f"Login failed - ended up at {page.url}") return False except Exception as e: logger.error(f"Login error: {e}") logger.debug("Exception occurred during login", exc_info=True) return False async def fetch_listings(self) -> list[dict]: """Fetch listings from the Wohnungsfinder with retry logic for transient failures""" max_retries = 3 retry_delay = 2 # Initial delay in seconds for attempt in range(max_retries): try: listings = await self._fetch_listings_attempt() if attempt > 0: logger.info(f"Fetch succeeded (attempt {attempt + 1})") return listings except Exception as e: if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) # Exponential backoff logger.warning(f"Fetch failed (attempt {attempt + 1}/{max_retries}): {str(e)[:50]}... Retrying in {wait_time}s") await asyncio.sleep(wait_time) else: logger.error(f"Fetch failed after {max_retries} attempts") return [] return [] async def _fetch_listings_attempt(self) -> list[dict]: """Single attempt to fetch listings (extracted for retry logic)""" listings = [] try: page = await self.context.new_page() # Attempt login if not already logged in if not self.state_manager.logged_in: login_success = await self.login(page) if login_success: self.state_manager.logged_in = True else: logger.warning("Login failed. Proceeding with public listings.") # Select the correct URL after login check if self.state_manager.logged_in: url = "https://www.inberlinwohnen.de/mein-bereich/wohnungsfinder" else: url = "https://www.inberlinwohnen.de/wohnungsfinder/" logger.info(f"Fetching listings from {url}") # Navigate to the page with a longer wait condition for slow internet logger.info("Navigating to listings page with extended timeout...") await page.goto(url, wait_until="networkidle", timeout=20000) # Check if the page is a download if "download" in page.url or page.url.endswith(".pdf"): logger.error("Page redirected to a download. Aborting.") return [] # Handle cookie modal if not logged in if not self.state_manager.logged_in: await self.dismiss_cookie_modal(page) # Wait a short time for the page to render, but do not block on any selector await asyncio.sleep(2) # Collect all listings content by clicking through pagination all_content = "" page_num = 1 max_pages = 10 # Safety limit while page_num <= max_pages: # Get current page content current_content = await page.content() all_content += current_content # Check for "next page" button (Livewire pagination) next_btn = await page.query_selector('[wire\\:click*="nextPage"]') if next_btn and await next_btn.is_visible(): await next_btn.click() await asyncio.sleep(2) # Wait for Livewire to update page_num += 1 else: break logger.info(f"Collected content from {page_num} page(s)") content = all_content # Debug: save HTML to file for inspection debug_path = DATA_DIR / "debug_page.html" with open(debug_path, "w", encoding="utf-8") as f: f.write(content) logger.info(f"Saved debug HTML to {debug_path}") # Debug: Log page title and check for listing count count_match = re.search(r'(\\d+)\\s*Wohnungen? für Sie gefunden', content) if count_match: logger.info(f"Page shows {count_match.group(1)} listings available") # Also check for "Zeige X bis Y von Z Angeboten" show_match = re.search(r'Zeige \\d+ bis \\d+ von (\\d+) Angeboten', content) if show_match: logger.info(f"Page shows {show_match.group(1)} total offers") # Decode HTML entities and JSON escaped slashes for extraction content_decoded = html.unescape(content) content_decoded = content_decoded.replace('\\/', '/') # Build flatId -> deeplink mapping from wire:snapshot JSON data (monitor.py logic) # Format in HTML: "deeplink":"https://...","flatId":12345 deeplink_pattern = r'"deeplink":"(https://[^"]+)","flatId":(\d+)' deeplink_matches = re.findall(deeplink_pattern, content_decoded) # Use string keys for flatId to match button extraction id_to_link = {str(flat_id): link for link, flat_id in deeplink_matches} logger.info(f"Found {len(id_to_link)} deeplink mappings") # --- Extraction logic copied from monitor.py for robustness --- # Extract listings from button elements with aria-label # Format: @click="open !== 12345 ..." aria-label="Wohnungsangebot - 2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Adresse" button_pattern = r'@click="open !== (\d+)[^\"]*"[^>]*aria-label="Wohnungsangebot - ([^"]+)' button_matches = re.findall(button_pattern, content_decoded) logger.info(f"Found {len(button_matches)} listing buttons (monitor.py pattern)") for flat_id, listing_text in button_matches: # Parse listing text: "2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Rhinstraße 4, 10315 Lichtenberg" parts_match = re.match(r'(\d,\d)\s*Zimmer,\s*([\d,.]+)\s*m²,\s*([\d.,]+)\s*€\s*(?:Kaltmiete)?\s*\|\s*(.+)', listing_text) if not parts_match: continue rooms, size, price, address = parts_match.groups() rooms = rooms.strip() address = address.strip() if len(address) < 5: continue # Get the deeplink for this flat (monitor.py logic: flat_id as string) detail_link = id_to_link.get(str(flat_id), url) listing_id = hashlib.md5(f"{rooms}{size}{price}{address}".encode()).hexdigest()[:12] listings.append({ "id": listing_id, "rooms": f"{rooms} Zimmer", "size": f"{size} m²", "price": f"{price} €", "address": address, "link": detail_link, "fetched_at": datetime.now().isoformat() }) # Deduplicate by id seen_ids = set() unique_listings = [] for listing in listings: if listing["id"] not in seen_ids: seen_ids.add(listing["id"]) unique_listings.append(listing) listings = unique_listings if not listings: logger.warning("No listings parsed") await page.close() logger.info(f"Fetched {len(listings)} listings") return listings except Exception as e: logger.error(f"Fetch error: {str(e)[:100]}") return [] async def dismiss_cookie_modal(self, page): """Dismiss the privacy/cookie consent modal if present""" try: # Wait a bit for modal to appear await asyncio.sleep(2) # Try to find and click the accept button in the privacy modal # Look for common accept button patterns in German accept_selectors = [ 'button:has-text("Akzeptieren")', 'button:has-text("Alle akzeptieren")', 'button:has-text("Accept")', 'button:has-text("Zustimmen")', '[x-show="showPrivacyModal"] button', '.privacy-modal button', 'button.accept-cookies', # More specific to inberlinwohnen 'div[x-show="showPrivacyModal"] button:first-of-type', ] for selector in accept_selectors: try: button = await page.query_selector(selector) if button and await button.is_visible(): await button.click() logger.info(f"Clicked cookie accept button: {selector}") await asyncio.sleep(1) return True except: continue # Try clicking any visible button in the modal overlay modal = await page.query_selector('div[x-show="showPrivacyModal"]') if modal: buttons = await modal.query_selector_all('button') for btn in buttons: if await btn.is_visible(): text = await btn.inner_text() logger.info(f"Found modal button: {text}") # Click the first button (usually accept) await btn.click() await asyncio.sleep(1) return True logger.info("No cookie modal found or already dismissed") return False except Exception as e: logger.debug(f"Cookie modal handling: {e}") return False