wohnbot/monitor.py
Aron ce66fc1933 Fix Degewo auto-apply (Wohnungshelden iframe), update dependencies, cleanup for production
- Fix Degewo handler to work with Wohnungshelden iframe portal
- Update playwright to >=1.57.0
- Add proper form field selectors for Wohnungshelden
- Fix success status bug (was marking failed submissions as success)
- Clean up .env.example (remove real credentials)
- Update README with housing company support table
- Add BOTFATHER_COMMANDS.txt for easy bot setup
- Add copilot-instructions.md for development context
2025-12-09 11:30:17 +01:00

1836 lines
81 KiB
Python

import os
import json
import hashlib
import logging
import asyncio
import re
import html
import threading
import time
import csv
from datetime import datetime
from pathlib import Path
import requests
import pandas as pd
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
from playwright.async_api import async_playwright
# Configuration from environment
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
INBERLIN_EMAIL = os.environ.get("INBERLIN_EMAIL", "")
INBERLIN_PASSWORD = os.environ.get("INBERLIN_PASSWORD", "")
CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "300")) # seconds (5 minutes)
# WGcompany search configuration
WGCOMPANY_ENABLED = os.environ.get("WGCOMPANY_ENABLED", "true").lower() == "true"
WGCOMPANY_MIN_SIZE = os.environ.get("WGCOMPANY_MIN_SIZE", "") # min room size m²
WGCOMPANY_MAX_SIZE = os.environ.get("WGCOMPANY_MAX_SIZE", "") # max room size m²
WGCOMPANY_MIN_PRICE = os.environ.get("WGCOMPANY_MIN_PRICE", "") # min rent €
WGCOMPANY_MAX_PRICE = os.environ.get("WGCOMPANY_MAX_PRICE", "") # max rent €
WGCOMPANY_BEZIRK = os.environ.get("WGCOMPANY_BEZIRK", "0") # 0=egal, or specific district code
WGCOMPANY_AGE = os.environ.get("WGCOMPANY_AGE", "") # your age (for WG matching)
WGCOMPANY_SMOKER = os.environ.get("WGCOMPANY_SMOKER", "") # NR=Nichtraucher, R=Raucher, empty=egal
# Form data for applications
FORM_ANREDE = os.environ.get("FORM_ANREDE", "")
FORM_VORNAME = os.environ.get("FORM_VORNAME", "")
FORM_NACHNAME = os.environ.get("FORM_NACHNAME", "")
FORM_EMAIL = os.environ.get("FORM_EMAIL", "")
FORM_PHONE = os.environ.get("FORM_PHONE", "")
FORM_STRASSE = os.environ.get("FORM_STRASSE", "")
FORM_HAUSNUMMER = os.environ.get("FORM_HAUSNUMMER", "")
FORM_PLZ = os.environ.get("FORM_PLZ", "")
FORM_ORT = os.environ.get("FORM_ORT", "")
FORM_PERSONS = os.environ.get("FORM_PERSONS", "1")
FORM_CHILDREN = os.environ.get("FORM_CHILDREN", "0")
FORM_INCOME = os.environ.get("FORM_INCOME", "")
DATA_DIR = Path("/data")
LISTINGS_FILE = DATA_DIR / "listings.json"
LOG_FILE = DATA_DIR / "monitor.log"
TIMING_FILE = DATA_DIR / "listing_times.csv"
STATE_FILE = DATA_DIR / "state.json"
APPLICATIONS_FILE = DATA_DIR / "applications.json"
# WGcompany specific files
WGCOMPANY_LISTINGS_FILE = DATA_DIR / "wgcompany_listings.json"
WGCOMPANY_TIMING_FILE = DATA_DIR / "wgcompany_times.csv"
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class TelegramBot:
"""Handle Telegram commands for controlling the monitor"""
def __init__(self, monitor):
self.monitor = monitor
self.last_update_id = 0
self.running = False
def start(self):
if not TELEGRAM_BOT_TOKEN:
logger.warning("Telegram bot token not configured, commands disabled")
return
self.running = True
thread = threading.Thread(target=self._poll_updates, daemon=True)
thread.start()
logger.info("Telegram command listener started")
def stop(self):
self.running = False
def _poll_updates(self):
while self.running:
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates"
params = {"offset": self.last_update_id + 1, "timeout": 30}
response = requests.get(url, params=params, timeout=35)
if response.ok:
data = response.json()
if data.get("ok") and data.get("result"):
for update in data["result"]:
self.last_update_id = update["update_id"]
self._handle_update(update)
except requests.exceptions.Timeout:
continue
except Exception as e:
logger.error(f"Telegram polling error: {e}")
time.sleep(5)
def _handle_update(self, update):
message = update.get("message", {})
text = message.get("text", "")
chat_id = str(message.get("chat", {}).get("id", ""))
if chat_id != TELEGRAM_CHAT_ID:
logger.debug(f"Ignoring message from unknown chat: {chat_id}")
return
logger.info(f"Received Telegram command: {text}")
if text.startswith("/autopilot"):
self._handle_autopilot_command(text)
elif text == "/status":
self._handle_status_command()
elif text == "/help":
self._handle_help_command()
elif text == "/plot":
self._handle_plot_command()
elif text.startswith("/"):
self._handle_unknown_command(text)
def _handle_autopilot_command(self, text):
logger.info(f"Processing autopilot command: {text}")
parts = text.split()
if len(parts) < 2:
self._send_message("Usage: /autopilot on|off")
return
action = parts[1].lower()
if action == "on":
logger.info("Enabling autopilot mode")
self.monitor.set_autopilot(True)
self._send_message("🤖 <b>Autopilot ENABLED</b>\n\nI will automatically apply to new listings!")
elif action == "off":
self.monitor.set_autopilot(False)
self._send_message("🛑 <b>Autopilot DISABLED</b>\n\nI will only notify you of new listings.")
else:
self._send_message("Usage: /autopilot on|off")
def _handle_status_command(self):
state = self.monitor.load_state()
autopilot = state.get("autopilot", False)
applications = self.monitor.load_applications()
status = "🤖 <b>Autopilot:</b> " + ("ON ✅" if autopilot else "OFF ❌")
status += f"\n📝 <b>Applications sent:</b> {len(applications)}"
by_company = {}
for app in applications.values():
company = app.get("company", "unknown")
by_company[company] = by_company.get(company, 0) + 1
if by_company:
status += "\n\n<b>By company:</b>"
for company, count in sorted(by_company.items()):
status += f"\n{company}: {count}"
self._send_message(status)
def _handle_help_command(self):
help_text = """🏠 <b>InBerlin Monitor Commands</b>
/autopilot on - Enable automatic applications
/autopilot off - Disable automatic applications
/status - Show current status and stats
/plot - Show weekly listing patterns
/help - Show this help message
When autopilot is ON, I will automatically apply to new listings."""
self._send_message(help_text)
def _handle_unknown_command(self, text):
cmd = text.split()[0] if text else text
self._send_message(f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands.")
def _handle_plot_command(self):
"""Generate and send a plot of listing times"""
logger.info("Generating listing times plot...")
try:
plot_path = self._generate_weekly_plot()
if plot_path:
self._send_photo(plot_path, "📊 <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
else:
self._send_message("📊 Not enough data to generate plot yet. Keep monitoring!")
except Exception as e:
logger.error(f"Error generating plot: {e}")
import traceback
logger.error(traceback.format_exc())
self._send_message(f"❌ Error generating plot: {str(e)}")
def _generate_weekly_plot(self) -> str:
"""Generate a heatmap of listings by day of week and hour"""
if not TIMING_FILE.exists():
logger.warning("No timing data file found")
return None
try:
df = pd.read_csv(TIMING_FILE)
if len(df) < 1:
logger.warning("Timing file is empty")
return None
logger.info(f"Loaded {len(df)} listing records for plot")
# Create day-hour matrix
days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
# Count listings per day and hour
heatmap_data = pd.DataFrame(0, index=days_order, columns=range(24))
for _, row in df.iterrows():
day = row['weekday']
hour = int(row['hour'])
if day in days_order:
heatmap_data.loc[day, hour] += 1
# Create figure with two subplots
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Listing Appearance Patterns', fontsize=16, fontweight='bold')
# 1. Heatmap - Day vs Hour
ax1 = axes[0, 0]
im = ax1.imshow(heatmap_data.values, cmap='YlOrRd', aspect='auto')
ax1.set_xticks(range(24))
ax1.set_xticklabels(range(24), fontsize=8)
ax1.set_yticks(range(7))
ax1.set_yticklabels(days_order)
ax1.set_xlabel('Hour of Day')
ax1.set_ylabel('Day of Week')
ax1.set_title('Listings by Day & Hour')
plt.colorbar(im, ax=ax1, label='Count')
# 2. Bar chart - By day of week
ax2 = axes[0, 1]
day_counts = df['weekday'].value_counts().reindex(days_order, fill_value=0)
colors = plt.cm.Blues(day_counts / day_counts.max() if day_counts.max() > 0 else day_counts)
bars = ax2.bar(range(7), day_counts.values, color=colors)
ax2.set_xticks(range(7))
ax2.set_xticklabels(['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'])
ax2.set_xlabel('Day of Week')
ax2.set_ylabel('Number of Listings')
ax2.set_title('Total Listings by Day')
for i, v in enumerate(day_counts.values):
if v > 0:
ax2.text(i, v + 0.1, str(v), ha='center', fontsize=9)
# 3. Line chart - By hour
ax3 = axes[1, 0]
hour_counts = df['hour'].value_counts().reindex(range(24), fill_value=0)
ax3.plot(range(24), hour_counts.values, marker='o', linewidth=2, markersize=4, color='#2E86AB')
ax3.fill_between(range(24), hour_counts.values, alpha=0.3, color='#2E86AB')
ax3.set_xticks(range(0, 24, 2))
ax3.set_xlabel('Hour of Day')
ax3.set_ylabel('Number of Listings')
ax3.set_title('Total Listings by Hour')
ax3.grid(True, alpha=0.3)
# 4. Summary stats
ax4 = axes[1, 1]
ax4.axis('off')
# Calculate best times
best_day = day_counts.idxmax() if day_counts.max() > 0 else "N/A"
best_hour = hour_counts.idxmax() if hour_counts.max() > 0 else "N/A"
total_listings = len(df)
# Find peak combinations
peak_combo = heatmap_data.stack().idxmax() if heatmap_data.values.max() > 0 else ("N/A", "N/A")
stats_text = f"""📊 Summary Statistics
Total listings tracked: {total_listings}
🏆 Best day: {best_day}
⏰ Best hour: {best_hour}:00
🎯 Peak time: {peak_combo[0]} at {peak_combo[1]}:00
📈 Average per day: {total_listings/7:.1f}
📅 Data collection period:
From: {df['timestamp'].min()[:10] if 'timestamp' in df.columns else 'N/A'}
To: {df['timestamp'].max()[:10] if 'timestamp' in df.columns else 'N/A'}
"""
ax4.text(0.1, 0.9, stats_text, transform=ax4.transAxes, fontsize=11,
verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout()
# Save plot
plot_path = DATA_DIR / "weekly_plot.png"
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
plt.close()
logger.info(f"Plot saved to {plot_path}")
return str(plot_path)
except Exception as e:
logger.error(f"Error creating plot: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def _send_message(self, text):
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {"chat_id": TELEGRAM_CHAT_ID, "text": text, "parse_mode": "HTML", "disable_web_page_preview": True}
requests.post(url, data=data)
except Exception as e:
logger.error(f"Failed to send Telegram message: {e}")
def _send_photo(self, photo_path: str, caption: str = ""):
"""Send a photo via Telegram"""
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
with open(photo_path, 'rb') as photo:
files = {'photo': photo}
data = {"chat_id": TELEGRAM_CHAT_ID, "caption": caption, "parse_mode": "HTML"}
response = requests.post(url, data=data, files=files)
if response.ok:
logger.info(f"Photo sent successfully: {photo_path}")
else:
logger.error(f"Failed to send photo: {response.text}")
except Exception as e:
logger.error(f"Failed to send Telegram photo: {e}")
class ApplicationHandler:
"""Handle automatic applications to different housing companies"""
def __init__(self, browser_context):
self.context = browser_context
async def apply(self, listing: dict) -> dict:
link = listing.get("link", "")
company = self._detect_company(link)
logger.info(f"Starting application process for {company}: {listing['address']}")
logger.info(f"Listing details - ID: {listing['id']}, Rooms: {listing['rooms']}, Price: {listing['price']}")
logger.info(f"Detail link: {link}")
result = {"listing_id": listing["id"], "company": company, "link": link,
"timestamp": datetime.now().isoformat(), "success": False, "message": "",
"address": listing.get("address", ""), "rooms": listing.get("rooms", ""), "price": listing.get("price", "")}
try:
if company == "howoge":
result = await self._apply_howoge(listing, result)
elif company == "gewobag":
result = await self._apply_gewobag(listing, result)
elif company == "degewo":
result = await self._apply_degewo(listing, result)
elif company == "gesobau":
result = await self._apply_gesobau(listing, result)
elif company == "stadtundland":
result = await self._apply_stadtundland(listing, result)
elif company == "wbm":
result = await self._apply_wbm(listing, result)
else:
result["message"] = f"Unknown company: {company}"
logger.warning(f"No application handler for company: {company}")
except Exception as e:
result["message"] = str(e)
logger.error(f"Application error for {company}: {e}")
import traceback
logger.error(traceback.format_exc())
# Log final result
status = "SUCCESS" if result["success"] else "FAILED"
logger.info(f"Application {status} for {listing['address']} ({company}): {result['message']}")
return result
def _detect_company(self, link: str) -> str:
if "howoge.de" in link: return "howoge"
elif "gewobag.de" in link: return "gewobag"
elif "degewo.de" in link: return "degewo"
elif "gesobau.de" in link: return "gesobau"
elif "stadtundland.de" in link: return "stadtundland"
elif "wbm.de" in link: return "wbm"
return "unknown"
async def _apply_howoge(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[HOWOGE] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[HOWOGE] Page loaded")
await asyncio.sleep(2)
# Handle cookies
try:
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[HOWOGE] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
# Try to handle consent manager (consentmanager.net)
try:
consent_selectors = [
'#cmpbntyestxt', '.cmpboxbtnyes', 'a.cmpboxbtn.cmpboxbtnyes',
'#cmpwelcomebtnyes', '.cmptxt_btn_yes'
]
for sel in consent_selectors:
consent_btn = await page.query_selector(sel)
if consent_btn and await consent_btn.is_visible():
await consent_btn.click()
logger.info("[HOWOGE] Dismissed consent manager")
await asyncio.sleep(1)
break
except: pass
# Look for "Besichtigung vereinbaren" button
# HOWOGE has multiple buttons with same text - only one is visible
logger.info("[HOWOGE] Looking for 'Besichtigung vereinbaren' button...")
# Use href selector - more reliable than text matching
selectors = [
'a[href*="besichtigung-vereinbaren"]',
'a:has-text("Besichtigung vereinbaren")',
'button:has-text("Besichtigung vereinbaren")',
'a:has-text("Anfragen")',
'button:has-text("Anfragen")'
]
apply_btn = None
for sel in selectors:
all_btns = await page.query_selector_all(sel)
logger.info(f"[HOWOGE] Selector '{sel}' found {len(all_btns)} matches")
# Find first visible button
for btn in all_btns:
try:
if await btn.is_visible():
apply_btn = btn
logger.info(f"[HOWOGE] Found visible button with selector '{sel}'")
break
except:
pass
if apply_btn:
break
if apply_btn:
# Scroll the button into view and click
logger.info("[HOWOGE] Found application button, scrolling into view...")
await apply_btn.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
logger.info("[HOWOGE] Clicking button...")
await apply_btn.click()
await asyncio.sleep(3)
await page.wait_for_load_state("networkidle")
logger.info("[HOWOGE] Clicked button, waiting for form...")
# Screenshot after clicking
screenshot_path = DATA_DIR / f"howoge_form_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
logger.info(f"[HOWOGE] Saved form screenshot to {screenshot_path}")
# Fill in the contact form
# Look for name fields (Vorname, Nachname)
vorname_field = await page.query_selector('input[name*="vorname" i], input[name*="firstname" i], input[placeholder*="Vorname" i], input#vorname')
nachname_field = await page.query_selector('input[name*="nachname" i], input[name*="lastname" i], input[name*="surname" i], input[placeholder*="Nachname" i], input#nachname')
email_field = await page.query_selector('input[type="email"], input[name*="email" i], input[name*="mail" i]')
form_filled = False
if vorname_field:
await vorname_field.fill(FORM_VORNAME)
logger.info(f"[HOWOGE] Filled Vorname: {FORM_VORNAME}")
form_filled = True
if nachname_field:
await nachname_field.fill(FORM_NACHNAME)
logger.info(f"[HOWOGE] Filled Nachname: {FORM_NACHNAME}")
form_filled = True
if email_field:
await email_field.fill(FORM_EMAIL)
logger.info(f"[HOWOGE] Filled Email: {FORM_EMAIL}")
form_filled = True
# Also look for phone field
phone_field = await page.query_selector('input[type="tel"], input[name*="telefon" i], input[name*="phone" i]')
if phone_field:
await phone_field.fill(FORM_PHONE)
logger.info(f"[HOWOGE] Filled Phone: {FORM_PHONE}")
# Screenshot after filling form
screenshot_path2 = DATA_DIR / f"howoge_filled_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path2))
logger.info(f"[HOWOGE] Saved filled form screenshot to {screenshot_path2}")
if form_filled:
# Look for submit button
submit_btn = await page.query_selector('button[type="submit"], input[type="submit"], button:has-text("Absenden"), button:has-text("Senden"), button:has-text("Anfrage")')
if submit_btn and await submit_btn.is_visible():
logger.info("[HOWOGE] Found submit button, clicking...")
await submit_btn.click()
await asyncio.sleep(3)
await page.wait_for_load_state("networkidle")
# Screenshot after submit
screenshot_path3 = DATA_DIR / f"howoge_submitted_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path3))
logger.info(f"[HOWOGE] Saved post-submit screenshot to {screenshot_path3}")
content = await page.content()
if "erfolgreich" in content.lower() or "gesendet" in content.lower() or "danke" in content.lower():
result["success"] = True
result["message"] = "Application submitted successfully"
logger.info("[HOWOGE] Success! Confirmation message detected")
else:
result["success"] = True
result["message"] = "Form submitted, awaiting confirmation"
logger.info("[HOWOGE] Form submitted but no clear confirmation")
else:
result["success"] = False
result["message"] = "Form filled but no submit button found"
logger.warning("[HOWOGE] Could not find submit button")
else:
result["success"] = False
result["message"] = "Could not find form fields to fill"
logger.warning("[HOWOGE] No form fields found")
else:
result["message"] = "No application button found"
logger.warning("[HOWOGE] Could not find 'Besichtigung vereinbaren' button")
# Save screenshot for debugging
screenshot_path = DATA_DIR / f"howoge_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
# Log all buttons on page for debugging
buttons = await page.query_selector_all('button, a.btn, a[class*="button"]')
for btn in buttons[:10]:
try:
text = await btn.inner_text()
logger.info(f"[HOWOGE] Found button: {text[:50]}")
except:
pass
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[HOWOGE] Exception: {str(e)}")
import traceback
logger.error(traceback.format_exc())
finally:
await page.close()
return result
async def _apply_gewobag(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[GEWOBAG] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[GEWOBAG] Page loaded")
await asyncio.sleep(2)
try:
cookie_btn = await page.query_selector('#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll, button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[GEWOBAG] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
logger.info("[GEWOBAG] Looking for application button...")
apply_btn = await page.query_selector('a:has-text("Kontakt"), button:has-text("Anfrage"), a.btn:has-text("Anfragen")')
if apply_btn and await apply_btn.is_visible():
logger.info("[GEWOBAG] Found application button, clicking...")
await apply_btn.click()
await asyncio.sleep(2)
screenshot_path = DATA_DIR / f"gewobag_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
logger.info(f"[GEWOBAG] Saved screenshot to {screenshot_path}")
result["success"] = True
result["message"] = "Application page opened"
else:
result["message"] = "No application button found"
logger.warning("[GEWOBAG] Could not find application button")
screenshot_path = DATA_DIR / f"gewobag_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[GEWOBAG] Exception: {str(e)}")
finally:
await page.close()
return result
async def _apply_degewo(self, listing: dict, result: dict) -> dict:
"""
Degewo uses Wohnungshelden (app.wohnungshelden.de) for their application system.
The application form is loaded in an iframe from a different domain.
We need to navigate directly to the iframe URL or interact with the iframe.
"""
page = await self.context.new_page()
try:
logger.info(f"[DEGEWO] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[DEGEWO] Page loaded")
await asyncio.sleep(2)
# Dismiss cookie banner
try:
cookie_btn = await page.query_selector('button:has-text("Alle akzeptieren"), #CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[DEGEWO] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
logger.info("[DEGEWO] Looking for kontaktieren button...")
apply_btn = await page.query_selector('a:has-text("kontaktieren"), button:has-text("kontaktieren"), a:has-text("Kontaktieren"), button:has-text("Kontaktieren")')
if apply_btn and await apply_btn.is_visible():
logger.info("[DEGEWO] Found kontaktieren button, clicking...")
await apply_btn.click()
await asyncio.sleep(3)
# Degewo uses Wohnungshelden iframe for the application form
# Find the iframe and get its URL to navigate directly
iframe_element = await page.query_selector('iframe[src*="wohnungshelden.de"]')
if iframe_element:
iframe_url = await iframe_element.get_attribute('src')
logger.info(f"[DEGEWO] Found Wohnungshelden iframe: {iframe_url}")
# Navigate to the iframe URL directly in a new page for full access
iframe_page = await self.context.new_page()
try:
await iframe_page.goto(iframe_url, wait_until="networkidle")
await asyncio.sleep(2)
logger.info("[DEGEWO] Loaded Wohnungshelden application page")
# Take screenshot of the Wohnungshelden form
screenshot_path = DATA_DIR / f"degewo_wohnungshelden_{listing['id']}.png"
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[DEGEWO] Saved Wohnungshelden screenshot to {screenshot_path}")
# Save HTML for debugging
html_content = await iframe_page.content()
html_path = DATA_DIR / f"degewo_wohnungshelden_{listing['id']}.html"
with open(html_path, 'w', encoding='utf-8') as f:
f.write(html_content)
logger.info(f"[DEGEWO] Saved HTML to {html_path}")
# Fill out Wohnungshelden form
# The form uses specific IDs: #firstName, #lastName, #email, etc.
form_filled = False
# Anrede (Salutation) - ng-select dropdown
try:
# Click on the salutation dropdown to open it
salutation_dropdown = await iframe_page.query_selector('#salutation-dropdown, ng-select[id*="salutation"]')
if salutation_dropdown:
await salutation_dropdown.click()
await asyncio.sleep(0.5)
# Select "Herr" or "Frau" based on FORM_ANREDE
anrede_option = await iframe_page.query_selector(f'.ng-option:has-text("{FORM_ANREDE}")')
if anrede_option:
await anrede_option.click()
logger.info(f"[DEGEWO] Selected Anrede: {FORM_ANREDE}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not set Anrede: {e}")
# Vorname (First name)
try:
vorname_field = await iframe_page.query_selector('#firstName')
if vorname_field:
await vorname_field.fill(FORM_VORNAME)
logger.info(f"[DEGEWO] Filled Vorname: {FORM_VORNAME}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not fill Vorname: {e}")
# Nachname (Last name)
try:
nachname_field = await iframe_page.query_selector('#lastName')
if nachname_field:
await nachname_field.fill(FORM_NACHNAME)
logger.info(f"[DEGEWO] Filled Nachname: {FORM_NACHNAME}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not fill Nachname: {e}")
# E-Mail
try:
email_field = await iframe_page.query_selector('#email')
if email_field:
await email_field.fill(FORM_EMAIL)
logger.info(f"[DEGEWO] Filled E-Mail: {FORM_EMAIL}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not fill E-Mail: {e}")
# Telefonnummer
try:
tel_field = await iframe_page.query_selector('input[id*="telefonnummer"]')
if tel_field:
await tel_field.fill(FORM_PHONE)
logger.info(f"[DEGEWO] Filled Telefon: {FORM_PHONE}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not fill Telefon: {e}")
# Anzahl einziehende Personen
try:
personen_field = await iframe_page.query_selector('input[id*="numberPersonsTotal"]')
if personen_field:
await personen_field.fill(FORM_PERSONS)
logger.info(f"[DEGEWO] Filled Anzahl Personen: {FORM_PERSONS}")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not fill Anzahl Personen: {e}")
# "Für sich selbst" dropdown
try:
selbst_dropdown = await iframe_page.query_selector('ng-select[id*="fuer_wen"]')
if selbst_dropdown:
await selbst_dropdown.click()
await asyncio.sleep(0.5)
# Select "Für mich selbst"
selbst_option = await iframe_page.query_selector('.ng-option:has-text("Für mich selbst"), .ng-option:has-text("selbst")')
if selbst_option:
await selbst_option.click()
logger.info("[DEGEWO] Selected: Für mich selbst")
form_filled = True
except Exception as e:
logger.warning(f"[DEGEWO] Could not set 'Für sich selbst': {e}")
await asyncio.sleep(1)
# Take screenshot after filling form
screenshot_path = DATA_DIR / f"degewo_form_filled_{listing['id']}.png"
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[DEGEWO] Saved filled form screenshot to {screenshot_path}")
# Try to submit
try:
# Look for submit button with various patterns
submit_selectors = [
'button[type="submit"]',
'input[type="submit"]',
'button:has-text("Absenden")',
'button:has-text("Senden")',
'button:has-text("Anfrage")',
'button:has-text("Bewerben")',
'button:has-text("Submit")',
'.btn-primary',
'.submit-btn',
]
submit_btn = None
for selector in submit_selectors:
submit_btn = await iframe_page.query_selector(selector)
if submit_btn and await submit_btn.is_visible():
logger.info(f"[DEGEWO] Found submit button with selector: {selector}")
break
submit_btn = None
if submit_btn:
await submit_btn.click()
logger.info("[DEGEWO] Clicked submit button")
await asyncio.sleep(3)
# Take screenshot after submission
screenshot_path = DATA_DIR / f"degewo_submitted_{listing['id']}.png"
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[DEGEWO] Saved submission screenshot to {screenshot_path}")
result["success"] = True
result["message"] = "Application submitted via Wohnungshelden"
else:
# Submit button not found - this is a failure
result["success"] = False
result["message"] = "Wohnungshelden form loaded but submit button not found"
logger.warning("[DEGEWO] Submit button not found in Wohnungshelden form")
except Exception as e:
result["success"] = False
result["message"] = f"Wohnungshelden submit error: {str(e)}"
logger.warning(f"[DEGEWO] Submit error: {e}")
finally:
await iframe_page.close()
else:
# No iframe found - try the old approach (fallback for different page structure)
logger.warning("[DEGEWO] Wohnungshelden iframe not found, trying direct form...")
# Take screenshot for debugging
screenshot_path = DATA_DIR / f"degewo_noiframe_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
# Save HTML for debugging
html_content = await page.content()
html_path = DATA_DIR / "degewo_debug.html"
with open(html_path, 'w', encoding='utf-8') as f:
f.write(html_content)
result["success"] = False
result["message"] = "Wohnungshelden iframe not found on page"
else:
result["message"] = "No kontaktieren button found"
logger.warning("[DEGEWO] Could not find kontaktieren button")
screenshot_path = DATA_DIR / f"degewo_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[DEGEWO] Exception: {str(e)}")
import traceback
logger.error(traceback.format_exc())
finally:
await page.close()
return result
async def _apply_gesobau(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[GESOBAU] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[GESOBAU] Page loaded")
await asyncio.sleep(2)
try:
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[GESOBAU] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
logger.info("[GESOBAU] Looking for application button...")
apply_btn = await page.query_selector('a:has-text("Anfragen"), button:has-text("Interesse"), a:has-text("Kontakt")')
if apply_btn and await apply_btn.is_visible():
logger.info("[GESOBAU] Found application button, clicking...")
await apply_btn.click()
await asyncio.sleep(2)
screenshot_path = DATA_DIR / f"gesobau_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
logger.info(f"[GESOBAU] Saved screenshot to {screenshot_path}")
result["success"] = True
result["message"] = "Application page opened"
else:
result["message"] = "No application button found"
logger.warning("[GESOBAU] Could not find application button")
screenshot_path = DATA_DIR / f"gesobau_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[GESOBAU] Exception: {str(e)}")
finally:
await page.close()
return result
async def _apply_stadtundland(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[STADTUNDLAND] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[STADTUNDLAND] Page loaded")
await asyncio.sleep(2)
try:
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[STADTUNDLAND] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
# Stadt und Land has the contact form directly on the page
logger.info("[STADTUNDLAND] Looking for contact form fields...")
form_filled = False
# Fill Vorname
try:
vorname_field = await page.query_selector('input[name*="vorname" i], input[placeholder*="Vorname" i], input#vorname')
if vorname_field:
await vorname_field.fill(FORM_VORNAME)
logger.info(f"[STADTUNDLAND] Filled Vorname: {FORM_VORNAME}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Vorname: {e}")
# Fill Nachname
try:
nachname_field = await page.query_selector('input[name*="nachname" i], input[placeholder*="Nachname" i], input#nachname')
if nachname_field:
await nachname_field.fill(FORM_NACHNAME)
logger.info(f"[STADTUNDLAND] Filled Nachname: {FORM_NACHNAME}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Nachname: {e}")
# Fill Telefonnummer
try:
tel_field = await page.query_selector('input[name*="telefon" i], input[type="tel"], input[placeholder*="Telefon" i]')
if tel_field:
await tel_field.fill(FORM_PHONE)
logger.info(f"[STADTUNDLAND] Filled Telefon: {FORM_PHONE}")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Telefon: {e}")
# Fill E-Mail
try:
email_field = await page.query_selector('input[type="email"], input[name*="email" i], input[name*="mail" i]')
if email_field:
await email_field.fill(FORM_EMAIL)
logger.info(f"[STADTUNDLAND] Filled E-Mail: {FORM_EMAIL}")
form_filled = True
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill E-Mail: {e}")
# Fill Straße (street)
try:
strasse_field = await page.query_selector('input[name*="strasse" i], input[name*="straße" i], input[placeholder*="Straße" i], input#strasse')
if strasse_field and FORM_STRASSE:
await strasse_field.fill(FORM_STRASSE)
logger.info(f"[STADTUNDLAND] Filled Straße: {FORM_STRASSE}")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Straße: {e}")
# Fill Hausnummer
try:
hausnummer_field = await page.query_selector('input[name*="hausnummer" i], input[name*="hausnr" i], input[placeholder*="Hausnummer" i], input#hausnummer')
if hausnummer_field and FORM_HAUSNUMMER:
await hausnummer_field.fill(FORM_HAUSNUMMER)
logger.info(f"[STADTUNDLAND] Filled Hausnummer: {FORM_HAUSNUMMER}")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Hausnummer: {e}")
# Fill PLZ
try:
plz_field = await page.query_selector('input[name*="plz" i], input[placeholder*="PLZ" i], input#plz')
if plz_field and FORM_PLZ:
await plz_field.fill(FORM_PLZ)
logger.info(f"[STADTUNDLAND] Filled PLZ: {FORM_PLZ}")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill PLZ: {e}")
# Fill Ort (city)
try:
ort_field = await page.query_selector('input[name*="ort" i], input[placeholder*="Ort" i], input#ort')
if ort_field and FORM_ORT:
await ort_field.fill(FORM_ORT)
logger.info(f"[STADTUNDLAND] Filled Ort: {FORM_ORT}")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not fill Ort: {e}")
# Check Datenschutz checkbox
try:
datenschutz_checkbox = await page.query_selector('input[type="checkbox"][name*="datenschutz" i], input[type="checkbox"][name*="privacy" i]')
if datenschutz_checkbox and not await datenschutz_checkbox.is_checked():
await datenschutz_checkbox.click()
logger.info("[STADTUNDLAND] Checked Datenschutz checkbox")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not check Datenschutz: {e}")
# Check Provision checkbox
try:
provision_checkbox = await page.query_selector('input[type="checkbox"][name*="provision" i]')
if provision_checkbox and not await provision_checkbox.is_checked():
await provision_checkbox.click()
logger.info("[STADTUNDLAND] Checked Provision checkbox")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not check Provision: {e}")
await asyncio.sleep(1)
# Screenshot before submitting
screenshot_path = DATA_DIR / f"stadtundland_form_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[STADTUNDLAND] Saved form screenshot to {screenshot_path}")
if form_filled:
# Submit the form - look for submit button
try:
submit_btn = await page.query_selector('button[type="submit"], input[type="submit"], button:has-text("prüfen"), button:has-text("Absenden"), button:has-text("Senden")')
if submit_btn and await submit_btn.is_visible():
await submit_btn.click()
logger.info("[STADTUNDLAND] Clicked submit button")
await asyncio.sleep(3)
# Screenshot after submission
screenshot_path = DATA_DIR / f"stadtundland_submitted_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
logger.info(f"[STADTUNDLAND] Saved submission screenshot to {screenshot_path}")
result["success"] = True
result["message"] = "Application submitted"
else:
result["success"] = True
result["message"] = "Form filled, submit button not found"
logger.warning("[STADTUNDLAND] Submit button not found")
except Exception as e:
result["success"] = True
result["message"] = f"Form filled, submit error: {str(e)}"
logger.warning(f"[STADTUNDLAND] Submit error: {e}")
else:
result["message"] = "No form fields found"
logger.warning("[STADTUNDLAND] Could not find form fields")
screenshot_path = DATA_DIR / f"stadtundland_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[STADTUNDLAND] Exception: {str(e)}")
finally:
await page.close()
return result
async def _apply_wbm(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[WBM] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[WBM] Page loaded")
await asyncio.sleep(2)
try:
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[WBM] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
logger.info("[WBM] Looking for application button...")
apply_btn = await page.query_selector('a:has-text("Anfragen"), button:has-text("Interesse"), a:has-text("Bewerben")')
if apply_btn and await apply_btn.is_visible():
logger.info("[WBM] Found application button, clicking...")
await apply_btn.click()
await asyncio.sleep(2)
screenshot_path = DATA_DIR / f"wbm_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
logger.info(f"[WBM] Saved screenshot to {screenshot_path}")
result["success"] = True
result["message"] = "Application page opened"
else:
result["message"] = "No application button found"
logger.warning("[WBM] Could not find application button")
screenshot_path = DATA_DIR / f"wbm_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[WBM] Exception: {str(e)}")
finally:
await page.close()
return result
class InBerlinMonitor:
def __init__(self):
self.browser = None
self.context = None
self.logged_in = False
self.application_handler = None
async def init_browser(self):
"""Initialize Playwright browser"""
if self.browser is None:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
self.context = await self.browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
self.application_handler = ApplicationHandler(self.context)
logger.info("Browser initialized")
def load_state(self) -> dict:
"""Load persistent state"""
if STATE_FILE.exists():
with open(STATE_FILE, "r") as f:
return json.load(f)
return {"autopilot": False}
def save_state(self, state: dict):
"""Save persistent state"""
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def set_autopilot(self, enabled: bool):
"""Enable or disable autopilot mode"""
state = self.load_state()
state["autopilot"] = enabled
self.save_state(state)
logger.info(f"Autopilot {'enabled' if enabled else 'disabled'}")
def is_autopilot_enabled(self) -> bool:
"""Check if autopilot mode is enabled"""
return self.load_state().get("autopilot", False)
def load_applications(self) -> dict:
"""Load application history"""
if APPLICATIONS_FILE.exists():
with open(APPLICATIONS_FILE, "r") as f:
return json.load(f)
return {}
def save_application(self, result: dict):
"""Save an application result"""
applications = self.load_applications()
applications[result["listing_id"]] = result
with open(APPLICATIONS_FILE, "w") as f:
json.dump(applications, f, indent=2, ensure_ascii=False)
def has_applied(self, listing_id: str) -> bool:
"""Check if we've already applied to this listing"""
return listing_id in self.load_applications()
async def dismiss_cookie_modal(self, page):
"""Dismiss the privacy/cookie consent modal if present"""
try:
# Wait a bit for modal to appear
await asyncio.sleep(2)
# Try to find and click the accept button in the privacy modal
# Look for common accept button patterns in German
accept_selectors = [
'button:has-text("Akzeptieren")',
'button:has-text("Alle akzeptieren")',
'button:has-text("Accept")',
'button:has-text("Zustimmen")',
'[x-show="showPrivacyModal"] button',
'.privacy-modal button',
'button.accept-cookies',
# More specific to inberlinwohnen
'div[x-show="showPrivacyModal"] button:first-of-type',
]
for selector in accept_selectors:
try:
button = await page.query_selector(selector)
if button and await button.is_visible():
await button.click()
logger.info(f"Clicked cookie accept button: {selector}")
await asyncio.sleep(1)
return True
except:
continue
# Try clicking any visible button in the modal overlay
modal = await page.query_selector('div[x-show="showPrivacyModal"]')
if modal:
buttons = await modal.query_selector_all('button')
for btn in buttons:
if await btn.is_visible():
text = await btn.inner_text()
logger.info(f"Found modal button: {text}")
# Click the first button (usually accept)
await btn.click()
await asyncio.sleep(1)
return True
logger.info("No cookie modal found or already dismissed")
return False
except Exception as e:
logger.debug(f"Cookie modal handling: {e}")
return False
async def login(self) -> bool:
"""Login to inberlinwohnen.de"""
if not INBERLIN_EMAIL or not INBERLIN_PASSWORD:
logger.warning("No credentials provided, using public listings")
return False
try:
page = await self.context.new_page()
await page.goto("https://www.inberlinwohnen.de/login", wait_until="networkidle")
# Handle cookie/privacy modal first
await self.dismiss_cookie_modal(page)
# Fill login form
await page.fill('input[name="email"], input[type="email"]', INBERLIN_EMAIL)
await page.fill('input[name="password"], input[type="password"]', INBERLIN_PASSWORD)
# Click submit button
await page.click('button[type="submit"], input[type="submit"]')
# Wait for navigation
await page.wait_for_load_state("networkidle")
await asyncio.sleep(2)
# Check if login successful
if "mein-bereich" in page.url or await page.query_selector('text="Abmelden"'):
logger.info("Login successful")
self.logged_in = True
await page.close()
return True
else:
logger.error(f"Login failed - ended up at {page.url}")
await page.close()
return False
except Exception as e:
logger.error(f"Login error: {e}")
return False
async def fetch_listings(self) -> list[dict]:
"""Fetch listings from the Wohnungsfinder"""
listings = []
try:
page = await self.context.new_page()
# Use personal Wohnungsfinder when logged in to see filtered listings
if self.logged_in:
url = "https://www.inberlinwohnen.de/mein-bereich/wohnungsfinder"
else:
url = "https://www.inberlinwohnen.de/wohnungsfinder/"
logger.info(f"Fetching listings from {url}")
await page.goto(url, wait_until="networkidle")
# Handle cookie modal if not logged in
if not self.logged_in:
await self.dismiss_cookie_modal(page)
# Wait for dynamic content to load - look for listing text pattern
try:
await page.wait_for_selector('text=/\\d,\\d\\s*Zimmer/', timeout=15000)
logger.info("Listings content loaded")
except:
logger.warning("Timeout waiting for listings content")
# Additional wait for initial listings to render
await asyncio.sleep(2)
# Collect all listings content by clicking through pagination
all_content = ""
page_num = 1
max_pages = 10 # Safety limit
while page_num <= max_pages:
# Get current page content
current_content = await page.content()
all_content += current_content
# Check for "next page" button (Livewire pagination)
next_btn = await page.query_selector('[wire\\:click*="nextPage"]')
if next_btn and await next_btn.is_visible():
await next_btn.click()
await asyncio.sleep(2) # Wait for Livewire to update
page_num += 1
else:
break
logger.info(f"Collected content from {page_num} page(s)")
content = all_content
# Debug: save HTML to file for inspection
debug_path = DATA_DIR / "debug_page.html"
with open(debug_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"Saved debug HTML to {debug_path}")
# Debug: Log page title and check for listing count
count_match = re.search(r'(\d+)\s*Wohnungen? für Sie gefunden', content)
if count_match:
logger.info(f"Page shows {count_match.group(1)} listings available")
# Also check for "Zeige X bis Y von Z Angeboten"
show_match = re.search(r'Zeige \d+ bis \d+ von (\d+) Angeboten', content)
if show_match:
logger.info(f"Page shows {show_match.group(1)} total offers")
# Decode HTML entities and JSON escaped slashes for extraction
content_decoded = html.unescape(content)
content_decoded = content_decoded.replace('\\/', '/')
# Build flatId -> deeplink mapping from wire:snapshot JSON data
# Format in HTML: "deeplink":"https://...","flatId":12345
deeplink_pattern = r'"deeplink":"(https://[^"]+)","flatId":(\d+)'
deeplink_matches = re.findall(deeplink_pattern, content_decoded)
id_to_link = {flat_id: link for link, flat_id in deeplink_matches}
logger.info(f"Found {len(id_to_link)} deeplink mappings")
# Extract listings from button elements with aria-label
# Format: @click="open !== 12345 ..." aria-label="Wohnungsangebot - 2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Adresse"
button_pattern = r'@click="open !== (\d+)[^"]*"[^>]*aria-label="Wohnungsangebot - ([^"]+)"'
button_matches = re.findall(button_pattern, content_decoded)
logger.info(f"Found {len(button_matches)} listing buttons")
for flat_id, listing_text in button_matches:
# Parse listing text: "2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Rhinstraße 4, 10315 Lichtenberg"
parts_match = re.match(r'(\d,\d)\s*Zimmer,\s*([\d,]+)\s*m²,\s*([\d.,]+)\s*€\s*(?:Kaltmiete\s*)?\|\s*(.+)', listing_text)
if not parts_match:
continue
rooms, size, price, address = parts_match.groups()
rooms = rooms.strip()
address = address.strip()
if len(address) < 5:
continue
# Get the deeplink for this flat
detail_link = id_to_link.get(flat_id, url)
listing_id = hashlib.md5(f"{rooms}{size}{price}{address}".encode()).hexdigest()[:12]
listings.append({
"id": listing_id,
"rooms": f"{rooms} Zimmer",
"size": f"{size}",
"price": f"{price}",
"address": address,
"link": detail_link,
"fetched_at": datetime.now().isoformat()
})
# Deduplicate by id
seen_ids = set()
unique_listings = []
for listing in listings:
if listing["id"] not in seen_ids:
seen_ids.add(listing["id"])
unique_listings.append(listing)
listings = unique_listings
await page.close()
logger.info(f"Fetched {len(listings)} unique listings")
return listings
except Exception as e:
logger.error(f"Error fetching listings: {e}")
import traceback
logger.error(traceback.format_exc())
return []
def load_previous_listings(self) -> dict:
"""Load previously saved listings"""
if LISTINGS_FILE.exists():
with open(LISTINGS_FILE, "r") as f:
return json.load(f)
return {}
def save_listings(self, listings: list[dict]):
"""Save current listings"""
listings_dict = {l["id"]: l for l in listings}
with open(LISTINGS_FILE, "w") as f:
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
"""Find listings that are new since last check"""
new = []
for listing in current:
if listing["id"] not in previous:
new.append(listing)
return new
def send_telegram(self, message: str):
"""Send notification via Telegram"""
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
logger.warning("Telegram not configured, skipping notification")
return
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": "HTML",
"disable_web_page_preview": True
}
response = requests.post(url, data=data)
if response.ok:
logger.info("Telegram notification sent")
else:
logger.error(f"Telegram error: {response.text}")
except Exception as e:
logger.error(f"Telegram error: {e}")
def log_listing_times(self, new_listings: list[dict]):
"""Log new listing appearance times to CSV for later analysis"""
if not new_listings:
return
import csv
file_exists = TIMING_FILE.exists()
with open(TIMING_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
now = datetime.now()
for listing in new_listings:
writer.writerow([
now.isoformat(),
now.strftime("%A"), # Weekday name
now.hour,
now.minute,
listing["rooms"],
listing["size"],
listing["price"],
listing["address"],
listing["id"]
])
logger.info(f"Logged {len(new_listings)} listing times to CSV")
def notify_new_listings(self, new_listings: list[dict], application_results: dict = None):
"""Send individual notification for each new listing"""
if not new_listings:
return
for listing in new_listings:
link = listing.get('link', 'https://www.inberlinwohnen.de/wohnungsfinder/')
message = f"🏠 <b>Neue Wohnung!</b>\n\n"
message += f"🚪 <b>{listing['rooms']}</b>\n"
message += f"📐 {listing['size']}\n"
message += f"💰 {listing['price']}\n"
message += f"📍 {listing['address']}\n\n"
message += f"👉 <a href=\"{link}\">Alle Details</a>"
# Add autopilot status if application was attempted
if application_results and listing["id"] in application_results:
result = application_results[listing["id"]]
if result["success"]:
message += f"\n\n🤖 <b>Auto-applied!</b> ({result['company']})"
if result["message"]:
message += f"\n<i>{result['message']}</i>"
else:
message += f"\n\n⚠️ <b>Auto-apply failed</b> ({result['company']})"
if result["message"]:
message += f"\n<i>{result['message']}</i>"
self.send_telegram(message)
time.sleep(0.5)
async def apply_to_listings(self, listings: list[dict]) -> dict:
"""Apply to multiple listings, returns results dict"""
results = {}
for listing in listings:
if self.has_applied(listing["id"]):
logger.info(f"Already applied to {listing['id']}, skipping")
continue
result = await self.application_handler.apply(listing)
results[listing["id"]] = result
self.save_application(result)
status = "" if result["success"] else ""
logger.info(f"Application {status}: {listing['address']} - {result['message']}")
await asyncio.sleep(2)
return results
def check(self):
"""Run a single check for new listings"""
logger.info("Starting check...")
# Login if credentials provided
if not self.logged_in and INBERLIN_EMAIL:
asyncio.get_event_loop().run_until_complete(self._async_login())
# Fetch current listings
current_listings = asyncio.get_event_loop().run_until_complete(self._async_fetch())
if not current_listings:
logger.warning("No listings fetched")
return
# Load previous listings
previous_listings = self.load_previous_listings()
# First run - just save baseline
if not previous_listings:
logger.info(f"First run - saving {len(current_listings)} listings as baseline")
self.save_listings(current_listings)
return
# Find new listings
new_listings = self.find_new_listings(current_listings, previous_listings)
application_results = {}
if new_listings:
logger.info(f"Found {len(new_listings)} new listing(s)")
self.log_listing_times(new_listings)
# Apply automatically if autopilot is enabled
if self.is_autopilot_enabled():
logger.info("Autopilot enabled - applying to listings...")
application_results = asyncio.get_event_loop().run_until_complete(
self._async_apply(new_listings)
)
self.notify_new_listings(new_listings, application_results)
else:
logger.info("No new listings")
# Save current state
self.save_listings(current_listings)
async def _async_login(self):
await self.init_browser()
await self.login()
async def _async_fetch(self):
await self.init_browser()
return await self.fetch_listings()
async def _async_apply(self, listings: list[dict]):
await self.init_browser()
return await self.apply_to_listings(listings)
class WGCompanyMonitor:
"""Monitor WGcompany.de for new WG room listings"""
def __init__(self):
self.browser = None
self.context = None
async def init_browser(self):
"""Initialize Playwright browser"""
if self.browser is None:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
self.context = await self.browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
logger.info("[WGCOMPANY] Browser initialized")
async def fetch_listings(self) -> list[dict]:
"""Fetch WG listings from wgcompany.de search"""
listings = []
try:
page = await self.context.new_page()
# Use simple search page: st=1 (Berlin), mi=10 (simple WG search), li=100
search_url = "http://www.wgcompany.de/cgi-bin/seite?st=1&mi=10&li=100"
logger.info(f"[WGCOMPANY] Loading search page: {search_url}")
await page.goto(search_url, wait_until="networkidle")
await asyncio.sleep(2)
# Fill search form - field names from simple search:
# c = Min. Größe (min size m²)
# a = Max. Miete (max rent €)
# l = Alter (age)
# e = Bezirk (district select)
# Min size field
if WGCOMPANY_MIN_SIZE:
min_size_field = await page.query_selector('input[name="c"]')
if min_size_field:
await min_size_field.fill(WGCOMPANY_MIN_SIZE)
logger.info(f"[WGCOMPANY] Set min size: {WGCOMPANY_MIN_SIZE}")
# Max rent field
if WGCOMPANY_MAX_PRICE:
max_price_field = await page.query_selector('input[name="a"]')
if max_price_field:
await max_price_field.fill(WGCOMPANY_MAX_PRICE)
logger.info(f"[WGCOMPANY] Set max rent: {WGCOMPANY_MAX_PRICE}")
# Age field (l = Alter)
if WGCOMPANY_AGE:
age_field = await page.query_selector('input[name="l"]')
if age_field:
await age_field.fill(WGCOMPANY_AGE)
logger.info(f"[WGCOMPANY] Set age: {WGCOMPANY_AGE}")
# Smoker filter (o = RaucherIn: NR=Nichtraucher, R=Raucher)
if WGCOMPANY_SMOKER:
smoker_select = await page.query_selector('select[name="o"]')
if smoker_select:
await smoker_select.select_option(WGCOMPANY_SMOKER)
logger.info(f"[WGCOMPANY] Set smoker: {WGCOMPANY_SMOKER}")
# District selection (e = Bezirk, multi-select)
# Leave as default "egal" (all districts) unless specified
if WGCOMPANY_BEZIRK and WGCOMPANY_BEZIRK != "0":
bezirk_select = await page.query_selector('select[name="e"]')
if bezirk_select:
await bezirk_select.select_option(WGCOMPANY_BEZIRK)
logger.info(f"[WGCOMPANY] Set district: {WGCOMPANY_BEZIRK}")
# Submit the search form
submit_btn = await page.query_selector('input[type="submit"][value*="finde"], input[type="submit"]')
if submit_btn:
logger.info("[WGCOMPANY] Submitting search form...")
await submit_btn.click()
await page.wait_for_load_state("networkidle")
await asyncio.sleep(2)
# Get results page content
content = await page.content()
# Save debug HTML
debug_path = DATA_DIR / "wgcompany_debug.html"
with open(debug_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"[WGCOMPANY] Saved debug HTML to {debug_path}")
# Parse listings from the results page
# WGcompany results typically have tables with room info
# Look for listing links and extract data
# Pattern to find listing detail links
# Format: wg.pl?...function=wgzeigen... with room details in table rows
listing_links = await page.query_selector_all('a[href*="wg.pl"][href*="wgzeigen"]')
logger.info(f"[WGCOMPANY] Found {len(listing_links)} listing links")
for link_elem in listing_links:
try:
href = await link_elem.get_attribute("href")
if not href:
continue
# Get surrounding text/row for listing details
parent = await link_elem.evaluate_handle("el => el.closest('tr') || el.parentElement")
row_text = await parent.evaluate("el => el.innerText") if parent else ""
# Extract price from row text (e.g., "350 €" or "350€")
price_match = re.search(r'(\d+)\s*€', row_text)
price = price_match.group(1) + "" if price_match else "?"
# Extract size (e.g., "15 m²" or "15m²")
size_match = re.search(r'(\d+)\s*m²', row_text)
size = size_match.group(1) + "" if size_match else "?"
# Extract district/location
# Common Berlin districts in text
bezirk_patterns = [
"Kreuzberg", "Neukölln", "Friedrichshain", "Prenzlauer Berg",
"Mitte", "Wedding", "Charlottenburg", "Schöneberg", "Tempelhof",
"Steglitz", "Wilmersdorf", "Pankow", "Lichtenberg", "Treptow",
"Köpenick", "Reinickendorf", "Spandau", "Zehlendorf", "Moabit"
]
location = "Berlin"
for bez in bezirk_patterns:
if bez.lower() in row_text.lower():
location = bez
break
# Make absolute URL
if not href.startswith("http"):
href = f"http://www.wgcompany.de{href}" if href.startswith("/") else f"http://www.wgcompany.de/cgi-bin/{href}"
# Generate unique ID from link and key details
listing_id = hashlib.md5(f"{href}{price}{size}".encode()).hexdigest()[:12]
listings.append({
"id": listing_id,
"rooms": "1 Zimmer (WG)",
"size": size,
"price": price,
"address": location,
"link": href,
"source": "wgcompany",
"fetched_at": datetime.now().isoformat()
})
except Exception as e:
logger.debug(f"[WGCOMPANY] Error parsing listing: {e}")
continue
# Deduplicate by id
seen_ids = set()
unique_listings = []
for listing in listings:
if listing["id"] not in seen_ids:
seen_ids.add(listing["id"])
unique_listings.append(listing)
listings = unique_listings
await page.close()
logger.info(f"[WGCOMPANY] Fetched {len(listings)} unique listings")
return listings
except Exception as e:
logger.error(f"[WGCOMPANY] Error fetching listings: {e}")
import traceback
logger.error(traceback.format_exc())
return []
def load_previous_listings(self) -> dict:
"""Load previously saved WGcompany listings"""
if WGCOMPANY_LISTINGS_FILE.exists():
with open(WGCOMPANY_LISTINGS_FILE, "r") as f:
return json.load(f)
return {}
def save_listings(self, listings: list[dict]):
"""Save current WGcompany listings"""
listings_dict = {l["id"]: l for l in listings}
with open(WGCOMPANY_LISTINGS_FILE, "w") as f:
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
"""Find listings that are new since last check"""
new = []
for listing in current:
if listing["id"] not in previous:
new.append(listing)
return new
def send_telegram(self, message: str):
"""Send notification via Telegram"""
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
logger.warning("[WGCOMPANY] Telegram not configured, skipping notification")
return
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": "HTML",
"disable_web_page_preview": True
}
response = requests.post(url, data=data)
if response.ok:
logger.info("[WGCOMPANY] Telegram notification sent")
else:
logger.error(f"[WGCOMPANY] Telegram error: {response.text}")
except Exception as e:
logger.error(f"[WGCOMPANY] Telegram error: {e}")
def log_listing_times(self, new_listings: list[dict]):
"""Log new WGcompany listing appearance times to CSV"""
if not new_listings:
return
file_exists = WGCOMPANY_TIMING_FILE.exists()
with open(WGCOMPANY_TIMING_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
now = datetime.now()
for listing in new_listings:
writer.writerow([
now.isoformat(),
now.strftime("%A"),
now.hour,
now.minute,
listing["rooms"],
listing["size"],
listing["price"],
listing["address"],
listing["id"]
])
logger.info(f"[WGCOMPANY] Logged {len(new_listings)} listing times to CSV")
def notify_new_listings(self, new_listings: list[dict]):
"""Send individual notification for each new WGcompany listing"""
if not new_listings:
return
for listing in new_listings:
message = f"🏠 <b>Neues WG-Zimmer!</b> (WGcompany)\n\n"
message += f"🚪 <b>{listing['rooms']}</b>\n"
message += f"📐 {listing['size']}\n"
message += f"💰 {listing['price']}\n"
message += f"📍 {listing['address']}\n\n"
message += f"👉 <a href=\"{listing['link']}\">Zum Angebot</a>"
self.send_telegram(message)
time.sleep(0.5)
def check(self):
"""Run a single check for new WGcompany listings"""
logger.info("[WGCOMPANY] Starting check...")
# Fetch current listings
current_listings = asyncio.get_event_loop().run_until_complete(self._async_fetch())
if not current_listings:
logger.warning("[WGCOMPANY] No listings fetched")
return
# Load previous listings
previous_listings = self.load_previous_listings()
# First run - just save baseline
if not previous_listings:
logger.info(f"[WGCOMPANY] First run - saving {len(current_listings)} listings as baseline")
self.save_listings(current_listings)
return
# Find new listings
new_listings = self.find_new_listings(current_listings, previous_listings)
if new_listings:
logger.info(f"[WGCOMPANY] Found {len(new_listings)} new listing(s)")
self.log_listing_times(new_listings)
self.notify_new_listings(new_listings)
else:
logger.info("[WGCOMPANY] No new listings")
# Save current state
self.save_listings(current_listings)
async def _async_fetch(self):
await self.init_browser()
return await self.fetch_listings()
def main():
"""Main entry point"""
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Initialize monitors
inberlin_monitor = InBerlinMonitor()
wgcompany_monitor = WGCompanyMonitor() if WGCOMPANY_ENABLED else None
# Start Telegram command listener
telegram_bot = TelegramBot(inberlin_monitor)
telegram_bot.start()
logger.info(f"Monitor started (interval: {CHECK_INTERVAL}s)")
logger.info(f"InBerlin Autopilot: {'ENABLED' if inberlin_monitor.is_autopilot_enabled() else 'DISABLED'}")
logger.info(f"WGcompany: {'ENABLED' if WGCOMPANY_ENABLED else 'DISABLED'}")
while True:
# Check InBerlinWohnen
try:
inberlin_monitor.check()
except Exception as e:
logger.error(f"InBerlin check failed: {e}")
# Check WGcompany
if wgcompany_monitor:
try:
wgcompany_monitor.check()
except Exception as e:
logger.error(f"WGcompany check failed: {e}")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()