from datetime import datetime
from handlers.base_handler import BaseHandler
from handlers.howoge_handler import HowogeHandler
from handlers.gewobag_handler import GewobagHandler
from handlers.degewo_handler import DegewoHandler
from handlers.gesobau_handler import GesobauHandler
from handlers.stadtundland_handler import StadtUndLandHandler
from handlers.wbm_handler import WBMHandler
import json
from pathlib import Path
import pandas as pd
from typing import Optional
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import logging
import matplotlib
import matplotlib.font_manager as fm
import seaborn as sns
import html
import re
import hashlib
import asyncio
from playwright.async_api import async_playwright
import os
STATE_FILE = Path("data/state.json")
APPLICATIONS_FILE = Path("data/applications.json")
TIMING_FILE = Path("data/listing_times.csv")
LISTINGS_FILE = Path("data/listings.json")
DATA_DIR = Path("data")
# --- Matplotlib & Seaborn Setup ---
font_cache_dir = Path("data/fonts")
font_cache_dir.mkdir(parents=True, exist_ok=True)
matplotlib.get_configdir = lambda: str(font_cache_dir)
fm.findSystemFonts(fontpaths=str(font_cache_dir), fontext='ttf')
matplotlib.rcParams['font.family'] = 'Noto Sans'
# Configure seaborn for beautiful plots
sns.set_theme(style="whitegrid", palette="deep")
sns.set_context("notebook", font_scale=1.1)
matplotlib.rcParams['figure.dpi'] = 300
matplotlib.rcParams['savefig.dpi'] = 300
matplotlib.rcParams['figure.facecolor'] = 'white'
# Use the root logger for consistency with main.py
logger = logging.getLogger()
class ApplicationHandler:
"""
Main handler for apartment monitoring, application automation, and notification logic.
Handles browser automation, listing extraction, application delegation, and Telegram notifications.
"""
def __init__(self, browser_context, state_manager, applications_file: Optional[Path] = None):
if browser_context is None:
raise ValueError("browser_context must not be None. ApplicationHandler requires a valid Playwright context.")
self.context = browser_context
self.state_manager = state_manager
self.applications_file = applications_file or APPLICATIONS_FILE
self.handlers = {
"howoge": HowogeHandler(browser_context),
"gewobag": GewobagHandler(browser_context),
"degewo": DegewoHandler(browser_context),
"gesobau": GesobauHandler(browser_context),
"stadtundland": StadtUndLandHandler(browser_context),
"wbm": WBMHandler(browser_context),
}
def set_telegram_bot(self, telegram_bot) -> None:
"""Attach a TelegramBot instance for notifications."""
self.telegram_bot = telegram_bot
def notify_new_listings(self, new_listings: list[dict], application_results: Optional[dict] = None) -> None:
"""
Send a Telegram notification for each new listing.
Includes application result if autopilot was enabled.
"""
for listing in new_listings:
link = listing.get('link', 'https://www.inberlinwohnen.de/wohnungsfinder/')
company = self._detect_company(link)
if company == "wgcompany":
continue # skip WGCompany listings for main handler
company_label = company.capitalize() if company != "unknown" else "Wohnung"
message = (
f"[{company_label}] Neue Wohnung!\n\n"
f"🚪 {listing['rooms']}\n"
f"📏 {listing['size']}\n"
f"💰 {listing['price']}\n"
f"📍 {listing['address']}\n\n"
f"👉 Alle Details"
)
# Always show autopilot/apply status for clarity
if application_results is not None:
if listing["id"] in application_results:
result = application_results[listing["id"]]
if result["success"]:
message += f"\n\n\ud83e\udd16 Auto-applied! ({result['company']})"
if result["message"]:
message += f"\n{result['message']}"
else:
# Handler attempted but failed
fail_msg = result.get("message") or "Unknown error during application."
message += f"\n\n\u26a0\ufe0f Auto-apply failed ({result['company']})"
message += f"\nReason: {html.escape(fail_msg)}"
else:
# Should not happen if logic is correct, but fallback
message += "\n\n\u2139\ufe0f No application attempted (internal logic error)"
else:
# Autopilot was off or not attempted at all
message += "\n\n\u2139\ufe0f No application attempted (autopilot off)"
# Send via TelegramBot if available
if hasattr(self, 'telegram_bot') and self.telegram_bot:
loop = getattr(self.telegram_bot, 'event_loop', None) or asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self.telegram_bot._send_message(message), loop)
else:
logger.debug(f"[No Telegram] {listing['address']} ({listing['rooms']})")
async def apply_to_listings(self, listings: list[dict]) -> dict:
"""
Apply to multiple listings (autopilot mode).
Returns a dict of application results keyed by listing ID.
"""
results = {}
# Fail fast if context is ever None (should never happen)
if self.context is None:
raise RuntimeError("browser_context is None in apply_to_listings. This should never happen.")
for listing in listings:
if self.has_applied(listing["id"]):
logger.debug(f"Skip (applied): {listing['address']}")
continue
result = await self.apply(listing)
results[listing["id"]] = result
self.save_application(result)
status = "[SUCCESS]" if result["success"] else "[FAILED]"
logger.info(f"{status} {listing['address'][:30]}... | {result['message'][:50]}")
await asyncio.sleep(2)
return results
def log_listing_times(self, new_listings: list[dict]) -> None:
"""
Log new listing appearance times to CSV for later analysis and pattern mining.
Appends to data/listing_times.csv, creating header if needed.
"""
if not new_listings:
return
import csv
TIMING_FILE = Path("data/listing_times.csv")
file_exists = TIMING_FILE.exists()
with open(TIMING_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
now = datetime.now()
for listing in new_listings:
writer.writerow([
now.isoformat(),
now.strftime("%A"), # Weekday name
now.hour,
now.minute,
listing["rooms"],
listing["size"],
listing["price"],
listing["address"],
listing["id"]
])
logger.debug(f"Logged {len(new_listings)} listings to CSV")
# ...existing code...
async def init_browser(self) -> None:
"""Initialize Playwright browser (minimal, like test script)"""
if not hasattr(self, 'browser') or self.browser is None:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
self.context = await self.browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
logger.info("Browser initialized (minimal context)")
self.application_handler = ApplicationHandler(self.context, self.state_manager)
async def apply(self, listing: dict) -> dict:
company = self._detect_company(listing.get("link", ""))
handler = self.handlers.get(company)
result = {
"listing_id": listing.get("id"),
"company": company,
"link": listing.get("link"),
"timestamp": datetime.now().isoformat(),
"success": False,
"message": "",
"address": listing.get("address", ""),
"rooms": listing.get("rooms", ""),
"price": listing.get("price", "")
}
if handler:
result = await handler.apply(listing, result)
else:
result["message"] = f"No handler found for company: {company}"
return result
def _detect_company(self, link: str) -> str:
"""Robust company detection logic, matching monitor.py as closely as possible."""
link = (link or "").lower()
# Remove URL scheme and www for easier matching
link = re.sub(r"^https?://(www\.)?", "", link)
# Use domain-based matching, including subdomains
if re.search(r"howoge\\.de", link):
return "howoge"
if re.search(r"gewobag\\.de", link):
return "gewobag"
if re.search(r"degewo\\.de", link):
return "degewo"
if re.search(r"gesobau\\.de", link):
return "gesobau"
if re.search(r"stadt-und-land\\.de|stadtundland\\.de", link):
return "stadtundland"
if re.search(r"wbm\\.de", link):
return "wbm"
# Also check for company in the path or query (legacy/edge cases)
if re.search(r"howoge", link):
return "howoge"
if re.search(r"gewobag", link):
return "gewobag"
if re.search(r"degewo", link):
return "degewo"
if re.search(r"gesobau", link):
return "gesobau"
if re.search(r"stadt-und-land|stadtundland", link):
return "stadtundland"
if re.search(r"wbm", link):
return "wbm"
return "unknown"
def load_state(self) -> dict:
"""Load persistent state"""
if STATE_FILE.exists():
with open(STATE_FILE, "r") as f:
return json.load(f)
return {"autopilot": False}
def save_state(self, state: dict) -> None:
"""Save persistent state"""
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def set_autopilot(self, enabled: bool) -> None:
"""Enable or disable autopilot mode"""
self.state_manager.set_autopilot(enabled)
def is_autopilot_enabled(self) -> bool:
"""Check if autopilot mode is enabled"""
return self.state_manager.is_autopilot_enabled()
def load_applications(self) -> dict:
"""Load application history."""
if self.applications_file.exists():
try:
with open(self.applications_file, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError:
logger.error("Failed to decode applications file. Returning empty history.")
return {}
def save_application(self, result: dict) -> None:
"""Save an application result."""
applications = self.load_applications()
applications[result["listing_id"]] = result
with open(self.applications_file, "w", encoding="utf-8") as f:
json.dump(applications, f, indent=2, ensure_ascii=False)
def has_applied(self, listing_id: str) -> bool:
"""
Check if we've already applied to this listing.
Excludes baseline entries from first run (not auto-applied).
"""
applications = self.load_applications()
if listing_id not in applications:
return False
app = applications[listing_id]
# If message contains "First run, not auto-applied", treat as not applied
if "First run, not auto-applied" in app.get("message", ""):
return False
return True
def load_previous_listings(self) -> dict:
"""Load previously saved listings"""
if LISTINGS_FILE.exists():
with open(LISTINGS_FILE, "r") as f:
return json.load(f)
return {}
def save_listings(self, listings: list[dict]) -> None:
"""Save current listings"""
listings_dict = {l["id"]: l for l in listings}
with open(LISTINGS_FILE, "w") as f:
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
"""Find listings that are new since last check"""
new = []
for listing in current:
if listing["id"] not in previous:
new.append(listing)
return new
def _generate_weekly_plot(self) -> str:
"""Generate a heatmap, bar chart, line chart, and summary of listings by day/hour, like monitor.py."""
plot_path = DATA_DIR / "weekly_plot.png"
try:
if not TIMING_FILE.exists():
logger.warning("No timing data file found")
return ""
df = pd.read_csv(TIMING_FILE)
if len(df) < 1:
logger.warning("Timing file is empty")
return ""
logger.info(f"Loaded {len(df)} listing records for plot")
# Create day-hour matrix
days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
# Count listings per day and hour
heatmap_data = pd.DataFrame(0, index=days_order, columns=range(24))
for _, row in df.iterrows():
day = row['weekday']
hour = int(row['hour'])
if day in days_order:
# Use pd.to_numeric to ensure value is numeric before incrementing
val = pd.to_numeric(heatmap_data.loc[day, hour], errors='coerce')
if pd.isna(val):
heatmap_data.loc[day, hour] = 1
else:
heatmap_data.loc[day, hour] = int(val) + 1
# Create figure with two subplots
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Listing Appearance Patterns', fontsize=18, fontweight='bold', y=0.995)
# 1. Heatmap - Day vs Hour (using seaborn)
ax1 = axes[0, 0]
sns.heatmap(heatmap_data, cmap='RdYlGn_r', annot=False, fmt='d',
cbar_kws={'label': 'Count'}, ax=ax1, linewidths=0.5, linecolor='gray')
ax1.set_xlabel('Hour of Day', fontsize=11, fontweight='bold')
ax1.set_ylabel('Day of Week', fontsize=11, fontweight='bold')
ax1.set_title('Listings by Day & Hour', fontsize=12, fontweight='bold', pad=10)
ax1.set_xticklabels(range(24), fontsize=9)
ax1.set_yticklabels(days_order, rotation=0, fontsize=9)
# 2. Bar chart - By day of week (seaborn style)
ax2 = axes[0, 1]
day_counts = df['weekday'].value_counts().reindex(days_order, fill_value=0)
sns.barplot(x=range(7), y=day_counts.values, ax=ax2, palette='Blues_d', hue=range(7), legend=False)
ax2.set_xticks(range(7))
ax2.set_xticklabels(['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'], fontsize=9)
ax2.set_xlabel('Day of Week', fontsize=11, fontweight='bold')
ax2.set_ylabel('Number of Listings', fontsize=11, fontweight='bold')
ax2.set_title('Total Listings by Day', fontsize=12, fontweight='bold', pad=10)
for i, v in enumerate(day_counts.values):
if v > 0:
ax2.text(i, v + 0.5, str(v), ha='center', fontsize=9, fontweight='bold')
# 3. Line chart - By hour (seaborn style)
ax3 = axes[1, 0]
hour_counts = df['hour'].value_counts().reindex(range(24), fill_value=0)
sns.lineplot(x=range(24), y=hour_counts.values, ax=ax3, marker='o',
linewidth=2.5, markersize=6, color='#2E86AB')
ax3.fill_between(range(24), hour_counts.values, alpha=0.2, color='#2E86AB')
ax3.set_xticks(range(0, 24, 2))
ax3.set_xlabel('Hour of Day', fontsize=11, fontweight='bold')
ax3.set_ylabel('Number of Listings', fontsize=11, fontweight='bold')
ax3.set_title('Total Listings by Hour', fontsize=12, fontweight='bold', pad=10)
ax3.grid(True, alpha=0.3, linestyle='--')
# 4. Summary stats
ax4 = axes[1, 1]
ax4.axis('off')
# Calculate best times
best_day = day_counts.idxmax() if day_counts.max() > 0 else "N/A"
best_hour = hour_counts.idxmax() if hour_counts.max() > 0 else "N/A"
total_listings = len(df)
# Find peak combinations
peak_combo = heatmap_data.stack().idxmax() if heatmap_data.values.max() > 0 else ("N/A", "N/A")
# Fix: Ensure peak_combo is iterable
if isinstance(peak_combo, tuple) and len(peak_combo) == 2:
stats_text = f"🎯 Peak time: {peak_combo[0]} at {peak_combo[1]}:00"
else:
stats_text = "🎯 Peak time: N/A"
stats_text = f"""Summary Statistics
Total listings tracked: {total_listings}
🏆 Best day: {best_day}
⏰ Best hour: {best_hour}:00
{stats_text}
📈 Average per day: {total_listings/7:.1f}
📅 Data collection period:
From: {df['timestamp'].min()[:10] if 'timestamp' in df.columns else 'N/A'}
To: {df['timestamp'].max()[:10] if 'timestamp' in df.columns else 'N/A'}
"""
ax4.text(0.1, 0.9, stats_text, transform=ax4.transAxes, fontsize=11,
verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout(rect=(0, 0, 1, 0.99))
# Save plot with high resolution
plt.savefig(plot_path, dpi=300, bbox_inches='tight', facecolor='white', edgecolor='none')
plt.close()
logger.info(f"Plot saved to {plot_path}")
return str(plot_path)
except Exception as e:
logger.error(f"Error creating plot: {e}")
return ""
def _generate_error_rate_plot(self) -> tuple[str | None, str]:
"""Read applications.json and produce a plot image + summary text.
Returns (plot_path, summary_text) or (None, "") if insufficient data.
"""
import matplotlib.dates as mdates
from pathlib import Path
if not self.applications_file.exists():
logger.warning("No applications.json found for errorrate plot")
return None, ""
try:
with open(self.applications_file, 'r', encoding='utf-8') as f:
apps = json.load(f)
if not apps:
return None, ""
# Convert to DataFrame
rows = []
for _id, rec in apps.items():
ts = rec.get('timestamp')
try:
dt = pd.to_datetime(ts)
except Exception:
dt = pd.NaT
rows.append({'id': _id, 'company': rec.get('company'), 'success': bool(rec.get('success')), 'ts': dt})
df = pd.DataFrame(rows)
df = df.dropna(subset=['ts'])
if df.empty:
return None, ""
df['date'] = df['ts'].dt.floor('D')
grouped = df.groupby('date').agg(total=('id','count'), successes=('success', lambda x: x.sum()))
grouped['failures'] = grouped['total'] - grouped['successes']
grouped['error_rate'] = grouped['failures'] / grouped['total']
# Ensure index is sorted by date for plotting
grouped = grouped.sort_index()
# Prepare plot: convert dates to matplotlib numeric x-values so bars and line align
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 14), sharex=True)
fig.suptitle('Autopilot Performance Analysis', fontsize=18, fontweight='bold', y=0.995)
dates = pd.to_datetime(grouped.index).to_pydatetime()
x = mdates.date2num(dates)
width = 0.6 # width in days for bars
successes = grouped['successes'].values
failures = grouped['failures'].values
# Use seaborn color palette
success_color = sns.color_palette('RdYlGn', n_colors=10)[8] # Green
failure_color = sns.color_palette('RdYlGn', n_colors=10)[1] # Red
ax1.bar(x, successes, width=width, color=success_color, align='center', label='Success', edgecolor='white', linewidth=0.5)
ax1.bar(x, failures, bottom=successes, width=width, color=failure_color, align='center', label='Failure', edgecolor='white', linewidth=0.5)
ax1.set_ylabel('Count', fontsize=11, fontweight='bold')
ax1.set_title('Successes vs Failures (by day)', fontsize=13, fontweight='bold', pad=10)
ax1.set_xticks(x)
ax1.set_xlim(min(x) - 1, max(x) + 1)
ax1.xaxis.set_major_locator(mdates.AutoDateLocator())
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax1.legend(loc='upper left', framealpha=0.9)
ax1.grid(True, alpha=0.3, linestyle='--', axis='y')
# Plot error rate line on same x (date) axis
sns.lineplot(x=x, y=grouped['error_rate'].values, ax=ax2, marker='o',
linewidth=2.5, markersize=8, color='#E74C3C')
ax2.fill_between(x, grouped['error_rate'].values, alpha=0.2, color='#E74C3C')
ax2.set_ylim(-0.02, 1.02)
ax2.set_ylabel('Error Rate', fontsize=11, fontweight='bold')
ax2.set_xlabel('Date', fontsize=11, fontweight='bold')
ax2.set_title('Daily Error Rate (failures / total)', fontsize=13, fontweight='bold', pad=10)
ax2.grid(True, alpha=0.3, linestyle='--')
ax2.set_xticks(x)
ax2.set_xlim(min(x) - 1, max(x) + 1)
ax2.xaxis.set_major_locator(mdates.AutoDateLocator())
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
# Error rate by company (line plot with seaborn palette)
company_grouped = df.groupby(['date', 'company']).agg(total=('id','count'), successes=('success', lambda x: x.sum()))
company_grouped['failures'] = company_grouped['total'] - company_grouped['successes']
company_grouped['error_rate'] = company_grouped['failures'] / company_grouped['total']
company_grouped = company_grouped.reset_index()
error_rate_pivot = company_grouped.pivot(index='date', columns='company', values='error_rate')
# Use distinct seaborn colors for each company
palette = sns.color_palette('husl', n_colors=len(error_rate_pivot.columns))
for idx, company in enumerate(error_rate_pivot.columns):
y = error_rate_pivot[company].values
ax3.plot(x, y, marker='o', label=str(company), linewidth=2.5,
markersize=7, color=palette[idx])
ax3.set_ylim(-0.02, 1.02)
ax3.set_ylabel('Error Rate', fontsize=11, fontweight='bold')
ax3.set_xlabel('Date', fontsize=11, fontweight='bold')
ax3.set_title('Daily Error Rate by Company', fontsize=13, fontweight='bold', pad=10)
ax3.grid(True, alpha=0.3, linestyle='--')
ax3.set_xticks(x)
ax3.set_xlim(min(x) - 1, max(x) + 1)
ax3.xaxis.set_major_locator(mdates.AutoDateLocator())
ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax3.legend(title='Company', loc='upper right', fontsize=10, framealpha=0.9)
fig.autofmt_xdate()
plt.tight_layout(rect=(0, 0, 1, 0.99))
plot_path = self.applications_file.parent / 'error_rate.png'
tmp_path = self.applications_file.parent / 'error_rate.tmp.png'
# Save to a temp file first and atomically replace to ensure overwrite
fig.savefig(tmp_path, format='png', dpi=300, bbox_inches='tight', facecolor='white', edgecolor='none')
plt.close(fig)
try:
tmp_path.replace(plot_path)
except Exception:
# Fallback: try removing existing and renaming
try:
if plot_path.exists():
plot_path.unlink()
tmp_path.rename(plot_path)
except Exception:
logger.exception(f"Failed to write plot to {plot_path}")
# Summary
total_attempts = int(grouped['total'].sum())
total_success = int(grouped['successes'].sum())
total_fail = int(grouped['failures'].sum())
overall_error = (total_fail / total_attempts) if total_attempts>0 else 0.0
summary = f"Total attempts: {total_attempts}\nSuccesses: {total_success}\nFailures: {total_fail}\nOverall error rate: {overall_error:.1%}"
return str(plot_path), summary
except Exception as e:
logger.exception(f"Failed to generate error rate plot: {e}")
return None, ""
async def login(self, page) -> bool:
"""Login to inberlinwohnen.de (minimal, like test script)"""
if not self.state_manager.email or not self.state_manager.password:
logger.warning("No credentials provided. Ensure INBERLIN_EMAIL and INBERLIN_PASSWORD are set in the environment.")
return False
try:
logger.info("Navigating to login page...")
login_response = await page.goto("https://www.inberlinwohnen.de/login", wait_until="networkidle")
logger.info(f"Login page status: {login_response.status if login_response else 'No response'}")
await asyncio.sleep(2)
# Dismiss cookie/privacy modal before login
logger.info("Attempting to dismiss cookie/privacy modal before login...")
await self.dismiss_cookie_modal(page)
logger.info("Cookie/privacy modal dismissed.")
# Fill login form (if present)
logger.info("Filling in login credentials...")
await page.fill('input[name="email"], input[type="email"]', self.state_manager.email)
await page.fill('input[name="password"], input[type="password"]', self.state_manager.password)
logger.info("Login credentials filled.")
# Click submit button
logger.info("Submitting login form...")
submit_response = await page.click('button[type="submit"], input[type="submit"]', timeout=30000)
logger.info(f"Clicked submit, waiting for navigation...")
try:
await page.wait_for_load_state("networkidle", timeout=30000)
logger.info(f"After login, page url: {page.url}")
logger.info(f"After login, page content length: {len(await page.content())}")
except Exception as e:
logger.error(f"Timeout or error after login submit: {e}")
await asyncio.sleep(2)
# Check if login successful
logger.info("Checking if login was successful...")
if "mein-bereich" in page.url or await page.query_selector('text="Abmelden"'):
logger.info("Login successful.")
return True
else:
logger.error(f"Login failed - ended up at {page.url}")
return False
except Exception as e:
logger.error(f"Login error: {e}")
logger.debug("Exception occurred during login", exc_info=True)
return False
async def fetch_listings(self) -> list[dict]:
"""Fetch listings from the Wohnungsfinder with retry logic for transient failures"""
max_retries = 3
retry_delay = 2 # Initial delay in seconds
for attempt in range(max_retries):
try:
listings = await self._fetch_listings_attempt()
if attempt > 0:
logger.info(f"Fetch succeeded (attempt {attempt + 1})")
return listings
except Exception as e:
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt) # Exponential backoff
logger.warning(f"Fetch failed (attempt {attempt + 1}/{max_retries}): {str(e)[:50]}... Retrying in {wait_time}s")
await asyncio.sleep(wait_time)
else:
logger.error(f"Fetch failed after {max_retries} attempts")
return []
return []
async def _fetch_listings_attempt(self) -> list[dict]:
"""Single attempt to fetch listings (extracted for retry logic)"""
listings = []
try:
page = await self.context.new_page()
# Attempt login if not already logged in
if not self.state_manager.logged_in:
login_success = await self.login(page)
if login_success:
self.state_manager.logged_in = True
else:
logger.warning("Login failed. Proceeding with public listings.")
# Select the correct URL after login check
if self.state_manager.logged_in:
url = "https://www.inberlinwohnen.de/mein-bereich/wohnungsfinder"
else:
url = "https://www.inberlinwohnen.de/wohnungsfinder/"
logger.info(f"Fetching listings from {url}")
# Navigate to the page with a longer wait condition for slow internet
logger.info("Navigating to listings page with extended timeout...")
await page.goto(url, wait_until="networkidle", timeout=20000)
# Check if the page is a download
if "download" in page.url or page.url.endswith(".pdf"):
logger.error("Page redirected to a download. Aborting.")
return []
# Handle cookie modal if not logged in
if not self.state_manager.logged_in:
await self.dismiss_cookie_modal(page)
# Wait a short time for the page to render, but do not block on any selector
await asyncio.sleep(2)
# Collect all listings content by clicking through pagination
all_content = ""
page_num = 1
max_pages = 10 # Safety limit
while page_num <= max_pages:
# Get current page content
current_content = await page.content()
all_content += current_content
# Check for "next page" button (Livewire pagination)
next_btn = await page.query_selector('[wire\\:click*="nextPage"]')
if next_btn and await next_btn.is_visible():
await next_btn.click()
await asyncio.sleep(2) # Wait for Livewire to update
page_num += 1
else:
break
logger.info(f"Collected content from {page_num} page(s)")
content = all_content
# Debug: save HTML to file for inspection
debug_path = DATA_DIR / "debug_page.html"
with open(debug_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"Saved debug HTML to {debug_path}")
# Debug: Log page title and check for listing count
count_match = re.search(r'(\\d+)\\s*Wohnungen? für Sie gefunden', content)
if count_match:
logger.info(f"Page shows {count_match.group(1)} listings available")
# Also check for "Zeige X bis Y von Z Angeboten"
show_match = re.search(r'Zeige \\d+ bis \\d+ von (\\d+) Angeboten', content)
if show_match:
logger.info(f"Page shows {show_match.group(1)} total offers")
# Decode HTML entities and JSON escaped slashes for extraction
content_decoded = html.unescape(content)
content_decoded = content_decoded.replace('\\/', '/')
# Build flatId -> deeplink mapping from wire:snapshot JSON data (monitor.py logic)
# Format in HTML: "deeplink":"https://...","flatId":12345
deeplink_pattern = r'"deeplink":"(https://[^"]+)","flatId":(\d+)'
deeplink_matches = re.findall(deeplink_pattern, content_decoded)
# Use string keys for flatId to match button extraction
id_to_link = {str(flat_id): link for link, flat_id in deeplink_matches}
logger.info(f"Found {len(id_to_link)} deeplink mappings")
# --- Extraction logic copied from monitor.py for robustness ---
# Extract listings from button elements with aria-label
# Format: @click="open !== 12345 ..." aria-label="Wohnungsangebot - 2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Adresse"
button_pattern = r'@click="open !== (\d+)[^\"]*"[^>]*aria-label="Wohnungsangebot - ([^"]+)'
button_matches = re.findall(button_pattern, content_decoded)
logger.info(f"Found {len(button_matches)} listing buttons (monitor.py pattern)")
for flat_id, listing_text in button_matches:
# Parse listing text: "2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Rhinstraße 4, 10315 Lichtenberg"
parts_match = re.match(r'(\d,\d)\s*Zimmer,\s*([\d,.]+)\s*m²,\s*([\d.,]+)\s*€\s*(?:Kaltmiete)?\s*\|\s*(.+)', listing_text)
if not parts_match:
continue
rooms, size, price, address = parts_match.groups()
rooms = rooms.strip()
address = address.strip()
if len(address) < 5:
continue
# Get the deeplink for this flat (monitor.py logic: flat_id as string)
detail_link = id_to_link.get(str(flat_id), url)
listing_id = hashlib.md5(f"{rooms}{size}{price}{address}".encode()).hexdigest()[:12]
listings.append({
"id": listing_id,
"rooms": f"{rooms} Zimmer",
"size": f"{size} m²",
"price": f"{price} €",
"address": address,
"link": detail_link,
"fetched_at": datetime.now().isoformat()
})
# Deduplicate by id
seen_ids = set()
unique_listings = []
for listing in listings:
if listing["id"] not in seen_ids:
seen_ids.add(listing["id"])
unique_listings.append(listing)
listings = unique_listings
if not listings:
logger.warning("No listings parsed")
await page.close()
logger.info(f"Fetched {len(listings)} listings")
return listings
except Exception as e:
logger.error(f"Fetch error: {str(e)[:100]}")
return []
async def dismiss_cookie_modal(self, page):
"""Dismiss the privacy/cookie consent modal if present"""
try:
# Wait a bit for modal to appear
await asyncio.sleep(2)
# Try to find and click the accept button in the privacy modal
# Look for common accept button patterns in German
accept_selectors = [
'button:has-text("Akzeptieren")',
'button:has-text("Alle akzeptieren")',
'button:has-text("Accept")',
'button:has-text("Zustimmen")',
'[x-show="showPrivacyModal"] button',
'.privacy-modal button',
'button.accept-cookies',
# More specific to inberlinwohnen
'div[x-show="showPrivacyModal"] button:first-of-type',
]
for selector in accept_selectors:
try:
button = await page.query_selector(selector)
if button and await button.is_visible():
await button.click()
logger.info(f"Clicked cookie accept button: {selector}")
await asyncio.sleep(1)
return True
except:
continue
# Try clicking any visible button in the modal overlay
modal = await page.query_selector('div[x-show="showPrivacyModal"]')
if modal:
buttons = await modal.query_selector_all('button')
for btn in buttons:
if await btn.is_visible():
text = await btn.inner_text()
logger.info(f"Found modal button: {text}")
# Click the first button (usually accept)
await btn.click()
await asyncio.sleep(1)
return True
logger.info("No cookie modal found or already dismissed")
return False
except Exception as e:
logger.debug(f"Cookie modal handling: {e}")
return False