2255 lines
102 KiB
Python
2255 lines
102 KiB
Python
import os
|
|
import json
|
|
import hashlib
|
|
import logging
|
|
import asyncio
|
|
import re
|
|
import html
|
|
import threading
|
|
import time
|
|
import csv
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
import pandas as pd
|
|
import matplotlib
|
|
matplotlib.use('Agg') # Use non-interactive backend
|
|
import matplotlib.pyplot as plt
|
|
from playwright.async_api import async_playwright
|
|
|
|
# Configuration from environment
|
|
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
|
|
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
|
|
INBERLIN_EMAIL = os.environ.get("INBERLIN_EMAIL", "")
|
|
INBERLIN_PASSWORD = os.environ.get("INBERLIN_PASSWORD", "")
|
|
CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "300")) # seconds (5 minutes)
|
|
|
|
# WGcompany search configuration
|
|
WGCOMPANY_ENABLED = os.environ.get("WGCOMPANY_ENABLED", "true").lower() == "true"
|
|
WGCOMPANY_MIN_SIZE = os.environ.get("WGCOMPANY_MIN_SIZE", "") # min room size m²
|
|
WGCOMPANY_MAX_SIZE = os.environ.get("WGCOMPANY_MAX_SIZE", "") # max room size m²
|
|
WGCOMPANY_MIN_PRICE = os.environ.get("WGCOMPANY_MIN_PRICE", "") # min rent €
|
|
WGCOMPANY_MAX_PRICE = os.environ.get("WGCOMPANY_MAX_PRICE", "") # max rent €
|
|
WGCOMPANY_BEZIRK = os.environ.get("WGCOMPANY_BEZIRK", "0") # 0=egal, or specific district code
|
|
WGCOMPANY_AGE = os.environ.get("WGCOMPANY_AGE", "") # your age (for WG matching)
|
|
WGCOMPANY_SMOKER = os.environ.get("WGCOMPANY_SMOKER", "") # NR=Nichtraucher, R=Raucher, empty=egal
|
|
|
|
# Form data for applications
|
|
FORM_ANREDE = os.environ.get("FORM_ANREDE", "")
|
|
FORM_VORNAME = os.environ.get("FORM_VORNAME", "")
|
|
FORM_NACHNAME = os.environ.get("FORM_NACHNAME", "")
|
|
FORM_EMAIL = os.environ.get("FORM_EMAIL", "")
|
|
FORM_PHONE = os.environ.get("FORM_PHONE", "")
|
|
FORM_STRASSE = os.environ.get("FORM_STRASSE", "")
|
|
FORM_HAUSNUMMER = os.environ.get("FORM_HAUSNUMMER", "")
|
|
FORM_PLZ = os.environ.get("FORM_PLZ", "")
|
|
FORM_ORT = os.environ.get("FORM_ORT", "")
|
|
FORM_PERSONS = os.environ.get("FORM_PERSONS", "1")
|
|
FORM_CHILDREN = os.environ.get("FORM_CHILDREN", "0")
|
|
FORM_INCOME = os.environ.get("FORM_INCOME", "")
|
|
|
|
DATA_DIR = Path("/data")
|
|
LISTINGS_FILE = DATA_DIR / "listings.json"
|
|
LOG_FILE = DATA_DIR / "monitor.log"
|
|
TIMING_FILE = DATA_DIR / "listing_times.csv"
|
|
STATE_FILE = DATA_DIR / "state.json"
|
|
APPLICATIONS_FILE = DATA_DIR / "applications.json"
|
|
|
|
# WGcompany specific files
|
|
WGCOMPANY_LISTINGS_FILE = DATA_DIR / "wgcompany_listings.json"
|
|
|
|
|
|
def _cleanup_old_files(png_hours: int = 24, log_days: int = 7):
|
|
"""Remove PNG files older than `png_hours` and prune log lines older than `log_days` days.
|
|
|
|
Runs best-effort and logs exceptions to the logger.
|
|
"""
|
|
try:
|
|
now = datetime.utcnow()
|
|
|
|
# Remove old PNGs in DATA_DIR
|
|
png_cutoff = now - timedelta(hours=png_hours)
|
|
removed_pngs = 0
|
|
for p in DATA_DIR.glob("*.png"):
|
|
try:
|
|
mtime = datetime.fromtimestamp(p.stat().st_mtime)
|
|
if mtime < png_cutoff:
|
|
p.unlink()
|
|
removed_pngs += 1
|
|
except Exception:
|
|
logger.exception(f"Error while checking/removing PNG: {p}")
|
|
if removed_pngs:
|
|
logger.info(f"Removed {removed_pngs} PNG(s) older than {png_hours} hours")
|
|
|
|
# Prune logfile lines older than log_days
|
|
if LOG_FILE.exists():
|
|
cutoff_log = now - timedelta(days=log_days)
|
|
kept_lines = []
|
|
try:
|
|
with open(LOG_FILE, "r", encoding="utf-8", errors="ignore") as f:
|
|
for line in f:
|
|
# Expect logging lines starting with 'YYYY-MM-DD HH:MM:SS,ms - '
|
|
m = re.match(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d+)\s+-\s+", line)
|
|
if m:
|
|
try:
|
|
ts = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S,%f")
|
|
if ts >= cutoff_log:
|
|
kept_lines.append(line)
|
|
except Exception:
|
|
# If parsing fails, keep the line
|
|
kept_lines.append(line)
|
|
else:
|
|
# Keep non-standard lines
|
|
kept_lines.append(line)
|
|
# Atomically replace the logfile with kept lines
|
|
if kept_lines:
|
|
tmp = LOG_FILE.with_suffix(".tmp")
|
|
with open(tmp, "w", encoding="utf-8") as f:
|
|
f.writelines(kept_lines)
|
|
tmp.replace(LOG_FILE)
|
|
else:
|
|
# No recent lines; truncate the file
|
|
with open(LOG_FILE, "w", encoding="utf-8") as f:
|
|
f.truncate(0)
|
|
logger.info(f"Pruned logfile, kept {len(kept_lines)} lines from last {log_days} days")
|
|
except Exception:
|
|
logger.exception("Error while pruning logfile")
|
|
except Exception:
|
|
logger.exception("Unexpected error in cleanup task")
|
|
WGCOMPANY_TIMING_FILE = DATA_DIR / "wgcompany_times.csv"
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TelegramBot:
|
|
"""Handle Telegram commands for controlling the monitor"""
|
|
|
|
def __init__(self, monitor):
|
|
self.monitor = monitor
|
|
self.last_update_id = 0
|
|
self.running = False
|
|
|
|
def start(self):
|
|
if not TELEGRAM_BOT_TOKEN:
|
|
logger.warning("Telegram bot token not configured, commands disabled")
|
|
return
|
|
self.running = True
|
|
thread = threading.Thread(target=self._poll_updates, daemon=True)
|
|
thread.start()
|
|
logger.info("Telegram command listener started")
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
|
|
def _poll_updates(self):
|
|
while self.running:
|
|
try:
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates"
|
|
params = {"offset": self.last_update_id + 1, "timeout": 30}
|
|
response = requests.get(url, params=params, timeout=35)
|
|
if response.ok:
|
|
data = response.json()
|
|
if data.get("ok") and data.get("result"):
|
|
for update in data["result"]:
|
|
self.last_update_id = update["update_id"]
|
|
self._handle_update(update)
|
|
except requests.exceptions.Timeout:
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Telegram polling error: {e}")
|
|
time.sleep(5)
|
|
|
|
def _handle_update(self, update):
|
|
message = update.get("message", {})
|
|
text = message.get("text", "")
|
|
chat_id = str(message.get("chat", {}).get("id", ""))
|
|
if chat_id != TELEGRAM_CHAT_ID:
|
|
logger.debug(f"Ignoring message from unknown chat: {chat_id}")
|
|
return
|
|
logger.info(f"Received Telegram command: {text}")
|
|
if text.startswith("/autopilot"):
|
|
self._handle_autopilot_command(text)
|
|
elif text == "/status":
|
|
self._handle_status_command()
|
|
elif text == "/help":
|
|
self._handle_help_command()
|
|
elif text == "/plot":
|
|
self._handle_plot_command()
|
|
elif text == "/errorrate":
|
|
self._handle_error_rate_command()
|
|
elif text.startswith("/"):
|
|
self._handle_unknown_command(text)
|
|
|
|
def _handle_autopilot_command(self, text):
|
|
logger.info(f"Processing autopilot command: {text}")
|
|
parts = text.split()
|
|
if len(parts) < 2:
|
|
self._send_message("Usage: /autopilot on|off")
|
|
return
|
|
action = parts[1].lower()
|
|
if action == "on":
|
|
logger.info("Enabling autopilot mode")
|
|
self.monitor.set_autopilot(True)
|
|
self._send_message("🤖 <b>Autopilot ENABLED</b>\n\nI will automatically apply to new listings!")
|
|
elif action == "off":
|
|
self.monitor.set_autopilot(False)
|
|
self._send_message("🛑 <b>Autopilot DISABLED</b>\n\nI will only notify you of new listings.")
|
|
else:
|
|
self._send_message("Usage: /autopilot on|off")
|
|
|
|
def _handle_status_command(self):
|
|
state = self.monitor.load_state()
|
|
autopilot = state.get("autopilot", False)
|
|
applications = self.monitor.load_applications()
|
|
status = "🤖 <b>Autopilot:</b> " + ("ON ✅" if autopilot else "OFF ❌")
|
|
status += f"\n📝 <b>Applications sent:</b> {len(applications)}"
|
|
by_company = {}
|
|
for app in applications.values():
|
|
company = app.get("company", "unknown")
|
|
by_company[company] = by_company.get(company, 0) + 1
|
|
if by_company:
|
|
status += "\n\n<b>By company:</b>"
|
|
for company, count in sorted(by_company.items()):
|
|
status += f"\n • {company}: {count}"
|
|
self._send_message(status)
|
|
|
|
def _handle_help_command(self):
|
|
help_text = """🏠 <b>InBerlin Monitor Commands</b>
|
|
|
|
/autopilot on - Enable automatic applications
|
|
/autopilot off - Disable automatic applications
|
|
/status - Show current status and stats
|
|
/plot - Show weekly listing patterns
|
|
/help - Show this help message
|
|
|
|
When autopilot is ON, I will automatically apply to new listings."""
|
|
self._send_message(help_text)
|
|
|
|
def _handle_unknown_command(self, text):
|
|
cmd = text.split()[0] if text else text
|
|
|
|
def _handle_error_rate_command(self):
|
|
"""Generate and send a plot showing success vs failure ratio for autopilot applications."""
|
|
logger.info("Generating autopilot errorrate plot...")
|
|
try:
|
|
plot_path, summary = self._generate_error_rate_plot()
|
|
if plot_path:
|
|
caption = "📉 <b>Autopilot Success vs Failure</b>\n\n" + summary
|
|
self._send_photo(plot_path, caption)
|
|
else:
|
|
self._send_message("📉 Not enough application data to generate errorrate plot.")
|
|
except Exception as e:
|
|
logger.error(f"Error generating errorrate plot: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
self._send_message(f"❌ Error generating errorrate plot: {str(e)}")
|
|
|
|
def _generate_error_rate_plot(self):
|
|
"""Read applications.json and produce a plot image + summary text.
|
|
|
|
Returns (plot_path, summary_text) or (None, "") if insufficient data.
|
|
"""
|
|
if not APPLICATIONS_FILE.exists():
|
|
logger.warning("No applications.json found for errorrate plot")
|
|
return None, ""
|
|
|
|
try:
|
|
with open(APPLICATIONS_FILE, 'r', encoding='utf-8') as f:
|
|
apps = json.load(f)
|
|
if not apps:
|
|
return None, ""
|
|
|
|
# Convert to DataFrame
|
|
rows = []
|
|
for _id, rec in apps.items():
|
|
ts = rec.get('timestamp')
|
|
try:
|
|
dt = pd.to_datetime(ts)
|
|
except Exception:
|
|
dt = pd.NaT
|
|
rows.append({'id': _id, 'company': rec.get('company'), 'success': bool(rec.get('success')), 'ts': dt})
|
|
df = pd.DataFrame(rows)
|
|
df = df.dropna(subset=['ts'])
|
|
if df.empty:
|
|
return None, ""
|
|
|
|
df['date'] = df['ts'].dt.floor('D')
|
|
grouped = df.groupby('date').agg(total=('id','count'), successes=('success', lambda x: x.sum()))
|
|
grouped['failures'] = grouped['total'] - grouped['successes']
|
|
grouped['error_rate'] = grouped['failures'] / grouped['total']
|
|
|
|
# Prepare plot
|
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True)
|
|
grouped[['successes','failures']].plot(kind='bar', stacked=True, ax=ax1, color=['#2E8B57','#C44A4A'])
|
|
ax1.set_ylabel('Count')
|
|
ax1.set_title('Autopilot: Successes vs Failures (by day)')
|
|
|
|
ax2.plot(grouped.index, grouped['error_rate'], marker='o', color='#3333AA')
|
|
ax2.set_ylim(0,1)
|
|
ax2.set_ylabel('Error rate')
|
|
ax2.set_xlabel('Date')
|
|
ax2.set_title('Daily Error Rate (failures / total)')
|
|
|
|
plt.tight_layout()
|
|
plot_path = DATA_DIR / 'error_rate.png'
|
|
fig.savefig(plot_path)
|
|
plt.close(fig)
|
|
|
|
# Summary
|
|
total_attempts = int(grouped['total'].sum())
|
|
total_success = int(grouped['successes'].sum())
|
|
total_fail = int(grouped['failures'].sum())
|
|
overall_error = (total_fail / total_attempts) if total_attempts>0 else 0.0
|
|
summary = f"<b>Total attempts:</b> {total_attempts}\n<b>Successes:</b> {total_success}\n<b>Failures:</b> {total_fail}\n<b>Overall error rate:</b> {overall_error:.1%}"
|
|
|
|
return str(plot_path), summary
|
|
except Exception as e:
|
|
logger.exception(f"Failed to generate error rate plot: {e}")
|
|
return None, ""
|
|
self._send_message(f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands.")
|
|
|
|
def _handle_plot_command(self):
|
|
"""Generate and send a plot of listing times"""
|
|
logger.info("Generating listing times plot...")
|
|
try:
|
|
plot_path = self._generate_weekly_plot()
|
|
if plot_path:
|
|
self._send_photo(plot_path, "📊 <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
|
|
else:
|
|
self._send_message("📊 Not enough data to generate plot yet. Keep monitoring!")
|
|
except Exception as e:
|
|
logger.error(f"Error generating plot: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
self._send_message(f"❌ Error generating plot: {str(e)}")
|
|
|
|
def _generate_weekly_plot(self) -> str:
|
|
"""Generate a heatmap of listings by day of week and hour"""
|
|
if not TIMING_FILE.exists():
|
|
logger.warning("No timing data file found")
|
|
return None
|
|
|
|
try:
|
|
df = pd.read_csv(TIMING_FILE)
|
|
if len(df) < 1:
|
|
logger.warning("Timing file is empty")
|
|
return None
|
|
|
|
logger.info(f"Loaded {len(df)} listing records for plot")
|
|
|
|
# Create day-hour matrix
|
|
days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
|
|
|
|
# Count listings per day and hour
|
|
heatmap_data = pd.DataFrame(0, index=days_order, columns=range(24))
|
|
|
|
for _, row in df.iterrows():
|
|
day = row['weekday']
|
|
hour = int(row['hour'])
|
|
if day in days_order:
|
|
heatmap_data.loc[day, hour] += 1
|
|
|
|
# Create figure with two subplots
|
|
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
|
|
fig.suptitle('Listing Appearance Patterns', fontsize=16, fontweight='bold')
|
|
|
|
# 1. Heatmap - Day vs Hour
|
|
ax1 = axes[0, 0]
|
|
im = ax1.imshow(heatmap_data.values, cmap='YlOrRd', aspect='auto')
|
|
ax1.set_xticks(range(24))
|
|
ax1.set_xticklabels(range(24), fontsize=8)
|
|
ax1.set_yticks(range(7))
|
|
ax1.set_yticklabels(days_order)
|
|
ax1.set_xlabel('Hour of Day')
|
|
ax1.set_ylabel('Day of Week')
|
|
ax1.set_title('Listings by Day & Hour')
|
|
plt.colorbar(im, ax=ax1, label='Count')
|
|
|
|
# 2. Bar chart - By day of week
|
|
ax2 = axes[0, 1]
|
|
day_counts = df['weekday'].value_counts().reindex(days_order, fill_value=0)
|
|
colors = plt.cm.Blues(day_counts / day_counts.max() if day_counts.max() > 0 else day_counts)
|
|
bars = ax2.bar(range(7), day_counts.values, color=colors)
|
|
ax2.set_xticks(range(7))
|
|
ax2.set_xticklabels(['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'])
|
|
ax2.set_xlabel('Day of Week')
|
|
ax2.set_ylabel('Number of Listings')
|
|
ax2.set_title('Total Listings by Day')
|
|
for i, v in enumerate(day_counts.values):
|
|
if v > 0:
|
|
ax2.text(i, v + 0.1, str(v), ha='center', fontsize=9)
|
|
|
|
# 3. Line chart - By hour
|
|
ax3 = axes[1, 0]
|
|
hour_counts = df['hour'].value_counts().reindex(range(24), fill_value=0)
|
|
ax3.plot(range(24), hour_counts.values, marker='o', linewidth=2, markersize=4, color='#2E86AB')
|
|
ax3.fill_between(range(24), hour_counts.values, alpha=0.3, color='#2E86AB')
|
|
ax3.set_xticks(range(0, 24, 2))
|
|
ax3.set_xlabel('Hour of Day')
|
|
ax3.set_ylabel('Number of Listings')
|
|
ax3.set_title('Total Listings by Hour')
|
|
ax3.grid(True, alpha=0.3)
|
|
|
|
# 4. Summary stats
|
|
ax4 = axes[1, 1]
|
|
ax4.axis('off')
|
|
|
|
# Calculate best times
|
|
best_day = day_counts.idxmax() if day_counts.max() > 0 else "N/A"
|
|
best_hour = hour_counts.idxmax() if hour_counts.max() > 0 else "N/A"
|
|
total_listings = len(df)
|
|
|
|
# Find peak combinations
|
|
peak_combo = heatmap_data.stack().idxmax() if heatmap_data.values.max() > 0 else ("N/A", "N/A")
|
|
|
|
stats_text = f"""📊 Summary Statistics
|
|
|
|
Total listings tracked: {total_listings}
|
|
|
|
🏆 Best day: {best_day}
|
|
⏰ Best hour: {best_hour}:00
|
|
🎯 Peak time: {peak_combo[0]} at {peak_combo[1]}:00
|
|
|
|
📈 Average per day: {total_listings/7:.1f}
|
|
📅 Data collection period:
|
|
From: {df['timestamp'].min()[:10] if 'timestamp' in df.columns else 'N/A'}
|
|
To: {df['timestamp'].max()[:10] if 'timestamp' in df.columns else 'N/A'}
|
|
"""
|
|
ax4.text(0.1, 0.9, stats_text, transform=ax4.transAxes, fontsize=11,
|
|
verticalalignment='top', fontfamily='monospace',
|
|
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
|
|
|
|
plt.tight_layout()
|
|
|
|
# Save plot
|
|
plot_path = DATA_DIR / "weekly_plot.png"
|
|
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
logger.info(f"Plot saved to {plot_path}")
|
|
return str(plot_path)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating plot: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return None
|
|
|
|
def _send_message(self, text):
|
|
try:
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
|
data = {"chat_id": TELEGRAM_CHAT_ID, "text": text, "parse_mode": "HTML", "disable_web_page_preview": True}
|
|
requests.post(url, data=data)
|
|
except Exception as e:
|
|
logger.error(f"Failed to send Telegram message: {e}")
|
|
|
|
def _send_photo(self, photo_path: str, caption: str = ""):
|
|
"""Send a photo via Telegram"""
|
|
try:
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
|
|
with open(photo_path, 'rb') as photo:
|
|
files = {'photo': photo}
|
|
data = {"chat_id": TELEGRAM_CHAT_ID, "caption": caption, "parse_mode": "HTML"}
|
|
response = requests.post(url, data=data, files=files)
|
|
if response.ok:
|
|
logger.info(f"Photo sent successfully: {photo_path}")
|
|
else:
|
|
logger.error(f"Failed to send photo: {response.text}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send Telegram photo: {e}")
|
|
|
|
|
|
class ApplicationHandler:
|
|
"""Handle automatic applications to different housing companies"""
|
|
|
|
def __init__(self, browser_context):
|
|
self.context = browser_context
|
|
|
|
async def apply(self, listing: dict) -> dict:
|
|
link = listing.get("link", "")
|
|
company = self._detect_company(link)
|
|
logger.info(f"Starting application process for {company}: {listing['address']}")
|
|
logger.info(f"Listing details - ID: {listing['id']}, Rooms: {listing['rooms']}, Price: {listing['price']}")
|
|
logger.info(f"Detail link: {link}")
|
|
result = {"listing_id": listing["id"], "company": company, "link": link,
|
|
"timestamp": datetime.now().isoformat(), "success": False, "message": "",
|
|
"address": listing.get("address", ""), "rooms": listing.get("rooms", ""), "price": listing.get("price", "")}
|
|
try:
|
|
if company == "howoge":
|
|
result = await self._apply_howoge(listing, result)
|
|
elif company == "gewobag":
|
|
result = await self._apply_gewobag(listing, result)
|
|
elif company == "degewo":
|
|
result = await self._apply_degewo(listing, result)
|
|
elif company == "gesobau":
|
|
result = await self._apply_gesobau(listing, result)
|
|
elif company == "stadtundland":
|
|
result = await self._apply_stadtundland(listing, result)
|
|
elif company == "wbm":
|
|
result = await self._apply_wbm(listing, result)
|
|
else:
|
|
result["message"] = f"Unknown company: {company}"
|
|
logger.warning(f"No application handler for company: {company}")
|
|
except Exception as e:
|
|
result["message"] = str(e)
|
|
logger.error(f"Application error for {company}: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Log final result
|
|
status = "SUCCESS" if result["success"] else "FAILED"
|
|
logger.info(f"Application {status} for {listing['address']} ({company}): {result['message']}")
|
|
return result
|
|
|
|
def _detect_company(self, link: str) -> str:
|
|
if "howoge.de" in link: return "howoge"
|
|
elif "gewobag.de" in link: return "gewobag"
|
|
elif "degewo.de" in link: return "degewo"
|
|
elif "gesobau.de" in link: return "gesobau"
|
|
elif "stadtundland.de" in link: return "stadtundland"
|
|
elif "wbm.de" in link: return "wbm"
|
|
return "unknown"
|
|
|
|
async def _apply_howoge(self, listing: dict, result: dict) -> dict:
|
|
page = await self.context.new_page()
|
|
try:
|
|
logger.info(f"[HOWOGE] Opening page: {listing['link']}")
|
|
await page.goto(listing["link"], wait_until="networkidle")
|
|
logger.info("[HOWOGE] Page loaded")
|
|
await asyncio.sleep(2)
|
|
|
|
# Handle cookies
|
|
try:
|
|
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
|
|
if cookie_btn and await cookie_btn.is_visible():
|
|
await cookie_btn.click()
|
|
logger.info("[HOWOGE] Dismissed cookie banner")
|
|
await asyncio.sleep(1)
|
|
except: pass
|
|
|
|
# Try to handle consent manager (consentmanager.net)
|
|
try:
|
|
consent_selectors = [
|
|
'#cmpbntyestxt', '.cmpboxbtnyes', 'a.cmpboxbtn.cmpboxbtnyes',
|
|
'#cmpwelcomebtnyes', '.cmptxt_btn_yes'
|
|
]
|
|
for sel in consent_selectors:
|
|
consent_btn = await page.query_selector(sel)
|
|
if consent_btn and await consent_btn.is_visible():
|
|
await consent_btn.click()
|
|
logger.info("[HOWOGE] Dismissed consent manager")
|
|
await asyncio.sleep(1)
|
|
break
|
|
except: pass
|
|
|
|
# Look for "Besichtigung vereinbaren" button
|
|
# HOWOGE has multiple buttons with same text - only one is visible
|
|
logger.info("[HOWOGE] Looking for 'Besichtigung vereinbaren' button...")
|
|
|
|
# Use href selector - more reliable than text matching
|
|
selectors = [
|
|
'a[href*="besichtigung-vereinbaren"]',
|
|
'a:has-text("Besichtigung vereinbaren")',
|
|
'button:has-text("Besichtigung vereinbaren")',
|
|
'a:has-text("Anfragen")',
|
|
'button:has-text("Anfragen")'
|
|
]
|
|
|
|
apply_btn = None
|
|
for sel in selectors:
|
|
all_btns = await page.query_selector_all(sel)
|
|
logger.info(f"[HOWOGE] Selector '{sel}' found {len(all_btns)} matches")
|
|
# Find first visible button
|
|
for btn in all_btns:
|
|
try:
|
|
if await btn.is_visible():
|
|
apply_btn = btn
|
|
logger.info(f"[HOWOGE] Found visible button with selector '{sel}'")
|
|
break
|
|
except:
|
|
pass
|
|
if apply_btn:
|
|
break
|
|
|
|
if apply_btn:
|
|
# Scroll the button into view and click
|
|
logger.info("[HOWOGE] Found application button, scrolling into view...")
|
|
await apply_btn.scroll_into_view_if_needed()
|
|
await asyncio.sleep(0.5)
|
|
logger.info("[HOWOGE] Clicking button...")
|
|
await apply_btn.click()
|
|
await asyncio.sleep(3)
|
|
await page.wait_for_load_state("networkidle")
|
|
logger.info("[HOWOGE] Clicked button, starting multi-step form process...")
|
|
|
|
# HOWOGE has a multi-step form (typically 3-4 steps):
|
|
# Each step has a checkbox that must be clicked, then "Weiter" button
|
|
# Final step has the actual contact form
|
|
|
|
max_steps = 6 # safety limit
|
|
for step in range(1, max_steps + 1):
|
|
logger.info(f"[HOWOGE] Processing step {step}")
|
|
|
|
# Scroll down to reveal checkboxes
|
|
await page.evaluate("window.scrollBy(0, 300)")
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Check if we've reached the form (email field is visible)
|
|
email_field = await page.query_selector('input[name*="email" i]')
|
|
if email_field and await email_field.is_visible():
|
|
logger.info("[HOWOGE] Email field is visible - form is ready!")
|
|
break
|
|
|
|
# Find and click any visible unchecked checkboxes
|
|
checkboxes = await page.query_selector_all('input[type="checkbox"]')
|
|
clicked_checkbox = False
|
|
for checkbox in checkboxes:
|
|
try:
|
|
if await checkbox.is_visible() and not await checkbox.is_checked():
|
|
# Use JavaScript click to avoid viewport issues
|
|
await checkbox.evaluate("el => el.click()")
|
|
clicked_checkbox = True
|
|
logger.info(f"[HOWOGE] Clicked checkbox in step {step}")
|
|
await asyncio.sleep(0.5)
|
|
except Exception as e:
|
|
logger.debug(f"[HOWOGE] Checkbox click failed: {e}")
|
|
|
|
if clicked_checkbox:
|
|
await asyncio.sleep(1) # Wait for page to update after checkbox
|
|
|
|
# Screenshot this step
|
|
screenshot_path = DATA_DIR / f"howoge_step{step}_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path), full_page=True)
|
|
|
|
# Look for visible "Weiter" button and click it
|
|
weiter_btns = await page.query_selector_all('button:has-text("Weiter")')
|
|
weiter_clicked = False
|
|
for btn in weiter_btns:
|
|
try:
|
|
if await btn.is_visible():
|
|
await btn.click()
|
|
weiter_clicked = True
|
|
logger.info(f"[HOWOGE] Clicked 'Weiter' button in step {step}")
|
|
await asyncio.sleep(2)
|
|
await page.wait_for_load_state("networkidle")
|
|
break
|
|
except Exception as e:
|
|
logger.debug(f"[HOWOGE] Weiter click failed: {e}")
|
|
|
|
if not weiter_clicked and not clicked_checkbox:
|
|
logger.warning(f"[HOWOGE] No action possible in step {step}, breaking")
|
|
break
|
|
|
|
# Now try to fill the form
|
|
logger.info("[HOWOGE] Attempting to fill form fields...")
|
|
|
|
# Look for name fields - HOWOGE uses firstName/lastName
|
|
vorname_field = await page.query_selector('input[name*="firstName" i], input[name*="vorname" i]')
|
|
nachname_field = await page.query_selector('input[name*="lastName" i], input[name*="nachname" i]')
|
|
email_field = await page.query_selector('input[type="email"], input[name*="email" i]')
|
|
|
|
form_filled = False
|
|
if vorname_field and await vorname_field.is_visible():
|
|
await vorname_field.fill(FORM_VORNAME)
|
|
logger.info(f"[HOWOGE] Filled Vorname: {FORM_VORNAME}")
|
|
form_filled = True
|
|
else:
|
|
logger.warning("[HOWOGE] Vorname field not found or not visible")
|
|
|
|
if nachname_field and await nachname_field.is_visible():
|
|
await nachname_field.fill(FORM_NACHNAME)
|
|
logger.info(f"[HOWOGE] Filled Nachname: {FORM_NACHNAME}")
|
|
form_filled = True
|
|
else:
|
|
logger.warning("[HOWOGE] Nachname field not found or not visible")
|
|
|
|
if email_field and await email_field.is_visible():
|
|
await email_field.fill(FORM_EMAIL)
|
|
logger.info(f"[HOWOGE] Filled Email: {FORM_EMAIL}")
|
|
form_filled = True
|
|
else:
|
|
logger.warning("[HOWOGE] Email field not found or not visible")
|
|
|
|
# Also look for phone field
|
|
phone_field = await page.query_selector('input[type="tel"], input[name*="telefon" i], input[name*="phone" i]')
|
|
if phone_field and await phone_field.is_visible():
|
|
await phone_field.fill(FORM_PHONE)
|
|
logger.info(f"[HOWOGE] Filled Phone: {FORM_PHONE}")
|
|
|
|
# Screenshot after filling form
|
|
screenshot_path2 = DATA_DIR / f"howoge_filled_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path2), full_page=True)
|
|
logger.info(f"[HOWOGE] Saved filled form screenshot to {screenshot_path2}")
|
|
|
|
if form_filled:
|
|
# Look for submit button - HOWOGE uses "Anfrage senden"
|
|
# Try specific selectors first, then fall back
|
|
submit_btn = None
|
|
for selector in ['button:has-text("Anfrage senden")', 'button:has-text("Absenden")', 'button:has-text("Senden")']:
|
|
btn = await page.query_selector(selector)
|
|
if btn and await btn.is_visible():
|
|
submit_btn = btn
|
|
logger.info(f"[HOWOGE] Found submit button with selector: {selector}")
|
|
break
|
|
if submit_btn:
|
|
logger.info("[HOWOGE] Found submit button, clicking...")
|
|
await submit_btn.click()
|
|
await asyncio.sleep(3)
|
|
await page.wait_for_load_state("networkidle")
|
|
|
|
# Screenshot after submit
|
|
screenshot_path3 = DATA_DIR / f"howoge_submitted_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path3))
|
|
logger.info(f"[HOWOGE] Saved post-submit screenshot to {screenshot_path3}")
|
|
|
|
content = await page.content()
|
|
if "erfolgreich" in content.lower() or "gesendet" in content.lower() or "danke" in content.lower() or "bestätigung" in content.lower():
|
|
result["success"] = True
|
|
result["message"] = "Application submitted successfully"
|
|
logger.info("[HOWOGE] Success! Confirmation message detected")
|
|
else:
|
|
result["success"] = False
|
|
result["message"] = "Form submitted but no confirmation detected"
|
|
logger.warning("[HOWOGE] Form submitted but no clear confirmation")
|
|
else:
|
|
result["success"] = False
|
|
result["message"] = "Form filled but no submit button found"
|
|
logger.warning("[HOWOGE] Could not find submit button")
|
|
else:
|
|
result["success"] = False
|
|
result["message"] = "Could not find form fields to fill after navigating steps"
|
|
logger.warning("[HOWOGE] No form fields found after multi-step navigation")
|
|
else:
|
|
result["message"] = "No application button found"
|
|
logger.warning("[HOWOGE] Could not find 'Besichtigung vereinbaren' button")
|
|
# Save screenshot for debugging
|
|
screenshot_path = DATA_DIR / f"howoge_nobtn_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path))
|
|
# Log all buttons on page for debugging
|
|
buttons = await page.query_selector_all('button, a.btn, a[class*="button"]')
|
|
for btn in buttons[:10]:
|
|
try:
|
|
text = await btn.inner_text()
|
|
logger.info(f"[HOWOGE] Found button: {text[:50]}")
|
|
except:
|
|
pass
|
|
except Exception as e:
|
|
result["message"] = f"Error: {str(e)}"
|
|
logger.error(f"[HOWOGE] Exception: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
finally:
|
|
await page.close()
|
|
return result
|
|
|
|
async def _apply_gewobag(self, listing: dict, result: dict) -> dict:
|
|
"""
|
|
Gewobag uses Wohnungshelden (app.wohnungshelden.de) for their application system.
|
|
The application form is embedded in an iframe on the listing page.
|
|
We navigate directly to the iframe URL to fill the form.
|
|
"""
|
|
page = await self.context.new_page()
|
|
try:
|
|
logger.info(f"[GEWOBAG] Opening page: {listing['link']}")
|
|
await page.goto(listing["link"], wait_until="networkidle")
|
|
logger.info("[GEWOBAG] Page loaded")
|
|
await asyncio.sleep(2)
|
|
|
|
try:
|
|
cookie_btn = await page.query_selector('#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll, button:has-text("Alle akzeptieren")')
|
|
if cookie_btn and await cookie_btn.is_visible():
|
|
await cookie_btn.click()
|
|
logger.info("[GEWOBAG] Dismissed cookie banner")
|
|
await asyncio.sleep(1)
|
|
except: pass
|
|
|
|
# Gewobag has Wohnungshelden iframe directly on the page
|
|
logger.info("[GEWOBAG] Looking for Wohnungshelden iframe...")
|
|
iframe_element = await page.query_selector('iframe[src*="wohnungshelden.de"]')
|
|
|
|
if iframe_element:
|
|
iframe_url = await iframe_element.get_attribute('src')
|
|
logger.info(f"[GEWOBAG] Found Wohnungshelden iframe: {iframe_url}")
|
|
|
|
# Navigate to the iframe URL directly in a new page
|
|
iframe_page = await self.context.new_page()
|
|
try:
|
|
await iframe_page.goto(iframe_url, wait_until="networkidle")
|
|
await asyncio.sleep(2)
|
|
logger.info("[GEWOBAG] Loaded Wohnungshelden application page")
|
|
|
|
# Take screenshot
|
|
screenshot_path = DATA_DIR / f"gewobag_wohnungshelden_{listing['id']}.png"
|
|
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
|
|
logger.info(f"[GEWOBAG] Saved Wohnungshelden screenshot")
|
|
|
|
# Fill out Wohnungshelden form (same fields as Degewo)
|
|
form_filled = False
|
|
|
|
# Anrede (Salutation) - ng-select dropdown
|
|
try:
|
|
salutation_dropdown = await iframe_page.query_selector('#salutation-dropdown, ng-select[id*="salutation"]')
|
|
if salutation_dropdown:
|
|
await salutation_dropdown.click()
|
|
await asyncio.sleep(0.5)
|
|
anrede_option = await iframe_page.query_selector(f'.ng-option:has-text("{FORM_ANREDE}")')
|
|
if anrede_option:
|
|
await anrede_option.click()
|
|
logger.info(f"[GEWOBAG] Selected Anrede: {FORM_ANREDE}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[GEWOBAG] Could not set Anrede: {e}")
|
|
|
|
# Vorname (First name)
|
|
try:
|
|
vorname_field = await iframe_page.query_selector('#firstName')
|
|
if vorname_field:
|
|
await vorname_field.fill(FORM_VORNAME)
|
|
logger.info(f"[GEWOBAG] Filled Vorname: {FORM_VORNAME}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[GEWOBAG] Could not fill Vorname: {e}")
|
|
|
|
# Nachname (Last name)
|
|
try:
|
|
nachname_field = await iframe_page.query_selector('#lastName')
|
|
if nachname_field:
|
|
await nachname_field.fill(FORM_NACHNAME)
|
|
logger.info(f"[GEWOBAG] Filled Nachname: {FORM_NACHNAME}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[GEWOBAG] Could not fill Nachname: {e}")
|
|
|
|
# E-Mail
|
|
try:
|
|
email_field = await iframe_page.query_selector('#email')
|
|
if email_field:
|
|
await email_field.fill(FORM_EMAIL)
|
|
logger.info(f"[GEWOBAG] Filled E-Mail: {FORM_EMAIL}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[GEWOBAG] Could not fill E-Mail: {e}")
|
|
|
|
# Telefonnummer - Gewobag uses #phone-number
|
|
try:
|
|
tel_field = await iframe_page.query_selector('#phone-number, input[id*="telefonnummer"], input[id*="phone"]')
|
|
if tel_field:
|
|
await tel_field.fill(FORM_PHONE)
|
|
logger.info(f"[GEWOBAG] Filled Telefon: {FORM_PHONE}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[GEWOBAG] Could not fill Telefon: {e}")
|
|
|
|
# Anzahl einziehende Personen - Gewobag uses formly_*_gesamtzahl
|
|
try:
|
|
personen_field = await iframe_page.query_selector('input[id*="gesamtzahl"], input[id*="numberPersonsTotal"]')
|
|
if personen_field:
|
|
await personen_field.fill(FORM_PERSONS)
|
|
logger.info(f"[GEWOBAG] Filled Anzahl Personen: {FORM_PERSONS}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[GEWOBAG] Could not fill Anzahl Personen: {e}")
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
# Screenshot after filling
|
|
screenshot_path = DATA_DIR / f"gewobag_filled_{listing['id']}.png"
|
|
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
|
|
logger.info(f"[GEWOBAG] Saved filled form screenshot")
|
|
|
|
# Try to submit
|
|
if form_filled:
|
|
try:
|
|
submit_selectors = [
|
|
'button[type="submit"]',
|
|
'button:has-text("Absenden")',
|
|
'button:has-text("Senden")',
|
|
'button:has-text("Anfrage")',
|
|
'.btn-primary',
|
|
]
|
|
|
|
submit_btn = None
|
|
for selector in submit_selectors:
|
|
submit_btn = await iframe_page.query_selector(selector)
|
|
if submit_btn and await submit_btn.is_visible():
|
|
logger.info(f"[GEWOBAG] Found submit button: {selector}")
|
|
break
|
|
submit_btn = None
|
|
|
|
if submit_btn:
|
|
await submit_btn.click()
|
|
logger.info("[GEWOBAG] Clicked submit button")
|
|
await asyncio.sleep(3)
|
|
|
|
# Screenshot after submission
|
|
screenshot_path = DATA_DIR / f"gewobag_submitted_{listing['id']}.png"
|
|
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
|
|
logger.info(f"[GEWOBAG] Saved submission screenshot")
|
|
|
|
result["success"] = True
|
|
result["message"] = "Application submitted via Wohnungshelden"
|
|
else:
|
|
result["success"] = False
|
|
result["message"] = "Form filled but submit button not found"
|
|
logger.warning("[GEWOBAG] Submit button not found")
|
|
except Exception as e:
|
|
result["success"] = False
|
|
result["message"] = f"Submit error: {str(e)}"
|
|
logger.warning(f"[GEWOBAG] Submit error: {e}")
|
|
else:
|
|
result["success"] = False
|
|
result["message"] = "No form fields found in Wohnungshelden"
|
|
logger.warning("[GEWOBAG] Could not find form fields")
|
|
finally:
|
|
await iframe_page.close()
|
|
else:
|
|
result["success"] = False
|
|
result["message"] = "No Wohnungshelden iframe found"
|
|
logger.warning("[GEWOBAG] No Wohnungshelden iframe found")
|
|
screenshot_path = DATA_DIR / f"gewobag_nobtn_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path))
|
|
except Exception as e:
|
|
result["success"] = False
|
|
result["message"] = f"Error: {str(e)}"
|
|
logger.error(f"[GEWOBAG] Exception: {str(e)}")
|
|
finally:
|
|
await page.close()
|
|
return result
|
|
|
|
async def _apply_degewo(self, listing: dict, result: dict) -> dict:
|
|
"""
|
|
Degewo uses Wohnungshelden (app.wohnungshelden.de) for their application system.
|
|
The application form is loaded in an iframe from a different domain.
|
|
We need to navigate directly to the iframe URL or interact with the iframe.
|
|
"""
|
|
page = await self.context.new_page()
|
|
try:
|
|
logger.info(f"[DEGEWO] Opening page: {listing['link']}")
|
|
await page.goto(listing["link"], wait_until="networkidle")
|
|
logger.info("[DEGEWO] Page loaded")
|
|
await asyncio.sleep(2)
|
|
|
|
# Dismiss cookie banner
|
|
try:
|
|
cookie_btn = await page.query_selector('button:has-text("Alle akzeptieren"), #CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll')
|
|
if cookie_btn and await cookie_btn.is_visible():
|
|
await cookie_btn.click()
|
|
logger.info("[DEGEWO] Dismissed cookie banner")
|
|
await asyncio.sleep(1)
|
|
except: pass
|
|
|
|
logger.info("[DEGEWO] Looking for kontaktieren button...")
|
|
apply_btn = await page.query_selector('a:has-text("kontaktieren"), button:has-text("kontaktieren"), a:has-text("Kontaktieren"), button:has-text("Kontaktieren")')
|
|
if apply_btn and await apply_btn.is_visible():
|
|
logger.info("[DEGEWO] Found kontaktieren button, clicking...")
|
|
await apply_btn.click()
|
|
await asyncio.sleep(3)
|
|
|
|
# Degewo uses Wohnungshelden iframe for the application form
|
|
# Find the iframe and get its URL to navigate directly
|
|
iframe_element = await page.query_selector('iframe[src*="wohnungshelden.de"]')
|
|
if iframe_element:
|
|
iframe_url = await iframe_element.get_attribute('src')
|
|
logger.info(f"[DEGEWO] Found Wohnungshelden iframe: {iframe_url}")
|
|
|
|
# Navigate to the iframe URL directly in a new page for full access
|
|
iframe_page = await self.context.new_page()
|
|
try:
|
|
await iframe_page.goto(iframe_url, wait_until="networkidle")
|
|
await asyncio.sleep(2)
|
|
logger.info("[DEGEWO] Loaded Wohnungshelden application page")
|
|
|
|
# Take screenshot of the Wohnungshelden form
|
|
screenshot_path = DATA_DIR / f"degewo_wohnungshelden_{listing['id']}.png"
|
|
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
|
|
logger.info(f"[DEGEWO] Saved Wohnungshelden screenshot to {screenshot_path}")
|
|
|
|
# Save HTML for debugging
|
|
html_content = await iframe_page.content()
|
|
html_path = DATA_DIR / f"degewo_wohnungshelden_{listing['id']}.html"
|
|
with open(html_path, 'w', encoding='utf-8') as f:
|
|
f.write(html_content)
|
|
logger.info(f"[DEGEWO] Saved HTML to {html_path}")
|
|
|
|
# Fill out Wohnungshelden form
|
|
# The form uses specific IDs: #firstName, #lastName, #email, etc.
|
|
form_filled = False
|
|
|
|
# Anrede (Salutation) - ng-select dropdown
|
|
try:
|
|
# Click on the salutation dropdown to open it
|
|
salutation_dropdown = await iframe_page.query_selector('#salutation-dropdown, ng-select[id*="salutation"]')
|
|
if salutation_dropdown:
|
|
await salutation_dropdown.click()
|
|
await asyncio.sleep(0.5)
|
|
# Select "Herr" or "Frau" based on FORM_ANREDE
|
|
anrede_option = await iframe_page.query_selector(f'.ng-option:has-text("{FORM_ANREDE}")')
|
|
if anrede_option:
|
|
await anrede_option.click()
|
|
logger.info(f"[DEGEWO] Selected Anrede: {FORM_ANREDE}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[DEGEWO] Could not set Anrede: {e}")
|
|
|
|
# Vorname (First name)
|
|
try:
|
|
vorname_field = await iframe_page.query_selector('#firstName')
|
|
if vorname_field:
|
|
await vorname_field.fill(FORM_VORNAME)
|
|
logger.info(f"[DEGEWO] Filled Vorname: {FORM_VORNAME}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[DEGEWO] Could not fill Vorname: {e}")
|
|
|
|
# Nachname (Last name)
|
|
try:
|
|
nachname_field = await iframe_page.query_selector('#lastName')
|
|
if nachname_field:
|
|
await nachname_field.fill(FORM_NACHNAME)
|
|
logger.info(f"[DEGEWO] Filled Nachname: {FORM_NACHNAME}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[DEGEWO] Could not fill Nachname: {e}")
|
|
|
|
# E-Mail
|
|
try:
|
|
email_field = await iframe_page.query_selector('#email')
|
|
if email_field:
|
|
await email_field.fill(FORM_EMAIL)
|
|
logger.info(f"[DEGEWO] Filled E-Mail: {FORM_EMAIL}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[DEGEWO] Could not fill E-Mail: {e}")
|
|
|
|
# Telefonnummer
|
|
try:
|
|
tel_field = await iframe_page.query_selector('input[id*="telefonnummer"]')
|
|
if tel_field:
|
|
await tel_field.fill(FORM_PHONE)
|
|
logger.info(f"[DEGEWO] Filled Telefon: {FORM_PHONE}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[DEGEWO] Could not fill Telefon: {e}")
|
|
|
|
# Anzahl einziehende Personen
|
|
try:
|
|
personen_field = await iframe_page.query_selector('input[id*="numberPersonsTotal"]')
|
|
if personen_field:
|
|
await personen_field.fill(FORM_PERSONS)
|
|
logger.info(f"[DEGEWO] Filled Anzahl Personen: {FORM_PERSONS}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[DEGEWO] Could not fill Anzahl Personen: {e}")
|
|
|
|
# "Für sich selbst" dropdown
|
|
try:
|
|
selbst_dropdown = await iframe_page.query_selector('ng-select[id*="fuer_wen"]')
|
|
if selbst_dropdown:
|
|
await selbst_dropdown.click()
|
|
await asyncio.sleep(0.5)
|
|
# Select "Für mich selbst"
|
|
selbst_option = await iframe_page.query_selector('.ng-option:has-text("Für mich selbst"), .ng-option:has-text("selbst")')
|
|
if selbst_option:
|
|
await selbst_option.click()
|
|
logger.info("[DEGEWO] Selected: Für mich selbst")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[DEGEWO] Could not set 'Für sich selbst': {e}")
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
# Take screenshot after filling form
|
|
screenshot_path = DATA_DIR / f"degewo_form_filled_{listing['id']}.png"
|
|
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
|
|
logger.info(f"[DEGEWO] Saved filled form screenshot to {screenshot_path}")
|
|
|
|
# Try to submit
|
|
try:
|
|
# Look for submit button with various patterns
|
|
submit_selectors = [
|
|
'button[type="submit"]',
|
|
'input[type="submit"]',
|
|
'button:has-text("Absenden")',
|
|
'button:has-text("Senden")',
|
|
'button:has-text("Anfrage")',
|
|
'button:has-text("Bewerben")',
|
|
'button:has-text("Submit")',
|
|
'.btn-primary',
|
|
'.submit-btn',
|
|
]
|
|
|
|
submit_btn = None
|
|
for selector in submit_selectors:
|
|
submit_btn = await iframe_page.query_selector(selector)
|
|
if submit_btn and await submit_btn.is_visible():
|
|
logger.info(f"[DEGEWO] Found submit button with selector: {selector}")
|
|
break
|
|
submit_btn = None
|
|
|
|
if submit_btn:
|
|
await submit_btn.click()
|
|
logger.info("[DEGEWO] Clicked submit button")
|
|
await asyncio.sleep(3)
|
|
|
|
# Take screenshot after submission
|
|
screenshot_path = DATA_DIR / f"degewo_submitted_{listing['id']}.png"
|
|
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
|
|
logger.info(f"[DEGEWO] Saved submission screenshot to {screenshot_path}")
|
|
|
|
result["success"] = True
|
|
result["message"] = "Application submitted via Wohnungshelden"
|
|
else:
|
|
# Submit button not found - this is a failure
|
|
result["success"] = False
|
|
result["message"] = "Wohnungshelden form loaded but submit button not found"
|
|
logger.warning("[DEGEWO] Submit button not found in Wohnungshelden form")
|
|
except Exception as e:
|
|
result["success"] = False
|
|
result["message"] = f"Wohnungshelden submit error: {str(e)}"
|
|
logger.warning(f"[DEGEWO] Submit error: {e}")
|
|
finally:
|
|
await iframe_page.close()
|
|
else:
|
|
# No iframe found - try the old approach (fallback for different page structure)
|
|
logger.warning("[DEGEWO] Wohnungshelden iframe not found, trying direct form...")
|
|
|
|
# Take screenshot for debugging
|
|
screenshot_path = DATA_DIR / f"degewo_noiframe_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path), full_page=True)
|
|
|
|
# Save HTML for debugging
|
|
html_content = await page.content()
|
|
html_path = DATA_DIR / "degewo_debug.html"
|
|
with open(html_path, 'w', encoding='utf-8') as f:
|
|
f.write(html_content)
|
|
|
|
result["success"] = False
|
|
result["message"] = "Wohnungshelden iframe not found on page"
|
|
else:
|
|
result["message"] = "No kontaktieren button found"
|
|
logger.warning("[DEGEWO] Could not find kontaktieren button")
|
|
screenshot_path = DATA_DIR / f"degewo_nobtn_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path), full_page=True)
|
|
except Exception as e:
|
|
result["message"] = f"Error: {str(e)}"
|
|
logger.error(f"[DEGEWO] Exception: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
finally:
|
|
await page.close()
|
|
return result
|
|
|
|
async def _apply_gesobau(self, listing: dict, result: dict) -> dict:
|
|
page = await self.context.new_page()
|
|
try:
|
|
logger.info(f"[GESOBAU] Opening page: {listing['link']}")
|
|
await page.goto(listing["link"], wait_until="networkidle")
|
|
logger.info("[GESOBAU] Page loaded")
|
|
await asyncio.sleep(2)
|
|
|
|
try:
|
|
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
|
|
if cookie_btn and await cookie_btn.is_visible():
|
|
await cookie_btn.click()
|
|
logger.info("[GESOBAU] Dismissed cookie banner")
|
|
await asyncio.sleep(1)
|
|
except: pass
|
|
|
|
logger.info("[GESOBAU] Looking for application button...")
|
|
apply_btn = await page.query_selector('a:has-text("Anfragen"), button:has-text("Interesse"), a:has-text("Kontakt")')
|
|
if apply_btn and await apply_btn.is_visible():
|
|
logger.info("[GESOBAU] Found application button, clicking...")
|
|
await apply_btn.click()
|
|
await asyncio.sleep(2)
|
|
|
|
screenshot_path = DATA_DIR / f"gesobau_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path))
|
|
logger.info(f"[GESOBAU] Saved screenshot to {screenshot_path}")
|
|
|
|
result["success"] = False
|
|
result["message"] = "Application page opened but not submitted (not implemented)"
|
|
else:
|
|
result["message"] = "No application button found"
|
|
logger.warning("[GESOBAU] Could not find application button")
|
|
screenshot_path = DATA_DIR / f"gesobau_nobtn_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path))
|
|
except Exception as e:
|
|
result["message"] = f"Error: {str(e)}"
|
|
logger.error(f"[GESOBAU] Exception: {str(e)}")
|
|
finally:
|
|
await page.close()
|
|
return result
|
|
|
|
async def _apply_stadtundland(self, listing: dict, result: dict) -> dict:
|
|
"""
|
|
Stadt und Land has an embedded contact form directly on their listing page.
|
|
No iframe - the form fields are directly accessible.
|
|
Fields: name, surname, street, houseNo, postalCode, city, phone, email
|
|
Checkboxes: privacy, provision
|
|
Submit: "Eingaben prüfen"
|
|
"""
|
|
page = await self.context.new_page()
|
|
try:
|
|
logger.info(f"[STADTUNDLAND] Opening page: {listing['link']}")
|
|
await page.goto(listing["link"], wait_until="networkidle")
|
|
logger.info("[STADTUNDLAND] Page loaded")
|
|
await asyncio.sleep(2)
|
|
|
|
# Dismiss cookie banner
|
|
try:
|
|
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
|
|
if cookie_btn and await cookie_btn.is_visible():
|
|
await cookie_btn.click()
|
|
logger.info("[STADTUNDLAND] Dismissed cookie banner")
|
|
await asyncio.sleep(1)
|
|
except: pass
|
|
|
|
# Scroll down to the contact form
|
|
await page.evaluate("window.scrollBy(0, 500)")
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Take initial screenshot
|
|
screenshot_path = DATA_DIR / f"stadtundland_page_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path), full_page=True)
|
|
logger.info(f"[STADTUNDLAND] Saved page screenshot to {screenshot_path}")
|
|
|
|
# Fill out the embedded form directly
|
|
form_filled = False
|
|
|
|
# Vorname (name field)
|
|
try:
|
|
vorname_field = await page.query_selector('input[name="name"]')
|
|
if vorname_field and await vorname_field.is_visible():
|
|
await vorname_field.fill(FORM_VORNAME)
|
|
logger.info(f"[STADTUNDLAND] Filled Vorname: {FORM_VORNAME}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[STADTUNDLAND] Could not fill Vorname: {e}")
|
|
|
|
# Nachname (surname field)
|
|
try:
|
|
nachname_field = await page.query_selector('input[name="surname"]')
|
|
if nachname_field and await nachname_field.is_visible():
|
|
await nachname_field.fill(FORM_NACHNAME)
|
|
logger.info(f"[STADTUNDLAND] Filled Nachname: {FORM_NACHNAME}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[STADTUNDLAND] Could not fill Nachname: {e}")
|
|
|
|
# Straße (street field)
|
|
try:
|
|
street_field = await page.query_selector('input[name="street"]')
|
|
if street_field and await street_field.is_visible():
|
|
await street_field.fill(FORM_STRASSE)
|
|
logger.info(f"[STADTUNDLAND] Filled Straße: {FORM_STRASSE}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[STADTUNDLAND] Could not fill Straße: {e}")
|
|
|
|
# Hausnummer (houseNo field)
|
|
try:
|
|
house_field = await page.query_selector('input[name="houseNo"]')
|
|
if house_field and await house_field.is_visible():
|
|
await house_field.fill(FORM_HAUSNUMMER)
|
|
logger.info(f"[STADTUNDLAND] Filled Hausnummer: {FORM_HAUSNUMMER}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[STADTUNDLAND] Could not fill Hausnummer: {e}")
|
|
|
|
# PLZ (postalCode field)
|
|
try:
|
|
plz_field = await page.query_selector('input[name="postalCode"]')
|
|
if plz_field and await plz_field.is_visible():
|
|
await plz_field.fill(FORM_PLZ)
|
|
logger.info(f"[STADTUNDLAND] Filled PLZ: {FORM_PLZ}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[STADTUNDLAND] Could not fill PLZ: {e}")
|
|
|
|
# Ort (city field)
|
|
try:
|
|
city_field = await page.query_selector('input[name="city"]')
|
|
if city_field and await city_field.is_visible():
|
|
await city_field.fill(FORM_ORT)
|
|
logger.info(f"[STADTUNDLAND] Filled Ort: {FORM_ORT}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[STADTUNDLAND] Could not fill Ort: {e}")
|
|
|
|
# Telefon (phone field)
|
|
try:
|
|
phone_field = await page.query_selector('input[name="phone"]')
|
|
if phone_field and await phone_field.is_visible():
|
|
await phone_field.fill(FORM_PHONE)
|
|
logger.info(f"[STADTUNDLAND] Filled Telefon: {FORM_PHONE}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[STADTUNDLAND] Could not fill Telefon: {e}")
|
|
|
|
# E-Mail (email field)
|
|
try:
|
|
email_field = await page.query_selector('input[name="email"]')
|
|
if email_field and await email_field.is_visible():
|
|
await email_field.fill(FORM_EMAIL)
|
|
logger.info(f"[STADTUNDLAND] Filled E-Mail: {FORM_EMAIL}")
|
|
form_filled = True
|
|
except Exception as e:
|
|
logger.warning(f"[STADTUNDLAND] Could not fill E-Mail: {e}")
|
|
|
|
# Click privacy checkbox
|
|
try:
|
|
privacy_checkbox = await page.query_selector('input[name="privacy"]')
|
|
if privacy_checkbox and await privacy_checkbox.is_visible():
|
|
if not await privacy_checkbox.is_checked():
|
|
await privacy_checkbox.click()
|
|
logger.info("[STADTUNDLAND] Clicked privacy checkbox")
|
|
except Exception as e:
|
|
logger.warning(f"[STADTUNDLAND] Could not click privacy checkbox: {e}")
|
|
|
|
# Click provision checkbox (optional)
|
|
try:
|
|
provision_checkbox = await page.query_selector('input[name="provision"]')
|
|
if provision_checkbox and await provision_checkbox.is_visible():
|
|
if not await provision_checkbox.is_checked():
|
|
await provision_checkbox.click()
|
|
logger.info("[STADTUNDLAND] Clicked provision checkbox")
|
|
except Exception as e:
|
|
logger.warning(f"[STADTUNDLAND] Could not click provision checkbox: {e}")
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
# Take screenshot after filling form
|
|
screenshot_path = DATA_DIR / f"stadtundland_filled_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path), full_page=True)
|
|
logger.info(f"[STADTUNDLAND] Saved filled form screenshot to {screenshot_path}")
|
|
|
|
# Submit form
|
|
if form_filled:
|
|
try:
|
|
# Stadt und Land uses "Eingaben prüfen" button
|
|
# Step 1: Click "Eingaben prüfen" button
|
|
pruefen_btn = await page.query_selector('button:has-text("Eingaben prüfen")')
|
|
if pruefen_btn and await pruefen_btn.is_visible():
|
|
await pruefen_btn.click()
|
|
logger.info("[STADTUNDLAND] Clicked 'Eingaben prüfen' button")
|
|
await asyncio.sleep(2)
|
|
await page.wait_for_load_state("networkidle")
|
|
|
|
# Take screenshot after validation
|
|
screenshot_path = DATA_DIR / f"stadtundland_validated_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path), full_page=True)
|
|
logger.info(f"[STADTUNDLAND] Saved validation screenshot")
|
|
|
|
# Step 2: Click the final submit button
|
|
final_submit_selectors = [
|
|
'button:has-text("Absenden")',
|
|
'button:has-text("Senden")',
|
|
'button:has-text("Anfrage senden")',
|
|
'button:has-text("Bestätigen")',
|
|
'button[type="submit"]',
|
|
]
|
|
|
|
final_btn = None
|
|
for selector in final_submit_selectors:
|
|
final_btn = await page.query_selector(selector)
|
|
if final_btn and await final_btn.is_visible():
|
|
logger.info(f"[STADTUNDLAND] Found final submit button: {selector}")
|
|
break
|
|
final_btn = None
|
|
|
|
if final_btn:
|
|
await final_btn.click()
|
|
logger.info("[STADTUNDLAND] Clicked final submit button")
|
|
await asyncio.sleep(3)
|
|
await page.wait_for_load_state("networkidle")
|
|
|
|
# Take screenshot after final submission
|
|
screenshot_path = DATA_DIR / f"stadtundland_submitted_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path), full_page=True)
|
|
logger.info(f"[STADTUNDLAND] Saved submission screenshot")
|
|
|
|
# Check for confirmation message
|
|
content = await page.content()
|
|
if "erfolgreich" in content.lower() or "gesendet" in content.lower() or "danke" in content.lower() or "bestätigung" in content.lower():
|
|
result["success"] = True
|
|
result["message"] = "Application submitted successfully"
|
|
logger.info("[STADTUNDLAND] Success! Confirmation message detected")
|
|
else:
|
|
result["success"] = True
|
|
result["message"] = "Form submitted"
|
|
logger.info("[STADTUNDLAND] Form submitted")
|
|
else:
|
|
result["success"] = False
|
|
result["message"] = "Validated but final submit button not found"
|
|
logger.warning("[STADTUNDLAND] Final submit button not found")
|
|
else:
|
|
result["success"] = False
|
|
result["message"] = "Form filled but 'Eingaben prüfen' button not found"
|
|
logger.warning("[STADTUNDLAND] 'Eingaben prüfen' button not found")
|
|
except Exception as e:
|
|
result["success"] = False
|
|
result["message"] = f"Submit error: {str(e)}"
|
|
logger.warning(f"[STADTUNDLAND] Submit error: {e}")
|
|
else:
|
|
result["success"] = False
|
|
result["message"] = "No form fields found on page"
|
|
logger.warning("[STADTUNDLAND] Could not find form fields")
|
|
|
|
except Exception as e:
|
|
result["success"] = False
|
|
result["message"] = f"Error: {str(e)}"
|
|
logger.error(f"[STADTUNDLAND] Exception: {str(e)}")
|
|
finally:
|
|
await page.close()
|
|
return result
|
|
|
|
async def _apply_wbm(self, listing: dict, result: dict) -> dict:
|
|
page = await self.context.new_page()
|
|
try:
|
|
logger.info(f"[WBM] Opening page: {listing['link']}")
|
|
await page.goto(listing["link"], wait_until="networkidle")
|
|
logger.info("[WBM] Page loaded")
|
|
await asyncio.sleep(2)
|
|
|
|
try:
|
|
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
|
|
if cookie_btn and await cookie_btn.is_visible():
|
|
await cookie_btn.click()
|
|
logger.info("[WBM] Dismissed cookie banner")
|
|
await asyncio.sleep(1)
|
|
except: pass
|
|
|
|
logger.info("[WBM] Looking for application button...")
|
|
apply_btn = await page.query_selector('a:has-text("Anfragen"), button:has-text("Interesse"), a:has-text("Bewerben")')
|
|
if apply_btn and await apply_btn.is_visible():
|
|
logger.info("[WBM] Found application button, clicking...")
|
|
await apply_btn.click()
|
|
await asyncio.sleep(2)
|
|
|
|
screenshot_path = DATA_DIR / f"wbm_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path))
|
|
logger.info(f"[WBM] Saved screenshot to {screenshot_path}")
|
|
|
|
result["success"] = False
|
|
result["message"] = "Application page opened but not submitted (not implemented)"
|
|
else:
|
|
result["message"] = "No application button found"
|
|
logger.warning("[WBM] Could not find application button")
|
|
screenshot_path = DATA_DIR / f"wbm_nobtn_{listing['id']}.png"
|
|
await page.screenshot(path=str(screenshot_path))
|
|
except Exception as e:
|
|
result["message"] = f"Error: {str(e)}"
|
|
logger.error(f"[WBM] Exception: {str(e)}")
|
|
finally:
|
|
await page.close()
|
|
return result
|
|
|
|
|
|
class InBerlinMonitor:
|
|
def __init__(self):
|
|
self.browser = None
|
|
self.context = None
|
|
self.logged_in = False
|
|
self.application_handler = None
|
|
|
|
async def init_browser(self):
|
|
"""Initialize Playwright browser"""
|
|
if self.browser is None:
|
|
self.playwright = await async_playwright().start()
|
|
self.browser = await self.playwright.chromium.launch(headless=True)
|
|
self.context = await self.browser.new_context(
|
|
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
|
|
)
|
|
self.application_handler = ApplicationHandler(self.context)
|
|
logger.info("Browser initialized")
|
|
|
|
def load_state(self) -> dict:
|
|
"""Load persistent state"""
|
|
if STATE_FILE.exists():
|
|
with open(STATE_FILE, "r") as f:
|
|
return json.load(f)
|
|
return {"autopilot": False}
|
|
|
|
def save_state(self, state: dict):
|
|
"""Save persistent state"""
|
|
with open(STATE_FILE, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
|
|
def set_autopilot(self, enabled: bool):
|
|
"""Enable or disable autopilot mode"""
|
|
state = self.load_state()
|
|
state["autopilot"] = enabled
|
|
self.save_state(state)
|
|
logger.info(f"Autopilot {'enabled' if enabled else 'disabled'}")
|
|
|
|
def is_autopilot_enabled(self) -> bool:
|
|
"""Check if autopilot mode is enabled"""
|
|
return self.load_state().get("autopilot", False)
|
|
|
|
def load_applications(self) -> dict:
|
|
"""Load application history"""
|
|
if APPLICATIONS_FILE.exists():
|
|
with open(APPLICATIONS_FILE, "r") as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def save_application(self, result: dict):
|
|
"""Save an application result"""
|
|
applications = self.load_applications()
|
|
applications[result["listing_id"]] = result
|
|
with open(APPLICATIONS_FILE, "w") as f:
|
|
json.dump(applications, f, indent=2, ensure_ascii=False)
|
|
|
|
def has_applied(self, listing_id: str) -> bool:
|
|
"""Check if we've already applied to this listing"""
|
|
return listing_id in self.load_applications()
|
|
|
|
async def dismiss_cookie_modal(self, page):
|
|
"""Dismiss the privacy/cookie consent modal if present"""
|
|
try:
|
|
# Wait a bit for modal to appear
|
|
await asyncio.sleep(2)
|
|
|
|
# Try to find and click the accept button in the privacy modal
|
|
# Look for common accept button patterns in German
|
|
accept_selectors = [
|
|
'button:has-text("Akzeptieren")',
|
|
'button:has-text("Alle akzeptieren")',
|
|
'button:has-text("Accept")',
|
|
'button:has-text("Zustimmen")',
|
|
'[x-show="showPrivacyModal"] button',
|
|
'.privacy-modal button',
|
|
'button.accept-cookies',
|
|
# More specific to inberlinwohnen
|
|
'div[x-show="showPrivacyModal"] button:first-of-type',
|
|
]
|
|
|
|
for selector in accept_selectors:
|
|
try:
|
|
button = await page.query_selector(selector)
|
|
if button and await button.is_visible():
|
|
await button.click()
|
|
logger.info(f"Clicked cookie accept button: {selector}")
|
|
await asyncio.sleep(1)
|
|
return True
|
|
except:
|
|
continue
|
|
|
|
# Try clicking any visible button in the modal overlay
|
|
modal = await page.query_selector('div[x-show="showPrivacyModal"]')
|
|
if modal:
|
|
buttons = await modal.query_selector_all('button')
|
|
for btn in buttons:
|
|
if await btn.is_visible():
|
|
text = await btn.inner_text()
|
|
logger.info(f"Found modal button: {text}")
|
|
# Click the first button (usually accept)
|
|
await btn.click()
|
|
await asyncio.sleep(1)
|
|
return True
|
|
|
|
logger.info("No cookie modal found or already dismissed")
|
|
return False
|
|
except Exception as e:
|
|
logger.debug(f"Cookie modal handling: {e}")
|
|
return False
|
|
|
|
async def login(self) -> bool:
|
|
"""Login to inberlinwohnen.de"""
|
|
if not INBERLIN_EMAIL or not INBERLIN_PASSWORD:
|
|
logger.warning("No credentials provided, using public listings")
|
|
return False
|
|
|
|
try:
|
|
page = await self.context.new_page()
|
|
await page.goto("https://www.inberlinwohnen.de/login", wait_until="networkidle")
|
|
|
|
# Handle cookie/privacy modal first
|
|
await self.dismiss_cookie_modal(page)
|
|
|
|
# Fill login form
|
|
await page.fill('input[name="email"], input[type="email"]', INBERLIN_EMAIL)
|
|
await page.fill('input[name="password"], input[type="password"]', INBERLIN_PASSWORD)
|
|
|
|
# Click submit button
|
|
await page.click('button[type="submit"], input[type="submit"]')
|
|
|
|
# Wait for navigation
|
|
await page.wait_for_load_state("networkidle")
|
|
await asyncio.sleep(2)
|
|
|
|
# Check if login successful
|
|
if "mein-bereich" in page.url or await page.query_selector('text="Abmelden"'):
|
|
logger.info("Login successful")
|
|
self.logged_in = True
|
|
await page.close()
|
|
return True
|
|
else:
|
|
logger.error(f"Login failed - ended up at {page.url}")
|
|
await page.close()
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Login error: {e}")
|
|
return False
|
|
|
|
async def fetch_listings(self) -> list[dict]:
|
|
"""Fetch listings from the Wohnungsfinder"""
|
|
listings = []
|
|
|
|
try:
|
|
page = await self.context.new_page()
|
|
|
|
# Use personal Wohnungsfinder when logged in to see filtered listings
|
|
if self.logged_in:
|
|
url = "https://www.inberlinwohnen.de/mein-bereich/wohnungsfinder"
|
|
else:
|
|
url = "https://www.inberlinwohnen.de/wohnungsfinder/"
|
|
|
|
logger.info(f"Fetching listings from {url}")
|
|
await page.goto(url, wait_until="networkidle")
|
|
|
|
# Handle cookie modal if not logged in
|
|
if not self.logged_in:
|
|
await self.dismiss_cookie_modal(page)
|
|
|
|
# Wait for dynamic content to load - look for listing text pattern
|
|
try:
|
|
await page.wait_for_selector('text=/\\d,\\d\\s*Zimmer/', timeout=15000)
|
|
logger.info("Listings content loaded")
|
|
except:
|
|
logger.warning("Timeout waiting for listings content")
|
|
|
|
# Additional wait for initial listings to render
|
|
await asyncio.sleep(2)
|
|
|
|
# Collect all listings content by clicking through pagination
|
|
all_content = ""
|
|
page_num = 1
|
|
max_pages = 10 # Safety limit
|
|
|
|
while page_num <= max_pages:
|
|
# Get current page content
|
|
current_content = await page.content()
|
|
all_content += current_content
|
|
|
|
# Check for "next page" button (Livewire pagination)
|
|
next_btn = await page.query_selector('[wire\\:click*="nextPage"]')
|
|
if next_btn and await next_btn.is_visible():
|
|
await next_btn.click()
|
|
await asyncio.sleep(2) # Wait for Livewire to update
|
|
page_num += 1
|
|
else:
|
|
break
|
|
|
|
logger.info(f"Collected content from {page_num} page(s)")
|
|
content = all_content
|
|
|
|
# Debug: save HTML to file for inspection
|
|
debug_path = DATA_DIR / "debug_page.html"
|
|
with open(debug_path, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
logger.info(f"Saved debug HTML to {debug_path}")
|
|
|
|
# Debug: Log page title and check for listing count
|
|
count_match = re.search(r'(\d+)\s*Wohnungen? für Sie gefunden', content)
|
|
if count_match:
|
|
logger.info(f"Page shows {count_match.group(1)} listings available")
|
|
|
|
# Also check for "Zeige X bis Y von Z Angeboten"
|
|
show_match = re.search(r'Zeige \d+ bis \d+ von (\d+) Angeboten', content)
|
|
if show_match:
|
|
logger.info(f"Page shows {show_match.group(1)} total offers")
|
|
|
|
# Decode HTML entities and JSON escaped slashes for extraction
|
|
content_decoded = html.unescape(content)
|
|
content_decoded = content_decoded.replace('\\/', '/')
|
|
|
|
# Build flatId -> deeplink mapping from wire:snapshot JSON data
|
|
# Format in HTML: "deeplink":"https://...","flatId":12345
|
|
deeplink_pattern = r'"deeplink":"(https://[^"]+)","flatId":(\d+)'
|
|
deeplink_matches = re.findall(deeplink_pattern, content_decoded)
|
|
id_to_link = {flat_id: link for link, flat_id in deeplink_matches}
|
|
logger.info(f"Found {len(id_to_link)} deeplink mappings")
|
|
|
|
# Extract listings from button elements with aria-label
|
|
# Format: @click="open !== 12345 ..." aria-label="Wohnungsangebot - 2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Adresse"
|
|
button_pattern = r'@click="open !== (\d+)[^"]*"[^>]*aria-label="Wohnungsangebot - ([^"]+)"'
|
|
button_matches = re.findall(button_pattern, content_decoded)
|
|
logger.info(f"Found {len(button_matches)} listing buttons")
|
|
|
|
for flat_id, listing_text in button_matches:
|
|
# Parse listing text: "2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Rhinstraße 4, 10315 Lichtenberg"
|
|
parts_match = re.match(r'(\d,\d)\s*Zimmer,\s*([\d,]+)\s*m²,\s*([\d.,]+)\s*€\s*(?:Kaltmiete\s*)?\|\s*(.+)', listing_text)
|
|
if not parts_match:
|
|
continue
|
|
|
|
rooms, size, price, address = parts_match.groups()
|
|
rooms = rooms.strip()
|
|
address = address.strip()
|
|
|
|
if len(address) < 5:
|
|
continue
|
|
|
|
# Get the deeplink for this flat
|
|
detail_link = id_to_link.get(flat_id, url)
|
|
|
|
listing_id = hashlib.md5(f"{rooms}{size}{price}{address}".encode()).hexdigest()[:12]
|
|
|
|
listings.append({
|
|
"id": listing_id,
|
|
"rooms": f"{rooms} Zimmer",
|
|
"size": f"{size} m²",
|
|
"price": f"{price} €",
|
|
"address": address,
|
|
"link": detail_link,
|
|
"fetched_at": datetime.now().isoformat()
|
|
})
|
|
|
|
# Deduplicate by id
|
|
seen_ids = set()
|
|
unique_listings = []
|
|
for listing in listings:
|
|
if listing["id"] not in seen_ids:
|
|
seen_ids.add(listing["id"])
|
|
unique_listings.append(listing)
|
|
listings = unique_listings
|
|
|
|
await page.close()
|
|
logger.info(f"Fetched {len(listings)} unique listings")
|
|
return listings
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching listings: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return []
|
|
|
|
def load_previous_listings(self) -> dict:
|
|
"""Load previously saved listings"""
|
|
if LISTINGS_FILE.exists():
|
|
with open(LISTINGS_FILE, "r") as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def save_listings(self, listings: list[dict]):
|
|
"""Save current listings"""
|
|
listings_dict = {l["id"]: l for l in listings}
|
|
with open(LISTINGS_FILE, "w") as f:
|
|
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
|
|
|
|
def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
|
|
"""Find listings that are new since last check"""
|
|
new = []
|
|
for listing in current:
|
|
if listing["id"] not in previous:
|
|
new.append(listing)
|
|
return new
|
|
|
|
def send_telegram(self, message: str):
|
|
"""Send notification via Telegram"""
|
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
|
logger.warning("Telegram not configured, skipping notification")
|
|
return
|
|
|
|
try:
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
|
data = {
|
|
"chat_id": TELEGRAM_CHAT_ID,
|
|
"text": message,
|
|
"parse_mode": "HTML",
|
|
"disable_web_page_preview": True
|
|
}
|
|
response = requests.post(url, data=data)
|
|
if response.ok:
|
|
logger.info("Telegram notification sent")
|
|
else:
|
|
logger.error(f"Telegram error: {response.text}")
|
|
except Exception as e:
|
|
logger.error(f"Telegram error: {e}")
|
|
|
|
def log_listing_times(self, new_listings: list[dict]):
|
|
"""Log new listing appearance times to CSV for later analysis"""
|
|
if not new_listings:
|
|
return
|
|
|
|
import csv
|
|
file_exists = TIMING_FILE.exists()
|
|
|
|
with open(TIMING_FILE, "a", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
if not file_exists:
|
|
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
|
|
|
|
now = datetime.now()
|
|
for listing in new_listings:
|
|
writer.writerow([
|
|
now.isoformat(),
|
|
now.strftime("%A"), # Weekday name
|
|
now.hour,
|
|
now.minute,
|
|
listing["rooms"],
|
|
listing["size"],
|
|
listing["price"],
|
|
listing["address"],
|
|
listing["id"]
|
|
])
|
|
|
|
logger.info(f"Logged {len(new_listings)} listing times to CSV")
|
|
|
|
def notify_new_listings(self, new_listings: list[dict], application_results: dict = None):
|
|
"""Send individual notification for each new listing"""
|
|
if not new_listings:
|
|
return
|
|
|
|
for listing in new_listings:
|
|
link = listing.get('link', 'https://www.inberlinwohnen.de/wohnungsfinder/')
|
|
|
|
message = f"🏠 <b>Neue Wohnung!</b>\n\n"
|
|
message += f"🚪 <b>{listing['rooms']}</b>\n"
|
|
message += f"📐 {listing['size']}\n"
|
|
message += f"💰 {listing['price']}\n"
|
|
message += f"📍 {listing['address']}\n\n"
|
|
message += f"👉 <a href=\"{link}\">Alle Details</a>"
|
|
|
|
# Add autopilot status if application was attempted
|
|
if application_results and listing["id"] in application_results:
|
|
result = application_results[listing["id"]]
|
|
if result["success"]:
|
|
message += f"\n\n🤖 <b>Auto-applied!</b> ({result['company']})"
|
|
if result["message"]:
|
|
message += f"\n<i>{result['message']}</i>"
|
|
else:
|
|
message += f"\n\n⚠️ <b>Auto-apply failed</b> ({result['company']})"
|
|
if result["message"]:
|
|
message += f"\n<i>{result['message']}</i>"
|
|
|
|
self.send_telegram(message)
|
|
time.sleep(0.5)
|
|
|
|
async def apply_to_listings(self, listings: list[dict]) -> dict:
|
|
"""Apply to multiple listings, returns results dict"""
|
|
results = {}
|
|
for listing in listings:
|
|
if self.has_applied(listing["id"]):
|
|
logger.info(f"Already applied to {listing['id']}, skipping")
|
|
continue
|
|
result = await self.application_handler.apply(listing)
|
|
results[listing["id"]] = result
|
|
self.save_application(result)
|
|
status = "✅" if result["success"] else "❌"
|
|
logger.info(f"Application {status}: {listing['address']} - {result['message']}")
|
|
await asyncio.sleep(2)
|
|
return results
|
|
|
|
def check(self):
|
|
"""Run a single check for new listings"""
|
|
logger.info("Starting check...")
|
|
|
|
# Login if credentials provided
|
|
if not self.logged_in and INBERLIN_EMAIL:
|
|
asyncio.get_event_loop().run_until_complete(self._async_login())
|
|
|
|
# Fetch current listings
|
|
current_listings = asyncio.get_event_loop().run_until_complete(self._async_fetch())
|
|
if not current_listings:
|
|
logger.warning("No listings fetched")
|
|
return
|
|
|
|
# Load previous listings
|
|
previous_listings = self.load_previous_listings()
|
|
|
|
# First run - just save baseline
|
|
if not previous_listings:
|
|
logger.info(f"First run - saving {len(current_listings)} listings as baseline")
|
|
self.save_listings(current_listings)
|
|
return
|
|
|
|
# Find new listings
|
|
new_listings = self.find_new_listings(current_listings, previous_listings)
|
|
|
|
application_results = {}
|
|
if new_listings:
|
|
logger.info(f"Found {len(new_listings)} new listing(s)")
|
|
self.log_listing_times(new_listings)
|
|
|
|
# Apply automatically if autopilot is enabled
|
|
if self.is_autopilot_enabled():
|
|
logger.info("Autopilot enabled - applying to listings...")
|
|
application_results = asyncio.get_event_loop().run_until_complete(
|
|
self._async_apply(new_listings)
|
|
)
|
|
|
|
self.notify_new_listings(new_listings, application_results)
|
|
else:
|
|
logger.info("No new listings")
|
|
|
|
# Save current state
|
|
self.save_listings(current_listings)
|
|
|
|
async def _async_login(self):
|
|
await self.init_browser()
|
|
await self.login()
|
|
|
|
async def _async_fetch(self):
|
|
await self.init_browser()
|
|
return await self.fetch_listings()
|
|
|
|
async def _async_apply(self, listings: list[dict]):
|
|
await self.init_browser()
|
|
return await self.apply_to_listings(listings)
|
|
|
|
|
|
class WGCompanyMonitor:
|
|
"""Monitor WGcompany.de for new WG room listings"""
|
|
|
|
def __init__(self):
|
|
self.browser = None
|
|
self.context = None
|
|
|
|
async def init_browser(self):
|
|
"""Initialize Playwright browser"""
|
|
if self.browser is None:
|
|
self.playwright = await async_playwright().start()
|
|
self.browser = await self.playwright.chromium.launch(headless=True)
|
|
self.context = await self.browser.new_context(
|
|
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
|
|
)
|
|
logger.info("[WGCOMPANY] Browser initialized")
|
|
|
|
async def fetch_listings(self) -> list[dict]:
|
|
"""Fetch WG listings from wgcompany.de search"""
|
|
listings = []
|
|
|
|
try:
|
|
page = await self.context.new_page()
|
|
|
|
# Use simple search page: st=1 (Berlin), mi=10 (simple WG search), li=100
|
|
search_url = "http://www.wgcompany.de/cgi-bin/seite?st=1&mi=10&li=100"
|
|
logger.info(f"[WGCOMPANY] Loading search page: {search_url}")
|
|
await page.goto(search_url, wait_until="networkidle")
|
|
await asyncio.sleep(2)
|
|
|
|
# Fill search form - field names from simple search:
|
|
# c = Min. Größe (min size m²)
|
|
# a = Max. Miete (max rent €)
|
|
# l = Alter (age)
|
|
# e = Bezirk (district select)
|
|
|
|
# Min size field
|
|
if WGCOMPANY_MIN_SIZE:
|
|
min_size_field = await page.query_selector('input[name="c"]')
|
|
if min_size_field:
|
|
await min_size_field.fill(WGCOMPANY_MIN_SIZE)
|
|
logger.info(f"[WGCOMPANY] Set min size: {WGCOMPANY_MIN_SIZE} m²")
|
|
|
|
# Max rent field
|
|
if WGCOMPANY_MAX_PRICE:
|
|
max_price_field = await page.query_selector('input[name="a"]')
|
|
if max_price_field:
|
|
await max_price_field.fill(WGCOMPANY_MAX_PRICE)
|
|
logger.info(f"[WGCOMPANY] Set max rent: {WGCOMPANY_MAX_PRICE} €")
|
|
|
|
# Age field (l = Alter)
|
|
if WGCOMPANY_AGE:
|
|
age_field = await page.query_selector('input[name="l"]')
|
|
if age_field:
|
|
await age_field.fill(WGCOMPANY_AGE)
|
|
logger.info(f"[WGCOMPANY] Set age: {WGCOMPANY_AGE}")
|
|
|
|
# Smoker filter (o = RaucherIn: NR=Nichtraucher, R=Raucher)
|
|
if WGCOMPANY_SMOKER:
|
|
smoker_select = await page.query_selector('select[name="o"]')
|
|
if smoker_select:
|
|
await smoker_select.select_option(WGCOMPANY_SMOKER)
|
|
logger.info(f"[WGCOMPANY] Set smoker: {WGCOMPANY_SMOKER}")
|
|
|
|
# District selection (e = Bezirk, multi-select)
|
|
# Leave as default "egal" (all districts) unless specified
|
|
if WGCOMPANY_BEZIRK and WGCOMPANY_BEZIRK != "0":
|
|
bezirk_select = await page.query_selector('select[name="e"]')
|
|
if bezirk_select:
|
|
await bezirk_select.select_option(WGCOMPANY_BEZIRK)
|
|
logger.info(f"[WGCOMPANY] Set district: {WGCOMPANY_BEZIRK}")
|
|
|
|
# Submit the search form
|
|
submit_btn = await page.query_selector('input[type="submit"][value*="finde"], input[type="submit"]')
|
|
if submit_btn:
|
|
logger.info("[WGCOMPANY] Submitting search form...")
|
|
await submit_btn.click()
|
|
await page.wait_for_load_state("networkidle")
|
|
await asyncio.sleep(2)
|
|
|
|
# Get results page content
|
|
content = await page.content()
|
|
|
|
# Save debug HTML
|
|
debug_path = DATA_DIR / "wgcompany_debug.html"
|
|
with open(debug_path, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
logger.info(f"[WGCOMPANY] Saved debug HTML to {debug_path}")
|
|
|
|
# Parse listings from the results page
|
|
# WGcompany results typically have tables with room info
|
|
# Look for listing links and extract data
|
|
|
|
# Pattern to find listing detail links
|
|
# Format: wg.pl?...function=wgzeigen... with room details in table rows
|
|
listing_links = await page.query_selector_all('a[href*="wg.pl"][href*="wgzeigen"]')
|
|
logger.info(f"[WGCOMPANY] Found {len(listing_links)} listing links")
|
|
|
|
for link_elem in listing_links:
|
|
try:
|
|
href = await link_elem.get_attribute("href")
|
|
if not href:
|
|
continue
|
|
|
|
# Get surrounding text/row for listing details
|
|
parent = await link_elem.evaluate_handle("el => el.closest('tr') || el.parentElement")
|
|
row_text = await parent.evaluate("el => el.innerText") if parent else ""
|
|
|
|
# Extract price from row text (e.g., "350 €" or "350€")
|
|
price_match = re.search(r'(\d+)\s*€', row_text)
|
|
price = price_match.group(1) + " €" if price_match else "?"
|
|
|
|
# Extract size (e.g., "15 m²" or "15m²")
|
|
size_match = re.search(r'(\d+)\s*m²', row_text)
|
|
size = size_match.group(1) + " m²" if size_match else "?"
|
|
|
|
# Extract district/location
|
|
# Common Berlin districts in text
|
|
bezirk_patterns = [
|
|
"Kreuzberg", "Neukölln", "Friedrichshain", "Prenzlauer Berg",
|
|
"Mitte", "Wedding", "Charlottenburg", "Schöneberg", "Tempelhof",
|
|
"Steglitz", "Wilmersdorf", "Pankow", "Lichtenberg", "Treptow",
|
|
"Köpenick", "Reinickendorf", "Spandau", "Zehlendorf", "Moabit"
|
|
]
|
|
location = "Berlin"
|
|
for bez in bezirk_patterns:
|
|
if bez.lower() in row_text.lower():
|
|
location = bez
|
|
break
|
|
|
|
# Make absolute URL
|
|
if not href.startswith("http"):
|
|
href = f"http://www.wgcompany.de{href}" if href.startswith("/") else f"http://www.wgcompany.de/cgi-bin/{href}"
|
|
|
|
# Generate unique ID from link and key details
|
|
listing_id = hashlib.md5(f"{href}{price}{size}".encode()).hexdigest()[:12]
|
|
|
|
listings.append({
|
|
"id": listing_id,
|
|
"rooms": "1 Zimmer (WG)",
|
|
"size": size,
|
|
"price": price,
|
|
"address": location,
|
|
"link": href,
|
|
"source": "wgcompany",
|
|
"fetched_at": datetime.now().isoformat()
|
|
})
|
|
except Exception as e:
|
|
logger.debug(f"[WGCOMPANY] Error parsing listing: {e}")
|
|
continue
|
|
|
|
# Deduplicate by id
|
|
seen_ids = set()
|
|
unique_listings = []
|
|
for listing in listings:
|
|
if listing["id"] not in seen_ids:
|
|
seen_ids.add(listing["id"])
|
|
unique_listings.append(listing)
|
|
listings = unique_listings
|
|
|
|
await page.close()
|
|
logger.info(f"[WGCOMPANY] Fetched {len(listings)} unique listings")
|
|
return listings
|
|
|
|
except Exception as e:
|
|
logger.error(f"[WGCOMPANY] Error fetching listings: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return []
|
|
|
|
def load_previous_listings(self) -> dict:
|
|
"""Load previously saved WGcompany listings"""
|
|
if WGCOMPANY_LISTINGS_FILE.exists():
|
|
with open(WGCOMPANY_LISTINGS_FILE, "r") as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def save_listings(self, listings: list[dict]):
|
|
"""Save current WGcompany listings"""
|
|
listings_dict = {l["id"]: l for l in listings}
|
|
with open(WGCOMPANY_LISTINGS_FILE, "w") as f:
|
|
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
|
|
|
|
def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
|
|
"""Find listings that are new since last check"""
|
|
new = []
|
|
for listing in current:
|
|
if listing["id"] not in previous:
|
|
new.append(listing)
|
|
return new
|
|
|
|
def send_telegram(self, message: str):
|
|
"""Send notification via Telegram"""
|
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
|
logger.warning("[WGCOMPANY] Telegram not configured, skipping notification")
|
|
return
|
|
|
|
try:
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
|
data = {
|
|
"chat_id": TELEGRAM_CHAT_ID,
|
|
"text": message,
|
|
"parse_mode": "HTML",
|
|
"disable_web_page_preview": True
|
|
}
|
|
response = requests.post(url, data=data)
|
|
if response.ok:
|
|
logger.info("[WGCOMPANY] Telegram notification sent")
|
|
else:
|
|
logger.error(f"[WGCOMPANY] Telegram error: {response.text}")
|
|
except Exception as e:
|
|
logger.error(f"[WGCOMPANY] Telegram error: {e}")
|
|
|
|
def log_listing_times(self, new_listings: list[dict]):
|
|
"""Log new WGcompany listing appearance times to CSV"""
|
|
if not new_listings:
|
|
return
|
|
|
|
file_exists = WGCOMPANY_TIMING_FILE.exists()
|
|
|
|
with open(WGCOMPANY_TIMING_FILE, "a", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
if not file_exists:
|
|
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
|
|
|
|
now = datetime.now()
|
|
for listing in new_listings:
|
|
writer.writerow([
|
|
now.isoformat(),
|
|
now.strftime("%A"),
|
|
now.hour,
|
|
now.minute,
|
|
listing["rooms"],
|
|
listing["size"],
|
|
listing["price"],
|
|
listing["address"],
|
|
listing["id"]
|
|
])
|
|
|
|
logger.info(f"[WGCOMPANY] Logged {len(new_listings)} listing times to CSV")
|
|
|
|
def notify_new_listings(self, new_listings: list[dict]):
|
|
"""Send individual notification for each new WGcompany listing"""
|
|
if not new_listings:
|
|
return
|
|
|
|
for listing in new_listings:
|
|
message = f"🏠 <b>Neues WG-Zimmer!</b> (WGcompany)\n\n"
|
|
message += f"🚪 <b>{listing['rooms']}</b>\n"
|
|
message += f"📐 {listing['size']}\n"
|
|
message += f"💰 {listing['price']}\n"
|
|
message += f"📍 {listing['address']}\n\n"
|
|
message += f"👉 <a href=\"{listing['link']}\">Zum Angebot</a>"
|
|
|
|
self.send_telegram(message)
|
|
time.sleep(0.5)
|
|
|
|
def check(self):
|
|
"""Run a single check for new WGcompany listings"""
|
|
logger.info("[WGCOMPANY] Starting check...")
|
|
|
|
# Fetch current listings
|
|
current_listings = asyncio.get_event_loop().run_until_complete(self._async_fetch())
|
|
if not current_listings:
|
|
logger.warning("[WGCOMPANY] No listings fetched")
|
|
return
|
|
|
|
# Load previous listings
|
|
previous_listings = self.load_previous_listings()
|
|
|
|
# First run - just save baseline
|
|
if not previous_listings:
|
|
logger.info(f"[WGCOMPANY] First run - saving {len(current_listings)} listings as baseline")
|
|
self.save_listings(current_listings)
|
|
return
|
|
|
|
# Find new listings
|
|
new_listings = self.find_new_listings(current_listings, previous_listings)
|
|
|
|
if new_listings:
|
|
logger.info(f"[WGCOMPANY] Found {len(new_listings)} new listing(s)")
|
|
self.log_listing_times(new_listings)
|
|
self.notify_new_listings(new_listings)
|
|
else:
|
|
logger.info("[WGCOMPANY] No new listings")
|
|
|
|
# Save current state
|
|
self.save_listings(current_listings)
|
|
|
|
async def _async_fetch(self):
|
|
await self.init_browser()
|
|
return await self.fetch_listings()
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
|
|
# Ensure data directory exists
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Initialize monitors
|
|
inberlin_monitor = InBerlinMonitor()
|
|
wgcompany_monitor = WGCompanyMonitor() if WGCOMPANY_ENABLED else None
|
|
|
|
# Start Telegram command listener
|
|
telegram_bot = TelegramBot(inberlin_monitor)
|
|
telegram_bot.start()
|
|
|
|
logger.info(f"Monitor started (interval: {CHECK_INTERVAL}s)")
|
|
logger.info(f"InBerlin Autopilot: {'ENABLED' if inberlin_monitor.is_autopilot_enabled() else 'DISABLED'}")
|
|
logger.info(f"WGcompany: {'ENABLED' if WGCOMPANY_ENABLED else 'DISABLED'}")
|
|
|
|
# Run periodic cleanup hourly
|
|
last_cleanup = 0
|
|
|
|
while True:
|
|
# Check InBerlinWohnen
|
|
try:
|
|
inberlin_monitor.check()
|
|
except Exception as e:
|
|
logger.error(f"InBerlin check failed: {e}")
|
|
|
|
# Periodic cleanup: remove PNGs older than 24h and prune logs older than 7 days
|
|
try:
|
|
if time.time() - last_cleanup > 3600: # every hour
|
|
logger.info("Running periodic cleanup (old PNGs, prune logs)")
|
|
_cleanup_old_files(png_hours=24, log_days=7)
|
|
last_cleanup = time.time()
|
|
except Exception:
|
|
logger.exception("Cleanup failed")
|
|
|
|
# Check WGcompany
|
|
if wgcompany_monitor:
|
|
try:
|
|
wgcompany_monitor.check()
|
|
except Exception as e:
|
|
logger.error(f"WGcompany check failed: {e}")
|
|
|
|
time.sleep(CHECK_INTERVAL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|