import os import json import hashlib import logging import asyncio import re import html import threading import time import csv from datetime import datetime from pathlib import Path import requests import pandas as pd import matplotlib matplotlib.use('Agg') # Use non-interactive backend import matplotlib.pyplot as plt from playwright.async_api import async_playwright # Configuration from environment TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "") INBERLIN_EMAIL = os.environ.get("INBERLIN_EMAIL", "") INBERLIN_PASSWORD = os.environ.get("INBERLIN_PASSWORD", "") CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "300")) # seconds (5 minutes) # Form data for applications FORM_ANREDE = os.environ.get("FORM_ANREDE", "Herr") FORM_VORNAME = os.environ.get("FORM_VORNAME", "Aron") FORM_NACHNAME = os.environ.get("FORM_NACHNAME", "Petau") FORM_EMAIL = os.environ.get("FORM_EMAIL", "aron@petau.net") FORM_PHONE = os.environ.get("FORM_PHONE", "017695773688") FORM_PERSONS = os.environ.get("FORM_PERSONS", "1") FORM_CHILDREN = os.environ.get("FORM_CHILDREN", "0") FORM_INCOME = os.environ.get("FORM_INCOME", "1600") DATA_DIR = Path("/data") LISTINGS_FILE = DATA_DIR / "listings.json" LOG_FILE = DATA_DIR / "monitor.log" TIMING_FILE = DATA_DIR / "listing_times.csv" STATE_FILE = DATA_DIR / "state.json" APPLICATIONS_FILE = DATA_DIR / "applications.json" # Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class TelegramBot: """Handle Telegram commands for controlling the monitor""" def __init__(self, monitor): self.monitor = monitor self.last_update_id = 0 self.running = False def start(self): if not TELEGRAM_BOT_TOKEN: logger.warning("Telegram bot token not configured, commands disabled") return self.running = True thread = threading.Thread(target=self._poll_updates, daemon=True) thread.start() logger.info("Telegram command listener started") def stop(self): self.running = False def _poll_updates(self): while self.running: try: url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates" params = {"offset": self.last_update_id + 1, "timeout": 30} response = requests.get(url, params=params, timeout=35) if response.ok: data = response.json() if data.get("ok") and data.get("result"): for update in data["result"]: self.last_update_id = update["update_id"] self._handle_update(update) except requests.exceptions.Timeout: continue except Exception as e: logger.error(f"Telegram polling error: {e}") time.sleep(5) def _handle_update(self, update): message = update.get("message", {}) text = message.get("text", "") chat_id = str(message.get("chat", {}).get("id", "")) if chat_id != TELEGRAM_CHAT_ID: logger.debug(f"Ignoring message from unknown chat: {chat_id}") return logger.info(f"Received Telegram command: {text}") if text.startswith("/autopilot"): self._handle_autopilot_command(text) elif text == "/status": self._handle_status_command() elif text == "/help": self._handle_help_command() elif text == "/plot": self._handle_plot_command() else: logger.debug(f"Unknown command: {text}") def _handle_autopilot_command(self, text): logger.info(f"Processing autopilot command: {text}") parts = text.split() if len(parts) < 2: self._send_message("Usage: /autopilot on|off") return action = parts[1].lower() if action == "on": logger.info("Enabling autopilot mode") self.monitor.set_autopilot(True) self._send_message("🤖 Autopilot ENABLED\n\nI will automatically apply to new listings!") elif action == "off": self.monitor.set_autopilot(False) self._send_message("🛑 Autopilot DISABLED\n\nI will only notify you of new listings.") else: self._send_message("Usage: /autopilot on|off") def _handle_status_command(self): state = self.monitor.load_state() autopilot = state.get("autopilot", False) applications = self.monitor.load_applications() status = "🤖 Autopilot: " + ("ON ✅" if autopilot else "OFF ❌") status += f"\n📝 Applications sent: {len(applications)}" by_company = {} for app in applications.values(): company = app.get("company", "unknown") by_company[company] = by_company.get(company, 0) + 1 if by_company: status += "\n\nBy company:" for company, count in sorted(by_company.items()): status += f"\n • {company}: {count}" self._send_message(status) def _handle_help_command(self): help_text = """🏠 InBerlin Monitor Commands /autopilot on - Enable automatic applications /autopilot off - Disable automatic applications /status - Show current status and stats /plot - Show weekly listing patterns /help - Show this help message When autopilot is ON, I will automatically apply to new listings.""" self._send_message(help_text) def _handle_plot_command(self): """Generate and send a plot of listing times""" logger.info("Generating listing times plot...") try: plot_path = self._generate_weekly_plot() if plot_path: self._send_photo(plot_path, "📊 Weekly Listing Patterns\n\nThis shows when new listings typically appear throughout the week.") else: self._send_message("📊 Not enough data to generate plot yet. Keep monitoring!") except Exception as e: logger.error(f"Error generating plot: {e}") import traceback logger.error(traceback.format_exc()) self._send_message(f"❌ Error generating plot: {str(e)}") def _generate_weekly_plot(self) -> str: """Generate a heatmap of listings by day of week and hour""" if not TIMING_FILE.exists(): logger.warning("No timing data file found") return None try: df = pd.read_csv(TIMING_FILE) if len(df) < 1: logger.warning("Timing file is empty") return None logger.info(f"Loaded {len(df)} listing records for plot") # Create day-hour matrix days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] # Count listings per day and hour heatmap_data = pd.DataFrame(0, index=days_order, columns=range(24)) for _, row in df.iterrows(): day = row['weekday'] hour = int(row['hour']) if day in days_order: heatmap_data.loc[day, hour] += 1 # Create figure with two subplots fig, axes = plt.subplots(2, 2, figsize=(14, 10)) fig.suptitle('Listing Appearance Patterns', fontsize=16, fontweight='bold') # 1. Heatmap - Day vs Hour ax1 = axes[0, 0] im = ax1.imshow(heatmap_data.values, cmap='YlOrRd', aspect='auto') ax1.set_xticks(range(24)) ax1.set_xticklabels(range(24), fontsize=8) ax1.set_yticks(range(7)) ax1.set_yticklabels(days_order) ax1.set_xlabel('Hour of Day') ax1.set_ylabel('Day of Week') ax1.set_title('Listings by Day & Hour') plt.colorbar(im, ax=ax1, label='Count') # 2. Bar chart - By day of week ax2 = axes[0, 1] day_counts = df['weekday'].value_counts().reindex(days_order, fill_value=0) colors = plt.cm.Blues(day_counts / day_counts.max() if day_counts.max() > 0 else day_counts) bars = ax2.bar(range(7), day_counts.values, color=colors) ax2.set_xticks(range(7)) ax2.set_xticklabels(['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']) ax2.set_xlabel('Day of Week') ax2.set_ylabel('Number of Listings') ax2.set_title('Total Listings by Day') for i, v in enumerate(day_counts.values): if v > 0: ax2.text(i, v + 0.1, str(v), ha='center', fontsize=9) # 3. Line chart - By hour ax3 = axes[1, 0] hour_counts = df['hour'].value_counts().reindex(range(24), fill_value=0) ax3.plot(range(24), hour_counts.values, marker='o', linewidth=2, markersize=4, color='#2E86AB') ax3.fill_between(range(24), hour_counts.values, alpha=0.3, color='#2E86AB') ax3.set_xticks(range(0, 24, 2)) ax3.set_xlabel('Hour of Day') ax3.set_ylabel('Number of Listings') ax3.set_title('Total Listings by Hour') ax3.grid(True, alpha=0.3) # 4. Summary stats ax4 = axes[1, 1] ax4.axis('off') # Calculate best times best_day = day_counts.idxmax() if day_counts.max() > 0 else "N/A" best_hour = hour_counts.idxmax() if hour_counts.max() > 0 else "N/A" total_listings = len(df) # Find peak combinations peak_combo = heatmap_data.stack().idxmax() if heatmap_data.values.max() > 0 else ("N/A", "N/A") stats_text = f"""📊 Summary Statistics Total listings tracked: {total_listings} 🏆 Best day: {best_day} ⏰ Best hour: {best_hour}:00 🎯 Peak time: {peak_combo[0]} at {peak_combo[1]}:00 📈 Average per day: {total_listings/7:.1f} 📅 Data collection period: From: {df['timestamp'].min()[:10] if 'timestamp' in df.columns else 'N/A'} To: {df['timestamp'].max()[:10] if 'timestamp' in df.columns else 'N/A'} """ ax4.text(0.1, 0.9, stats_text, transform=ax4.transAxes, fontsize=11, verticalalignment='top', fontfamily='monospace', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) plt.tight_layout() # Save plot plot_path = DATA_DIR / "weekly_plot.png" plt.savefig(plot_path, dpi=150, bbox_inches='tight') plt.close() logger.info(f"Plot saved to {plot_path}") return str(plot_path) except Exception as e: logger.error(f"Error creating plot: {e}") import traceback logger.error(traceback.format_exc()) return None def _send_message(self, text): try: url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" data = {"chat_id": TELEGRAM_CHAT_ID, "text": text, "parse_mode": "HTML", "disable_web_page_preview": True} requests.post(url, data=data) except Exception as e: logger.error(f"Failed to send Telegram message: {e}") def _send_photo(self, photo_path: str, caption: str = ""): """Send a photo via Telegram""" try: url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" with open(photo_path, 'rb') as photo: files = {'photo': photo} data = {"chat_id": TELEGRAM_CHAT_ID, "caption": caption, "parse_mode": "HTML"} response = requests.post(url, data=data, files=files) if response.ok: logger.info(f"Photo sent successfully: {photo_path}") else: logger.error(f"Failed to send photo: {response.text}") except Exception as e: logger.error(f"Failed to send Telegram photo: {e}") class ApplicationHandler: """Handle automatic applications to different housing companies""" def __init__(self, browser_context): self.context = browser_context async def apply(self, listing: dict) -> dict: link = listing.get("link", "") company = self._detect_company(link) logger.info(f"Starting application process for {company}: {listing['address']}") logger.info(f"Listing details - ID: {listing['id']}, Rooms: {listing['rooms']}, Price: {listing['price']}") logger.info(f"Detail link: {link}") result = {"listing_id": listing["id"], "company": company, "link": link, "timestamp": datetime.now().isoformat(), "success": False, "message": "", "address": listing.get("address", ""), "rooms": listing.get("rooms", ""), "price": listing.get("price", "")} try: if company == "howoge": result = await self._apply_howoge(listing, result) elif company == "gewobag": result = await self._apply_gewobag(listing, result) elif company == "degewo": result = await self._apply_degewo(listing, result) elif company == "gesobau": result = await self._apply_gesobau(listing, result) elif company == "stadtundland": result = await self._apply_stadtundland(listing, result) elif company == "wbm": result = await self._apply_wbm(listing, result) else: result["message"] = f"Unknown company: {company}" logger.warning(f"No application handler for company: {company}") except Exception as e: result["message"] = str(e) logger.error(f"Application error for {company}: {e}") import traceback logger.error(traceback.format_exc()) # Log final result status = "SUCCESS" if result["success"] else "FAILED" logger.info(f"Application {status} for {listing['address']} ({company}): {result['message']}") return result def _detect_company(self, link: str) -> str: if "howoge.de" in link: return "howoge" elif "gewobag.de" in link: return "gewobag" elif "degewo.de" in link: return "degewo" elif "gesobau.de" in link: return "gesobau" elif "stadtundland.de" in link: return "stadtundland" elif "wbm.de" in link: return "wbm" return "unknown" async def _apply_howoge(self, listing: dict, result: dict) -> dict: page = await self.context.new_page() try: logger.info(f"[HOWOGE] Opening page: {listing['link']}") await page.goto(listing["link"], wait_until="networkidle") logger.info("[HOWOGE] Page loaded") await asyncio.sleep(2) # Handle cookies try: cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")') if cookie_btn and await cookie_btn.is_visible(): await cookie_btn.click() logger.info("[HOWOGE] Dismissed cookie banner") await asyncio.sleep(1) except: pass # Look for "Besichtigung vereinbaren" button logger.info("[HOWOGE] Looking for 'Besichtigung vereinbaren' button...") apply_btn = await page.query_selector('a:has-text("Besichtigung vereinbaren"), button:has-text("Besichtigung vereinbaren"), a:has-text("Anfragen"), button:has-text("Anfragen")') if apply_btn and await apply_btn.is_visible(): logger.info("[HOWOGE] Found application button, clicking...") await apply_btn.click() await asyncio.sleep(3) await page.wait_for_load_state("networkidle") logger.info("[HOWOGE] Clicked button, waiting for form...") # Screenshot after clicking screenshot_path = DATA_DIR / f"howoge_form_{listing['id']}.png" await page.screenshot(path=str(screenshot_path)) logger.info(f"[HOWOGE] Saved form screenshot to {screenshot_path}") # Fill in the contact form # Look for name fields (Vorname, Nachname) vorname_field = await page.query_selector('input[name*="vorname" i], input[name*="firstname" i], input[placeholder*="Vorname" i], input#vorname') nachname_field = await page.query_selector('input[name*="nachname" i], input[name*="lastname" i], input[name*="surname" i], input[placeholder*="Nachname" i], input#nachname') email_field = await page.query_selector('input[type="email"], input[name*="email" i], input[name*="mail" i]') form_filled = False if vorname_field: await vorname_field.fill(FORM_VORNAME) logger.info(f"[HOWOGE] Filled Vorname: {FORM_VORNAME}") form_filled = True if nachname_field: await nachname_field.fill(FORM_NACHNAME) logger.info(f"[HOWOGE] Filled Nachname: {FORM_NACHNAME}") form_filled = True if email_field: await email_field.fill(FORM_EMAIL) logger.info(f"[HOWOGE] Filled Email: {FORM_EMAIL}") form_filled = True # Also look for phone field phone_field = await page.query_selector('input[type="tel"], input[name*="telefon" i], input[name*="phone" i]') if phone_field: await phone_field.fill(FORM_PHONE) logger.info(f"[HOWOGE] Filled Phone: {FORM_PHONE}") # Screenshot after filling form screenshot_path2 = DATA_DIR / f"howoge_filled_{listing['id']}.png" await page.screenshot(path=str(screenshot_path2)) logger.info(f"[HOWOGE] Saved filled form screenshot to {screenshot_path2}") if form_filled: # Look for submit button submit_btn = await page.query_selector('button[type="submit"], input[type="submit"], button:has-text("Absenden"), button:has-text("Senden"), button:has-text("Anfrage")') if submit_btn and await submit_btn.is_visible(): logger.info("[HOWOGE] Found submit button, clicking...") await submit_btn.click() await asyncio.sleep(3) await page.wait_for_load_state("networkidle") # Screenshot after submit screenshot_path3 = DATA_DIR / f"howoge_submitted_{listing['id']}.png" await page.screenshot(path=str(screenshot_path3)) logger.info(f"[HOWOGE] Saved post-submit screenshot to {screenshot_path3}") content = await page.content() if "erfolgreich" in content.lower() or "gesendet" in content.lower() or "danke" in content.lower(): result["success"] = True result["message"] = "Application submitted successfully" logger.info("[HOWOGE] Success! Confirmation message detected") else: result["success"] = True result["message"] = "Form submitted, awaiting confirmation" logger.info("[HOWOGE] Form submitted but no clear confirmation") else: result["success"] = False result["message"] = "Form filled but no submit button found" logger.warning("[HOWOGE] Could not find submit button") else: result["success"] = False result["message"] = "Could not find form fields to fill" logger.warning("[HOWOGE] No form fields found") else: result["message"] = "No application button found" logger.warning("[HOWOGE] Could not find 'Besichtigung vereinbaren' button") # Save screenshot for debugging screenshot_path = DATA_DIR / f"howoge_nobtn_{listing['id']}.png" await page.screenshot(path=str(screenshot_path)) # Log all buttons on page for debugging buttons = await page.query_selector_all('button, a.btn, a[class*="button"]') for btn in buttons[:10]: try: text = await btn.inner_text() logger.info(f"[HOWOGE] Found button: {text[:50]}") except: pass except Exception as e: result["message"] = f"Error: {str(e)}" logger.error(f"[HOWOGE] Exception: {str(e)}") import traceback logger.error(traceback.format_exc()) finally: await page.close() return result async def _apply_gewobag(self, listing: dict, result: dict) -> dict: page = await self.context.new_page() try: logger.info(f"[GEWOBAG] Opening page: {listing['link']}") await page.goto(listing["link"], wait_until="networkidle") logger.info("[GEWOBAG] Page loaded") await asyncio.sleep(2) try: cookie_btn = await page.query_selector('#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll, button:has-text("Alle akzeptieren")') if cookie_btn and await cookie_btn.is_visible(): await cookie_btn.click() logger.info("[GEWOBAG] Dismissed cookie banner") await asyncio.sleep(1) except: pass logger.info("[GEWOBAG] Looking for application button...") apply_btn = await page.query_selector('a:has-text("Kontakt"), button:has-text("Anfrage"), a.btn:has-text("Anfragen")') if apply_btn and await apply_btn.is_visible(): logger.info("[GEWOBAG] Found application button, clicking...") await apply_btn.click() await asyncio.sleep(2) screenshot_path = DATA_DIR / f"gewobag_{listing['id']}.png" await page.screenshot(path=str(screenshot_path)) logger.info(f"[GEWOBAG] Saved screenshot to {screenshot_path}") result["success"] = True result["message"] = "Application page opened" else: result["message"] = "No application button found" logger.warning("[GEWOBAG] Could not find application button") screenshot_path = DATA_DIR / f"gewobag_nobtn_{listing['id']}.png" await page.screenshot(path=str(screenshot_path)) except Exception as e: result["message"] = f"Error: {str(e)}" logger.error(f"[GEWOBAG] Exception: {str(e)}") finally: await page.close() return result async def _apply_degewo(self, listing: dict, result: dict) -> dict: page = await self.context.new_page() try: logger.info(f"[DEGEWO] Opening page: {listing['link']}") await page.goto(listing["link"], wait_until="networkidle") logger.info("[DEGEWO] Page loaded") await asyncio.sleep(2) try: cookie_btn = await page.query_selector('button:has-text("Alle akzeptieren"), #CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll') if cookie_btn and await cookie_btn.is_visible(): await cookie_btn.click() logger.info("[DEGEWO] Dismissed cookie banner") await asyncio.sleep(1) except: pass logger.info("[DEGEWO] Looking for kontaktieren button...") apply_btn = await page.query_selector('a:has-text("kontaktieren"), button:has-text("kontaktieren"), a:has-text("Kontaktieren"), button:has-text("Kontaktieren")') if apply_btn and await apply_btn.is_visible(): logger.info("[DEGEWO] Found kontaktieren button, clicking...") await apply_btn.click() await asyncio.sleep(3) # Fill out the contact form logger.info("[DEGEWO] Filling out contact form...") # Anrede - select from env try: anrede_select = await page.query_selector('select[name*="anrede"], select[name*="salutation"], select[id*="anrede"]') if anrede_select: await anrede_select.select_option(label=FORM_ANREDE) logger.info(f"[DEGEWO] Selected Anrede: {FORM_ANREDE}") else: # Try radio button anrede_radio = await page.query_selector(f'input[type="radio"][value="{FORM_ANREDE}"], label:has-text("{FORM_ANREDE}") input[type="radio"]') if anrede_radio: await anrede_radio.click() logger.info(f"[DEGEWO] Clicked Anrede radio: {FORM_ANREDE}") except Exception as e: logger.warning(f"[DEGEWO] Could not set Anrede: {e}") # Vorname try: vorname_field = await page.query_selector('input[name*="vorname"], input[name*="firstname"], input[id*="vorname"], input[placeholder*="Vorname"]') if vorname_field: await vorname_field.fill(FORM_VORNAME) logger.info(f"[DEGEWO] Filled Vorname: {FORM_VORNAME}") except Exception as e: logger.warning(f"[DEGEWO] Could not fill Vorname: {e}") # Nachname try: nachname_field = await page.query_selector('input[name*="nachname"], input[name*="lastname"], input[id*="nachname"], input[placeholder*="Nachname"]') if nachname_field: await nachname_field.fill(FORM_NACHNAME) logger.info(f"[DEGEWO] Filled Nachname: {FORM_NACHNAME}") except Exception as e: logger.warning(f"[DEGEWO] Could not fill Nachname: {e}") # E-Mail try: email_field = await page.query_selector('input[type="email"], input[name*="email"], input[name*="mail"], input[id*="email"]') if email_field: await email_field.fill(FORM_EMAIL) logger.info(f"[DEGEWO] Filled E-Mail: {FORM_EMAIL}") except Exception as e: logger.warning(f"[DEGEWO] Could not fill E-Mail: {e}") # Telefonnummer try: tel_field = await page.query_selector('input[type="tel"], input[name*="telefon"], input[name*="phone"], input[id*="telefon"]') if tel_field: await tel_field.fill(FORM_PHONE) logger.info(f"[DEGEWO] Filled Telefonnummer: {FORM_PHONE}") except Exception as e: logger.warning(f"[DEGEWO] Could not handle Telefon: {e}") # Anzahl einziehende Personen try: personen_field = await page.query_selector('input[name*="personen"], input[name*="persons"], input[id*="personen"], select[name*="personen"]') if personen_field: tag_name = await personen_field.evaluate("el => el.tagName.toLowerCase()") if tag_name == "select": await personen_field.select_option(FORM_PERSONS) else: await personen_field.fill(FORM_PERSONS) logger.info(f"[DEGEWO] Set Anzahl Personen: {FORM_PERSONS}") except Exception as e: logger.warning(f"[DEGEWO] Could not set Personen: {e}") # davon Anzahl Kinder try: kinder_field = await page.query_selector('input[name*="kinder"], input[name*="children"], input[id*="kinder"], select[name*="kinder"]') if kinder_field: tag_name = await kinder_field.evaluate("el => el.tagName.toLowerCase()") if tag_name == "select": await kinder_field.select_option(FORM_CHILDREN) else: await kinder_field.fill(FORM_CHILDREN) logger.info(f"[DEGEWO] Set Anzahl Kinder: {FORM_CHILDREN}") except Exception as e: logger.warning(f"[DEGEWO] Could not set Kinder: {e}") # Monatliches Haushaltsnettoeinkommen try: einkommen_field = await page.query_selector('input[name*="einkommen"], input[name*="income"], input[id*="einkommen"], select[name*="einkommen"]') if einkommen_field: tag_name = await einkommen_field.evaluate("el => el.tagName.toLowerCase()") if tag_name == "select": # Try to select by value or index try: await einkommen_field.select_option(FORM_INCOME) except: # Fallback to first non-empty option options = await einkommen_field.query_selector_all("option") if len(options) > 1: await einkommen_field.select_option(index=1) else: await einkommen_field.fill(FORM_INCOME) logger.info(f"[DEGEWO] Set Einkommen: {FORM_INCOME}") except Exception as e: logger.warning(f"[DEGEWO] Could not set Einkommen: {e}") # "Für mich selbst" selection try: selbst_radio = await page.query_selector('input[type="radio"][value*="selbst"], input[type="radio"][value*="myself"], label:has-text("Für mich selbst") input') if selbst_radio: await selbst_radio.click() logger.info("[DEGEWO] Selected: Für mich selbst") except Exception as e: logger.warning(f"[DEGEWO] Could not set 'Für mich selbst': {e}") # Accept data privacy checkbox try: checkbox = await page.query_selector('input[type="checkbox"][name*="datenschutz"], input[type="checkbox"][name*="privacy"], input[type="checkbox"][name*="consent"]') if checkbox and not await checkbox.is_checked(): await checkbox.click() logger.info("[DEGEWO] Checked privacy/consent checkbox") except Exception as e: logger.warning(f"[DEGEWO] Could not check consent: {e}") await asyncio.sleep(1) # Take screenshot before submitting screenshot_path = DATA_DIR / f"degewo_form_{listing['id']}.png" await page.screenshot(path=str(screenshot_path), full_page=True) logger.info(f"[DEGEWO] Saved form screenshot to {screenshot_path}") # Submit the form try: submit_btn = await page.query_selector('button[type="submit"], input[type="submit"], button:has-text("Absenden"), button:has-text("Senden")') if submit_btn and await submit_btn.is_visible(): await submit_btn.click() logger.info("[DEGEWO] Clicked submit button") await asyncio.sleep(3) # Take screenshot after submission screenshot_path = DATA_DIR / f"degewo_submitted_{listing['id']}.png" await page.screenshot(path=str(screenshot_path), full_page=True) logger.info(f"[DEGEWO] Saved submission screenshot to {screenshot_path}") result["success"] = True result["message"] = "Application submitted" else: result["success"] = True result["message"] = "Form filled, submit button not found" logger.warning("[DEGEWO] Submit button not found") except Exception as e: result["success"] = True result["message"] = f"Form filled, submit error: {str(e)}" logger.warning(f"[DEGEWO] Submit error: {e}") else: result["message"] = "No kontaktieren button found" logger.warning("[DEGEWO] Could not find kontaktieren button") screenshot_path = DATA_DIR / f"degewo_nobtn_{listing['id']}.png" await page.screenshot(path=str(screenshot_path), full_page=True) except Exception as e: result["message"] = f"Error: {str(e)}" logger.error(f"[DEGEWO] Exception: {str(e)}") import traceback logger.error(traceback.format_exc()) finally: await page.close() return result async def _apply_gesobau(self, listing: dict, result: dict) -> dict: page = await self.context.new_page() try: logger.info(f"[GESOBAU] Opening page: {listing['link']}") await page.goto(listing["link"], wait_until="networkidle") logger.info("[GESOBAU] Page loaded") await asyncio.sleep(2) try: cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")') if cookie_btn and await cookie_btn.is_visible(): await cookie_btn.click() logger.info("[GESOBAU] Dismissed cookie banner") await asyncio.sleep(1) except: pass logger.info("[GESOBAU] Looking for application button...") apply_btn = await page.query_selector('a:has-text("Anfragen"), button:has-text("Interesse"), a:has-text("Kontakt")') if apply_btn and await apply_btn.is_visible(): logger.info("[GESOBAU] Found application button, clicking...") await apply_btn.click() await asyncio.sleep(2) screenshot_path = DATA_DIR / f"gesobau_{listing['id']}.png" await page.screenshot(path=str(screenshot_path)) logger.info(f"[GESOBAU] Saved screenshot to {screenshot_path}") result["success"] = True result["message"] = "Application page opened" else: result["message"] = "No application button found" logger.warning("[GESOBAU] Could not find application button") screenshot_path = DATA_DIR / f"gesobau_nobtn_{listing['id']}.png" await page.screenshot(path=str(screenshot_path)) except Exception as e: result["message"] = f"Error: {str(e)}" logger.error(f"[GESOBAU] Exception: {str(e)}") finally: await page.close() return result async def _apply_stadtundland(self, listing: dict, result: dict) -> dict: page = await self.context.new_page() try: logger.info(f"[STADTUNDLAND] Opening page: {listing['link']}") await page.goto(listing["link"], wait_until="networkidle") logger.info("[STADTUNDLAND] Page loaded") await asyncio.sleep(2) try: cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")') if cookie_btn and await cookie_btn.is_visible(): await cookie_btn.click() logger.info("[STADTUNDLAND] Dismissed cookie banner") await asyncio.sleep(1) except: pass logger.info("[STADTUNDLAND] Looking for application button...") apply_btn = await page.query_selector('a:has-text("Anfragen"), button:has-text("Bewerben"), a:has-text("Interesse")') if apply_btn and await apply_btn.is_visible(): logger.info("[STADTUNDLAND] Found application button, clicking...") await apply_btn.click() await asyncio.sleep(2) screenshot_path = DATA_DIR / f"stadtundland_{listing['id']}.png" await page.screenshot(path=str(screenshot_path)) logger.info(f"[STADTUNDLAND] Saved screenshot to {screenshot_path}") result["success"] = True result["message"] = "Application page opened" else: result["message"] = "No application button found" logger.warning("[STADTUNDLAND] Could not find application button") screenshot_path = DATA_DIR / f"stadtundland_nobtn_{listing['id']}.png" await page.screenshot(path=str(screenshot_path)) except Exception as e: result["message"] = f"Error: {str(e)}" logger.error(f"[STADTUNDLAND] Exception: {str(e)}") finally: await page.close() return result async def _apply_wbm(self, listing: dict, result: dict) -> dict: page = await self.context.new_page() try: logger.info(f"[WBM] Opening page: {listing['link']}") await page.goto(listing["link"], wait_until="networkidle") logger.info("[WBM] Page loaded") await asyncio.sleep(2) try: cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")') if cookie_btn and await cookie_btn.is_visible(): await cookie_btn.click() logger.info("[WBM] Dismissed cookie banner") await asyncio.sleep(1) except: pass logger.info("[WBM] Looking for application button...") apply_btn = await page.query_selector('a:has-text("Anfragen"), button:has-text("Interesse"), a:has-text("Bewerben")') if apply_btn and await apply_btn.is_visible(): logger.info("[WBM] Found application button, clicking...") await apply_btn.click() await asyncio.sleep(2) screenshot_path = DATA_DIR / f"wbm_{listing['id']}.png" await page.screenshot(path=str(screenshot_path)) logger.info(f"[WBM] Saved screenshot to {screenshot_path}") result["success"] = True result["message"] = "Application page opened" else: result["message"] = "No application button found" logger.warning("[WBM] Could not find application button") screenshot_path = DATA_DIR / f"wbm_nobtn_{listing['id']}.png" await page.screenshot(path=str(screenshot_path)) except Exception as e: result["message"] = f"Error: {str(e)}" logger.error(f"[WBM] Exception: {str(e)}") finally: await page.close() return result class InBerlinMonitor: def __init__(self): self.browser = None self.context = None self.logged_in = False self.application_handler = None async def init_browser(self): """Initialize Playwright browser""" if self.browser is None: self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch(headless=True) self.context = await self.browser.new_context( user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" ) self.application_handler = ApplicationHandler(self.context) logger.info("Browser initialized") def load_state(self) -> dict: """Load persistent state""" if STATE_FILE.exists(): with open(STATE_FILE, "r") as f: return json.load(f) return {"autopilot": False} def save_state(self, state: dict): """Save persistent state""" with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2) def set_autopilot(self, enabled: bool): """Enable or disable autopilot mode""" state = self.load_state() state["autopilot"] = enabled self.save_state(state) logger.info(f"Autopilot {'enabled' if enabled else 'disabled'}") def is_autopilot_enabled(self) -> bool: """Check if autopilot mode is enabled""" return self.load_state().get("autopilot", False) def load_applications(self) -> dict: """Load application history""" if APPLICATIONS_FILE.exists(): with open(APPLICATIONS_FILE, "r") as f: return json.load(f) return {} def save_application(self, result: dict): """Save an application result""" applications = self.load_applications() applications[result["listing_id"]] = result with open(APPLICATIONS_FILE, "w") as f: json.dump(applications, f, indent=2, ensure_ascii=False) def has_applied(self, listing_id: str) -> bool: """Check if we've already applied to this listing""" return listing_id in self.load_applications() async def dismiss_cookie_modal(self, page): """Dismiss the privacy/cookie consent modal if present""" try: # Wait a bit for modal to appear await asyncio.sleep(2) # Try to find and click the accept button in the privacy modal # Look for common accept button patterns in German accept_selectors = [ 'button:has-text("Akzeptieren")', 'button:has-text("Alle akzeptieren")', 'button:has-text("Accept")', 'button:has-text("Zustimmen")', '[x-show="showPrivacyModal"] button', '.privacy-modal button', 'button.accept-cookies', # More specific to inberlinwohnen 'div[x-show="showPrivacyModal"] button:first-of-type', ] for selector in accept_selectors: try: button = await page.query_selector(selector) if button and await button.is_visible(): await button.click() logger.info(f"Clicked cookie accept button: {selector}") await asyncio.sleep(1) return True except: continue # Try clicking any visible button in the modal overlay modal = await page.query_selector('div[x-show="showPrivacyModal"]') if modal: buttons = await modal.query_selector_all('button') for btn in buttons: if await btn.is_visible(): text = await btn.inner_text() logger.info(f"Found modal button: {text}") # Click the first button (usually accept) await btn.click() await asyncio.sleep(1) return True logger.info("No cookie modal found or already dismissed") return False except Exception as e: logger.debug(f"Cookie modal handling: {e}") return False async def login(self) -> bool: """Login to inberlinwohnen.de""" if not INBERLIN_EMAIL or not INBERLIN_PASSWORD: logger.warning("No credentials provided, using public listings") return False try: page = await self.context.new_page() await page.goto("https://www.inberlinwohnen.de/login", wait_until="networkidle") # Handle cookie/privacy modal first await self.dismiss_cookie_modal(page) # Fill login form await page.fill('input[name="email"], input[type="email"]', INBERLIN_EMAIL) await page.fill('input[name="password"], input[type="password"]', INBERLIN_PASSWORD) # Click submit button await page.click('button[type="submit"], input[type="submit"]') # Wait for navigation await page.wait_for_load_state("networkidle") await asyncio.sleep(2) # Check if login successful if "mein-bereich" in page.url or await page.query_selector('text="Abmelden"'): logger.info("Login successful") self.logged_in = True await page.close() return True else: logger.error(f"Login failed - ended up at {page.url}") await page.close() return False except Exception as e: logger.error(f"Login error: {e}") return False async def fetch_listings(self) -> list[dict]: """Fetch listings from the Wohnungsfinder""" listings = [] try: page = await self.context.new_page() # Use personal Wohnungsfinder when logged in to see filtered listings if self.logged_in: url = "https://www.inberlinwohnen.de/mein-bereich/wohnungsfinder" else: url = "https://www.inberlinwohnen.de/wohnungsfinder/" logger.info(f"Fetching listings from {url}") await page.goto(url, wait_until="networkidle") # Handle cookie modal if not logged in if not self.logged_in: await self.dismiss_cookie_modal(page) # Wait for dynamic content to load - look for listing text pattern try: await page.wait_for_selector('text=/\\d,\\d\\s*Zimmer/', timeout=15000) logger.info("Listings content loaded") except: logger.warning("Timeout waiting for listings content") # Additional wait for initial listings to render await asyncio.sleep(2) # Collect all listings content by clicking through pagination all_content = "" page_num = 1 max_pages = 10 # Safety limit while page_num <= max_pages: # Get current page content current_content = await page.content() all_content += current_content # Check for "next page" button (Livewire pagination) next_btn = await page.query_selector('[wire\\:click*="nextPage"]') if next_btn and await next_btn.is_visible(): await next_btn.click() await asyncio.sleep(2) # Wait for Livewire to update page_num += 1 else: break logger.info(f"Collected content from {page_num} page(s)") content = all_content # Debug: save HTML to file for inspection debug_path = DATA_DIR / "debug_page.html" with open(debug_path, "w", encoding="utf-8") as f: f.write(content) logger.info(f"Saved debug HTML to {debug_path}") # Debug: Log page title and check for listing count count_match = re.search(r'(\d+)\s*Wohnungen? für Sie gefunden', content) if count_match: logger.info(f"Page shows {count_match.group(1)} listings available") # Also check for "Zeige X bis Y von Z Angeboten" show_match = re.search(r'Zeige \d+ bis \d+ von (\d+) Angeboten', content) if show_match: logger.info(f"Page shows {show_match.group(1)} total offers") # Decode HTML entities and JSON escaped slashes for extraction content_decoded = html.unescape(content) content_decoded = content_decoded.replace('\\/', '/') # Build flatId -> deeplink mapping from wire:snapshot JSON data # Format in HTML: "deeplink":"https://...","flatId":12345 deeplink_pattern = r'"deeplink":"(https://[^"]+)","flatId":(\d+)' deeplink_matches = re.findall(deeplink_pattern, content_decoded) id_to_link = {flat_id: link for link, flat_id in deeplink_matches} logger.info(f"Found {len(id_to_link)} deeplink mappings") # Extract listings from button elements with aria-label # Format: @click="open !== 12345 ..." aria-label="Wohnungsangebot - 2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Adresse" button_pattern = r'@click="open !== (\d+)[^"]*"[^>]*aria-label="Wohnungsangebot - ([^"]+)"' button_matches = re.findall(button_pattern, content_decoded) logger.info(f"Found {len(button_matches)} listing buttons") for flat_id, listing_text in button_matches: # Parse listing text: "2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Rhinstraße 4, 10315 Lichtenberg" parts_match = re.match(r'(\d,\d)\s*Zimmer,\s*([\d,]+)\s*m²,\s*([\d.,]+)\s*€\s*(?:Kaltmiete\s*)?\|\s*(.+)', listing_text) if not parts_match: continue rooms, size, price, address = parts_match.groups() rooms = rooms.strip() address = address.strip() if len(address) < 5: continue # Get the deeplink for this flat detail_link = id_to_link.get(flat_id, url) listing_id = hashlib.md5(f"{rooms}{size}{price}{address}".encode()).hexdigest()[:12] listings.append({ "id": listing_id, "rooms": f"{rooms} Zimmer", "size": f"{size} m²", "price": f"{price} €", "address": address, "link": detail_link, "fetched_at": datetime.now().isoformat() }) # Deduplicate by id seen_ids = set() unique_listings = [] for listing in listings: if listing["id"] not in seen_ids: seen_ids.add(listing["id"]) unique_listings.append(listing) listings = unique_listings await page.close() logger.info(f"Fetched {len(listings)} unique listings") return listings except Exception as e: logger.error(f"Error fetching listings: {e}") import traceback logger.error(traceback.format_exc()) return [] def load_previous_listings(self) -> dict: """Load previously saved listings""" if LISTINGS_FILE.exists(): with open(LISTINGS_FILE, "r") as f: return json.load(f) return {} def save_listings(self, listings: list[dict]): """Save current listings""" listings_dict = {l["id"]: l for l in listings} with open(LISTINGS_FILE, "w") as f: json.dump(listings_dict, f, indent=2, ensure_ascii=False) def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]: """Find listings that are new since last check""" new = [] for listing in current: if listing["id"] not in previous: new.append(listing) return new def send_telegram(self, message: str): """Send notification via Telegram""" if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: logger.warning("Telegram not configured, skipping notification") return try: url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" data = { "chat_id": TELEGRAM_CHAT_ID, "text": message, "parse_mode": "HTML", "disable_web_page_preview": True } response = requests.post(url, data=data) if response.ok: logger.info("Telegram notification sent") else: logger.error(f"Telegram error: {response.text}") except Exception as e: logger.error(f"Telegram error: {e}") def log_listing_times(self, new_listings: list[dict]): """Log new listing appearance times to CSV for later analysis""" if not new_listings: return import csv file_exists = TIMING_FILE.exists() with open(TIMING_FILE, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) if not file_exists: writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"]) now = datetime.now() for listing in new_listings: writer.writerow([ now.isoformat(), now.strftime("%A"), # Weekday name now.hour, now.minute, listing["rooms"], listing["size"], listing["price"], listing["address"], listing["id"] ]) logger.info(f"Logged {len(new_listings)} listing times to CSV") def notify_new_listings(self, new_listings: list[dict], application_results: dict = None): """Send individual notification for each new listing""" if not new_listings: return for listing in new_listings: link = listing.get('link', 'https://www.inberlinwohnen.de/wohnungsfinder/') message = f"🏠 Neue Wohnung!\n\n" message += f"🚪 {listing['rooms']}\n" message += f"📐 {listing['size']}\n" message += f"💰 {listing['price']}\n" message += f"📍 {listing['address']}\n\n" message += f"👉 Alle Details" # Add autopilot status if application was attempted if application_results and listing["id"] in application_results: result = application_results[listing["id"]] if result["success"]: message += f"\n\n🤖 Auto-applied! ({result['company']})" if result["message"]: message += f"\n{result['message']}" else: message += f"\n\n⚠️ Auto-apply failed ({result['company']})" if result["message"]: message += f"\n{result['message']}" self.send_telegram(message) time.sleep(0.5) async def apply_to_listings(self, listings: list[dict]) -> dict: """Apply to multiple listings, returns results dict""" results = {} for listing in listings: if self.has_applied(listing["id"]): logger.info(f"Already applied to {listing['id']}, skipping") continue result = await self.application_handler.apply(listing) results[listing["id"]] = result self.save_application(result) status = "✅" if result["success"] else "❌" logger.info(f"Application {status}: {listing['address']} - {result['message']}") await asyncio.sleep(2) return results def check(self): """Run a single check for new listings""" logger.info("Starting check...") # Login if credentials provided if not self.logged_in and INBERLIN_EMAIL: asyncio.get_event_loop().run_until_complete(self._async_login()) # Fetch current listings current_listings = asyncio.get_event_loop().run_until_complete(self._async_fetch()) if not current_listings: logger.warning("No listings fetched") return # Load previous listings previous_listings = self.load_previous_listings() # First run - just save baseline if not previous_listings: logger.info(f"First run - saving {len(current_listings)} listings as baseline") self.save_listings(current_listings) return # Find new listings new_listings = self.find_new_listings(current_listings, previous_listings) application_results = {} if new_listings: logger.info(f"Found {len(new_listings)} new listing(s)") self.log_listing_times(new_listings) # Apply automatically if autopilot is enabled if self.is_autopilot_enabled(): logger.info("Autopilot enabled - applying to listings...") application_results = asyncio.get_event_loop().run_until_complete( self._async_apply(new_listings) ) self.notify_new_listings(new_listings, application_results) else: logger.info("No new listings") # Save current state self.save_listings(current_listings) async def _async_login(self): await self.init_browser() await self.login() async def _async_fetch(self): await self.init_browser() return await self.fetch_listings() async def _async_apply(self, listings: list[dict]): await self.init_browser() return await self.apply_to_listings(listings) def main(): """Main entry point""" # Ensure data directory exists DATA_DIR.mkdir(parents=True, exist_ok=True) monitor = InBerlinMonitor() # Start Telegram command listener telegram_bot = TelegramBot(monitor) telegram_bot.start() logger.info(f"inberlin-monitor started (interval: {CHECK_INTERVAL}s)") logger.info(f"Autopilot: {'ENABLED' if monitor.is_autopilot_enabled() else 'DISABLED'}") while True: try: monitor.check() except Exception as e: logger.error(f"Check failed: {e}") time.sleep(CHECK_INTERVAL) if __name__ == "__main__": main()