wohnbot/monitor.py

2273 lines
104 KiB
Python
Raw Permalink Normal View History

import os
import json
import hashlib
import logging
import asyncio
import re
import html
import threading
import time
import csv
2025-12-15 15:39:08 +01:00
from datetime import datetime, timedelta
from pathlib import Path
import requests
import pandas as pd
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
from playwright.async_api import async_playwright
# Configuration from environment
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
INBERLIN_EMAIL = os.environ.get("INBERLIN_EMAIL", "")
INBERLIN_PASSWORD = os.environ.get("INBERLIN_PASSWORD", "")
CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "300")) # seconds (5 minutes)
# WGcompany search configuration
WGCOMPANY_ENABLED = os.environ.get("WGCOMPANY_ENABLED", "true").lower() == "true"
WGCOMPANY_MIN_SIZE = os.environ.get("WGCOMPANY_MIN_SIZE", "") # min room size m²
WGCOMPANY_MAX_SIZE = os.environ.get("WGCOMPANY_MAX_SIZE", "") # max room size m²
WGCOMPANY_MIN_PRICE = os.environ.get("WGCOMPANY_MIN_PRICE", "") # min rent €
WGCOMPANY_MAX_PRICE = os.environ.get("WGCOMPANY_MAX_PRICE", "") # max rent €
WGCOMPANY_BEZIRK = os.environ.get("WGCOMPANY_BEZIRK", "0") # 0=egal, or specific district code
WGCOMPANY_AGE = os.environ.get("WGCOMPANY_AGE", "") # your age (for WG matching)
WGCOMPANY_SMOKER = os.environ.get("WGCOMPANY_SMOKER", "") # NR=Nichtraucher, R=Raucher, empty=egal
# Form data for applications
FORM_ANREDE = os.environ.get("FORM_ANREDE", "")
FORM_VORNAME = os.environ.get("FORM_VORNAME", "")
FORM_NACHNAME = os.environ.get("FORM_NACHNAME", "")
FORM_EMAIL = os.environ.get("FORM_EMAIL", "")
FORM_PHONE = os.environ.get("FORM_PHONE", "")
FORM_STRASSE = os.environ.get("FORM_STRASSE", "")
FORM_HAUSNUMMER = os.environ.get("FORM_HAUSNUMMER", "")
FORM_PLZ = os.environ.get("FORM_PLZ", "")
FORM_ORT = os.environ.get("FORM_ORT", "")
FORM_PERSONS = os.environ.get("FORM_PERSONS", "1")
FORM_CHILDREN = os.environ.get("FORM_CHILDREN", "0")
FORM_INCOME = os.environ.get("FORM_INCOME", "")
DATA_DIR = Path("/data")
LISTINGS_FILE = DATA_DIR / "listings.json"
LOG_FILE = DATA_DIR / "monitor.log"
TIMING_FILE = DATA_DIR / "listing_times.csv"
STATE_FILE = DATA_DIR / "state.json"
APPLICATIONS_FILE = DATA_DIR / "applications.json"
# WGcompany specific files
WGCOMPANY_LISTINGS_FILE = DATA_DIR / "wgcompany_listings.json"
2025-12-15 15:39:08 +01:00
def _cleanup_old_files(png_hours: int = 24, log_days: int = 7):
"""Remove PNG files older than `png_hours` and prune log lines older than `log_days` days.
Runs best-effort and logs exceptions to the logger.
"""
try:
now = datetime.utcnow()
# Remove old PNGs in DATA_DIR
png_cutoff = now - timedelta(hours=png_hours)
removed_pngs = 0
for p in DATA_DIR.glob("*.png"):
try:
mtime = datetime.fromtimestamp(p.stat().st_mtime)
if mtime < png_cutoff:
p.unlink()
removed_pngs += 1
except Exception:
logger.exception(f"Error while checking/removing PNG: {p}")
if removed_pngs:
logger.info(f"Removed {removed_pngs} PNG(s) older than {png_hours} hours")
# Prune logfile lines older than log_days
if LOG_FILE.exists():
cutoff_log = now - timedelta(days=log_days)
kept_lines = []
try:
with open(LOG_FILE, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
# Expect logging lines starting with 'YYYY-MM-DD HH:MM:SS,ms - '
m = re.match(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d+)\s+-\s+", line)
if m:
try:
ts = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S,%f")
if ts >= cutoff_log:
kept_lines.append(line)
except Exception:
# If parsing fails, keep the line
kept_lines.append(line)
else:
# Keep non-standard lines
kept_lines.append(line)
# Atomically replace the logfile with kept lines
if kept_lines:
tmp = LOG_FILE.with_suffix(".tmp")
with open(tmp, "w", encoding="utf-8") as f:
f.writelines(kept_lines)
tmp.replace(LOG_FILE)
else:
# No recent lines; truncate the file
with open(LOG_FILE, "w", encoding="utf-8") as f:
f.truncate(0)
logger.info(f"Pruned logfile, kept {len(kept_lines)} lines from last {log_days} days")
except Exception:
logger.exception("Error while pruning logfile")
except Exception:
logger.exception("Unexpected error in cleanup task")
WGCOMPANY_TIMING_FILE = DATA_DIR / "wgcompany_times.csv"
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class TelegramBot:
"""Handle Telegram commands for controlling the monitor"""
def __init__(self, monitor):
self.monitor = monitor
self.last_update_id = 0
self.running = False
def start(self):
if not TELEGRAM_BOT_TOKEN:
logger.warning("Telegram bot token not configured, commands disabled")
return
self.running = True
thread = threading.Thread(target=self._poll_updates, daemon=True)
thread.start()
logger.info("Telegram command listener started")
def stop(self):
self.running = False
def _poll_updates(self):
while self.running:
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates"
params = {"offset": self.last_update_id + 1, "timeout": 30}
response = requests.get(url, params=params, timeout=35)
if response.ok:
data = response.json()
if data.get("ok") and data.get("result"):
for update in data["result"]:
self.last_update_id = update["update_id"]
self._handle_update(update)
except requests.exceptions.Timeout:
continue
except Exception as e:
logger.error(f"Telegram polling error: {e}")
time.sleep(5)
def _handle_update(self, update):
message = update.get("message", {})
text = message.get("text", "")
chat_id = str(message.get("chat", {}).get("id", ""))
if chat_id != TELEGRAM_CHAT_ID:
logger.debug(f"Ignoring message from unknown chat: {chat_id}")
return
logger.info(f"Received Telegram command: {text}")
if text.startswith("/autopilot"):
self._handle_autopilot_command(text)
elif text == "/status":
self._handle_status_command()
elif text == "/help":
self._handle_help_command()
elif text == "/plot":
self._handle_plot_command()
2025-12-15 17:04:14 +01:00
elif text == "/errorrate":
self._handle_error_rate_command()
elif text.startswith("/"):
self._handle_unknown_command(text)
def _handle_autopilot_command(self, text):
logger.info(f"Processing autopilot command: {text}")
parts = text.split()
if len(parts) < 2:
self._send_message("Usage: /autopilot on|off")
return
action = parts[1].lower()
if action == "on":
logger.info("Enabling autopilot mode")
self.monitor.set_autopilot(True)
self._send_message("🤖 <b>Autopilot ENABLED</b>\n\nI will automatically apply to new listings!")
elif action == "off":
self.monitor.set_autopilot(False)
self._send_message("🛑 <b>Autopilot DISABLED</b>\n\nI will only notify you of new listings.")
else:
self._send_message("Usage: /autopilot on|off")
def _handle_status_command(self):
state = self.monitor.load_state()
autopilot = state.get("autopilot", False)
applications = self.monitor.load_applications()
status = "🤖 <b>Autopilot:</b> " + ("ON ✅" if autopilot else "OFF ❌")
status += f"\n📝 <b>Applications sent:</b> {len(applications)}"
by_company = {}
for app in applications.values():
company = app.get("company", "unknown")
by_company[company] = by_company.get(company, 0) + 1
if by_company:
status += "\n\n<b>By company:</b>"
for company, count in sorted(by_company.items()):
status += f"\n{company}: {count}"
self._send_message(status)
def _handle_help_command(self):
help_text = """🏠 <b>InBerlin Monitor Commands</b>
/autopilot on - Enable automatic applications
/autopilot off - Disable automatic applications
/status - Show current status and stats
/plot - Show weekly listing patterns
/help - Show this help message
When autopilot is ON, I will automatically apply to new listings."""
self._send_message(help_text)
def _handle_unknown_command(self, text):
cmd = text.split()[0] if text else text
2025-12-15 17:04:14 +01:00
def _handle_error_rate_command(self):
"""Generate and send a plot showing success vs failure ratio for autopilot applications."""
logger.info("Generating autopilot errorrate plot...")
try:
plot_path, summary = self._generate_error_rate_plot()
if plot_path:
caption = "📉 <b>Autopilot Success vs Failure</b>\n\n" + summary
self._send_photo(plot_path, caption)
else:
self._send_message("📉 Not enough application data to generate errorrate plot.")
except Exception as e:
logger.error(f"Error generating errorrate plot: {e}")
import traceback
logger.error(traceback.format_exc())
self._send_message(f"❌ Error generating errorrate plot: {str(e)}")
def _generate_error_rate_plot(self):
"""Read applications.json and produce a plot image + summary text.
Returns (plot_path, summary_text) or (None, "") if insufficient data.
"""
if not APPLICATIONS_FILE.exists():
logger.warning("No applications.json found for errorrate plot")
2025-12-16 13:51:25 +01:00
return "", "" # Return empty strings
2025-12-15 17:04:14 +01:00
try:
with open(APPLICATIONS_FILE, 'r', encoding='utf-8') as f:
apps = json.load(f)
if not apps:
2025-12-16 13:51:25 +01:00
return "", ""
2025-12-15 17:04:14 +01:00
# Convert to DataFrame
rows = []
for _id, rec in apps.items():
ts = rec.get('timestamp')
try:
dt = pd.to_datetime(ts)
except Exception:
dt = pd.NaT
rows.append({'id': _id, 'company': rec.get('company'), 'success': bool(rec.get('success')), 'ts': dt})
df = pd.DataFrame(rows)
df = df.dropna(subset=['ts'])
if df.empty:
2025-12-16 13:51:25 +01:00
return "", ""
2025-12-15 17:04:14 +01:00
df['date'] = df['ts'].dt.floor('D')
grouped = df.groupby('date').agg(total=('id','count'), successes=('success', lambda x: x.sum()))
grouped['failures'] = grouped['total'] - grouped['successes']
grouped['error_rate'] = grouped['failures'] / grouped['total']
2025-12-15 17:25:51 +01:00
# Ensure index is sorted by date for plotting
grouped = grouped.sort_index()
# Prepare plot: convert dates to matplotlib numeric x-values so bars and line align
import matplotlib.dates as mdates
2025-12-16 13:51:25 +01:00
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12), sharex=True)
2025-12-15 17:25:51 +01:00
dates = pd.to_datetime(grouped.index).to_pydatetime()
x = mdates.date2num(dates)
width = 0.6 # width in days for bars
successes = grouped['successes'].values
failures = grouped['failures'].values
ax1.bar(x, successes, width=width, color='#2E8B57', align='center')
ax1.bar(x, failures, bottom=successes, width=width, color='#C44A4A', align='center')
2025-12-15 17:04:14 +01:00
ax1.set_ylabel('Count')
ax1.set_title('Autopilot: Successes vs Failures (by day)')
2025-12-15 17:25:51 +01:00
ax1.set_xticks(x)
ax1.set_xlim(min(x) - 1, max(x) + 1)
ax1.xaxis.set_major_locator(mdates.AutoDateLocator())
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
# Plot error rate line on same x (date) axis
ax2.plot(x, grouped['error_rate'].values, marker='o', color='#3333AA', linewidth=2)
ax2.set_ylim(-0.02, 1.02)
2025-12-15 17:04:14 +01:00
ax2.set_ylabel('Error rate')
ax2.set_xlabel('Date')
ax2.set_title('Daily Error Rate (failures / total)')
2025-12-15 17:25:51 +01:00
ax2.grid(True, alpha=0.3)
ax2.set_xticks(x)
ax2.set_xlim(min(x) - 1, max(x) + 1)
ax2.xaxis.set_major_locator(mdates.AutoDateLocator())
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
2025-12-16 13:51:25 +01:00
# New: Error rate by company (line plot)
company_grouped = df.groupby(['date', 'company']).agg(total=('id','count'), successes=('success', lambda x: x.sum()))
company_grouped['failures'] = company_grouped['total'] - company_grouped['successes']
company_grouped['error_rate'] = company_grouped['failures'] / company_grouped['total']
company_grouped = company_grouped.reset_index()
error_rate_pivot = company_grouped.pivot(index='date', columns='company', values='error_rate')
for company in error_rate_pivot.columns:
y = error_rate_pivot[company].values
ax3.plot(x, y, marker='o', label=str(company))
ax3.set_ylim(-0.02, 1.02)
ax3.set_ylabel('Error rate')
ax3.set_xlabel('Date')
ax3.set_title('Daily Error Rate by Company')
ax3.grid(True, alpha=0.3)
ax3.set_xticks(x)
ax3.set_xlim(min(x) - 1, max(x) + 1)
ax3.xaxis.set_major_locator(mdates.AutoDateLocator())
ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax3.legend(title='Company', loc='upper right', fontsize='small')
2025-12-15 17:25:51 +01:00
fig.autofmt_xdate()
2025-12-15 17:04:14 +01:00
plt.tight_layout()
plot_path = DATA_DIR / 'error_rate.png'
2025-12-15 17:25:51 +01:00
tmp_path = DATA_DIR / 'error_rate.tmp.png'
# Save to a temp file first and atomically replace to ensure overwrite
fig.savefig(tmp_path, format='png')
2025-12-15 17:04:14 +01:00
plt.close(fig)
2025-12-15 17:25:51 +01:00
try:
tmp_path.replace(plot_path)
except Exception:
# Fallback: try removing existing and renaming
try:
if plot_path.exists():
plot_path.unlink()
tmp_path.rename(plot_path)
except Exception:
logger.exception(f"Failed to write plot to {plot_path}")
2025-12-15 17:04:14 +01:00
# Summary
total_attempts = int(grouped['total'].sum())
total_success = int(grouped['successes'].sum())
total_fail = int(grouped['failures'].sum())
overall_error = (total_fail / total_attempts) if total_attempts>0 else 0.0
summary = f"<b>Total attempts:</b> {total_attempts}\n<b>Successes:</b> {total_success}\n<b>Failures:</b> {total_fail}\n<b>Overall error rate:</b> {overall_error:.1%}"
return str(plot_path), summary
except Exception as e:
logger.exception(f"Failed to generate error rate plot: {e}")
2025-12-16 13:51:25 +01:00
return "", ""
self._send_message(f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands.")
def _handle_plot_command(self):
"""Generate and send a plot of listing times"""
logger.info("Generating listing times plot...")
try:
plot_path = self._generate_weekly_plot()
if plot_path:
self._send_photo(plot_path, "📊 <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
else:
self._send_message("📊 Not enough data to generate plot yet. Keep monitoring!")
except Exception as e:
logger.error(f"Error generating plot: {e}")
import traceback
logger.error(traceback.format_exc())
self._send_message(f"❌ Error generating plot: {str(e)}")
def _generate_weekly_plot(self) -> str:
"""Generate a heatmap of listings by day of week and hour"""
if not TIMING_FILE.exists():
logger.warning("No timing data file found")
2025-12-16 13:51:25 +01:00
return ""
try:
df = pd.read_csv(TIMING_FILE)
if len(df) < 1:
logger.warning("Timing file is empty")
2025-12-16 13:51:25 +01:00
return ""
logger.info(f"Loaded {len(df)} listing records for plot")
# Create day-hour matrix
days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
# Count listings per day and hour
heatmap_data = pd.DataFrame(0, index=days_order, columns=range(24))
for _, row in df.iterrows():
day = row['weekday']
hour = int(row['hour'])
if day in days_order:
2025-12-16 13:51:25 +01:00
# Fix: Ensure the value is numeric before incrementing
if pd.api.types.is_numeric_dtype(heatmap_data.loc[day, hour]):
heatmap_data.loc[day, hour] += 1
else:
heatmap_data.loc[day, hour] = 1 # Initialize if not numeric
# Create figure with two subplots
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Listing Appearance Patterns', fontsize=16, fontweight='bold')
# 1. Heatmap - Day vs Hour
ax1 = axes[0, 0]
im = ax1.imshow(heatmap_data.values, cmap='YlOrRd', aspect='auto')
ax1.set_xticks(range(24))
ax1.set_xticklabels(range(24), fontsize=8)
ax1.set_yticks(range(7))
ax1.set_yticklabels(days_order)
ax1.set_xlabel('Hour of Day')
ax1.set_ylabel('Day of Week')
ax1.set_title('Listings by Day & Hour')
plt.colorbar(im, ax=ax1, label='Count')
# 2. Bar chart - By day of week
ax2 = axes[0, 1]
day_counts = df['weekday'].value_counts().reindex(days_order, fill_value=0)
2025-12-16 13:51:25 +01:00
colors = plt.cm.get_cmap('Blues')(day_counts / day_counts.max() if day_counts.max() > 0 else day_counts)
bars = ax2.bar(range(7), day_counts.values, color=colors)
ax2.set_xticks(range(7))
ax2.set_xticklabels(['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'])
ax2.set_xlabel('Day of Week')
ax2.set_ylabel('Number of Listings')
ax2.set_title('Total Listings by Day')
for i, v in enumerate(day_counts.values):
if v > 0:
ax2.text(i, v + 0.1, str(v), ha='center', fontsize=9)
# 3. Line chart - By hour
ax3 = axes[1, 0]
hour_counts = df['hour'].value_counts().reindex(range(24), fill_value=0)
ax3.plot(range(24), hour_counts.values, marker='o', linewidth=2, markersize=4, color='#2E86AB')
ax3.fill_between(range(24), hour_counts.values, alpha=0.3, color='#2E86AB')
ax3.set_xticks(range(0, 24, 2))
ax3.set_xlabel('Hour of Day')
ax3.set_ylabel('Number of Listings')
ax3.set_title('Total Listings by Hour')
ax3.grid(True, alpha=0.3)
# 4. Summary stats
ax4 = axes[1, 1]
ax4.axis('off')
# Calculate best times
best_day = day_counts.idxmax() if day_counts.max() > 0 else "N/A"
best_hour = hour_counts.idxmax() if hour_counts.max() > 0 else "N/A"
total_listings = len(df)
# Find peak combinations
peak_combo = heatmap_data.stack().idxmax() if heatmap_data.values.max() > 0 else ("N/A", "N/A")
2025-12-16 13:51:25 +01:00
# Fix: Ensure peak_combo is iterable
if isinstance(peak_combo, tuple) and len(peak_combo) == 2:
stats_text = f"🎯 Peak time: {peak_combo[0]} at {peak_combo[1]}:00"
else:
stats_text = "🎯 Peak time: N/A"
stats_text = f"""📊 Summary Statistics
Total listings tracked: {total_listings}
🏆 Best day: {best_day}
Best hour: {best_hour}:00
2025-12-16 13:51:25 +01:00
{stats_text}
📈 Average per day: {total_listings/7:.1f}
📅 Data collection period:
From: {df['timestamp'].min()[:10] if 'timestamp' in df.columns else 'N/A'}
To: {df['timestamp'].max()[:10] if 'timestamp' in df.columns else 'N/A'}
"""
ax4.text(0.1, 0.9, stats_text, transform=ax4.transAxes, fontsize=11,
verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout()
# Save plot
plot_path = DATA_DIR / "weekly_plot.png"
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
plt.close()
logger.info(f"Plot saved to {plot_path}")
return str(plot_path)
except Exception as e:
logger.error(f"Error creating plot: {e}")
import traceback
logger.error(traceback.format_exc())
2025-12-16 13:51:25 +01:00
return ""
self._send_message(f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands.")
def _send_message(self, text):
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {"chat_id": TELEGRAM_CHAT_ID, "text": text, "parse_mode": "HTML", "disable_web_page_preview": True}
requests.post(url, data=data)
except Exception as e:
logger.error(f"Failed to send Telegram message: {e}")
def _send_photo(self, photo_path: str, caption: str = ""):
"""Send a photo via Telegram"""
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
with open(photo_path, 'rb') as photo:
files = {'photo': photo}
data = {"chat_id": TELEGRAM_CHAT_ID, "caption": caption, "parse_mode": "HTML"}
response = requests.post(url, data=data, files=files)
if response.ok:
logger.info(f"Photo sent successfully: {photo_path}")
else:
logger.error(f"Failed to send photo: {response.text}")
except Exception as e:
logger.error(f"Failed to send Telegram photo: {e}")
class ApplicationHandler:
"""Handle automatic applications to different housing companies"""
def __init__(self, browser_context):
self.context = browser_context
async def apply(self, listing: dict) -> dict:
link = listing.get("link", "")
company = self._detect_company(link)
logger.info(f"Starting application process for {company}: {listing['address']}")
logger.info(f"Listing details - ID: {listing['id']}, Rooms: {listing['rooms']}, Price: {listing['price']}")
logger.info(f"Detail link: {link}")
result = {"listing_id": listing["id"], "company": company, "link": link,
"timestamp": datetime.now().isoformat(), "success": False, "message": "",
"address": listing.get("address", ""), "rooms": listing.get("rooms", ""), "price": listing.get("price", "")}
try:
if company == "howoge":
result = await self._apply_howoge(listing, result)
elif company == "gewobag":
result = await self._apply_gewobag(listing, result)
elif company == "degewo":
result = await self._apply_degewo(listing, result)
elif company == "gesobau":
result = await self._apply_gesobau(listing, result)
elif company == "stadtundland":
result = await self._apply_stadtundland(listing, result)
elif company == "wbm":
result = await self._apply_wbm(listing, result)
else:
result["message"] = f"Unknown company: {company}"
logger.warning(f"No application handler for company: {company}")
except Exception as e:
result["message"] = str(e)
logger.error(f"Application error for {company}: {e}")
import traceback
logger.error(traceback.format_exc())
# Log final result
status = "SUCCESS" if result["success"] else "FAILED"
logger.info(f"Application {status} for {listing['address']} ({company}): {result['message']}")
return result
def _detect_company(self, link: str) -> str:
if "howoge.de" in link: return "howoge"
elif "gewobag.de" in link: return "gewobag"
elif "degewo.de" in link: return "degewo"
elif "gesobau.de" in link: return "gesobau"
elif "stadtundland.de" in link: return "stadtundland"
elif "wbm.de" in link: return "wbm"
return "unknown"
async def _apply_howoge(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[HOWOGE] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[HOWOGE] Page loaded")
await asyncio.sleep(2)
# Handle cookies
try:
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[HOWOGE] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
# Try to handle consent manager (consentmanager.net)
try:
consent_selectors = [
'#cmpbntyestxt', '.cmpboxbtnyes', 'a.cmpboxbtn.cmpboxbtnyes',
'#cmpwelcomebtnyes', '.cmptxt_btn_yes'
]
for sel in consent_selectors:
consent_btn = await page.query_selector(sel)
if consent_btn and await consent_btn.is_visible():
await consent_btn.click()
logger.info("[HOWOGE] Dismissed consent manager")
await asyncio.sleep(1)
break
except: pass
# Look for "Besichtigung vereinbaren" button
# HOWOGE has multiple buttons with same text - only one is visible
logger.info("[HOWOGE] Looking for 'Besichtigung vereinbaren' button...")
# Use href selector - more reliable than text matching
selectors = [
'a[href*="besichtigung-vereinbaren"]',
'a:has-text("Besichtigung vereinbaren")',
'button:has-text("Besichtigung vereinbaren")',
'a:has-text("Anfragen")',
'button:has-text("Anfragen")'
]
apply_btn = None
for sel in selectors:
all_btns = await page.query_selector_all(sel)
logger.info(f"[HOWOGE] Selector '{sel}' found {len(all_btns)} matches")
# Find first visible button
for btn in all_btns:
try:
if await btn.is_visible():
apply_btn = btn
logger.info(f"[HOWOGE] Found visible button with selector '{sel}'")
break
except:
pass
if apply_btn:
break
if apply_btn:
# Scroll the button into view and click
logger.info("[HOWOGE] Found application button, scrolling into view...")
await apply_btn.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
logger.info("[HOWOGE] Clicking button...")
await apply_btn.click()
await asyncio.sleep(3)
await page.wait_for_load_state("networkidle")
logger.info("[HOWOGE] Clicked button, starting multi-step form process...")
# HOWOGE has a multi-step form (typically 3-4 steps):
# Each step has a checkbox that must be clicked, then "Weiter" button
# Final step has the actual contact form
max_steps = 6 # safety limit
for step in range(1, max_steps + 1):
logger.info(f"[HOWOGE] Processing step {step}")
# Scroll down to reveal checkboxes
await page.evaluate("window.scrollBy(0, 300)")
await asyncio.sleep(0.5)
# Check if we've reached the form (email field is visible)
email_field = await page.query_selector('input[name*="email" i]')
if email_field and await email_field.is_visible():
logger.info("[HOWOGE] Email field is visible - form is ready!")
break
# Find and click any visible unchecked checkboxes
checkboxes = await page.query_selector_all('input[type="checkbox"]')
clicked_checkbox = False
for checkbox in checkboxes:
try:
if await checkbox.is_visible() and not await checkbox.is_checked():
# Use JavaScript click to avoid viewport issues
await checkbox.evaluate("el => el.click()")
clicked_checkbox = True
logger.info(f"[HOWOGE] Clicked checkbox in step {step}")
await asyncio.sleep(0.5)
except Exception as e:
logger.debug(f"[HOWOGE] Checkbox click failed: {e}")
if clicked_checkbox:
await asyncio.sleep(1) # Wait for page to update after checkbox
# Screenshot this step
screenshot_path = DATA_DIR / f"howoge_step{step}_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
# Look for visible "Weiter" button and click it
weiter_btns = await page.query_selector_all('button:has-text("Weiter")')
weiter_clicked = False
for btn in weiter_btns:
try:
if await btn.is_visible():
await btn.click()
weiter_clicked = True
logger.info(f"[HOWOGE] Clicked 'Weiter' button in step {step}")
await asyncio.sleep(2)
await page.wait_for_load_state("networkidle")
break
except Exception as e:
logger.debug(f"[HOWOGE] Weiter click failed: {e}")
if not weiter_clicked and not clicked_checkbox:
logger.warning(f"[HOWOGE] No action possible in step {step}, breaking")
break
# Now try to fill the form
logger.info("[HOWOGE] Attempting to fill form fields...")
# Look for name fields - HOWOGE uses firstName/lastName
vorname_field = await page.query_selector('input[name*="firstName" i], input[name*="vorname" i]')
nachname_field = await page.query_selector('input[name*="lastName" i], input[name*="nachname" i]')
email_field = await page.query_selector('input[type="email"], input[name*="email" i]')
form_filled = False
if vorname_field and await vorname_field.is_visible():
await vorname_field.fill(FORM_VORNAME)
logger.info(f"[HOWOGE] Filled Vorname: {FORM_VORNAME}")
form_filled = True
else:
logger.warning("[HOWOGE] Vorname field not found or not visible")
if nachname_field and await nachname_field.is_visible():
await nachname_field.fill(FORM_NACHNAME)
logger.info(f"[HOWOGE] Filled Nachname: {FORM_NACHNAME}")
form_filled = True
else:
logger.warning("[HOWOGE] Nachname field not found or not visible")
if email_field and await email_field.is_visible():
await email_field.fill(FORM_EMAIL)
logger.info(f"[HOWOGE] Filled Email: {FORM_EMAIL}")
form_filled = True
else:
logger.warning("[HOWOGE] Email field not found or not visible")
# Also look for phone field
phone_field = await page.query_selector('input[type="tel"], input[name*="telefon" i], input[name*="phone" i]')
if phone_field and await phone_field.is_visible():
await phone_field.fill(FORM_PHONE)
logger.info(f"[HOWOGE] Filled Phone: {FORM_PHONE}")
# Screenshot after filling form
screenshot_path2 = DATA_DIR / f"howoge_filled_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path2), full_page=True)
logger.info(f"[HOWOGE] Saved filled form screenshot to {screenshot_path2}")
if form_filled:
# Look for submit button - HOWOGE uses "Anfrage senden"
# Try specific selectors first, then fall back
submit_btn = None
for selector in ['button:has-text("Anfrage senden")', 'button:has-text("Absenden")', 'button:has-text("Senden")']:
btn = await page.query_selector(selector)
if btn and await btn.is_visible():
submit_btn = btn
logger.info(f"[HOWOGE] Found submit button with selector: {selector}")
break
if submit_btn:
logger.info("[HOWOGE] Found submit button, clicking...")
await submit_btn.click()
await asyncio.sleep(3)
await page.wait_for_load_state("networkidle")
# Screenshot after submit
screenshot_path3 = DATA_DIR / f"howoge_submitted_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path3))
logger.info(f"[HOWOGE] Saved post-submit screenshot to {screenshot_path3}")
content = await page.content()
if "erfolgreich" in content.lower() or "gesendet" in content.lower() or "danke" in content.lower() or "bestätigung" in content.lower():
result["success"] = True
result["message"] = "Application submitted successfully"
logger.info("[HOWOGE] Success! Confirmation message detected")
else:
result["success"] = False
result["message"] = "Form submitted but no confirmation detected"
logger.warning("[HOWOGE] Form submitted but no clear confirmation")
else:
result["success"] = False
result["message"] = "Form filled but no submit button found"
logger.warning("[HOWOGE] Could not find submit button")
else:
result["success"] = False
result["message"] = "Could not find form fields to fill after navigating steps"
logger.warning("[HOWOGE] No form fields found after multi-step navigation")
else:
result["message"] = "No application button found"
logger.warning("[HOWOGE] Could not find 'Besichtigung vereinbaren' button")
# Save screenshot for debugging
screenshot_path = DATA_DIR / f"howoge_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
# Log all buttons on page for debugging
buttons = await page.query_selector_all('button, a.btn, a[class*="button"]')
for btn in buttons[:10]:
try:
text = await btn.inner_text()
logger.info(f"[HOWOGE] Found button: {text[:50]}")
except:
pass
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[HOWOGE] Exception: {str(e)}")
import traceback
logger.error(traceback.format_exc())
finally:
await page.close()
return result
async def _apply_gewobag(self, listing: dict, result: dict) -> dict:
"""
Gewobag uses Wohnungshelden (app.wohnungshelden.de) for their application system.
The application form is embedded in an iframe on the listing page.
We navigate directly to the iframe URL to fill the form.
"""
page = await self.context.new_page()
try:
logger.info(f"[GEWOBAG] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[GEWOBAG] Page loaded")
await asyncio.sleep(2)
try:
cookie_btn = await page.query_selector('#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll, button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[GEWOBAG] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
# Gewobag has Wohnungshelden iframe directly on the page
logger.info("[GEWOBAG] Looking for Wohnungshelden iframe...")
iframe_element = await page.query_selector('iframe[src*="wohnungshelden.de"]')
if iframe_element:
iframe_url = await iframe_element.get_attribute('src')
logger.info(f"[GEWOBAG] Found Wohnungshelden iframe: {iframe_url}")
# Navigate to the iframe URL directly in a new page
iframe_page = await self.context.new_page()
try:
await iframe_page.goto(iframe_url, wait_until="networkidle")
await asyncio.sleep(2)
logger.info("[GEWOBAG] Loaded Wohnungshelden application page")
# Take screenshot
screenshot_path = DATA_DIR / f"gewobag_wohnungshelden_{listing['id']}.png"
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[GEWOBAG] Saved Wohnungshelden screenshot")
# Fill out Wohnungshelden form (same fields as Degewo)
form_filled = False
# Anrede (Salutation) - ng-select dropdown
try:
salutation_dropdown = await iframe_page.query_selector('#salutation-dropdown, ng-select[id*="salutation"]')
if salutation_dropdown:
await salutation_dropdown.click()
await asyncio.sleep(0.5)
anrede_option = await iframe_page.query_selector(f'.ng-option:has-text("{FORM_ANREDE}")')
if anrede_option:
await anrede_option.click()
logger.info(f"[GEWOBAG] Selected Anrede: {FORM_ANREDE}")
form_filled = True
except Exception as e:
logger.warning(f"[GEWOBAG] Could not set Anrede: {e}")
# Vorname (First name)
try:
vorname_field = await iframe_page.query_selector('#firstName')
if vorname_field:
await vorname_field.fill(FORM_VORNAME)
logger.info(f"[GEWOBAG] Filled Vorname: {FORM_VORNAME}")
form_filled = True
except Exception as e:
logger.warning(f"[GEWOBAG] Could not fill Vorname: {e}")
# Nachname (Last name)
try:
nachname_field = await iframe_page.query_selector('#lastName')
if nachname_field:
await nachname_field.fill(FORM_NACHNAME)
logger.info(f"[GEWOBAG] Filled Nachname: {FORM_NACHNAME}")
form_filled = True
except Exception as e:
logger.warning(f"[GEWOBAG] Could not fill Nachname: {e}")
# E-Mail
try:
email_field = await iframe_page.query_selector('#email')
if email_field:
await email_field.fill(FORM_EMAIL)
logger.info(f"[GEWOBAG] Filled E-Mail: {FORM_EMAIL}")
form_filled = True
except Exception as e:
logger.warning(f"[GEWOBAG] Could not fill E-Mail: {e}")
# Telefonnummer - Gewobag uses #phone-number
try:
tel_field = await iframe_page.query_selector('#phone-number, input[id*="telefonnummer"], input[id*="phone"]')
if tel_field:
await tel_field.fill(FORM_PHONE)
logger.info(f"[GEWOBAG] Filled Telefon: {FORM_PHONE}")
form_filled = True
except Exception as e:
logger.warning(f"[GEWOBAG] Could not fill Telefon: {e}")
# Anzahl einziehende Personen - Gewobag uses formly_*_gesamtzahl
try:
personen_field = await iframe_page.query_selector('input[id*="gesamtzahl"], input[id*="numberPersonsTotal"]')
if personen_field:
await personen_field.fill(FORM_PERSONS)
logger.info(f"[GEWOBAG] Filled Anzahl Personen: {FORM_PERSONS}")
form_filled = True
except Exception as e:
logger.warning(f"[GEWOBAG] Could not fill Anzahl Personen: {e}")
await asyncio.sleep(1)
# Screenshot after filling
screenshot_path = DATA_DIR / f"gewobag_filled_{listing['id']}.png"
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[GEWOBAG] Saved filled form screenshot")
# Try to submit
if form_filled:
try:
submit_selectors = [
'button[type="submit"]',
'button:has-text("Absenden")',
'button:has-text("Senden")',
'button:has-text("Anfrage")',
'.btn-primary',
]
submit_btn = None
for selector in submit_selectors:
submit_btn = await iframe_page.query_selector(selector)
if submit_btn and await submit_btn.is_visible():
logger.info(f"[GEWOBAG] Found submit button: {selector}")
break
submit_btn = None
if submit_btn:
await submit_btn.click()
logger.info("[GEWOBAG] Clicked submit button")
await asyncio.sleep(3)
# Screenshot after submission
screenshot_path = DATA_DIR / f"gewobag_submitted_{listing['id']}.png"
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[GEWOBAG] Saved submission screenshot")
result["success"] = True
result["message"] = "Application submitted via Wohnungshelden"
else:
result["success"] = False
result["message"] = "Form filled but submit button not found"
logger.warning("[GEWOBAG] Submit button not found")
except Exception as e:
result["success"] = False
result["message"] = f"Submit error: {str(e)}"
logger.warning(f"[GEWOBAG] Submit error: {e}")
else:
result["success"] = False
result["message"] = "No form fields found in Wohnungshelden"
logger.warning("[GEWOBAG] Could not find form fields")
finally:
await iframe_page.close()
else:
result["success"] = False
result["message"] = "No Wohnungshelden iframe found"
logger.warning("[GEWOBAG] No Wohnungshelden iframe found")
screenshot_path = DATA_DIR / f"gewobag_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
except Exception as e:
result["success"] = False
result["message"] = f"Error: {str(e)}"
logger.error(f"[GEWOBAG] Exception: {str(e)}")
finally:
await page.close()
return result
async def _apply_degewo(self, listing: dict, result: dict) -> dict:
"""
Degewo uses Wohnungshelden (app.wohnungshelden.de) for their application system.
The application form is loaded in an iframe from a different domain.
We need to navigate directly to the iframe URL or interact with the iframe.
"""
page = await self.context.new_page()
try:
logger.info(f"[DEGEWO] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[DEGEWO] Page loaded")
await asyncio.sleep(2)
# Dismiss cookie banner
try:
cookie_btn = await page.query_selector('button:has-text("Alle akzeptieren"), #CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[DEGEWO] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
logger.info("[DEGEWO] Looking for kontaktieren button...")
apply_btn = await page.query_selector('a:has-text("kontaktieren"), button:has-text("kontaktieren"), a:has-text("Kontaktieren"), button:has-text("Kontaktieren")')
if apply_btn and await apply_btn.is_visible():
logger.info("[DEGEWO] Found kontaktieren button, clicking...")
await apply_btn.click()
await asyncio.sleep(3)
# Degewo uses Wohnungshelden iframe for the application form
# Find the iframe and get its URL to navigate directly
iframe_element = await page.query_selector('iframe[src*="wohnungshelden.de"]')
if iframe_element:
iframe_url = await iframe_element.get_attribute('src')
logger.info(f"[DEGEWO] Found Wohnungshelden iframe: {iframe_url}")
# Navigate to the iframe URL directly in a new page for full access
iframe_page = await self.context.new_page()
try:
await iframe_page.goto(iframe_url, wait_until="networkidle")
await asyncio.sleep(2)
logger.info("[DEGEWO] Loaded Wohnungshelden application page")
# Take screenshot of the Wohnungshelden form
screenshot_path = DATA_DIR / f"degewo_wohnungshelden_{listing['id']}.png"
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[DEGEWO] Saved Wohnungshelden screenshot to {screenshot_path}")
# Save HTML for debugging
html_content = await iframe_page.content()
html_path = DATA_DIR / f"degewo_wohnungshelden_{listing['id']}.html"
with open(html_path, 'w', encoding='utf-8') as f:
f.write(html_content)
logger.info(f"[DEGEWO] Saved HTML to {html_path}")
# Fill out Wohnungshelden form
# The form uses specific IDs: #firstName, #lastName, #email, etc.
form_filled = False
# Anrede (Salutation) - ng-select dropdown
try:
# Click on the salutation dropdown to open it
salutation_dropdown = await iframe_page.query_selector('#salutation-dropdown, ng-select[id*="salutation"]')
if salutation_dropdown:
await salutation_dropdown.click()
await asyncio.sleep(0.5)
# Select "Herr" or "Frau" based on FORM_ANREDE
anrede_option = await iframe_page.query_selector(f'.ng-option:has-text("{FORM_ANREDE}")')
if anrede_option:
await anrede_option.click()
logger.info(f"[DEGEWO] Selected Anrede: {FORM_ANREDE}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not set Anrede: {e}")
# Vorname (First name)
try:
vorname_field = await iframe_page.query_selector('#firstName')
if vorname_field:
await vorname_field.fill(FORM_VORNAME)
logger.info(f"[DEGEWO] Filled Vorname: {FORM_VORNAME}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not fill Vorname: {e}")
# Nachname (Last name)
try:
nachname_field = await iframe_page.query_selector('#lastName')
if nachname_field:
await nachname_field.fill(FORM_NACHNAME)
logger.info(f"[DEGEWO] Filled Nachname: {FORM_NACHNAME}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not fill Nachname: {e}")
# E-Mail
try:
email_field = await iframe_page.query_selector('#email')
if email_field:
await email_field.fill(FORM_EMAIL)
logger.info(f"[DEGEWO] Filled E-Mail: {FORM_EMAIL}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not fill E-Mail: {e}")
# Telefonnummer
try:
tel_field = await iframe_page.query_selector('input[id*="telefonnummer"]')
if tel_field:
await tel_field.fill(FORM_PHONE)
logger.info(f"[DEGEWO] Filled Telefon: {FORM_PHONE}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not fill Telefon: {e}")
# Anzahl einziehende Personen
try:
personen_field = await iframe_page.query_selector('input[id*="numberPersonsTotal"]')
if personen_field:
await personen_field.fill(FORM_PERSONS)
logger.info(f"[DEGEWO] Filled Anzahl Personen: {FORM_PERSONS}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not fill Anzahl Personen: {e}")
# "Für sich selbst" dropdown
try:
selbst_dropdown = await iframe_page.query_selector('ng-select[id*="fuer_wen"]')
if selbst_dropdown:
await selbst_dropdown.click()
await asyncio.sleep(0.5)
# Select "Für mich selbst"
selbst_option = await iframe_page.query_selector('.ng-option:has-text("Für mich selbst"), .ng-option:has-text("selbst")')
if selbst_option:
await selbst_option.click()
logger.info("[DEGEWO] Selected: Für mich selbst")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not set 'Für sich selbst': {e}")
await asyncio.sleep(1)
# Take screenshot after filling form
screenshot_path = DATA_DIR / f"degewo_form_filled_{listing['id']}.png"
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[DEGEWO] Saved filled form screenshot to {screenshot_path}")
# Try to submit
try:
# Look for submit button with various patterns
submit_selectors = [
'button[type="submit"]',
'input[type="submit"]',
'button:has-text("Absenden")',
'button:has-text("Senden")',
'button:has-text("Anfrage")',
'button:has-text("Bewerben")',
'button:has-text("Submit")',
'.btn-primary',
'.submit-btn',
]
submit_btn = None
for selector in submit_selectors:
submit_btn = await iframe_page.query_selector(selector)
if submit_btn and await submit_btn.is_visible():
logger.info(f"[DEGEWO] Found submit button with selector: {selector}")
break
submit_btn = None
if submit_btn:
await submit_btn.click()
logger.info("[DEGEWO] Clicked submit button")
await asyncio.sleep(3)
# Take screenshot after submission
screenshot_path = DATA_DIR / f"degewo_submitted_{listing['id']}.png"
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[DEGEWO] Saved submission screenshot to {screenshot_path}")
result["success"] = True
result["message"] = "Application submitted via Wohnungshelden"
else:
# Submit button not found - this is a failure
result["success"] = False
result["message"] = "Wohnungshelden form loaded but submit button not found"
logger.warning("[DEGEWO] Submit button not found in Wohnungshelden form")
except Exception as e:
result["success"] = False
result["message"] = f"Wohnungshelden submit error: {str(e)}"
logger.warning(f"[DEGEWO] Submit error: {e}")
finally:
await iframe_page.close()
else:
# No iframe found - try the old approach (fallback for different page structure)
logger.warning("[DEGEWO] Wohnungshelden iframe not found, trying direct form...")
# Take screenshot for debugging
screenshot_path = DATA_DIR / f"degewo_noiframe_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
# Save HTML for debugging
html_content = await page.content()
html_path = DATA_DIR / "degewo_debug.html"
with open(html_path, 'w', encoding='utf-8') as f:
f.write(html_content)
result["success"] = False
result["message"] = "Wohnungshelden iframe not found on page"
else:
result["message"] = "No kontaktieren button found"
logger.warning("[DEGEWO] Could not find kontaktieren button")
screenshot_path = DATA_DIR / f"degewo_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[DEGEWO] Exception: {str(e)}")
import traceback
logger.error(traceback.format_exc())
finally:
await page.close()
return result
async def _apply_gesobau(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[GESOBAU] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[GESOBAU] Page loaded")
await asyncio.sleep(2)
try:
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[GESOBAU] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
logger.info("[GESOBAU] Looking for application button...")
apply_btn = await page.query_selector('a:has-text("Anfragen"), button:has-text("Interesse"), a:has-text("Kontakt")')
if apply_btn and await apply_btn.is_visible():
logger.info("[GESOBAU] Found application button, clicking...")
await apply_btn.click()
await asyncio.sleep(2)
screenshot_path = DATA_DIR / f"gesobau_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
logger.info(f"[GESOBAU] Saved screenshot to {screenshot_path}")
result["success"] = False
result["message"] = "Application page opened but not submitted (not implemented)"
else:
result["message"] = "No application button found"
logger.warning("[GESOBAU] Could not find application button")
screenshot_path = DATA_DIR / f"gesobau_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[GESOBAU] Exception: {str(e)}")
finally:
await page.close()
return result
async def _apply_stadtundland(self, listing: dict, result: dict) -> dict:
"""
Stadt und Land has an embedded contact form directly on their listing page.
No iframe - the form fields are directly accessible.
Fields: name, surname, street, houseNo, postalCode, city, phone, email
Checkboxes: privacy, provision
Submit: "Eingaben prüfen"
"""
page = await self.context.new_page()
try:
logger.info(f"[STADTUNDLAND] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[STADTUNDLAND] Page loaded")
await asyncio.sleep(2)
# Dismiss cookie banner
try:
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[STADTUNDLAND] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
# Scroll down to the contact form
await page.evaluate("window.scrollBy(0, 500)")
await asyncio.sleep(0.5)
# Take initial screenshot
screenshot_path = DATA_DIR / f"stadtundland_page_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[STADTUNDLAND] Saved page screenshot to {screenshot_path}")
# Fill out the embedded form directly
form_filled = False
# Vorname (name field)
try:
vorname_field = await page.query_selector('input[name="name"]')
if vorname_field and await vorname_field.is_visible():
await vorname_field.fill(FORM_VORNAME)
logger.info(f"[STADTUNDLAND] Filled Vorname: {FORM_VORNAME}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Vorname: {e}")
# Nachname (surname field)
try:
nachname_field = await page.query_selector('input[name="surname"]')
if nachname_field and await nachname_field.is_visible():
await nachname_field.fill(FORM_NACHNAME)
logger.info(f"[STADTUNDLAND] Filled Nachname: {FORM_NACHNAME}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Nachname: {e}")
# Straße (street field)
try:
street_field = await page.query_selector('input[name="street"]')
if street_field and await street_field.is_visible():
await street_field.fill(FORM_STRASSE)
logger.info(f"[STADTUNDLAND] Filled Straße: {FORM_STRASSE}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Straße: {e}")
# Hausnummer (houseNo field)
try:
house_field = await page.query_selector('input[name="houseNo"]')
if house_field and await house_field.is_visible():
await house_field.fill(FORM_HAUSNUMMER)
logger.info(f"[STADTUNDLAND] Filled Hausnummer: {FORM_HAUSNUMMER}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Hausnummer: {e}")
# PLZ (postalCode field)
try:
plz_field = await page.query_selector('input[name="postalCode"]')
if plz_field and await plz_field.is_visible():
await plz_field.fill(FORM_PLZ)
logger.info(f"[STADTUNDLAND] Filled PLZ: {FORM_PLZ}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill PLZ: {e}")
# Ort (city field)
try:
city_field = await page.query_selector('input[name="city"]')
if city_field and await city_field.is_visible():
await city_field.fill(FORM_ORT)
logger.info(f"[STADTUNDLAND] Filled Ort: {FORM_ORT}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Ort: {e}")
# Telefon (phone field)
try:
phone_field = await page.query_selector('input[name="phone"]')
if phone_field and await phone_field.is_visible():
await phone_field.fill(FORM_PHONE)
logger.info(f"[STADTUNDLAND] Filled Telefon: {FORM_PHONE}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Telefon: {e}")
# E-Mail (email field)
try:
email_field = await page.query_selector('input[name="email"]')
if email_field and await email_field.is_visible():
await email_field.fill(FORM_EMAIL)
logger.info(f"[STADTUNDLAND] Filled E-Mail: {FORM_EMAIL}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill E-Mail: {e}")
# Click privacy checkbox
try:
privacy_checkbox = await page.query_selector('input[name="privacy"]')
if privacy_checkbox and await privacy_checkbox.is_visible():
if not await privacy_checkbox.is_checked():
await privacy_checkbox.click()
logger.info("[STADTUNDLAND] Clicked privacy checkbox")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not click privacy checkbox: {e}")
# Click provision checkbox (optional)
try:
provision_checkbox = await page.query_selector('input[name="provision"]')
if provision_checkbox and await provision_checkbox.is_visible():
if not await provision_checkbox.is_checked():
await provision_checkbox.click()
logger.info("[STADTUNDLAND] Clicked provision checkbox")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not click provision checkbox: {e}")
await asyncio.sleep(1)
# Take screenshot after filling form
screenshot_path = DATA_DIR / f"stadtundland_filled_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[STADTUNDLAND] Saved filled form screenshot to {screenshot_path}")
# Submit form
if form_filled:
try:
# Stadt und Land uses "Eingaben prüfen" button
# Step 1: Click "Eingaben prüfen" button
pruefen_btn = await page.query_selector('button:has-text("Eingaben prüfen")')
if pruefen_btn and await pruefen_btn.is_visible():
await pruefen_btn.click()
logger.info("[STADTUNDLAND] Clicked 'Eingaben prüfen' button")
await asyncio.sleep(2)
await page.wait_for_load_state("networkidle")
# Take screenshot after validation
screenshot_path = DATA_DIR / f"stadtundland_validated_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[STADTUNDLAND] Saved validation screenshot")
# Step 2: Click the final submit button
final_submit_selectors = [
'button:has-text("Absenden")',
'button:has-text("Senden")',
'button:has-text("Anfrage senden")',
'button:has-text("Bestätigen")',
'button[type="submit"]',
]
final_btn = None
for selector in final_submit_selectors:
final_btn = await page.query_selector(selector)
if final_btn and await final_btn.is_visible():
logger.info(f"[STADTUNDLAND] Found final submit button: {selector}")
break
final_btn = None
if final_btn:
await final_btn.click()
logger.info("[STADTUNDLAND] Clicked final submit button")
await asyncio.sleep(3)
await page.wait_for_load_state("networkidle")
# Take screenshot after final submission
screenshot_path = DATA_DIR / f"stadtundland_submitted_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[STADTUNDLAND] Saved submission screenshot")
# Check for confirmation message
content = await page.content()
if "erfolgreich" in content.lower() or "gesendet" in content.lower() or "danke" in content.lower() or "bestätigung" in content.lower():
result["success"] = True
result["message"] = "Application submitted successfully"
logger.info("[STADTUNDLAND] Success! Confirmation message detected")
else:
result["success"] = True
result["message"] = "Form submitted"
logger.info("[STADTUNDLAND] Form submitted")
else:
result["success"] = False
result["message"] = "Validated but final submit button not found"
logger.warning("[STADTUNDLAND] Final submit button not found")
else:
result["success"] = False
result["message"] = "Form filled but 'Eingaben prüfen' button not found"
logger.warning("[STADTUNDLAND] 'Eingaben prüfen' button not found")
except Exception as e:
result["success"] = False
result["message"] = f"Submit error: {str(e)}"
logger.warning(f"[STADTUNDLAND] Submit error: {e}")
else:
result["success"] = False
result["message"] = "No form fields found on page"
logger.warning("[STADTUNDLAND] Could not find form fields")
except Exception as e:
result["success"] = False
result["message"] = f"Error: {str(e)}"
logger.error(f"[STADTUNDLAND] Exception: {str(e)}")
finally:
await page.close()
return result
async def _apply_wbm(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[WBM] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[WBM] Page loaded")
await asyncio.sleep(2)
try:
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[WBM] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
logger.info("[WBM] Looking for application button...")
apply_btn = await page.query_selector('a:has-text("Anfragen"), button:has-text("Interesse"), a:has-text("Bewerben")')
if apply_btn and await apply_btn.is_visible():
logger.info("[WBM] Found application button, clicking...")
await apply_btn.click()
await asyncio.sleep(2)
screenshot_path = DATA_DIR / f"wbm_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
logger.info(f"[WBM] Saved screenshot to {screenshot_path}")
result["success"] = False
result["message"] = "Application page opened but not submitted (not implemented)"
else:
result["message"] = "No application button found"
logger.warning("[WBM] Could not find application button")
screenshot_path = DATA_DIR / f"wbm_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[WBM] Exception: {str(e)}")
finally:
await page.close()
return result
class InBerlinMonitor:
def __init__(self):
self.browser = None
self.context = None
self.logged_in = False
self.application_handler = None
async def init_browser(self):
"""Initialize Playwright browser"""
if self.browser is None:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
self.context = await self.browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
self.application_handler = ApplicationHandler(self.context)
logger.info("Browser initialized")
def load_state(self) -> dict:
"""Load persistent state"""
if STATE_FILE.exists():
with open(STATE_FILE, "r") as f:
return json.load(f)
return {"autopilot": False}
def save_state(self, state: dict):
"""Save persistent state"""
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def set_autopilot(self, enabled: bool):
"""Enable or disable autopilot mode"""
state = self.load_state()
state["autopilot"] = enabled
self.save_state(state)
logger.info(f"Autopilot {'enabled' if enabled else 'disabled'}")
def is_autopilot_enabled(self) -> bool:
"""Check if autopilot mode is enabled"""
return self.load_state().get("autopilot", False)
def load_applications(self) -> dict:
"""Load application history"""
if APPLICATIONS_FILE.exists():
with open(APPLICATIONS_FILE, "r") as f:
return json.load(f)
return {}
def save_application(self, result: dict):
"""Save an application result"""
applications = self.load_applications()
applications[result["listing_id"]] = result
with open(APPLICATIONS_FILE, "w") as f:
json.dump(applications, f, indent=2, ensure_ascii=False)
def has_applied(self, listing_id: str) -> bool:
"""Check if we've already applied to this listing"""
return listing_id in self.load_applications()
async def dismiss_cookie_modal(self, page):
"""Dismiss the privacy/cookie consent modal if present"""
try:
# Wait a bit for modal to appear
await asyncio.sleep(2)
# Try to find and click the accept button in the privacy modal
# Look for common accept button patterns in German
accept_selectors = [
'button:has-text("Akzeptieren")',
'button:has-text("Alle akzeptieren")',
'button:has-text("Accept")',
'button:has-text("Zustimmen")',
'[x-show="showPrivacyModal"] button',
'.privacy-modal button',
'button.accept-cookies',
# More specific to inberlinwohnen
'div[x-show="showPrivacyModal"] button:first-of-type',
]
for selector in accept_selectors:
try:
button = await page.query_selector(selector)
if button and await button.is_visible():
await button.click()
logger.info(f"Clicked cookie accept button: {selector}")
await asyncio.sleep(1)
return True
except:
continue
# Try clicking any visible button in the modal overlay
modal = await page.query_selector('div[x-show="showPrivacyModal"]')
if modal:
buttons = await modal.query_selector_all('button')
for btn in buttons:
if await btn.is_visible():
text = await btn.inner_text()
logger.info(f"Found modal button: {text}")
# Click the first button (usually accept)
await btn.click()
await asyncio.sleep(1)
return True
logger.info("No cookie modal found or already dismissed")
return False
except Exception as e:
logger.debug(f"Cookie modal handling: {e}")
return False
async def login(self) -> bool:
"""Login to inberlinwohnen.de"""
if not INBERLIN_EMAIL or not INBERLIN_PASSWORD:
logger.warning("No credentials provided, using public listings")
return False
try:
page = await self.context.new_page()
await page.goto("https://www.inberlinwohnen.de/login", wait_until="networkidle")
# Handle cookie/privacy modal first
await self.dismiss_cookie_modal(page)
# Fill login form
await page.fill('input[name="email"], input[type="email"]', INBERLIN_EMAIL)
await page.fill('input[name="password"], input[type="password"]', INBERLIN_PASSWORD)
# Click submit button
await page.click('button[type="submit"], input[type="submit"]')
# Wait for navigation
await page.wait_for_load_state("networkidle")
await asyncio.sleep(2)
# Check if login successful
if "mein-bereich" in page.url or await page.query_selector('text="Abmelden"'):
logger.info("Login successful")
self.logged_in = True
await page.close()
return True
else:
logger.error(f"Login failed - ended up at {page.url}")
await page.close()
return False
except Exception as e:
logger.error(f"Login error: {e}")
return False
async def fetch_listings(self) -> list[dict]:
"""Fetch listings from the Wohnungsfinder"""
listings = []
try:
page = await self.context.new_page()
# Use personal Wohnungsfinder when logged in to see filtered listings
if self.logged_in:
url = "https://www.inberlinwohnen.de/mein-bereich/wohnungsfinder"
else:
url = "https://www.inberlinwohnen.de/wohnungsfinder/"
logger.info(f"Fetching listings from {url}")
await page.goto(url, wait_until="networkidle")
# Handle cookie modal if not logged in
if not self.logged_in:
await self.dismiss_cookie_modal(page)
# Wait for dynamic content to load - look for listing text pattern
try:
await page.wait_for_selector('text=/\\d,\\d\\s*Zimmer/', timeout=15000)
logger.info("Listings content loaded")
except:
logger.warning("Timeout waiting for listings content")
# Additional wait for initial listings to render
await asyncio.sleep(2)
# Collect all listings content by clicking through pagination
all_content = ""
page_num = 1
max_pages = 10 # Safety limit
while page_num <= max_pages:
# Get current page content
current_content = await page.content()
all_content += current_content
# Check for "next page" button (Livewire pagination)
next_btn = await page.query_selector('[wire\\:click*="nextPage"]')
if next_btn and await next_btn.is_visible():
await next_btn.click()
await asyncio.sleep(2) # Wait for Livewire to update
page_num += 1
else:
break
logger.info(f"Collected content from {page_num} page(s)")
content = all_content
# Debug: save HTML to file for inspection
debug_path = DATA_DIR / "debug_page.html"
with open(debug_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"Saved debug HTML to {debug_path}")
# Debug: Log page title and check for listing count
count_match = re.search(r'(\d+)\s*Wohnungen? für Sie gefunden', content)
if count_match:
logger.info(f"Page shows {count_match.group(1)} listings available")
# Also check for "Zeige X bis Y von Z Angeboten"
show_match = re.search(r'Zeige \d+ bis \d+ von (\d+) Angeboten', content)
if show_match:
logger.info(f"Page shows {show_match.group(1)} total offers")
# Decode HTML entities and JSON escaped slashes for extraction
content_decoded = html.unescape(content)
content_decoded = content_decoded.replace('\\/', '/')
# Build flatId -> deeplink mapping from wire:snapshot JSON data
# Format in HTML: "deeplink":"https://...","flatId":12345
deeplink_pattern = r'"deeplink":"(https://[^"]+)","flatId":(\d+)'
deeplink_matches = re.findall(deeplink_pattern, content_decoded)
id_to_link = {flat_id: link for link, flat_id in deeplink_matches}
logger.info(f"Found {len(id_to_link)} deeplink mappings")
# Extract listings from button elements with aria-label
# Format: @click="open !== 12345 ..." aria-label="Wohnungsangebot - 2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Adresse"
2025-12-16 13:51:25 +01:00
button_pattern = r'@click="open !== (\d+)[^"]*"[^>]*aria-label="Wohnungsangebot - ([^"]+)'
button_matches = re.findall(button_pattern, content_decoded)
logger.info(f"Found {len(button_matches)} listing buttons")
for flat_id, listing_text in button_matches:
# Parse listing text: "2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Rhinstraße 4, 10315 Lichtenberg"
parts_match = re.match(r'(\d,\d)\s*Zimmer,\s*([\d,]+)\s*m²,\s*([\d.,]+)\s*€\s*(?:Kaltmiete\s*)?\|\s*(.+)', listing_text)
if not parts_match:
continue
rooms, size, price, address = parts_match.groups()
rooms = rooms.strip()
address = address.strip()
if len(address) < 5:
continue
# Get the deeplink for this flat
detail_link = id_to_link.get(flat_id, url)
listing_id = hashlib.md5(f"{rooms}{size}{price}{address}".encode()).hexdigest()[:12]
listings.append({
"id": listing_id,
"rooms": f"{rooms} Zimmer",
"size": f"{size}",
"price": f"{price}",
"address": address,
"link": detail_link,
"fetched_at": datetime.now().isoformat()
})
# Deduplicate by id
seen_ids = set()
unique_listings = []
for listing in listings:
if listing["id"] not in seen_ids:
seen_ids.add(listing["id"])
unique_listings.append(listing)
listings = unique_listings
await page.close()
logger.info(f"Fetched {len(listings)} unique listings")
return listings
except Exception as e:
logger.error(f"Error fetching listings: {e}")
import traceback
logger.error(traceback.format_exc())
return []
def load_previous_listings(self) -> dict:
"""Load previously saved listings"""
if LISTINGS_FILE.exists():
with open(LISTINGS_FILE, "r") as f:
return json.load(f)
return {}
def save_listings(self, listings: list[dict]):
"""Save current listings"""
listings_dict = {l["id"]: l for l in listings}
with open(LISTINGS_FILE, "w") as f:
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
"""Find listings that are new since last check"""
new = []
for listing in current:
if listing["id"] not in previous:
new.append(listing)
return new
def send_telegram(self, message: str):
"""Send notification via Telegram"""
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
logger.warning("Telegram not configured, skipping notification")
return
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": "HTML",
"disable_web_page_preview": True
}
response = requests.post(url, data=data)
if response.ok:
logger.info("Telegram notification sent")
else:
logger.error(f"Telegram error: {response.text}")
except Exception as e:
logger.error(f"Telegram error: {e}")
def log_listing_times(self, new_listings: list[dict]):
"""Log new listing appearance times to CSV for later analysis"""
if not new_listings:
return
import csv
file_exists = TIMING_FILE.exists()
with open(TIMING_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
now = datetime.now()
for listing in new_listings:
writer.writerow([
now.isoformat(),
now.strftime("%A"), # Weekday name
now.hour,
now.minute,
listing["rooms"],
listing["size"],
listing["price"],
listing["address"],
listing["id"]
])
logger.info(f"Logged {len(new_listings)} listing times to CSV")
def notify_new_listings(self, new_listings: list[dict], application_results: dict = None):
"""Send individual notification for each new listing"""
if not new_listings:
return
for listing in new_listings:
link = listing.get('link', 'https://www.inberlinwohnen.de/wohnungsfinder/')
message = f"🏠 <b>Neue Wohnung!</b>\n\n"
message += f"🚪 <b>{listing['rooms']}</b>\n"
message += f"📐 {listing['size']}\n"
message += f"💰 {listing['price']}\n"
message += f"📍 {listing['address']}\n\n"
message += f"👉 <a href=\"{link}\">Alle Details</a>"
# Add autopilot status if application was attempted
if application_results and listing["id"] in application_results:
result = application_results[listing["id"]]
if result["success"]:
message += f"\n\n🤖 <b>Auto-applied!</b> ({result['company']})"
if result["message"]:
message += f"\n<i>{result['message']}</i>"
else:
message += f"\n\n⚠️ <b>Auto-apply failed</b> ({result['company']})"
if result["message"]:
message += f"\n<i>{result['message']}</i>"
self.send_telegram(message)
time.sleep(0.5)
async def apply_to_listings(self, listings: list[dict]) -> dict:
"""Apply to multiple listings, returns results dict"""
results = {}
for listing in listings:
if self.has_applied(listing["id"]):
logger.info(f"Already applied to {listing['id']}, skipping")
continue
result = await self.application_handler.apply(listing)
results[listing["id"]] = result
self.save_application(result)
status = "" if result["success"] else ""
logger.info(f"Application {status}: {listing['address']} - {result['message']}")
await asyncio.sleep(2)
return results
def check(self):
"""Run a single check for new listings"""
logger.info("Starting check...")
# Login if credentials provided
if not self.logged_in and INBERLIN_EMAIL:
asyncio.get_event_loop().run_until_complete(self._async_login())
# Fetch current listings
current_listings = asyncio.get_event_loop().run_until_complete(self._async_fetch())
if not current_listings:
logger.warning("No listings fetched")
return
# Load previous listings
previous_listings = self.load_previous_listings()
# First run - just save baseline
if not previous_listings:
logger.info(f"First run - saving {len(current_listings)} listings as baseline")
self.save_listings(current_listings)
return
# Find new listings
new_listings = self.find_new_listings(current_listings, previous_listings)
application_results = {}
if new_listings:
logger.info(f"Found {len(new_listings)} new listing(s)")
self.log_listing_times(new_listings)
# Apply automatically if autopilot is enabled
if self.is_autopilot_enabled():
logger.info("Autopilot enabled - applying to listings...")
application_results = asyncio.get_event_loop().run_until_complete(
self._async_apply(new_listings)
)
self.notify_new_listings(new_listings, application_results)
else:
logger.info("No new listings")
# Save current state
self.save_listings(current_listings)
async def _async_login(self):
await self.init_browser()
await self.login()
async def _async_fetch(self):
await self.init_browser()
return await self.fetch_listings()
async def _async_apply(self, listings: list[dict]):
await self.init_browser()
return await self.apply_to_listings(listings)
class WGCompanyMonitor:
"""Monitor WGcompany.de for new WG room listings"""
def __init__(self):
self.browser = None
self.context = None
async def init_browser(self):
"""Initialize Playwright browser"""
if self.browser is None:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
self.context = await self.browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
logger.info("[WGCOMPANY] Browser initialized")
async def fetch_listings(self) -> list[dict]:
"""Fetch WG listings from wgcompany.de search"""
listings = []
try:
page = await self.context.new_page()
# Use simple search page: st=1 (Berlin), mi=10 (simple WG search), li=100
search_url = "http://www.wgcompany.de/cgi-bin/seite?st=1&mi=10&li=100"
logger.info(f"[WGCOMPANY] Loading search page: {search_url}")
await page.goto(search_url, wait_until="networkidle")
await asyncio.sleep(2)
# Fill search form - field names from simple search:
# c = Min. Größe (min size m²)
# a = Max. Miete (max rent €)
# l = Alter (age)
# e = Bezirk (district select)
# Min size field
if WGCOMPANY_MIN_SIZE:
min_size_field = await page.query_selector('input[name="c"]')
if min_size_field:
await min_size_field.fill(WGCOMPANY_MIN_SIZE)
logger.info(f"[WGCOMPANY] Set min size: {WGCOMPANY_MIN_SIZE}")
# Max rent field
if WGCOMPANY_MAX_PRICE:
max_price_field = await page.query_selector('input[name="a"]')
if max_price_field:
await max_price_field.fill(WGCOMPANY_MAX_PRICE)
logger.info(f"[WGCOMPANY] Set max rent: {WGCOMPANY_MAX_PRICE}")
# Age field (l = Alter)
if WGCOMPANY_AGE:
age_field = await page.query_selector('input[name="l"]')
if age_field:
await age_field.fill(WGCOMPANY_AGE)
logger.info(f"[WGCOMPANY] Set age: {WGCOMPANY_AGE}")
# Smoker filter (o = RaucherIn: NR=Nichtraucher, R=Raucher)
if WGCOMPANY_SMOKER:
smoker_select = await page.query_selector('select[name="o"]')
if smoker_select:
await smoker_select.select_option(WGCOMPANY_SMOKER)
logger.info(f"[WGCOMPANY] Set smoker: {WGCOMPANY_SMOKER}")
# District selection (e = Bezirk, multi-select)
# Leave as default "egal" (all districts) unless specified
if WGCOMPANY_BEZIRK and WGCOMPANY_BEZIRK != "0":
bezirk_select = await page.query_selector('select[name="e"]')
if bezirk_select:
await bezirk_select.select_option(WGCOMPANY_BEZIRK)
logger.info(f"[WGCOMPANY] Set district: {WGCOMPANY_BEZIRK}")
# Submit the search form
submit_btn = await page.query_selector('input[type="submit"][value*="finde"], input[type="submit"]')
if submit_btn:
logger.info("[WGCOMPANY] Submitting search form...")
await submit_btn.click()
await page.wait_for_load_state("networkidle")
await asyncio.sleep(2)
# Get results page content
content = await page.content()
# Save debug HTML
debug_path = DATA_DIR / "wgcompany_debug.html"
with open(debug_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"[WGCOMPANY] Saved debug HTML to {debug_path}")
# Parse listings from the results page
# WGcompany results typically have tables with room info
# Look for listing links and extract data
# Pattern to find listing detail links
# Format: wg.pl?...function=wgzeigen... with room details in table rows
listing_links = await page.query_selector_all('a[href*="wg.pl"][href*="wgzeigen"]')
logger.info(f"[WGCOMPANY] Found {len(listing_links)} listing links")
for link_elem in listing_links:
try:
href = await link_elem.get_attribute("href")
if not href:
continue
# Get surrounding text/row for listing details
parent = await link_elem.evaluate_handle("el => el.closest('tr') || el.parentElement")
row_text = await parent.evaluate("el => el.innerText") if parent else ""
# Extract price from row text (e.g., "350 €" or "350€")
price_match = re.search(r'(\d+)\s*€', row_text)
price = price_match.group(1) + "" if price_match else "?"
# Extract size (e.g., "15 m²" or "15m²")
size_match = re.search(r'(\d+)\s*m²', row_text)
size = size_match.group(1) + "" if size_match else "?"
# Extract district/location
# Common Berlin districts in text
bezirk_patterns = [
"Kreuzberg", "Neukölln", "Friedrichshain", "Prenzlauer Berg",
"Mitte", "Wedding", "Charlottenburg", "Schöneberg", "Tempelhof",
"Steglitz", "Wilmersdorf", "Pankow", "Lichtenberg", "Treptow",
"Köpenick", "Reinickendorf", "Spandau", "Zehlendorf", "Moabit"
]
location = "Berlin"
for bez in bezirk_patterns:
if bez.lower() in row_text.lower():
location = bez
break
# Make absolute URL
if not href.startswith("http"):
href = f"http://www.wgcompany.de{href}" if href.startswith("/") else f"http://www.wgcompany.de/cgi-bin/{href}"
# Generate unique ID from link and key details
listing_id = hashlib.md5(f"{href}{price}{size}".encode()).hexdigest()[:12]
listings.append({
"id": listing_id,
"rooms": "1 Zimmer (WG)",
"size": size,
"price": price,
"address": location,
"link": href,
"source": "wgcompany",
"fetched_at": datetime.now().isoformat()
})
except Exception as e:
logger.debug(f"[WGCOMPANY] Error parsing listing: {e}")
continue
# Deduplicate by id
seen_ids = set()
unique_listings = []
for listing in listings:
if listing["id"] not in seen_ids:
seen_ids.add(listing["id"])
unique_listings.append(listing)
listings = unique_listings
await page.close()
logger.info(f"[WGCOMPANY] Fetched {len(listings)} unique listings")
return listings
except Exception as e:
logger.error(f"[WGCOMPANY] Error fetching listings: {e}")
import traceback
logger.error(traceback.format_exc())
return []
def load_previous_listings(self) -> dict:
"""Load previously saved WGcompany listings"""
if WGCOMPANY_LISTINGS_FILE.exists():
with open(WGCOMPANY_LISTINGS_FILE, "r") as f:
return json.load(f)
return {}
def save_listings(self, listings: list[dict]):
"""Save current WGcompany listings"""
listings_dict = {l["id"]: l for l in listings}
with open(WGCOMPANY_LISTINGS_FILE, "w") as f:
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
"""Find listings that are new since last check"""
new = []
for listing in current:
if listing["id"] not in previous:
new.append(listing)
return new
def send_telegram(self, message: str):
"""Send notification via Telegram"""
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
logger.warning("[WGCOMPANY] Telegram not configured, skipping notification")
return
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": "HTML",
"disable_web_page_preview": True
}
response = requests.post(url, data=data)
if response.ok:
logger.info("[WGCOMPANY] Telegram notification sent")
else:
logger.error(f"[WGCOMPANY] Telegram error: {response.text}")
except Exception as e:
logger.error(f"[WGCOMPANY] Telegram error: {e}")
def log_listing_times(self, new_listings: list[dict]):
"""Log new WGcompany listing appearance times to CSV"""
if not new_listings:
return
file_exists = WGCOMPANY_TIMING_FILE.exists()
with open(WGCOMPANY_TIMING_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
now = datetime.now()
for listing in new_listings:
writer.writerow([
now.isoformat(),
now.strftime("%A"),
now.hour,
now.minute,
listing["rooms"],
listing["size"],
listing["price"],
listing["address"],
listing["id"]
])
logger.info(f"[WGCOMPANY] Logged {len(new_listings)} listing times to CSV")
def notify_new_listings(self, new_listings: list[dict]):
"""Send individual notification for each new WGcompany listing"""
if not new_listings:
return
for listing in new_listings:
message = f"🏠 <b>Neues WG-Zimmer!</b> (WGcompany)\n\n"
message += f"🚪 <b>{listing['rooms']}</b>\n"
message += f"📐 {listing['size']}\n"
message += f"💰 {listing['price']}\n"
message += f"📍 {listing['address']}\n\n"
message += f"👉 <a href=\"{listing['link']}\">Zum Angebot</a>"
self.send_telegram(message)
time.sleep(0.5)
def check(self):
"""Run a single check for new WGcompany listings"""
logger.info("[WGCOMPANY] Starting check...")
# Fetch current listings
current_listings = asyncio.get_event_loop().run_until_complete(self._async_fetch())
if not current_listings:
logger.warning("[WGCOMPANY] No listings fetched")
return
# Load previous listings
previous_listings = self.load_previous_listings()
# First run - just save baseline
if not previous_listings:
logger.info(f"[WGCOMPANY] First run - saving {len(current_listings)} listings as baseline")
self.save_listings(current_listings)
return
# Find new listings
new_listings = self.find_new_listings(current_listings, previous_listings)
if new_listings:
logger.info(f"[WGCOMPANY] Found {len(new_listings)} new listing(s)")
self.log_listing_times(new_listings)
self.notify_new_listings(new_listings)
else:
logger.info("[WGCOMPANY] No new listings")
# Save current state
self.save_listings(current_listings)
async def _async_fetch(self):
await self.init_browser()
return await self.fetch_listings()