working app

This commit is contained in:
Aron Petau 2025-12-29 22:46:10 +01:00
parent 8e69e30387
commit 3057cda8d3
12 changed files with 708 additions and 232 deletions

View file

@ -1,14 +1,15 @@
Autopilot bot command list for @BotFather
Use @BotFather -> /setcommands and paste the following lines exactly (one per line):
Use @BotFather -> setcommands and paste the following lines exactly (one per line):
/autopilot - Enable or disable automatic applications. Usage: `/autopilot on` or `/autopilot off`
/status - Show current status and statistics (autopilot state, application counts by company)
/plot - Show weekly listing patterns (image)
/errorrate - Show autopilot success vs failure plot (image)
/retryfailed - Retry all failed applications up to 3 times
/help - Show help and command usage
autopilot - Enable or disable automatic applications. Usage: autopilot on or autopilot off
status - Show current status and statistics (autopilot state, application counts by company)
plot - Show weekly listing patterns (image)
errorrate - Show autopilot success vs failure plot (image)
retryfailed - Retry all failed applications up to 3 times
resetlistings - Delete all seen listings (forces re-check of all flats, does not affect stats or WGcompany)
help - Show help and command usage
Example: send `/setcommands` to @BotFather, then paste the above lines and confirm.

View file

@ -156,19 +156,32 @@ Ensure all dependencies are installed and the environment is configured correctl
## Workflow Diagram
```mermaid
graph TD
A[Start] --> B[Fetch Listings]
B --> C{New Listings?}
C -->|Yes| D[Log to CSV]
C -->|Yes| E[Send Telegram Notification]
C -->|Yes| F{Autopilot Enabled?}
F -->|Yes| G[Auto-Apply to Listings]
F -->|No| H[Save to Applications.json]
C -->|No| I[End]
D --> I
E --> I
G --> H
H --> I
flowchart TD
A([Start]) --> B[Fetch Listings]
B --> C[Load Previous Listings]
C --> D[Deduplicate: Find New Listings]
D --> E{Any New Listings?}
E -- No --> Z1([Sleep & Wait])
E -- Yes --> F[Log New Listings to CSV]
F --> G[Save Current Listings]
G --> H[Check Autopilot State]
H -- Off --> I[Send Telegram Notification (No Apply)]
H -- On --> J[Attempt Auto-Apply to Each New Listing]
J --> K{Application Success?}
K -- Yes --> L[Log Success, Save to applications.json]
K -- No --> M[Log Failure, Save to applications.json]
L --> N[Send Telegram Notification (Success)]
M --> O[Send Telegram Notification (Failure)]
N --> P([Sleep & Wait])
O --> P
I --> P
Z1 --> B
P --> B
%% Details for error/debugging
J --> Q{Handler Error?}
Q -- Yes --> R[Save Screenshot/HTML for Debug]
R --> M
Q -- No --> K
```
This diagram illustrates the workflow of the bot, from fetching listings to logging, notifying, and optionally applying to new listings.

View file

@ -45,7 +45,9 @@ class ApplicationHandler:
Handles browser automation, listing extraction, application delegation, and Telegram notifications.
"""
def __init__(self, browser_context, state_manager, applications_file: Path = None):
def __init__(self, browser_context, state_manager, applications_file: Optional[Path] = None):
if browser_context is None:
raise ValueError("browser_context must not be None. ApplicationHandler requires a valid Playwright context.")
self.context = browser_context
self.state_manager = state_manager
self.applications_file = applications_file or APPLICATIONS_FILE
@ -72,6 +74,7 @@ class ApplicationHandler:
company = self._detect_company(link)
if company == "wgcompany":
continue # skip WGCompany listings for main handler
company_label = company.capitalize() if company != "unknown" else "Wohnung"
message = (
f"\ud83c\udfe0 <b>[{company_label}] Neue Wohnung!</b>\n\n"
@ -108,9 +111,6 @@ class ApplicationHandler:
self.telegram_bot._send_message(message)
else:
logger.info(f"[TELEGRAM] Would send message for: {listing['address']} ({listing['rooms']}, {listing['size']}, {listing['price']})")
self.telegram_bot._send_message(message)
else:
logger.info(f"[TELEGRAM] Would send message for: {listing['address']} ({listing['rooms']}, {listing['size']}, {listing['price']})")
async def apply_to_listings(self, listings: list[dict]) -> dict:
"""
@ -118,6 +118,9 @@ class ApplicationHandler:
Returns a dict of application results keyed by listing ID.
"""
results = {}
# Fail fast if context is ever None (should never happen)
if self.context is None:
raise RuntimeError("browser_context is None in apply_to_listings. This should never happen.")
for listing in listings:
if self.has_applied(listing["id"]):
logger.info(f"Already applied to {listing['id']} ({listing['address']}), skipping.")
@ -131,6 +134,7 @@ class ApplicationHandler:
return results
def log_listing_times(self, new_listings: list[dict]):
"""
Log new listing appearance times to CSV for later analysis and pattern mining.
@ -164,31 +168,7 @@ class ApplicationHandler:
logger.info(f"Logged {len(new_listings)} new listing times to CSV.")
def __init__(self, browser_context, state_manager):
self.context = browser_context
self.state_manager = state_manager
self.handlers = {
"howoge": HowogeHandler(browser_context),
"gewobag": GewobagHandler(browser_context),
"degewo": DegewoHandler(browser_context),
"gesobau": GesobauHandler(browser_context),
"stadtundland": StadtUndLandHandler(browser_context),
"wbm": WBMHandler(browser_context),
}
self.applications_file = applications_file or APPLICATIONS_FILE
def __init__(self, browser_context, state_manager, applications_file: Path = None):
self.context = browser_context
self.state_manager = state_manager
self.applications_file = applications_file or APPLICATIONS_FILE
self.handlers = {
"howoge": HowogeHandler(browser_context),
"gewobag": GewobagHandler(browser_context),
"degewo": DegewoHandler(browser_context),
"gesobau": GesobauHandler(browser_context),
"stadtundland": StadtUndLandHandler(browser_context),
"wbm": WBMHandler(browser_context),
}
# ...existing code...
async def init_browser(self):
@ -333,16 +313,39 @@ class ApplicationHandler:
def _generate_weekly_plot(self) -> str:
"""Generate a heatmap of listings by day of week and hour"""
if not TIMING_FILE.exists():
logger.warning("No timing file found for weekly plot")
return ""
"""Generate a heatmap of listings by day of week and hour. Always returns a plot path, even if no data."""
plot_path = DATA_DIR / "weekly_plot.png"
try:
if not TIMING_FILE.exists():
logger.warning("No timing file found for weekly plot. Generating empty plot.")
# Generate empty plot
fig, ax = plt.subplots(figsize=(10, 6))
ax.set_xticks(range(24))
ax.set_yticks(range(7))
ax.set_xticklabels([f"{h}:00" for h in range(24)], rotation=90)
ax.set_yticklabels(["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"])
ax.set_title("Listings Heatmap (No Data)")
ax.text(0.5, 0.5, "No data available", fontsize=18, ha='center', va='center', transform=ax.transAxes, color='gray')
plt.savefig(plot_path)
plt.close(fig)
return str(plot_path)
df = pd.read_csv(TIMING_FILE, parse_dates=["timestamp"])
if df.empty:
logger.warning("Timing file is empty. Generating empty plot.")
fig, ax = plt.subplots(figsize=(10, 6))
ax.set_xticks(range(24))
ax.set_yticks(range(7))
ax.set_xticklabels([f"{h}:00" for h in range(24)], rotation=90)
ax.set_yticklabels(["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"])
ax.set_title("Listings Heatmap (No Data)")
ax.text(0.5, 0.5, "No data available", fontsize=18, ha='center', va='center', transform=ax.transAxes, color='gray')
plt.savefig(plot_path)
plt.close(fig)
return str(plot_path)
df["day_of_week"] = df["timestamp"].dt.dayofweek
df["hour"] = df["timestamp"].dt.hour
heatmap_data = df.groupby(["day_of_week", "hour"]).size().unstack(fill_value=0)
fig, ax = plt.subplots(figsize=(10, 6))
@ -356,15 +359,23 @@ class ApplicationHandler:
ax.set_title("Listings Heatmap (Day of Week vs Hour)")
plot_path = DATA_DIR / "weekly_plot.png"
plt.savefig(plot_path)
plt.close(fig)
logger.info(f"Weekly plot saved to {plot_path}")
return str(plot_path)
except Exception as e:
logger.error(f"Failed to generate weekly plot: {e}")
return ""
# Always generate a fallback empty plot
fig, ax = plt.subplots(figsize=(10, 6))
ax.set_xticks(range(24))
ax.set_yticks(range(7))
ax.set_xticklabels([f"{h}:00" for h in range(24)], rotation=90)
ax.set_yticklabels(["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"])
ax.set_title("Listings Heatmap (Error)")
ax.text(0.5, 0.5, "Plot error", fontsize=18, ha='center', va='center', transform=ax.transAxes, color='red')
plt.savefig(plot_path)
plt.close(fig)
return str(plot_path)
def _generate_error_rate_plot(self):

View file

View file

@ -1,6 +1,9 @@
from .base_handler import BaseHandler
from handlers.base_handler import BaseHandler
import logging
import asyncio
import os
from pathlib import Path
logger = logging.getLogger(__name__)
@ -9,13 +12,16 @@ class DegewoHandler(BaseHandler):
self.context = browser_context
async def apply(self, listing: dict, result: dict) -> dict:
DATA_DIR = Path("data/degewo")
DATA_DIR.mkdir(parents=True, exist_ok=True)
page = await self.context.new_page()
try:
logger.info(f"[DEGEWO] Open: {listing['link']}")
logger.info(f"[DEGEWO] Opening page: {listing['link']}")
response = await page.goto(listing["link"], wait_until="networkidle")
logger.info("[DEGEWO] Page loaded")
await asyncio.sleep(2)
# Detect 404 by status or page title
# 404 detection
status = response.status if response else None
page_title = await page.title()
if status == 404 or (page_title and "404" in page_title):
@ -23,63 +29,67 @@ class DegewoHandler(BaseHandler):
result["success"] = False
result["message"] = "Listing is no longer available (404). Application impossible. Will not retry."
result["permanent_fail"] = True
return result
# Always handle cookies and consent before anything else
await self.handle_cookies(page)
await self.handle_consent(page)
# Save HTML after modal handling for debugging
try:
html_content = await page.content()
with open("data/degewo_debug.html", "w", encoding="utf-8") as f:
f.write(html_content)
except Exception as e:
logger.debug(f"[DEGEWO] Debug HTML not saved: {e}")
logger.info("[DEGEWO] Searching for application button...")
selectors = [
'a.btn',
'button.btn',
'a:has-text("Bewerben")',
'button:has-text("Bewerben")',
'a:has-text("Anfrage")',
'button:has-text("Anfrage")',
'a:has-text("Kontakt")',
'button:has-text("Kontakt")',
]
apply_btn = None
for sel in selectors:
all_btns = await page.query_selector_all(sel)
logger.debug(f"[DEGEWO] Selector '{sel}': {len(all_btns)} matches")
for btn in all_btns:
try:
if await btn.is_visible():
btn_text = (await btn.inner_text()).lower()
if any(x in btn_text for x in ["drucken", "merken", "zurück"]):
continue
apply_btn = btn
logger.info(f"[DEGEWO] Found visible application button: {sel} [{btn_text}]")
break
except Exception as e:
logger.debug(f"[DEGEWO] Button visibility error: {e}")
if apply_btn:
break
if apply_btn:
await apply_btn.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
await apply_btn.click()
await asyncio.sleep(2)
result["success"] = True
result["message"] = "Application submitted successfully."
else:
logger.warning("[DEGEWO] No application button found.")
result["message"] = "No application button found."
except Exception as e:
result["message"] = f"Error during application: {e}"
logger.error(f"[DEGEWO] Application error: {e}")
finally:
await page.close()
return result
# Check for 'INSERAT DEAKTIVIERT' (deactivated listing)
page_content = await page.content()
if "INSERAT DEAKTIVIERT" in page_content or "Inserat deaktiviert" in page_content:
logger.warning("[DEGEWO] Listing is deactivated (INSERAT DEAKTIVIERT detected), treating as 404")
result["success"] = False
result["message"] = "Listing deactivated (404)"
result["deactivated"] = True # Mark for removal from retries
await page.close()
return result
# Dismiss cookie banner
try:
cookie_btn = await page.query_selector('button:has-text("Alle akzeptieren"), #CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[DEGEWO] Dismissed cookie banner")
await asyncio.sleep(1)
except Exception as e:
logger.debug(f"[DEGEWO] Cookie banner dismiss failed: {e}")
logger.info("[DEGEWO] Looking for kontaktieren button...")
apply_btn = await page.query_selector('a:has-text("kontaktieren"), button:has-text("kontaktieren"), a:has-text("Kontaktieren"), button:has-text("Kontaktieren")')
if apply_btn and await apply_btn.is_visible():
logger.info("[DEGEWO] Found kontaktieren button, clicking...")
await apply_btn.click()
await asyncio.sleep(3)
# Degewo uses Wohnungshelden iframe for the application form
# Find the iframe and get its URL to navigate directly
iframe_element = await page.query_selector('iframe[src*="wohnungshelden.de"]')
if iframe_element:
iframe_url = await iframe_element.get_attribute('src')
logger.info(f"[DEGEWO] Found Wohnungshelden iframe: {iframe_url}")
# Navigate to the iframe URL directly in a new page for full access
iframe_page = await self.context.new_page()
try:
await iframe_page.goto(iframe_url, wait_until="networkidle")
await asyncio.sleep(2)
logger.info("[DEGEWO] Loaded Wohnungshelden application page")
# TODO: Implement form-filling and submission logic here
finally:
await iframe_page.close()
else:
# No iframe found - try the old approach (fallback for different page structure)
logger.warning("[DEGEWO] Wohnungshelden iframe not found, trying direct form...")
# TODO: Implement fallback logic here
else:
result["message"] = "No kontaktieren button found"
logger.warning("[DEGEWO] Could not find kontaktieren button")
screenshot_path = DATA_DIR / f"degewo_nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
await page.close()
return result
except Exception as e:
result["success"] = False
result["message"] = f"Error: {str(e)}"
logger.error(f"[DEGEWO] Exception: {str(e)}")
await page.close()
return result

View file

@ -11,10 +11,22 @@ class GesobauHandler(BaseHandler):
async def apply(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[GESOBAU] Open: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info(f"[GESOBAU] Opening page: {listing['link']}")
response = await page.goto(listing["link"], wait_until="networkidle")
logger.info("[GESOBAU] Page loaded")
await asyncio.sleep(2)
# 404 detection
status = response.status if response else None
page_title = await page.title()
if status == 404 or (page_title and "404" in page_title):
logger.warning(f"[GESOBAU] Listing is down (404): {listing['link']}")
result["success"] = False
result["message"] = "Listing is no longer available (404). Application impossible. Will not retry."
result["permanent_fail"] = True
await page.close()
return result
# Always handle cookies and consent before anything else
await self.handle_cookies(page)
await self.handle_consent(page)
@ -63,8 +75,47 @@ class GesobauHandler(BaseHandler):
await asyncio.sleep(0.5)
await apply_btn.click()
await asyncio.sleep(2)
# --- Post-click confirmation logic ---
logger.info("[GESOBAU] Clicked application button, checking for confirmation...")
# Save screenshot and HTML after click
try:
await page.screenshot(path="data/gesobau_after_apply.png")
logger.info("[GESOBAU] Saved screenshot after application click.")
except Exception as e:
logger.warning(f"[GESOBAU] Could not save screenshot: {e}")
try:
html_after = await page.content()
with open("data/gesobau_after_apply.html", "w", encoding="utf-8") as f:
f.write(html_after)
logger.info("[GESOBAU] Saved HTML after application click.")
except Exception as e:
logger.warning(f"[GESOBAU] Could not save HTML after apply: {e}")
# Look for confirmation message on the page
confirmation_selectors = [
'text="Vielen Dank"',
'text="Ihre Anfrage wurde gesendet"',
'text="Bestätigung"',
'div:has-text("Vielen Dank")',
'div:has-text("Ihre Anfrage wurde gesendet")',
]
confirmed = False
for sel in confirmation_selectors:
try:
el = await page.query_selector(sel)
if el and await el.is_visible():
logger.info(f"[GESOBAU] Found confirmation element: {sel}")
confirmed = True
break
except Exception as e:
logger.debug(f"[GESOBAU] Error checking confirmation selector {sel}: {e}")
if confirmed:
result["success"] = True
result["message"] = "Application submitted successfully."
result["message"] = "Application submitted and confirmation detected."
else:
logger.warning("[GESOBAU] No confirmation message detected after application click.")
result["success"] = False
result["message"] = "Clicked application button, but no confirmation detected. Check screenshot and HTML."
else:
logger.warning("[GESOBAU] No application button found.")
result["message"] = "No application button found."

View file

@ -90,8 +90,47 @@ class GewobagHandler(BaseHandler):
logger.info("[GEWOBAG] Clicking button...")
await apply_btn.click()
await asyncio.sleep(2)
# --- Post-click confirmation logic ---
logger.info("[GEWOBAG] Clicked application button, checking for confirmation...")
# Save screenshot and HTML after click
try:
await page.screenshot(path="data/gewobag_after_apply.png")
logger.info("[GEWOBAG] Saved screenshot after application click.")
except Exception as e:
logger.warning(f"[GEWOBAG] Could not save screenshot: {e}")
try:
html_after = await page.content()
with open("data/gewobag_after_apply.html", "w", encoding="utf-8") as f:
f.write(html_after)
logger.info("[GEWOBAG] Saved HTML after application click.")
except Exception as e:
logger.warning(f"[GEWOBAG] Could not save HTML after apply: {e}")
# Look for confirmation message on the page
confirmation_selectors = [
'text="Vielen Dank"',
'text="Ihre Anfrage wurde gesendet"',
'text="Bestätigung"',
'div:has-text("Vielen Dank")',
'div:has-text("Ihre Anfrage wurde gesendet")',
]
confirmed = False
for sel in confirmation_selectors:
try:
el = await page.query_selector(sel)
if el and await el.is_visible():
logger.info(f"[GEWOBAG] Found confirmation element: {sel}")
confirmed = True
break
except Exception as e:
logger.debug(f"[GEWOBAG] Error checking confirmation selector {sel}: {e}")
if confirmed:
result["success"] = True
result["message"] = "Application submitted successfully."
result["message"] = "Application submitted and confirmation detected."
else:
logger.warning("[GEWOBAG] No confirmation message detected after application click.")
result["success"] = False
result["message"] = "Clicked application button, but no confirmation detected. Check screenshot and HTML."
else:
result["message"] = "No application button found."
except Exception as e:

View file

@ -9,10 +9,15 @@ class HowogeHandler(BaseHandler):
self.context = browser_context
async def apply(self, listing: dict, result: dict) -> dict:
import os
from pathlib import Path
DATA_DIR = Path("data/howoge")
DATA_DIR.mkdir(parents=True, exist_ok=True)
page = await self.context.new_page()
try:
logger.info(f"[HOWOGE] Open: {listing['link']}")
logger.info(f"[HOWOGE] Opening page: {listing['link']}")
response = await page.goto(listing["link"], wait_until="networkidle")
logger.info("[HOWOGE] Page loaded")
await asyncio.sleep(2)
# Detect 404 by status or page title
@ -23,23 +28,35 @@ class HowogeHandler(BaseHandler):
result["success"] = False
result["message"] = "Listing is no longer available (404). Application impossible. Will not retry."
result["permanent_fail"] = True
await page.close()
return result
# Always handle cookies and consent before anything else
await self.handle_cookies(page)
await self.handle_consent(page)
# Save HTML after modal handling for debugging
# Handle cookies
try:
html_content = await page.content()
with open("data/howoge_debug.html", "w", encoding="utf-8") as f:
f.write(html_content)
except Exception as e:
logger.debug(f"[HOWOGE] Debug HTML not saved: {e}")
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[HOWOGE] Dismissed cookie banner")
await asyncio.sleep(1)
except: pass
await self.log_listing_details(listing)
# Try to handle consent manager (consentmanager.net)
try:
consent_selectors = [
'#cmpbntyestxt', '.cmpboxbtnyes', 'a.cmpboxbtn.cmpboxbtnyes',
'#cmpwelcomebtnyes', '.cmptxt_btn_yes'
]
for sel in consent_selectors:
consent_btn = await page.query_selector(sel)
if consent_btn and await consent_btn.is_visible():
await consent_btn.click()
logger.info("[HOWOGE] Dismissed consent manager")
await asyncio.sleep(1)
break
except: pass
logger.info("[HOWOGE] Searching for application button...")
# Look for "Besichtigung vereinbaren" button
logger.info("[HOWOGE] Looking for 'Besichtigung vereinbaren' button...")
selectors = [
'a[href*="besichtigung-vereinbaren"]',
'a:has-text("Besichtigung vereinbaren")',
@ -50,32 +67,160 @@ class HowogeHandler(BaseHandler):
apply_btn = None
for sel in selectors:
all_btns = await page.query_selector_all(sel)
logger.debug(f"[HOWOGE] Selector '{sel}': {len(all_btns)} matches")
logger.info(f"[HOWOGE] Selector '{sel}' found {len(all_btns)} matches")
for btn in all_btns:
try:
if await btn.is_visible():
apply_btn = btn
logger.info(f"[HOWOGE] Found visible application button: {sel}")
logger.info(f"[HOWOGE] Found visible button with selector '{sel}'")
break
except Exception as e:
logger.debug(f"[HOWOGE] Button visibility error: {e}")
except:
pass
if apply_btn:
break
if apply_btn:
# Scroll the button into view and click
logger.info("[HOWOGE] Found application button, scrolling into view...")
await apply_btn.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
logger.info("[HOWOGE] Clicking button...")
await apply_btn.click()
await asyncio.sleep(2)
result["success"] = True
result["message"] = "Application submitted successfully."
else:
logger.warning("[HOWOGE] No application button found.")
result["message"] = "No application button found."
await asyncio.sleep(3)
await page.wait_for_load_state("networkidle")
logger.info("[HOWOGE] Clicked button, starting multi-step form process...")
max_steps = 6 # safety limit
for step in range(1, max_steps + 1):
logger.info(f"[HOWOGE] Processing step {step}")
await page.evaluate("window.scrollBy(0, 300)")
await asyncio.sleep(0.5)
email_field = await page.query_selector('input[name*="email" i]')
if email_field and await email_field.is_visible():
logger.info("[HOWOGE] Email field is visible - form is ready!")
break
checkboxes = await page.query_selector_all('input[type="checkbox"]')
clicked_checkbox = False
for checkbox in checkboxes:
try:
if await checkbox.is_visible() and not await checkbox.is_checked():
await checkbox.evaluate("el => el.click()")
clicked_checkbox = True
logger.info(f"[HOWOGE] Clicked checkbox in step {step}")
await asyncio.sleep(0.5)
except Exception as e:
result["message"] = f"Error during application: {e}"
logger.error(f"[HOWOGE] Application error: {e}")
logger.debug(f"[HOWOGE] Checkbox click failed: {e}")
if clicked_checkbox:
await asyncio.sleep(1)
screenshot_path = DATA_DIR / f"step{step}_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path), full_page=True)
weiter_btns = await page.query_selector_all('button:has-text("Weiter")')
weiter_clicked = False
for btn in weiter_btns:
try:
if await btn.is_visible():
await btn.click()
weiter_clicked = True
logger.info(f"[HOWOGE] Clicked 'Weiter' button in step {step}")
await asyncio.sleep(2)
await page.wait_for_load_state("networkidle")
break
except Exception as e:
logger.debug(f"[HOWOGE] Weiter click failed: {e}")
if not weiter_clicked and not clicked_checkbox:
logger.warning(f"[HOWOGE] No action possible in step {step}, breaking")
break
# Now try to fill the form
logger.info("[HOWOGE] Attempting to fill form fields...")
vorname_field = await page.query_selector('input[name*="firstName" i], input[name*="vorname" i]')
nachname_field = await page.query_selector('input[name*="lastName" i], input[name*="nachname" i]')
email_field = await page.query_selector('input[type="email"], input[name*="email" i]')
form_filled = False
if vorname_field and await vorname_field.is_visible():
await vorname_field.fill(os.environ.get("FORM_VORNAME", "Max"))
logger.info(f"[HOWOGE] Filled Vorname: {os.environ.get('FORM_VORNAME', 'Max')}")
form_filled = True
else:
logger.warning("[HOWOGE] Vorname field not found or not visible")
if nachname_field and await nachname_field.is_visible():
await nachname_field.fill(os.environ.get("FORM_NACHNAME", "Mustermann"))
logger.info(f"[HOWOGE] Filled Nachname: {os.environ.get('FORM_NACHNAME', 'Mustermann')}")
form_filled = True
else:
logger.warning("[HOWOGE] Nachname field not found or not visible")
if email_field and await email_field.is_visible():
await email_field.fill(os.environ.get("FORM_EMAIL", "test@example.com"))
logger.info(f"[HOWOGE] Filled Email: {os.environ.get('FORM_EMAIL', 'test@example.com')}")
form_filled = True
else:
logger.warning("[HOWOGE] Email field not found or not visible")
phone_field = await page.query_selector('input[type="tel"], input[name*="telefon" i], input[name*="phone" i]')
if phone_field and await phone_field.is_visible():
await phone_field.fill(os.environ.get("FORM_PHONE", "0123456789"))
logger.info(f"[HOWOGE] Filled Phone: {os.environ.get('FORM_PHONE', '0123456789')}")
screenshot_path2 = DATA_DIR / f"filled_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path2), full_page=True)
logger.info(f"[HOWOGE] Saved filled form screenshot to {screenshot_path2}")
if form_filled:
submit_btn = None
for selector in ['button:has-text("Anfrage senden")', 'button:has-text("Absenden")', 'button:has-text("Senden")']:
btn = await page.query_selector(selector)
if btn and await btn.is_visible():
submit_btn = btn
logger.info(f"[HOWOGE] Found submit button with selector: {selector}")
break
if submit_btn:
logger.info("[HOWOGE] Found submit button, clicking...")
await submit_btn.click()
await asyncio.sleep(3)
await page.wait_for_load_state("networkidle")
screenshot_path3 = DATA_DIR / f"submitted_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path3))
logger.info(f"[HOWOGE] Saved post-submit screenshot to {screenshot_path3}")
content = await page.content()
if "erfolgreich" in content.lower() or "gesendet" in content.lower() or "danke" in content.lower() or "bestätigung" in content.lower():
result["success"] = True
result["message"] = "Application submitted successfully"
logger.info("[HOWOGE] Success! Confirmation message detected")
else:
result["success"] = False
result["message"] = "Form submitted but no confirmation detected"
logger.warning("[HOWOGE] Form submitted but no clear confirmation")
else:
result["success"] = False
result["message"] = "Form filled but no submit button found"
logger.warning("[HOWOGE] Could not find submit button")
else:
result["success"] = False
result["message"] = "Could not find form fields to fill after navigating steps"
logger.warning("[HOWOGE] No form fields found after multi-step navigation")
else:
result["message"] = "No application button found"
logger.warning("[HOWOGE] Could not find 'Besichtigung vereinbaren' button")
screenshot_path = DATA_DIR / f"nobtn_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path))
buttons = await page.query_selector_all('button, a.btn, a[class*="button"]')
for btn in buttons[:10]:
try:
text = await btn.inner_text()
logger.info(f"[HOWOGE] Found button: {text[:50]}")
except:
pass
except Exception as e:
result["message"] = f"Error: {str(e)}"
logger.error(f"[HOWOGE] Exception: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Save debug HTML on error
debug_html_path = DATA_DIR / f"debug_{listing['id']}.html"
try:
html = await page.content()
with open(debug_html_path, "w", encoding="utf-8") as f:
f.write(html)
logger.info(f"[HOWOGE] Saved debug HTML to {debug_html_path}")
except Exception as html_e:
logger.error(f"[HOWOGE] Failed to save debug HTML: {html_e}")
finally:
await page.close()
return result

View file

@ -1,6 +1,20 @@
from .base_handler import BaseHandler
import logging
import asyncio
import os
from pathlib import Path
# Load environment variables for form fields and data dir
FORM_VORNAME = os.environ.get("FORM_VORNAME", "")
FORM_NACHNAME = os.environ.get("FORM_NACHNAME", "")
FORM_STRASSE = os.environ.get("FORM_STRASSE", "")
FORM_HAUSNUMMER = os.environ.get("FORM_HAUSNUMMER", "")
FORM_PLZ = os.environ.get("FORM_PLZ", "")
FORM_ORT = os.environ.get("FORM_ORT", "")
FORM_PHONE = os.environ.get("FORM_PHONE", "")
FORM_EMAIL = os.environ.get("FORM_EMAIL", "")
DATA_DIR = Path(os.environ.get("DATA_DIR", "data"))
logger = logging.getLogger(__name__)
@ -11,77 +25,178 @@ class StadtUndLandHandler(BaseHandler):
async def apply(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[STADT UND LAND] Open: {listing['link']}")
logger.info(f"[STADTUNDLAND] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[STADTUNDLAND] Page loaded")
await asyncio.sleep(2)
# Always handle cookies and consent before anything else
await self.handle_cookies(page)
await self.handle_consent(page)
# Save HTML after modal handling for debugging
# Dismiss cookie banner
try:
html_content = await page.content()
with open("data/stadtundland_debug.html", "w", encoding="utf-8") as f:
f.write(html_content)
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[STADTUNDLAND] Dismissed cookie banner")
await asyncio.sleep(1)
except Exception as e:
logger.debug(f"[STADT UND LAND] Debug HTML not saved: {e}")
logger.debug(f"[STADTUNDLAND] Cookie banner dismiss failed: {e}")
# 404/permanent fail detection
error_texts = [
"Hier ist etwas schief gelaufen",
"Leider können wir Ihnen zur Zeit keine Details zu diesem Inserat anzeigen"
]
page_text = await page.text_content('body')
if page_text:
for err in error_texts:
if err in page_text:
logger.warning(f"[STADT UND LAND] Permanent fail: {err}")
result["permanent_fail"] = True
result["message"] = "Listing is no longer available (404 detected on STADT UND LAND)."
await page.close()
return result
# Look for application button (robust selectors)
logger.info("[STADT UND LAND] Searching for application button...")
selectors = [
'a[href*="bewerben"]',
'button:has-text("Bewerben")',
'a:has-text("Bewerben")',
'button.btn',
'a.Button_button__JnZ4E',
'button.Button_button__JnZ4E',
]
apply_btn = None
for sel in selectors:
all_btns = await page.query_selector_all(sel)
logger.debug(f"[STADT UND LAND] Selector '{sel}': {len(all_btns)} matches")
for btn in all_btns:
try:
if await btn.is_visible():
apply_btn = btn
logger.info(f"[STADT UND LAND] Found visible application button: {sel}")
break
except Exception as e:
logger.debug(f"[STADT UND LAND] Button visibility error: {e}")
if apply_btn:
break
if apply_btn:
await apply_btn.scroll_into_view_if_needed()
# Scroll to form
await page.evaluate("window.scrollBy(0, 500)")
await asyncio.sleep(0.5)
await apply_btn.click()
await asyncio.sleep(2)
result["success"] = True
result["message"] = "Application submitted successfully."
else:
logger.warning("[STADT UND LAND] No application button found.")
result["message"] = "No application button found."
# Fill out the embedded form directly
form_filled = False
try:
# Vorname
vorname_field = await page.query_selector('input[name="name"]')
if vorname_field and await vorname_field.is_visible():
await vorname_field.fill(FORM_VORNAME)
logger.info(f"[STADTUNDLAND] Filled Vorname: {FORM_VORNAME}")
form_filled = True
# Nachname
nachname_field = await page.query_selector('input[name="surname"]')
if nachname_field and await nachname_field.is_visible():
await nachname_field.fill(FORM_NACHNAME)
logger.info(f"[STADTUNDLAND] Filled Nachname: {FORM_NACHNAME}")
form_filled = True
# Straße
street_field = await page.query_selector('input[name="street"]')
if street_field and await street_field.is_visible():
await street_field.fill(FORM_STRASSE)
logger.info(f"[STADTUNDLAND] Filled Straße: {FORM_STRASSE}")
form_filled = True
# Hausnummer
house_field = await page.query_selector('input[name="houseNo"]')
if house_field and await house_field.is_visible():
await house_field.fill(FORM_HAUSNUMMER)
logger.info(f"[STADTUNDLAND] Filled Hausnummer: {FORM_HAUSNUMMER}")
form_filled = True
# PLZ
plz_field = await page.query_selector('input[name="postalCode"]')
if plz_field and await plz_field.is_visible():
await plz_field.fill(FORM_PLZ)
logger.info(f"[STADTUNDLAND] Filled PLZ: {FORM_PLZ}")
form_filled = True
# Ort
city_field = await page.query_selector('input[name="city"]')
if city_field and await city_field.is_visible():
await city_field.fill(FORM_ORT)
logger.info(f"[STADTUNDLAND] Filled Ort: {FORM_ORT}")
form_filled = True
# Telefon
phone_field = await page.query_selector('input[name="phone"]')
if phone_field and await phone_field.is_visible():
await phone_field.fill(FORM_PHONE)
logger.info(f"[STADTUNDLAND] Filled Telefon: {FORM_PHONE}")
form_filled = True
# E-Mail
email_field = await page.query_selector('input[name="email"]')
if email_field and await email_field.is_visible():
await email_field.fill(FORM_EMAIL)
logger.info(f"[STADTUNDLAND] Filled E-Mail: {FORM_EMAIL}")
form_filled = True
except Exception as e:
result["message"] = f"Error during application: {e}"
logger.error(f"[STADT UND LAND] Application error: {e}")
logger.warning(f"[STADTUNDLAND] Error filling form fields: {e}")
# Click privacy checkbox
try:
privacy_checkbox = await page.query_selector('input[name="privacy"]')
if privacy_checkbox and await privacy_checkbox.is_visible():
if not await privacy_checkbox.is_checked():
await privacy_checkbox.click()
logger.info("[STADTUNDLAND] Clicked privacy checkbox")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not click privacy checkbox: {e}")
# Click provision checkbox (optional)
try:
provision_checkbox = await page.query_selector('input[name="provision"]')
if provision_checkbox and await provision_checkbox.is_visible():
if not await provision_checkbox.is_checked():
await provision_checkbox.click()
logger.info("[STADTUNDLAND] Clicked provision checkbox")
except Exception as e:
logger.warning(f"[STADTUNDLAND] Could not click provision checkbox: {e}")
await asyncio.sleep(1)
# Screenshot after filling form
screenshot_path2 = DATA_DIR / f"stadtundland_filled_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path2), full_page=True)
logger.info(f"[STADTUNDLAND] Saved filled form screenshot to {screenshot_path2}")
# Submit form
if form_filled:
try:
pruefen_btn = await page.query_selector('button:has-text("Eingaben prüfen")')
if pruefen_btn and await pruefen_btn.is_visible():
await pruefen_btn.click()
logger.info("[STADTUNDLAND] Clicked 'Eingaben prüfen' button")
await asyncio.sleep(2)
await page.wait_for_load_state("networkidle")
# Screenshot after validation
screenshot_path3 = DATA_DIR / f"stadtundland_validated_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path3), full_page=True)
logger.info(f"[STADTUNDLAND] Saved validation screenshot to {screenshot_path3}")
# Final submit
final_submit_selectors = [
'button:has-text("Absenden")',
'button:has-text("Senden")',
'button:has-text("Anfrage senden")',
'button:has-text("Bestätigen")',
'button[type="submit"]',
]
final_btn = None
for selector in final_submit_selectors:
btn = await page.query_selector(selector)
if btn and await btn.is_visible():
final_btn = btn
logger.info(f"[STADTUNDLAND] Found final submit button: {selector}")
break
if final_btn:
await final_btn.click()
logger.info("[STADTUNDLAND] Clicked final submit button")
await asyncio.sleep(3)
await page.wait_for_load_state("networkidle")
# Screenshot after submission
screenshot_path4 = DATA_DIR / f"stadtundland_submitted_{listing['id']}.png"
await page.screenshot(path=str(screenshot_path4), full_page=True)
logger.info(f"[STADTUNDLAND] Saved submission screenshot to {screenshot_path4}")
# Check for confirmation
content = await page.content()
if any(word in content.lower() for word in ["erfolgreich", "gesendet", "danke", "bestätigung"]):
result["success"] = True
result["message"] = "Application submitted successfully"
logger.info("[STADTUNDLAND] Success! Confirmation message detected")
else:
result["success"] = True
result["message"] = "Form submitted"
logger.info("[STADTUNDLAND] Form submitted")
else:
result["success"] = False
result["message"] = "Validated but final submit button not found"
logger.warning("[STADTUNDLAND] Final submit button not found")
else:
result["success"] = False
result["message"] = "Form filled but 'Eingaben prüfen' button not found"
logger.warning("[STADTUNDLAND] 'Eingaben prüfen' button not found")
except Exception as e:
result["success"] = False
result["message"] = f"Submit error: {str(e)}"
logger.warning(f"[STADTUNDLAND] Submit error: {e}")
else:
result["success"] = False
result["message"] = "No form fields found on page"
logger.warning("[STADTUNDLAND] Could not find form fields")
except Exception as e:
result["success"] = False
result["message"] = f"Error: {str(e)}"
logger.error(f"[STADTUNDLAND] Exception: {str(e)}")
finally:
await page.close()
return result

View file

@ -119,8 +119,47 @@ class WBMHandler(BaseHandler):
logger.info("[WBM] Clicking application button...")
await apply_btn.click()
await asyncio.sleep(2)
# --- Post-click confirmation logic ---
logger.info("[WBM] Clicked application button, checking for confirmation...")
# Save screenshot and HTML after click
try:
await page.screenshot(path="data/wbm_after_apply.png")
logger.info("[WBM] Saved screenshot after application click.")
except Exception as e:
logger.warning(f"[WBM] Could not save screenshot: {e}")
try:
html_after = await page.content()
with open("data/wbm_after_apply.html", "w", encoding="utf-8") as f:
f.write(html_after)
logger.info("[WBM] Saved HTML after application click.")
except Exception as e:
logger.warning(f"[WBM] Could not save HTML after apply: {e}")
# Look for confirmation message on the page
confirmation_selectors = [
'text="Vielen Dank"',
'text="Ihre Anfrage wurde gesendet"',
'text="Bestätigung"',
'div:has-text("Vielen Dank")',
'div:has-text("Ihre Anfrage wurde gesendet")',
]
confirmed = False
for sel in confirmation_selectors:
try:
el = await page.query_selector(sel)
if el and await el.is_visible():
logger.info(f"[WBM] Found confirmation element: {sel}")
confirmed = True
break
except Exception as e:
logger.debug(f"[WBM] Error checking confirmation selector {sel}: {e}")
if confirmed:
result["success"] = True
result["message"] = "Application button clicked on detail page. (Submission not implemented)"
result["message"] = "Application submitted and confirmation detected."
else:
logger.warning("[WBM] No confirmation message detected after application click.")
result["success"] = False
result["message"] = "Clicked application button, but no confirmation detected. Check screenshot and HTML."
else:
result["message"] = "No application button found on detail page."
except Exception as e:

33
main.py
View file

@ -45,8 +45,17 @@ async def main():
# Initialize state manager
state_manager = StateManager(Path("data/state.json"))
# --- Playwright browser/context setup ---
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(headless=True)
browser_context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
logger.info("Playwright browser context initialized.")
# Application handler manages browser/context
app_handler = ApplicationHandler(None, state_manager)
app_handler = ApplicationHandler(browser_context, state_manager)
# Set up Telegram bot and inject into handler, passing the main event loop
event_loop = asyncio.get_running_loop()
@ -58,8 +67,6 @@ async def main():
wg_notifier = WGCompanyNotifier(telegram_bot=telegram_bot, refresh_minutes=10)
wg_task = asyncio.create_task(wg_notifier.run())
await app_handler.init_browser()
try:
logger.info(f"Bot is now running. Refreshing every {CHECK_INTERVAL} seconds...")
@ -72,7 +79,22 @@ async def main():
continue
previous_listings = app_handler.load_previous_listings()
if not previous_listings:
logger.info(f"First run - saving {len(current_listings)} listings as baseline")
logger.info(f"First run - saving {len(current_listings)} listings as baseline and marking as failed applications")
# Mark all as failed applications so /retryfailed can be used
for listing in current_listings:
result = {
"listing_id": listing.get("id"),
"company": app_handler._detect_company(listing.get("link", "")),
"link": listing.get("link"),
"timestamp": str(listing.get("timestamp", "")) or str(listing.get("date", "")) or "",
"success": False,
"message": "First run, not auto-applied. Use /retryfailed to attempt.",
"address": listing.get("address", ""),
"rooms": listing.get("rooms", ""),
"price": listing.get("price", ""),
"retries": 0
}
app_handler.save_application(result)
app_handler.save_listings(current_listings)
await asyncio.sleep(CHECK_INTERVAL)
_flush_rotating_file_handlers()
@ -94,8 +116,7 @@ async def main():
except Exception as e:
logger.error(f"[MAIN] Error in main loop: {e}")
finally:
if hasattr(app_handler, 'browser') and app_handler.browser:
await app_handler.browser.close()
await browser.close()
logger.info("Browser closed successfully.")
if __name__ == "__main__":

View file

@ -14,9 +14,31 @@ TELEGRAM_MAX_RETRIES = int(os.environ.get("TELEGRAM_MAX_RETRIES", 3))
logger = logging.getLogger(__name__)
class TelegramBot:
"""Handle Telegram commands for controlling the monitor"""
def _handle_reset_listings_command(self):
"""Move listings.json to data/old/ with a timestamp, preserving statistics and application history."""
import shutil
from datetime import datetime
try:
listings_path = os.path.join("data", "listings.json")
old_dir = os.path.join("data", "old")
if os.path.exists(listings_path):
# Ensure old_dir exists
os.makedirs(old_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
dest_path = os.path.join(old_dir, f"listings_{timestamp}.json")
shutil.move(listings_path, dest_path)
msg = f"🗑️ <b>Listings reset:</b>\n<code>listings.json</code> moved to <code>old/listings_{timestamp}.json</code>."
else:
msg = " No listings file found to move."
self._send_message(msg)
except Exception as e:
logger.error(f"Error resetting listings: {e}")
self._send_message(f"❌ Error resetting listings: {str(e)}")
def __init__(self, monitor, bot_token=None, chat_id=None, event_loop=None):
self.monitor = monitor
self.bot_token = bot_token or TELEGRAM_BOT_TOKEN
@ -88,8 +110,23 @@ class TelegramBot:
fut.result()
except Exception as e:
logger.error(f"/retryfailed command failed: {e}")
elif text == "/resetlistings":
self._handle_reset_listings_command()
elif text.startswith("/"):
self._handle_unknown_command(text)
def _handle_reset_listings_command(self):
"""Delete listings.json (not wgcompany_listings.json), but preserve statistics and application history."""
try:
listings_path = os.path.join("data", "listings.json")
if os.path.exists(listings_path):
os.remove(listings_path)
msg = "🗑️ <b>Listings reset:</b>\n<code>listings.json</code> deleted."
else:
msg = " No listings file found to delete."
self._send_message(msg)
except Exception as e:
logger.error(f"Error resetting listings: {e}")
self._send_message(f"❌ Error resetting listings: {str(e)}")
async def _handle_retry_failed_command(self, max_retries: int = 3):
"""Retry all failed applications up to max_retries."""
# Ensure browser context is initialized
@ -168,19 +205,16 @@ class TelegramBot:
status += f"\n{company}: {count}"
self._send_message(status)
def _handle_help_command(self):
help_text = """🏠 <b>InBerlin Monitor Commands</b>
/autopilot on - Enable automatic applications
/autopilot off - Disable automatic applications
/status - Show current status and stats
/plot - Show weekly listing patterns
/help - Show this help message
When autopilot is ON, I will automatically apply to new listings."""
self._send_message(help_text)
def _handle_unknown_command(self, text):
def _handle_plot_command(self):
logger.info("Generating listing times plot...")
try:
plot_path = self.app_handler._generate_weekly_plot()
self._send_photo(plot_path, "\U0001f4ca <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
except Exception as e:
logger.error(f"Error generating plot: {e}")
import traceback
logger.error(traceback.format_exc())
self._send_message(f"\u274c Error generating plot: {str(e)}")
cmd = text.split()[0] if text else text
self._send_message(f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands.")
@ -188,11 +222,8 @@ When autopilot is ON, I will automatically apply to new listings."""
logger.info("Generating autopilot errorrate plot...")
try:
plot_path, summary = self.app_handler._generate_error_rate_plot()
if plot_path:
caption = "📉 <b>Autopilot Success vs Failure</b>\n\n" + summary
self._send_photo(plot_path, caption)
else:
self._send_message("📉 Not enough application data to generate errorrate plot.")
except Exception as e:
logger.error(f"Error generating errorrate plot: {e}")
import traceback