roughly working again, now dev docker exists

This commit is contained in:
Aron Petau 2025-12-28 19:59:31 +01:00
parent a77a0c0393
commit 155ab39368
26 changed files with 1976 additions and 235 deletions

View file

@ -2,10 +2,12 @@ Autopilot bot command list for @BotFather
Use @BotFather -> /setcommands and paste the following lines exactly (one per line):
/autopilot - Enable or disable automatic applications. Usage: `/autopilot on` or `/autopilot off`
/status - Show current status and statistics (autopilot state, application counts by company)
/plot - Show weekly listing patterns (image)
/errorrate - Show autopilot success vs failure plot (image)
/retryfailed - Retry all failed applications up to 3 times
/help - Show help and command usage
Example: send `/setcommands` to @BotFather, then paste the above lines and confirm.

View file

@ -6,13 +6,37 @@ WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY main.py .
# Copy the handlers directory into the Docker image
COPY handlers/ ./handlers/
# Create data directory
RUN mkdir -p /data && chmod 777 /data
# Copy application handler
COPY application_handler.py .
# Copy Telegram bot
COPY telegram_bot.py .
# Copy the tests directory
COPY tests/ ./tests/
# Copy state manager
COPY state_manager.py .
# Move the main.py COPY statement to the end to ensure it is updated last
COPY main.py .
# Ensure the data directory exists
RUN mkdir -p /app/data && chmod 777 /app/data
# Ensure the state.json file exists
RUN touch /app/data/state.json && chmod 666 /app/data/state.json
# Copy fonts from the local data/fonts directory into the container
COPY data/fonts/*.ttf /usr/share/fonts/truetype/custom/
# Refresh the font cache to include the new fonts
RUN fc-cache -fv
# Log available fonts for debugging
RUN fc-list
CMD ["python", "-u", "main.py"]

95
LICENSE
View file

@ -1,93 +1,4 @@
Creative Commons Attribution-NonCommercial 4.0 International Public License
This project is licensed under the Creative Commons Attribution-NonCommercial 4.0 International (CC BY-NC 4.0) License.
By exercising the Licensed Rights (defined below), You accept and agree to be bound by the terms and conditions of this Creative Commons Attribution-NonCommercial 4.0 International Public License ("Public License"). To the extent this Public License may be interpreted as a contract, You are granted the Licensed Rights in consideration of Your acceptance of these terms and conditions, and the Licensor grants You such rights in consideration of benefits the Licensor receives from making the Licensed Material available under these terms and conditions.
**Section 1 Definitions.**
- **Adapted Material** means material subject to Copyright and Similar Rights that is derived from or based upon the Licensed Material and in which the Licensed Material is translated, altered, arranged, transformed, or otherwise modified in a manner requiring permission under the Copyright and Similar Rights held by the Licensor. For purposes of this Public License, where the Licensed Material is a musical work, performance, or sound recording, Adapted Material is always produced where the Licensed Material is synched in timed relation with a moving image.
- **Copyright and Similar Rights** means copyright and/or similar rights closely related to copyright including, without limitation, performance, broadcast, sound recording, and Sui Generis Database Rights, without regard to how the rights are labeled or categorized. For purposes of this Public License, the rights specified in Section 2(b)(1)-(2) are not Copyright and Similar Rights.
- **Effective Technological Measures** means those measures that, in the absence of proper authority, may not be circumvented under laws fulfilling obligations under Article 11 of the WIPO Copyright Treaty adopted on December 20, 1996, and/or similar international agreements.
- **Exceptions and Limitations** means fair use, fair dealing, and/or any other exception or limitation to Copyright and Similar Rights that applies to Your use of the Licensed Material.
- **Licensed Material** means the artistic or literary work, database, or other material to which the Licensor applied this Public License.
- **Licensed Rights** means the rights granted to You subject to the terms and conditions of this Public License, which are limited to all Copyright and Similar Rights that apply to Your use of the Licensed Material and that the Licensor has authority to license.
- **Licensor** means the individual(s) or entity(ies) granting rights under this Public License.
- **NonCommercial** means not primarily intended for or directed towards commercial advantage or monetary compensation. For purposes of this Public License, the exchange of the Licensed Material for other material subject to Copyright and Similar Rights by digital file-sharing or similar means is NonCommercial provided there is no payment of monetary compensation in connection with the exchange.
- **Share** means to provide material to the public by any means or process that requires permission under the Licensed Rights, such as reproduction, public display, public performance, distribution, dissemination, communication, or importation, and to make material available to the public including in ways that members of the public may access the material from a place and at a time individually chosen by them.
- **Sui Generis Database Rights** means rights other than copyright resulting from Directive 96/9/EC of the European Parliament and of the Council of 11 March 1996 on the legal protection of databases, as amended and/or succeeded, as well as other essentially equivalent rights anywhere in the world.
- **You** means the individual or entity exercising the Licensed Rights under this Public License. Your has a corresponding meaning.
**Section 2 Scope.**
- **License grant.**
1. Subject to the terms and conditions of this Public License, the Licensor hereby grants You a worldwide, royalty-free, non-sublicensable, non-exclusive, irrevocable license to exercise the Licensed Rights in the Licensed Material to:
- reproduce and Share the Licensed Material, in whole or in part, for NonCommercial purposes only; and
- produce, reproduce, and Share Adapted Material for NonCommercial purposes only.
2. Exceptions and Limitations. For the avoidance of doubt, where Exceptions and Limitations apply to Your use, this Public License does not apply, and You do not need to comply with its terms and conditions.
3. Term. The term of this Public License is specified in Section 6(a).
4. Media and formats; technical modifications allowed. The Licensor authorizes You to exercise the Licensed Rights in all media and formats whether now known or hereafter created, and to make technical modifications necessary to do so. The Licensor waives and/or agrees not to assert any right or authority to forbid You from making technical modifications necessary to exercise the Licensed Rights, including technical modifications necessary to circumvent Effective Technological Measures. For purposes of this Public License, simply making modifications authorized by this Section 2(a)(4) never produces Adapted Material.
5. Downstream recipients.
- Offer from the Licensor Licensed Material. Every recipient of the Licensed Material automatically receives an offer from the Licensor to exercise the Licensed Rights under the terms and conditions of this Public License.
- No downstream restrictions. You may not offer or impose any additional or different terms or conditions on, or apply any Effective Technological Measures to, the Licensed Material if doing so restricts exercise of the Licensed Rights by any recipient of the Licensed Material.
6. No endorsement. Nothing in this Public License constitutes or may be construed as permission to assert or imply that You are, or that Your use of the Licensed Material is, connected with, or sponsored, endorsed, or granted official status by, the Licensor or others designated to receive attribution as provided in Section 3(a)(1)(A)(i).
- **Other rights.**
1. Moral rights, such as the right of integrity, are not licensed under this Public License, nor are publicity, privacy, and/or other similar personality rights; however, to the extent possible, the Licensor waives any such rights held by the Licensor to the limited extent necessary to allow You to exercise the Licensed Rights, but not otherwise.
2. Patent and trademark rights are not licensed under this Public License.
3. To the extent possible, the Licensor waives any right to collect royalties from You for the exercise of the Licensed Rights, whether directly or through a collecting society under any voluntary or waivable statutory or compulsory licensing scheme. In all other cases, the Licensor expressly reserves any right to collect such royalties.
**Section 3 License Conditions.**
Your exercise of the Licensed Rights is expressly made subject to the following conditions.
- **Attribution.**
1. If You Share the Licensed Material (including in modified form), You must:
- retain the following if it is supplied by the Licensor with the Licensed Material:
- identification of the creator(s) of the Licensed Material and any others designated to receive attribution, in any reasonable manner requested by the Licensor (including by pseudonym if designated);
- a copyright notice;
- a notice that refers to this Public License;
- a notice that refers to the disclaimer of warranties;
- a URI or hyperlink to the Licensed Material to the extent reasonably practicable;
- indicate if You modified the Licensed Material and retain an indication of any previous modifications; and
- indicate the Licensed Material is licensed under this Public License, and include the text of, or the URI or hyperlink to, this Public License.
2. You may satisfy the conditions in Section 3(a)(1) in any reasonable manner based on the medium, means, and context in which You Share the Licensed Material. For example, it may be reasonable to satisfy the conditions by providing a URI or hyperlink to a resource that includes the required information.
3. If requested by the Licensor, You must remove any of the information required by Section 3(a)(1)(A) to the extent reasonably practicable.
- **NonCommercial.** You may not exercise the Licensed Rights for commercial purposes.
**Section 4 Sui Generis Database Rights.**
Where the Licensed Rights include Sui Generis Database Rights that apply to Your use of the Licensed Material:
- for the avoidance of doubt, Section 2(a)(1) grants You the right to extract, reuse, reproduce, and Share all or a substantial portion of the contents of the database for NonCommercial purposes only;
- if You include all or a substantial portion of the database contents in a database that is Adapted Material, then the database in which You include the contents may only be Shared under the terms of this Public License; and
- You must comply with the conditions in Section 3(a) if You Share all or a substantial portion of the contents of the database.
For the avoidance of doubt, this Section 4 supplements and does not replace Your obligations under this Public License where the Licensed Rights include other Copyright and Similar Rights.
**Section 5 Disclaimer of Warranties and Limitation of Liability.**
- Unless otherwise separately undertaken by the Licensor, to the extent possible, the Licensor offers the Licensed Material as-is and as-available, and makes no representations or warranties of any kind concerning the Licensed Material, whether express, implied, statutory, or other. This includes, without limitation, warranties of title, merchantability, fitness for a particular purpose, non-infringement, absence of latent or other defects, accuracy, or the presence or absence of errors, whether or not known or discoverable. Where disclaimers of warranties are not allowed in full or in part, this disclaimer may not apply to You.
- To the extent possible, in no event will the Licensor be liable to You on any legal theory (including, without limitation, negligence) or otherwise for any direct, special, indirect, incidental, consequential, punitive, exemplary, or other losses, costs, expenses, or damages arising out of this Public License or use of the Licensed Material, even if the Licensor has been advised of the possibility of such losses, costs, expenses, or damages. Where a limitation of liability is not allowed in full or in part, this limitation may not apply to You.
- The disclaimer of warranties and limitation of liability provided above shall be interpreted in a manner that, to the extent possible, most closely approximates an absolute disclaimer and waiver of all liability.
**Section 6 Term and Termination.**
- This Public License applies for the term of the Copyright and Similar Rights licensed here. However, if You fail to comply with this Public License, then Your rights under this Public License terminate automatically.
- Where Your right to use the Licensed Material has terminated under Section 6(a), it reinstates:
1. automatically as of the date the violation is cured, provided it is cured within 30 days of Your discovery of the violation; or
2. upon express reinstatement by the Licensor.
For the avoidance of doubt, this Section 6(b) does not affect any right the Licensor may have to seek remedies for Your violations of this Public License.
- For the avoidance of doubt, the Licensor may also offer the Licensed Material under separate terms or conditions or stop distributing the Licensed Material at any time; however, doing so will not terminate this Public License.
- Sections 1, 5, 6, 7, and 8 survive termination of this Public License.
**Section 7 Other Terms and Conditions.**
- The Licensor shall not be bound by any additional or different terms or conditions communicated by You unless expressly agreed.
- Any arrangements, understandings, or agreements regarding the Licensed Material not stated herein are separate from and independent of the terms and conditions of this Public License.
**Section 8 Interpretation.**
- For the avoidance of doubt, this Public License does not, and shall not be interpreted to, reduce, limit, restrict, or impose conditions on any use of the Licensed Material that could lawfully be made without permission under this Public License.
- To the extent possible, if any provision of this Public License is deemed unenforceable, it shall be automatically reformed to the minimum extent necessary to make it enforceable. If the provision cannot be reformed, it shall be severed from this Public License without affecting the enforceability of the remaining terms and conditions.
- No term or condition of this Public License will be waived and no failure to comply consented to unless expressly agreed to by the Licensor.
- Nothing in this Public License constitutes or may be interpreted as a limitation upon, or waiver of, any privileges and immunities that apply to the Licensor or You, including from the legal processes of any jurisdiction or authority.
For the full license text, please visit:
https://creativecommons.org/licenses/by-nc/4.0/legalcode

View file

@ -6,10 +6,49 @@ from handlers.degewo_handler import DegewoHandler
from handlers.gesobau_handler import GesobauHandler
from handlers.stadtundland_handler import StadtUndLandHandler
from handlers.wbm_handler import WBMHandler
import json
from pathlib import Path
import pandas as pd
from typing import Optional
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import logging
import matplotlib
import matplotlib.font_manager as fm
import html
import re
import hashlib
import asyncio
from playwright.async_api import async_playwright
import os
STATE_FILE = Path("data/state.json")
APPLICATIONS_FILE = Path("data/applications.json")
TIMING_FILE = Path("data/timing.csv")
LISTINGS_FILE = Path("data/listings.json")
DATA_DIR = Path("data")
# --- Matplotlib Font Setup (for emoji support in plots) ---
font_cache_dir = Path("data/fonts")
font_cache_dir.mkdir(parents=True, exist_ok=True)
matplotlib.get_configdir = lambda: str(font_cache_dir)
fm.findSystemFonts(fontpaths=str(font_cache_dir), fontext='ttf')
matplotlib.rcParams['font.family'] = 'Noto Sans'
# Use the root logger for consistency with main.py
logger = logging.getLogger()
class ApplicationHandler:
def __init__(self, browser_context):
"""
Main handler for apartment monitoring, application automation, and notification logic.
Handles browser automation, listing extraction, application delegation, and Telegram notifications.
"""
def __init__(self, browser_context, state_manager, applications_file: Path = None):
self.context = browser_context
self.state_manager = state_manager
self.applications_file = applications_file or APPLICATIONS_FILE
self.handlers = {
"howoge": HowogeHandler(browser_context),
"gewobag": GewobagHandler(browser_context),
@ -19,6 +58,142 @@ class ApplicationHandler:
"wbm": WBMHandler(browser_context),
}
def set_telegram_bot(self, telegram_bot):
"""Attach a TelegramBot instance for notifications."""
self.telegram_bot = telegram_bot
def notify_new_listings(self, new_listings: list[dict], application_results: Optional[dict] = None):
"""
Send a Telegram notification for each new listing.
Includes application result if autopilot was enabled.
"""
if not new_listings:
return
for listing in new_listings:
link = listing.get('link', 'https://www.inberlinwohnen.de/wohnungsfinder/')
# Detect company for header
company = self._detect_company(link)
company_label = company.capitalize() if company != "unknown" else "Wohnung"
message = (
f"🏠 <b>[{company_label}] Neue Wohnung!</b>\n\n"
f"🚪 <b>{listing['rooms']}</b>\n"
f"📐 {listing['size']}\n"
f"💰 {listing['price']}\n"
f"📍 {listing['address']}\n\n"
f"👉 <a href=\"{link}\">Alle Details</a>"
)
# Add autopilot/apply status if attempted
if application_results and listing["id"] in application_results:
result = application_results[listing["id"]]
if result["success"]:
message += f"\n\n🤖 <b>Auto-applied!</b> ({result['company']})"
if result["message"]:
message += f"\n<i>{result['message']}</i>"
else:
message += f"\n\n⚠️ <b>Auto-apply failed</b> ({result['company']})"
if result["message"]:
message += f"\n<i>{result['message']}</i>"
# Send via TelegramBot if available
if hasattr(self, 'telegram_bot') and self.telegram_bot:
logger.info(f"Notifying Telegram: {listing['address']} ({listing['rooms']}, {listing['size']}, {listing['price']})")
self.telegram_bot._send_message(message)
else:
logger.info(f"[TELEGRAM] Would send message for: {listing['address']} ({listing['rooms']}, {listing['size']}, {listing['price']})")
async def apply_to_listings(self, listings: list[dict]) -> dict:
"""
Apply to multiple listings (autopilot mode).
Returns a dict of application results keyed by listing ID.
"""
results = {}
for listing in listings:
if self.has_applied(listing["id"]):
logger.info(f"Already applied to {listing['id']} ({listing['address']}), skipping.")
continue
result = await self.apply(listing)
results[listing["id"]] = result
self.save_application(result)
status = "" if result["success"] else ""
logger.info(f"Application {status} for {listing['address']}: {result['message']}")
await asyncio.sleep(2)
return results
def log_listing_times(self, new_listings: list[dict]):
"""
Log new listing appearance times to CSV for later analysis and pattern mining.
Appends to data/listing_times.csv, creating header if needed.
"""
if not new_listings:
return
import csv
TIMING_FILE = Path("data/listing_times.csv")
file_exists = TIMING_FILE.exists()
with open(TIMING_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
now = datetime.now()
for listing in new_listings:
writer.writerow([
now.isoformat(),
now.strftime("%A"), # Weekday name
now.hour,
now.minute,
listing["rooms"],
listing["size"],
listing["price"],
listing["address"],
listing["id"]
])
logger.info(f"Logged {len(new_listings)} new listing times to CSV.")
def __init__(self, browser_context, state_manager):
self.context = browser_context
self.state_manager = state_manager
self.handlers = {
"howoge": HowogeHandler(browser_context),
"gewobag": GewobagHandler(browser_context),
"degewo": DegewoHandler(browser_context),
"gesobau": GesobauHandler(browser_context),
"stadtundland": StadtUndLandHandler(browser_context),
"wbm": WBMHandler(browser_context),
}
self.applications_file = applications_file or APPLICATIONS_FILE
def __init__(self, browser_context, state_manager, applications_file: Path = None):
self.context = browser_context
self.state_manager = state_manager
self.applications_file = applications_file or APPLICATIONS_FILE
self.handlers = {
"howoge": HowogeHandler(browser_context),
"gewobag": GewobagHandler(browser_context),
"degewo": DegewoHandler(browser_context),
"gesobau": GesobauHandler(browser_context),
"stadtundland": StadtUndLandHandler(browser_context),
"wbm": WBMHandler(browser_context),
}
async def init_browser(self):
"""Initialize Playwright browser (minimal, like test script)"""
if not hasattr(self, 'browser') or self.browser is None:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
self.context = await self.browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
logger.info("Browser initialized (minimal context)")
self.application_handler = ApplicationHandler(self.context, self.state_manager)
async def apply(self, listing: dict) -> dict:
company = self._detect_company(listing.get("link", ""))
handler = self.handlers.get(company)
@ -41,11 +216,463 @@ class ApplicationHandler:
return result
def _detect_company(self, link: str) -> str:
if "howoge.de" in link: return "howoge"
elif "gewobag.de" in link: return "gewobag"
elif "degewo.de" in link: return "degewo"
elif "gesobau.de" in link: return "gesobau"
elif "stadtundland.de" in link: return "stadtundland"
elif "wbm.de" in link: return "wbm"
"""Robust company detection logic, matching monitor.py as closely as possible."""
link = (link or "").lower()
# Remove URL scheme and www for easier matching
link = re.sub(r"^https?://(www\.)?", "", link)
# Use domain-based matching, including subdomains
if re.search(r"howoge\\.de", link):
return "howoge"
if re.search(r"gewobag\\.de", link):
return "gewobag"
if re.search(r"degewo\\.de", link):
return "degewo"
if re.search(r"gesobau\\.de", link):
return "gesobau"
if re.search(r"stadt-und-land\\.de|stadtundland\\.de", link):
return "stadtundland"
if re.search(r"wbm\\.de", link):
return "wbm"
# Also check for company in the path or query (legacy/edge cases)
if re.search(r"howoge", link):
return "howoge"
if re.search(r"gewobag", link):
return "gewobag"
if re.search(r"degewo", link):
return "degewo"
if re.search(r"gesobau", link):
return "gesobau"
if re.search(r"stadt-und-land|stadtundland", link):
return "stadtundland"
if re.search(r"wbm", link):
return "wbm"
return "unknown"
def load_state(self) -> dict:
"""Load persistent state"""
if STATE_FILE.exists():
with open(STATE_FILE, "r") as f:
return json.load(f)
return {"autopilot": False}
def save_state(self, state: dict):
"""Save persistent state"""
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def set_autopilot(self, enabled: bool):
"""Enable or disable autopilot mode"""
self.state_manager.set_autopilot(enabled)
def is_autopilot_enabled(self) -> bool:
"""Check if autopilot mode is enabled"""
return self.state_manager.is_autopilot_enabled()
def load_applications(self) -> dict:
"""Load application history."""
if self.applications_file.exists():
try:
with open(self.applications_file, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError:
logger.error("Failed to decode applications file. Returning empty history.")
return {}
def save_application(self, result: dict):
"""Save an application result."""
applications = self.load_applications()
applications[result["listing_id"]] = result
with open(self.applications_file, "w", encoding="utf-8") as f:
json.dump(applications, f, indent=2, ensure_ascii=False)
def has_applied(self, listing_id: str) -> bool:
"""Check if we've already applied to this listing."""
return listing_id in self.load_applications()
def load_previous_listings(self) -> dict:
"""Load previously saved listings"""
if LISTINGS_FILE.exists():
with open(LISTINGS_FILE, "r") as f:
return json.load(f)
return {}
def save_listings(self, listings: list[dict]):
"""Save current listings"""
listings_dict = {l["id"]: l for l in listings}
with open(LISTINGS_FILE, "w") as f:
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
"""Find listings that are new since last check"""
new = []
for listing in current:
if listing["id"] not in previous:
new.append(listing)
return new
def _generate_weekly_plot(self) -> str:
"""Generate a heatmap of listings by day of week and hour"""
if not TIMING_FILE.exists():
logger.warning("No timing file found for weekly plot")
return ""
try:
df = pd.read_csv(TIMING_FILE, parse_dates=["timestamp"])
df["day_of_week"] = df["timestamp"].dt.dayofweek
df["hour"] = df["timestamp"].dt.hour
heatmap_data = df.groupby(["day_of_week", "hour"]).size().unstack(fill_value=0)
fig, ax = plt.subplots(figsize=(10, 6))
cax = ax.matshow(heatmap_data, cmap="YlGnBu", aspect="auto")
fig.colorbar(cax)
ax.set_xticks(range(24))
ax.set_yticks(range(7))
ax.set_xticklabels([f"{h}:00" for h in range(24)], rotation=90)
ax.set_yticklabels(["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"])
ax.set_title("Listings Heatmap (Day of Week vs Hour)")
plot_path = DATA_DIR / "weekly_plot.png"
plt.savefig(plot_path)
plt.close(fig)
logger.info(f"Weekly plot saved to {plot_path}")
return str(plot_path)
except Exception as e:
logger.error(f"Failed to generate weekly plot: {e}")
return ""
def _generate_error_rate_plot(self):
"""Read applications.json and produce a plot image + summary text.
Returns (plot_path, summary_text) or (None, "") if insufficient data.
"""
if not self.applications_file.exists():
logger.warning("No applications.json found for errorrate plot")
return None, ""
try:
with open(self.applications_file, 'r', encoding='utf-8') as f:
apps = json.load(f)
if not apps:
logger.warning("No application data available for errorrate plot")
return None, ""
# Convert to DataFrame
rows = []
for _id, rec in apps.items():
rows.append({
"id": _id,
"ts": pd.to_datetime(rec.get("timestamp")),
"success": rec.get("success", False),
"company": rec.get("company", "unknown")
})
df = pd.DataFrame(rows)
df = df.dropna(subset=['ts'])
if df.empty:
logger.warning("No valid data for errorrate plot")
return None, ""
df['date'] = df['ts'].dt.floor('D')
grouped = df.groupby('date').agg(total=('id','count'), successes=('success', lambda x: x.sum()))
grouped['failures'] = grouped['total'] - grouped['successes']
grouped['error_rate'] = grouped['failures'] / grouped['total']
# Ensure index is sorted by date for plotting
grouped = grouped.sort_index()
# Prepare plot
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(grouped.index, grouped['error_rate'], marker='o', color='red', label='Error Rate')
ax.set_title('Autopilot Error Rate Over Time')
ax.set_xlabel('Date')
ax.set_ylabel('Error Rate')
ax.legend()
ax.grid(True)
# Save plot to the same directory as the applications file
plot_path = self.applications_file.parent / 'error_rate.png'
plt.savefig(plot_path)
plt.close(fig)
# Summary
total_attempts = int(grouped['total'].sum())
total_success = int(grouped['successes'].sum())
total_fail = int(grouped['failures'].sum())
overall_error = (total_fail / total_attempts) if total_attempts > 0 else 0.0
summary = f"<b>Total attempts:</b> {total_attempts}\n<b>Successes:</b> {total_success}\n<b>Failures:</b> {total_fail}\n<b>Overall error rate:</b> {overall_error:.1%}"
return plot_path, summary
except Exception as e:
logger.exception(f"Failed to generate error rate plot: {e}")
return None, ""
async def login(self, page):
"""Login to inberlinwohnen.de (minimal, like test script)"""
if not self.state_manager.email or not self.state_manager.password:
logger.warning("No credentials provided. Ensure INBERLIN_EMAIL and INBERLIN_PASSWORD are set in the environment.")
return False
try:
logger.info("Navigating to login page...")
login_response = await page.goto("https://www.inberlinwohnen.de/login", wait_until="networkidle")
logger.info(f"Login page status: {login_response.status if login_response else 'No response'}")
await asyncio.sleep(2)
# Dismiss cookie/privacy modal before login
logger.info("Attempting to dismiss cookie/privacy modal before login...")
await self.dismiss_cookie_modal(page)
logger.info("Cookie/privacy modal dismissed.")
# Fill login form (if present)
logger.info("Filling in login credentials...")
await page.fill('input[name="email"], input[type="email"]', self.state_manager.email)
await page.fill('input[name="password"], input[type="password"]', self.state_manager.password)
logger.info("Login credentials filled.")
# Click submit button
logger.info("Submitting login form...")
submit_response = await page.click('button[type="submit"], input[type="submit"]', timeout=30000)
logger.info(f"Clicked submit, waiting for navigation...")
try:
await page.wait_for_load_state("networkidle", timeout=30000)
logger.info(f"After login, page url: {page.url}")
logger.info(f"After login, page content length: {len(await page.content())}")
except Exception as e:
logger.error(f"Timeout or error after login submit: {e}")
await asyncio.sleep(2)
# Check if login successful
logger.info("Checking if login was successful...")
if "mein-bereich" in page.url or await page.query_selector('text="Abmelden"'):
logger.info("Login successful.")
return True
else:
logger.error(f"Login failed - ended up at {page.url}")
return False
except Exception as e:
logger.error(f"Login error: {e}")
logger.debug("Exception occurred during login", exc_info=True)
return False
async def fetch_listings(self) -> list[dict]:
"""Fetch listings from the Wohnungsfinder"""
listings = []
try:
page = await self.context.new_page()
# Attempt login if not already logged in
if not self.state_manager.logged_in:
login_success = await self.login(page)
if login_success:
self.state_manager.logged_in = True
else:
logger.warning("Login failed. Proceeding with public listings.")
# Select the correct URL after login check
if self.state_manager.logged_in:
url = "https://www.inberlinwohnen.de/mein-bereich/wohnungsfinder"
else:
url = "https://www.inberlinwohnen.de/wohnungsfinder/"
logger.info(f"Fetching listings from {url}")
# Navigate to the page with a longer wait condition for slow internet
logger.info("Navigating to listings page with extended timeout...")
await page.goto(url, wait_until="networkidle", timeout=20000)
# Check if the page is a download
if "download" in page.url or page.url.endswith(".pdf"):
logger.error("Page redirected to a download. Aborting.")
return []
# Handle cookie modal if not logged in
if not self.state_manager.logged_in:
await self.dismiss_cookie_modal(page)
# Wait a short time for the page to render, but do not block on any selector
await asyncio.sleep(2)
# Collect all listings content by clicking through pagination
all_content = ""
page_num = 1
max_pages = 10 # Safety limit
while page_num <= max_pages:
# Get current page content
current_content = await page.content()
all_content += current_content
# Check for "next page" button (Livewire pagination)
next_btn = await page.query_selector('[wire\\:click*="nextPage"]')
if next_btn and await next_btn.is_visible():
await next_btn.click()
await asyncio.sleep(2) # Wait for Livewire to update
page_num += 1
else:
break
logger.info(f"Collected content from {page_num} page(s)")
content = all_content
# Debug: save HTML to file for inspection
debug_path = DATA_DIR / "debug_page.html"
with open(debug_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"Saved debug HTML to {debug_path}")
# Debug: Log page title and check for listing count
count_match = re.search(r'(\\d+)\\s*Wohnungen? für Sie gefunden', content)
if count_match:
logger.info(f"Page shows {count_match.group(1)} listings available")
# Also check for "Zeige X bis Y von Z Angeboten"
show_match = re.search(r'Zeige \\d+ bis \\d+ von (\\d+) Angeboten', content)
if show_match:
logger.info(f"Page shows {show_match.group(1)} total offers")
# Decode HTML entities and JSON escaped slashes for extraction
content_decoded = html.unescape(content)
content_decoded = content_decoded.replace('\\/', '/')
# Build flatId -> deeplink mapping from wire:snapshot JSON data (monitor.py logic)
# Format in HTML: "deeplink":"https://...","flatId":12345
deeplink_pattern = r'"deeplink":"(https://[^"]+)","flatId":(\d+)'
deeplink_matches = re.findall(deeplink_pattern, content_decoded)
# Use string keys for flatId to match button extraction
id_to_link = {str(flat_id): link for link, flat_id in deeplink_matches}
logger.info(f"Found {len(id_to_link)} deeplink mappings")
# --- Extraction logic copied from monitor.py for robustness ---
# Extract listings from button elements with aria-label
# Format: @click="open !== 12345 ..." aria-label="Wohnungsangebot - 2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Adresse"
button_pattern = r'@click="open !== (\d+)[^\"]*"[^>]*aria-label="Wohnungsangebot - ([^"]+)'
button_matches = re.findall(button_pattern, content_decoded)
logger.info(f"Found {len(button_matches)} listing buttons (monitor.py pattern)")
for flat_id, listing_text in button_matches:
# Parse listing text: "2,0 Zimmer, 53,01 m², 494,38 € Kaltmiete | Rhinstraße 4, 10315 Lichtenberg"
parts_match = re.match(r'(\d,\d)\s*Zimmer,\s*([\d,.]+)\s*m²,\s*([\d.,]+)\s*€\s*(?:Kaltmiete)?\s*\|\s*(.+)', listing_text)
if not parts_match:
continue
rooms, size, price, address = parts_match.groups()
rooms = rooms.strip()
address = address.strip()
if len(address) < 5:
continue
# Get the deeplink for this flat (monitor.py logic: flat_id as string)
detail_link = id_to_link.get(str(flat_id), url)
listing_id = hashlib.md5(f"{rooms}{size}{price}{address}".encode()).hexdigest()[:12]
listings.append({
"id": listing_id,
"rooms": f"{rooms} Zimmer",
"size": f"{size}",
"price": f"{price}",
"address": address,
"link": detail_link,
"fetched_at": datetime.now().isoformat()
})
# Deduplicate by id
seen_ids = set()
unique_listings = []
for listing in listings:
if listing["id"] not in seen_ids:
seen_ids.add(listing["id"])
unique_listings.append(listing)
listings = unique_listings
if not listings:
logger.warning("No listings found after parsing. Dumping HTML snippet for debugging:")
logger.warning(content[:1000])
await page.close()
logger.info(f"Fetched {len(listings)} unique listings")
return listings
except Exception as e:
logger.error(f"Error fetching listings: {e}")
import traceback
logger.error(traceback.format_exc())
return []
async def dismiss_cookie_modal(self, page):
"""Dismiss the privacy/cookie consent modal if present"""
try:
# Wait a bit for modal to appear
await asyncio.sleep(2)
# Try to find and click the accept button in the privacy modal
# Look for common accept button patterns in German
accept_selectors = [
'button:has-text("Akzeptieren")',
'button:has-text("Alle akzeptieren")',
'button:has-text("Accept")',
'button:has-text("Zustimmen")',
'[x-show="showPrivacyModal"] button',
'.privacy-modal button',
'button.accept-cookies',
# More specific to inberlinwohnen
'div[x-show="showPrivacyModal"] button:first-of-type',
]
for selector in accept_selectors:
try:
button = await page.query_selector(selector)
if button and await button.is_visible():
await button.click()
logger.info(f"Clicked cookie accept button: {selector}")
await asyncio.sleep(1)
return True
except:
continue
# Try clicking any visible button in the modal overlay
modal = await page.query_selector('div[x-show="showPrivacyModal"]')
if modal:
buttons = await modal.query_selector_all('button')
for btn in buttons:
if await btn.is_visible():
text = await btn.inner_text()
logger.info(f"Found modal button: {text}")
# Click the first button (usually accept)
await btn.click()
await asyncio.sleep(1)
return True
logger.info("No cookie modal found or already dismissed")
return False
except Exception as e:
logger.debug(f"Cookie modal handling: {e}")
return False

0
archive/__init__.py Normal file
View file

View file

@ -107,10 +107,14 @@ def generate_error_rate_plot(applications_file: str):
ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax3.legend(title='Company', loc='upper right', fontsize='small')
fig.autofmt_xdate()
plot_path = os.path.join(DATA_DIR, 'error_rate.png')
tmp_path = os.path.join(DATA_DIR, 'error_rate.tmp.png')
# Write plot to the same directory as applications_file
out_dir = os.path.dirname(os.path.abspath(applications_file))
os.makedirs(out_dir, exist_ok=True)
plot_path = os.path.join(out_dir, 'error_rate.png')
tmp_path = os.path.join(out_dir, 'error_rate.tmp.png')
fig.savefig(tmp_path, format='png')
plt.close(fig)
try:

15
docker-compose.dev.yml Normal file
View file

@ -0,0 +1,15 @@
services:
wohnbot:
build: .
container_name: wohnbot-dev
restart: unless-stopped
env_file:
- .env
dns:
- 1.1.1.1
- 8.8.8.8
volumes:
- ./data:/app/data:rw
environment:
- CHECK_INTERVAL=30
- WOHNBOT_DEV=1

View file

@ -7,9 +7,6 @@ services:
- .env
volumes:
- /srv/dev-disk-by-uuid-a920d9c0-dfc1-4a58-ae4d-92cf88ff04a5/docker-app/wohnbot/data:/data:rw
dns:
- 1.1.1.1
- 8.8.8.8
networks:
proxy-network:
aliases:

View file

@ -1,12 +1,23 @@
from abc import ABC, abstractmethod
from playwright.async_api import Page
import logging
import asyncio
import html
import re
import hashlib
from datetime import datetime
import traceback
from pathlib import Path
logger = logging.getLogger(__name__)
DATA_DIR = Path("data")
class BaseHandler(ABC):
def __init__(self, context):
def __init__(self, context, email=None, password=None):
self.context = context
self.email = email
self.password = password
@abstractmethod
async def apply(self, listing: dict, result: dict) -> dict:
@ -16,11 +27,18 @@ class BaseHandler(ABC):
async def handle_cookies(self, page: Page):
"""Handle cookie banners if present."""
try:
cookie_btn = await page.query_selector('button:has-text("Akzeptieren"), button:has-text("Alle akzeptieren")')
cookie_selectors = [
'button:has-text("Akzeptieren")',
'button:has-text("Alle akzeptieren")',
'#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll'
]
for sel in cookie_selectors:
cookie_btn = await page.query_selector(sel)
if cookie_btn and await cookie_btn.is_visible():
await cookie_btn.click()
logger.info("[BaseHandler] Dismissed cookie banner")
await asyncio.sleep(1)
break
except Exception as e:
logger.warning(f"[BaseHandler] Failed to handle cookies: {e}")
@ -40,3 +58,180 @@ class BaseHandler(ABC):
break
except Exception as e:
logger.warning(f"[BaseHandler] Failed to handle consent manager: {e}")
async def log_listing_details(self, listing: dict):
"""Log details of the listing being processed."""
logger.info(f"[BaseHandler] Processing listing: {listing}")
async def login(self, page):
"""Login to inberlinwohnen.de"""
if not self.email or not self.password:
logger.warning("No credentials provided, using public listings")
return False
try:
await page.goto("https://www.inberlinwohnen.de/login", wait_until="networkidle")
# Handle cookie/privacy modal first
await self.handle_cookies(page)
# Fill login form
await page.fill('input[name="email"], input[type="email"]', self.email)
await page.fill('input[name="password"], input[type="password"]', self.password)
# Click submit button
await page.click('button[type="submit"], input[type="submit"]')
# Wait for navigation
await page.wait_for_load_state("networkidle")
await asyncio.sleep(2)
# Check if login successful
if "mein-bereich" in page.url or await page.query_selector('text="Abmelden"'):
logger.info("Login successful")
return True
else:
logger.error(f"Login failed - ended up at {page.url}")
return False
except Exception as e:
logger.error(f"Login error: {e}")
return False
async def fetch_listings(self, logged_in: bool) -> list[dict]:
"""Fetch listings from the Wohnungsfinder"""
listings = []
try:
page = await self.context.new_page()
# Use personal Wohnungsfinder when logged in to see filtered listings
url = "https://www.inberlinwohnen.de/mein-bereich/wohnungsfinder" if logged_in else "https://www.inberlinwohnen.de/wohnungsfinder/"
logger.info(f"Fetching listings from {url}")
await page.goto(url, wait_until="networkidle")
# Handle cookie modal if not logged in
if not logged_in:
await self.handle_cookies(page)
# Wait for dynamic content to load - look for listing text pattern
try:
await page.wait_for_selector('text=/\\d,\\d\\s*Zimmer/', timeout=15000)
logger.info("Listings content loaded")
except:
logger.warning("Timeout waiting for listings content")
# Additional wait for initial listings to render
await asyncio.sleep(2)
# Collect all listings content by clicking through pagination
all_content = ""
page_num = 1
max_pages = 10 # Safety limit
while page_num <= max_pages:
current_content = await page.content()
all_content += current_content
next_btn = await page.query_selector('[wire\\:click*="nextPage"]')
if next_btn and await next_btn.is_visible():
await next_btn.click()
await asyncio.sleep(2) # Wait for Livewire to update
page_num += 1
else:
break
logger.info(f"Collected content from {page_num} page(s)")
# Debug: save HTML to file for inspection
debug_path = DATA_DIR / "debug_page.html"
with open(debug_path, "w", encoding="utf-8") as f:
f.write(all_content)
logger.info(f"Saved debug HTML to {debug_path}")
# Decode HTML entities and JSON escaped slashes for extraction
content_decoded = html.unescape(all_content).replace('\\/', '/')
# Build flatId -> deeplink mapping from wire:snapshot JSON data
deeplink_pattern = r'"deeplink":"(https://[^"]+)","flatId":(\d+)'
deeplink_matches = re.findall(deeplink_pattern, content_decoded)
id_to_link = {flat_id: link for link, flat_id in deeplink_matches}
logger.info(f"Found {len(id_to_link)} deeplink mappings")
# Extract listings from button elements with aria-label
button_pattern = r'@click="open !== (\d+)[^\"]*"[^>]*aria-label="Wohnungsangebot - ([^\"]+)'
button_matches = re.findall(button_pattern, content_decoded)
logger.info(f"Found {len(button_matches)} listing buttons")
for flat_id, listing_text in button_matches:
parts_match = re.match(r'(\d,\d)\\s*Zimmer,\\s*([\d,]+)\\s*m²,\\s*([\d.,]+)\\s*€\\s*(?:Kaltmiete\\s*)?\\|\\s*(.+)', listing_text)
if not parts_match:
continue
rooms, size, price, address = parts_match.groups()
rooms = rooms.strip()
address = address.strip()
if len(address) < 5:
continue
detail_link = id_to_link.get(flat_id, url)
listing_id = hashlib.md5(f"{rooms}{size}{price}{address}".encode()).hexdigest()[:12]
listings.append({
"id": listing_id,
"rooms": f"{rooms} Zimmer",
"size": f"{size}",
"price": f"{price}",
"address": address,
"link": detail_link,
"fetched_at": datetime.now().isoformat()
})
# Deduplicate by id
seen_ids = set()
unique_listings = []
for listing in listings:
if listing["id"] not in seen_ids:
seen_ids.add(listing["id"])
unique_listings.append(listing)
listings = unique_listings
await page.close()
logger.info(f"Fetched {len(listings)} unique listings")
return listings
except Exception as e:
logger.error(f"Error fetching listings: {e}")
import traceback
logger.error(traceback.format_exc())
return []
async def save_screenshot(self, page, filename):
"""Save a screenshot of the current page."""
screenshot_path = DATA_DIR / filename
await page.screenshot(path=str(screenshot_path))
logger.info(f"Saved screenshot to {screenshot_path}")
async def save_html(self, page, filename):
"""Save the HTML content of the current page."""
html_path = DATA_DIR / filename
content = await page.content()
with open(html_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"Saved HTML to {html_path}")
async def log_buttons(self, page):
"""Log the text of buttons on the current page."""
buttons = await page.query_selector_all('button, a.btn, a[class*="button"]')
for btn in buttons[:10]:
try:
text = await btn.inner_text()
logger.info(f"Found button: {text[:50]}")
except Exception as e:
logger.debug(f"Error logging button text: {e}")
async def handle_exception(self, e):
"""Log an exception with traceback."""
logger.error(f"Exception: {str(e)}")
logger.error(traceback.format_exc())

View file

@ -5,50 +5,76 @@ import asyncio
logger = logging.getLogger(__name__)
class DegewoHandler(BaseHandler):
def __init__(self, browser_context):
self.context = browser_context
async def apply(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[DEGEWO] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[DEGEWO] Page loaded")
logger.info(f"[DEGEWO] Open: {listing['link']}")
response = await page.goto(listing["link"], wait_until="networkidle")
await asyncio.sleep(2)
# Handle cookies and consent
# Detect 404 by status or page title
status = response.status if response else None
page_title = await page.title()
if status == 404 or (page_title and "404" in page_title):
logger.warning(f"[DEGEWO] Listing is down (404): {listing['link']}")
result["success"] = False
result["message"] = "Listing is no longer available (404). Application impossible. Will not retry."
result["permanent_fail"] = True
return result
# Always handle cookies and consent before anything else
await self.handle_cookies(page)
await self.handle_consent(page)
# Look for application button
logger.info("[DEGEWO] Looking for application button...")
selectors = [
'a[href*="bewerben"]',
'button:has-text("Bewerben")'
]
# Save HTML after modal handling for debugging
try:
html_content = await page.content()
with open("data/degewo_debug.html", "w", encoding="utf-8") as f:
f.write(html_content)
except Exception as e:
logger.debug(f"[DEGEWO] Debug HTML not saved: {e}")
logger.info("[DEGEWO] Searching for application button...")
selectors = [
'a.btn',
'button.btn',
'a:has-text("Bewerben")',
'button:has-text("Bewerben")',
'a:has-text("Anfrage")',
'button:has-text("Anfrage")',
'a:has-text("Kontakt")',
'button:has-text("Kontakt")',
]
apply_btn = None
for sel in selectors:
all_btns = await page.query_selector_all(sel)
logger.info(f"[DEGEWO] Selector '{sel}' found {len(all_btns)} matches")
logger.debug(f"[DEGEWO] Selector '{sel}': {len(all_btns)} matches")
for btn in all_btns:
try:
if await btn.is_visible():
btn_text = (await btn.inner_text()).lower()
if any(x in btn_text for x in ["drucken", "merken", "zurück"]):
continue
apply_btn = btn
logger.info(f"[DEGEWO] Found visible button with selector '{sel}'")
logger.info(f"[DEGEWO] Found visible application button: {sel} [{btn_text}]")
break
except Exception as e:
logger.warning(f"[DEGEWO] Error checking button visibility: {e}")
logger.debug(f"[DEGEWO] Button visibility error: {e}")
if apply_btn:
break
if apply_btn:
logger.info("[DEGEWO] Found application button, scrolling into view...")
await apply_btn.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
logger.info("[DEGEWO] Clicking button...")
await apply_btn.click()
await asyncio.sleep(2)
result["success"] = True
result["message"] = "Application submitted successfully."
else:
logger.warning("[DEGEWO] No application button found.")
result["message"] = "No application button found."
except Exception as e:
result["message"] = f"Error during application: {e}"

View file

@ -5,50 +5,68 @@ import asyncio
logger = logging.getLogger(__name__)
class GesobauHandler(BaseHandler):
def __init__(self, browser_context):
self.context = browser_context
async def apply(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[GESOBAU] Opening page: {listing['link']}")
logger.info(f"[GESOBAU] Open: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[GESOBAU] Page loaded")
await asyncio.sleep(2)
# Handle cookies and consent
# Always handle cookies and consent before anything else
await self.handle_cookies(page)
await self.handle_consent(page)
# Save HTML after modal handling for debugging
try:
html_content = await page.content()
with open("data/gesobau_debug.html", "w", encoding="utf-8") as f:
f.write(html_content)
except Exception as e:
logger.debug(f"[GESOBAU] Debug HTML not saved: {e}")
# Tailored 404 detection: Angebot nicht mehr verfügbar
if "Angebot nicht mehr verfügbar" in html_content:
logger.warning("[GESOBAU] Permanent fail: Angebot nicht mehr verfügbar")
result["permanent_fail"] = True
result["message"] = "Listing is no longer available (Angebot nicht mehr verfügbar). Marked as permanent fail."
return result
# Look for application button
logger.info("[GESOBAU] Looking for application button...")
logger.info("[GESOBAU] Searching for application button...")
selectors = [
'a[href*="bewerben"]',
'button:has-text("Bewerben")'
'button:has-text("Bewerben")',
'a:has-text("Bewerben")',
'button.btn',
]
apply_btn = None
for sel in selectors:
all_btns = await page.query_selector_all(sel)
logger.info(f"[GESOBAU] Selector '{sel}' found {len(all_btns)} matches")
logger.debug(f"[GESOBAU] Selector '{sel}': {len(all_btns)} matches")
for btn in all_btns:
try:
if await btn.is_visible():
apply_btn = btn
logger.info(f"[GESOBAU] Found visible button with selector '{sel}'")
logger.info(f"[GESOBAU] Found visible application button: {sel}")
break
except Exception as e:
logger.warning(f"[GESOBAU] Error checking button visibility: {e}")
logger.debug(f"[GESOBAU] Button visibility error: {e}")
if apply_btn:
break
if apply_btn:
logger.info("[GESOBAU] Found application button, scrolling into view...")
await apply_btn.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
logger.info("[GESOBAU] Clicking button...")
await apply_btn.click()
await asyncio.sleep(2)
result["success"] = True
result["message"] = "Application submitted successfully."
else:
logger.warning("[GESOBAU] No application button found.")
result["message"] = "No application button found."
except Exception as e:
result["message"] = f"Error during application: {e}"

View file

@ -5,23 +5,49 @@ import asyncio
logger = logging.getLogger(__name__)
class GewobagHandler(BaseHandler):
def __init__(self, browser_context):
self.context = browser_context
async def apply(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[GEWOBAG] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
response = await page.goto(listing["link"], wait_until="networkidle")
logger.info("[GEWOBAG] Page loaded")
await asyncio.sleep(2)
# Handle cookies and consent
# Detect 404 by status or page title
status = response.status if response else None
page_title = await page.title()
if status == 404 or (page_title and "404" in page_title):
logger.warning(f"[GEWOBAG] Listing is down (404): {listing['link']}")
result["success"] = False
result["message"] = "Listing is no longer available (404). Application impossible. Will not retry."
result["permanent_fail"] = True
return result
# Always handle cookies and consent before anything else
await self.handle_cookies(page)
await self.handle_consent(page)
# Look for application button
# Save HTML after modal handling for debugging
try:
html_content = await page.content()
with open("data/gewobag_debug.html", "w", encoding="utf-8") as f:
f.write(html_content)
except Exception as e:
logger.warning(f"[GEWOBAG] Could not save debug HTML: {e}")
# Log listing details
await self.log_listing_details(listing)
# Look for application button ("Anfrage senden") in tab or footer
logger.info("[GEWOBAG] Looking for application button...")
selectors = [
'a[href*="bewerben"]',
'button:has-text("Bewerben")'
'button.rental-contact',
'button:has-text("Anfrage senden")',
'div.contact-button button',
'iframe#contact-iframe',
]
apply_btn = None
@ -39,6 +65,24 @@ class GewobagHandler(BaseHandler):
if apply_btn:
break
# If not found, check for iframe (Wohnungshelden)
if not apply_btn:
iframe = await page.query_selector('iframe#contact-iframe')
if iframe:
logger.info("[GEWOBAG] Found Wohnungshelden iframe, switching context...")
frame = await iframe.content_frame()
if frame:
# Try to find a submit/apply button in the iframe
iframe_btns = await frame.query_selector_all('button, input[type="submit"]')
for btn in iframe_btns:
try:
if await btn.is_visible():
apply_btn = btn
logger.info("[GEWOBAG] Found visible button in iframe")
break
except Exception as e:
logger.warning(f"[GEWOBAG] Error checking iframe button visibility: {e}")
if apply_btn:
logger.info("[GEWOBAG] Found application button, scrolling into view...")
await apply_btn.scroll_into_view_if_needed()

View file

@ -5,20 +5,41 @@ import asyncio
logger = logging.getLogger(__name__)
class HowogeHandler(BaseHandler):
def __init__(self, browser_context):
self.context = browser_context
async def apply(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[HOWOGE] Opening page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[HOWOGE] Page loaded")
logger.info(f"[HOWOGE] Open: {listing['link']}")
response = await page.goto(listing["link"], wait_until="networkidle")
await asyncio.sleep(2)
# Handle cookies and consent
# Detect 404 by status or page title
status = response.status if response else None
page_title = await page.title()
if status == 404 or (page_title and "404" in page_title):
logger.warning(f"[HOWOGE] Listing is down (404): {listing['link']}")
result["success"] = False
result["message"] = "Listing is no longer available (404). Application impossible. Will not retry."
result["permanent_fail"] = True
return result
# Always handle cookies and consent before anything else
await self.handle_cookies(page)
await self.handle_consent(page)
# Look for "Besichtigung vereinbaren" button
logger.info("[HOWOGE] Looking for 'Besichtigung vereinbaren' button...")
# Save HTML after modal handling for debugging
try:
html_content = await page.content()
with open("data/howoge_debug.html", "w", encoding="utf-8") as f:
f.write(html_content)
except Exception as e:
logger.debug(f"[HOWOGE] Debug HTML not saved: {e}")
await self.log_listing_details(listing)
logger.info("[HOWOGE] Searching for application button...")
selectors = [
'a[href*="besichtigung-vereinbaren"]',
'a:has-text("Besichtigung vereinbaren")',
@ -26,32 +47,30 @@ class HowogeHandler(BaseHandler):
'a:has-text("Anfragen")',
'button:has-text("Anfragen")'
]
apply_btn = None
for sel in selectors:
all_btns = await page.query_selector_all(sel)
logger.info(f"[HOWOGE] Selector '{sel}' found {len(all_btns)} matches")
logger.debug(f"[HOWOGE] Selector '{sel}': {len(all_btns)} matches")
for btn in all_btns:
try:
if await btn.is_visible():
apply_btn = btn
logger.info(f"[HOWOGE] Found visible button with selector '{sel}'")
logger.info(f"[HOWOGE] Found visible application button: {sel}")
break
except Exception as e:
logger.warning(f"[HOWOGE] Error checking button visibility: {e}")
logger.debug(f"[HOWOGE] Button visibility error: {e}")
if apply_btn:
break
if apply_btn:
logger.info("[HOWOGE] Found application button, scrolling into view...")
await apply_btn.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
logger.info("[HOWOGE] Clicking button...")
await apply_btn.click()
await asyncio.sleep(2)
result["success"] = True
result["message"] = "Application submitted successfully."
else:
logger.warning("[HOWOGE] No application button found.")
result["message"] = "No application button found."
except Exception as e:
result["message"] = f"Error during application: {e}"

View file

@ -5,50 +5,78 @@ import asyncio
logger = logging.getLogger(__name__)
class StadtUndLandHandler(BaseHandler):
def __init__(self, browser_context):
self.context = browser_context
async def apply(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[STADT UND LAND] Opening page: {listing['link']}")
logger.info(f"[STADT UND LAND] Open: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[STADT UND LAND] Page loaded")
await asyncio.sleep(2)
# Handle cookies and consent
# Always handle cookies and consent before anything else
await self.handle_cookies(page)
await self.handle_consent(page)
# Look for application button
logger.info("[STADT UND LAND] Looking for application button...")
# Save HTML after modal handling for debugging
try:
html_content = await page.content()
with open("data/stadtundland_debug.html", "w", encoding="utf-8") as f:
f.write(html_content)
except Exception as e:
logger.debug(f"[STADT UND LAND] Debug HTML not saved: {e}")
# 404/permanent fail detection
error_texts = [
"Hier ist etwas schief gelaufen",
"Leider können wir Ihnen zur Zeit keine Details zu diesem Inserat anzeigen"
]
page_text = await page.text_content('body')
if page_text:
for err in error_texts:
if err in page_text:
logger.warning(f"[STADT UND LAND] Permanent fail: {err}")
result["permanent_fail"] = True
result["message"] = "Listing is no longer available (404 detected on STADT UND LAND)."
await page.close()
return result
# Look for application button (robust selectors)
logger.info("[STADT UND LAND] Searching for application button...")
selectors = [
'a[href*="bewerben"]',
'button:has-text("Bewerben")'
'button:has-text("Bewerben")',
'a:has-text("Bewerben")',
'button.btn',
'a.Button_button__JnZ4E',
'button.Button_button__JnZ4E',
]
apply_btn = None
for sel in selectors:
all_btns = await page.query_selector_all(sel)
logger.info(f"[STADT UND LAND] Selector '{sel}' found {len(all_btns)} matches")
logger.debug(f"[STADT UND LAND] Selector '{sel}': {len(all_btns)} matches")
for btn in all_btns:
try:
if await btn.is_visible():
apply_btn = btn
logger.info(f"[STADT UND LAND] Found visible button with selector '{sel}'")
logger.info(f"[STADT UND LAND] Found visible application button: {sel}")
break
except Exception as e:
logger.warning(f"[STADT UND LAND] Error checking button visibility: {e}")
logger.debug(f"[STADT UND LAND] Button visibility error: {e}")
if apply_btn:
break
if apply_btn:
logger.info("[STADT UND LAND] Found application button, scrolling into view...")
await apply_btn.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
logger.info("[STADT UND LAND] Clicking button...")
await apply_btn.click()
await asyncio.sleep(2)
result["success"] = True
result["message"] = "Application submitted successfully."
else:
logger.warning("[STADT UND LAND] No application button found.")
result["message"] = "No application button found."
except Exception as e:
result["message"] = f"Error during application: {e}"

View file

@ -5,34 +5,107 @@ import asyncio
logger = logging.getLogger(__name__)
class WBMHandler(BaseHandler):
def __init__(self, browser_context):
self.context = browser_context
async def apply(self, listing: dict, result: dict) -> dict:
page = await self.context.new_page()
try:
logger.info(f"[WBM] Opening page: {listing['link']}")
logger.info(f"[WBM] Opening listing overview page: {listing['link']}")
await page.goto(listing["link"], wait_until="networkidle")
logger.info("[WBM] Page loaded")
logger.info("[WBM] Overview page loaded")
await asyncio.sleep(2)
# Handle cookies and consent
# Always handle cookies and consent before anything else
await self.handle_cookies(page)
await self.handle_consent(page)
# Look for application button
logger.info("[WBM] Looking for application button...")
selectors = [
'a[href*="bewerben"]',
'button:has-text("Bewerben")'
]
# Save HTML after modal handling for debugging
try:
html_content = await page.content()
with open("data/wbm_debug.html", "w", encoding="utf-8") as f:
f.write(html_content)
except Exception as e:
logger.warning(f"[WBM] Could not save debug HTML: {e}")
# 404/permanent fail detection
error_texts = [
"Keine passenden Angebote gefunden",
"Das Angebot existiert nicht mehr",
"Die gewünschte Seite konnte nicht gefunden werden",
"404",
"Es wurden keine Immobilien gefunden"
]
page_text = await page.text_content('body')
if page_text:
for err in error_texts:
if err in page_text:
result["permanent_fail"] = True
result["message"] = "Listing is no longer available (404 detected on WBM)."
logger.warning(f"[WBM] Permanent fail: {err}")
await page.close()
return result
# Find and follow the 'Details' link to the detail page
logger.info("[WBM] Looking for 'Details' link to open detail page...")
detail_link = None
detail_selectors = [
'a.btn.sign[title="Details"]',
'a.immo-button-cta[title="Details"]',
'a[title="Details"]',
]
for sel in detail_selectors:
links = await page.query_selector_all(sel)
logger.info(f"[WBM] Selector '{sel}' found {len(links)} matches for details link")
for link in links:
try:
if await link.is_visible():
detail_link = link
break
except Exception as e:
logger.warning(f"[WBM] Error checking details link visibility: {e}")
if detail_link:
break
if not detail_link:
result["message"] = "No details link found on overview page."
await page.close()
return result
# Click the details link and wait for navigation
logger.info("[WBM] Clicking details link to open detail page...")
await detail_link.click()
await page.wait_for_load_state("networkidle")
await asyncio.sleep(2)
# Save HTML of detail page for debugging
try:
html_content = await page.content()
with open("data/wbm_detail_debug.html", "w", encoding="utf-8") as f:
f.write(html_content)
except Exception as e:
logger.warning(f"[WBM] Could not save detail debug HTML: {e}")
# Look for application button on detail page
logger.info("[WBM] Looking for application button on detail page...")
selectors = [
'a[href*="expose-anfordern"]',
'a[href*="bewerben"]',
'a:has-text("Anfragen")',
'button:has-text("Interesse")',
'a:has-text("Bewerben")',
'button:has-text("Bewerben")',
'button.btn',
]
apply_btn = None
for sel in selectors:
all_btns = await page.query_selector_all(sel)
logger.info(f"[WBM] Selector '{sel}' found {len(all_btns)} matches")
logger.info(f"[WBM] Selector '{sel}' found {len(all_btns)} matches on detail page")
for btn in all_btns:
try:
if await btn.is_visible():
apply_btn = btn
logger.info(f"[WBM] Found visible button with selector '{sel}'")
logger.info(f"[WBM] Found visible application button with selector '{sel}' on detail page")
break
except Exception as e:
logger.warning(f"[WBM] Error checking button visibility: {e}")
@ -43,13 +116,13 @@ class WBMHandler(BaseHandler):
logger.info("[WBM] Found application button, scrolling into view...")
await apply_btn.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
logger.info("[WBM] Clicking button...")
logger.info("[WBM] Clicking application button...")
await apply_btn.click()
await asyncio.sleep(2)
result["success"] = True
result["message"] = "Application submitted successfully."
result["message"] = "Application button clicked on detail page. (Submission not implemented)"
else:
result["message"] = "No application button found."
result["message"] = "No application button found on detail page."
except Exception as e:
result["message"] = f"Error during application: {e}"
logger.error(f"[WBM] Application error: {e}")

View file

@ -0,0 +1,216 @@
import asyncio
import logging
import hashlib
import re
from datetime import datetime
from pathlib import Path
import json
import os
from playwright.async_api import async_playwright
logger = logging.getLogger(__name__)
WGCOMPANY_LISTINGS_FILE = Path("data/wgcompany_listings.json")
WGCOMPANY_TIMING_FILE = Path("data/wgcompany_times.csv")
# Environment variables for search filters
WGCOMPANY_MIN_SIZE = os.environ.get("WGCOMPANY_MIN_SIZE", "")
WGCOMPANY_MAX_PRICE = os.environ.get("WGCOMPANY_MAX_PRICE", "")
WGCOMPANY_AGE = os.environ.get("WGCOMPANY_AGE", "")
WGCOMPANY_SMOKER = os.environ.get("WGCOMPANY_SMOKER", "")
WGCOMPANY_BEZIRK = os.environ.get("WGCOMPANY_BEZIRK", "0")
class WGCompanyNotifier:
def __init__(self, telegram_bot=None, refresh_minutes=10):
self.browser = None
self.context = None
self.telegram_bot = telegram_bot
self.refresh_minutes = refresh_minutes
async def init_browser(self):
if self.browser is None:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True)
self.context = await self.browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
logger.info("[WGCOMPANY] Browser initialized")
async def fetch_listings(self):
listings = []
try:
page = await self.context.new_page()
search_url = "http://www.wgcompany.de/cgi-bin/seite?st=1&mi=10&li=100"
logger.info(f"[WGCOMPANY] Loading search page: {search_url}")
await page.goto(search_url, wait_until="networkidle")
await asyncio.sleep(2)
if WGCOMPANY_MIN_SIZE:
min_size_field = await page.query_selector('input[name="c"]')
if min_size_field:
await min_size_field.fill(WGCOMPANY_MIN_SIZE)
if WGCOMPANY_MAX_PRICE:
max_price_field = await page.query_selector('input[name="a"]')
if max_price_field:
await max_price_field.fill(WGCOMPANY_MAX_PRICE)
if WGCOMPANY_AGE:
age_field = await page.query_selector('input[name="l"]')
if age_field:
await age_field.fill(WGCOMPANY_AGE)
if WGCOMPANY_SMOKER:
smoker_select = await page.query_selector('select[name="o"]')
if smoker_select:
await smoker_select.select_option(WGCOMPANY_SMOKER)
if WGCOMPANY_BEZIRK and WGCOMPANY_BEZIRK != "0":
bezirk_select = await page.query_selector('select[name="e"]')
if bezirk_select:
await bezirk_select.select_option(WGCOMPANY_BEZIRK)
submit_btn = await page.query_selector('input[type="submit"][value*="finde"], input[type="submit"]')
if submit_btn:
await submit_btn.click()
await page.wait_for_load_state("networkidle")
await asyncio.sleep(2)
content = await page.content()
with open("data/wgcompany_debug.html", "w", encoding="utf-8") as f:
f.write(content)
listing_links = await page.query_selector_all('a[href*="wg.pl"][href*="wgzeigen"]')
logger.info(f"[WGCOMPANY] Found {len(listing_links)} listing links")
for link_elem in listing_links:
try:
href = await link_elem.get_attribute("href")
if not href:
continue
parent = await link_elem.evaluate_handle("el => el.closest('tr') || el.parentElement")
row_text = await parent.evaluate("el => el.innerText") if parent else ""
price_match = re.search(r'(\d+)\s*€', row_text)
price = price_match.group(1) + "" if price_match else "?"
size_match = re.search(r'(\d+)\s*m²', row_text)
size = size_match.group(1) + "" if size_match else "?"
bezirk_patterns = [
"Kreuzberg", "Neukölln", "Friedrichshain", "Prenzlauer Berg",
"Mitte", "Wedding", "Charlottenburg", "Schöneberg", "Tempelhof",
"Steglitz", "Wilmersdorf", "Pankow", "Lichtenberg", "Treptow",
"Köpenick", "Reinickendorf", "Spandau", "Zehlendorf", "Moabit"
]
location = "Berlin"
for bez in bezirk_patterns:
if bez.lower() in row_text.lower():
location = bez
break
if not href.startswith("http"):
href = f"http://www.wgcompany.de{href}" if href.startswith("/") else f"http://www.wgcompany.de/cgi-bin/{href}"
listing_id = hashlib.md5(f"{href}{price}{size}".encode()).hexdigest()[:12]
listings.append({
"id": listing_id,
"rooms": "1 Zimmer (WG)",
"size": size,
"price": price,
"address": location,
"link": href,
"source": "wgcompany",
"fetched_at": datetime.now().isoformat()
})
except Exception as e:
logger.debug(f"[WGCOMPANY] Error parsing listing: {e}")
continue
# Deduplicate
seen_ids = set()
unique_listings = []
for listing in listings:
if listing["id"] not in seen_ids:
seen_ids.add(listing["id"])
unique_listings.append(listing)
await page.close()
logger.info(f"[WGCOMPANY] Fetched {len(unique_listings)} unique listings")
return unique_listings
except Exception as e:
logger.error(f"[WGCOMPANY] Error fetching listings: {e}")
return []
def load_previous_listings(self):
if WGCOMPANY_LISTINGS_FILE.exists():
with open(WGCOMPANY_LISTINGS_FILE, "r") as f:
data = json.load(f)
logger.info(f"[WGCOMPANY] Loaded {len(data)} previous listings from file. IDs: {list(data.keys())[:10]}{'...' if len(data) > 10 else ''}")
return data
logger.info("[WGCOMPANY] No previous listings file found.")
return {}
def save_listings(self, listings):
listings_dict = {l["id"]: l for l in listings}
logger.info(f"[WGCOMPANY] Saving {len(listings_dict)} listings to file. IDs: {list(listings_dict.keys())[:10]}{'...' if len(listings_dict) > 10 else ''}")
with open(WGCOMPANY_LISTINGS_FILE, "w") as f:
json.dump(listings_dict, f, indent=2, ensure_ascii=False)
def find_new_listings(self, current, previous):
current_ids = [l["id"] for l in current]
previous_ids = list(previous.keys())
logger.info(f"[WGCOMPANY] Current listing IDs: {current_ids[:10]}{'...' if len(current_ids) > 10 else ''}")
logger.info(f"[WGCOMPANY] Previous listing IDs: {previous_ids[:10]}{'...' if len(previous_ids) > 10 else ''}")
new_listings = [l for l in current if l["id"] not in previous]
logger.info(f"[WGCOMPANY] Detected {len(new_listings)} new listings (not in previous)")
return new_listings
def log_listing_times(self, new_listings):
if not new_listings:
return
import csv
file_exists = WGCOMPANY_TIMING_FILE.exists()
with open(WGCOMPANY_TIMING_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["timestamp", "weekday", "hour", "minute", "rooms", "size", "price", "address", "listing_id"])
now = datetime.now()
for listing in new_listings:
writer.writerow([
now.isoformat(),
now.strftime("%A"),
now.hour,
now.minute,
listing["rooms"],
listing["size"],
listing["price"],
listing["address"],
listing["id"]
])
logger.info(f"[WGCOMPANY] Logged {len(new_listings)} listing times to CSV")
async def notify_new_listings(self, new_listings):
if not new_listings or not self.telegram_bot:
logger.info("[WGCOMPANY] No new listings to notify or Telegram bot not set.")
return
logger.info(f"[WGCOMPANY] Notifying {len(new_listings)} new listing(s) via Telegram")
for idx, listing in enumerate(new_listings, 1):
try:
logger.info(f"[WGCOMPANY] Sending listing {idx}/{len(new_listings)}: {listing['link']} | {listing['rooms']} | {listing['size']} | {listing['price']} | {listing['address']}")
message = f"<b>[WGCOMPANY]</b> <a href=\"{listing['link']}\">{listing['link']}</a>\n"
message += f"🚪 <b>{listing['rooms']}</b>\n"
message += f"📐 {listing['size']}\n"
message += f"💰 {listing['price']}\n"
message += f"📍 {listing['address']}"
await self.telegram_bot._send_message(message)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"[WGCOMPANY] Error sending Telegram message for listing {idx}/{len(new_listings)}: {e}")
import traceback
logger.error(traceback.format_exc())
async def run(self):
await self.init_browser()
while True:
listings = await self.fetch_listings()
previous = self.load_previous_listings()
new_listings = self.find_new_listings(listings, previous)
if new_listings:
logger.info(f"[WGCOMPANY] Found {len(new_listings)} new listing(s)")
self.log_listing_times(new_listings)
await self.notify_new_listings(new_listings)
else:
logger.info("[WGCOMPANY] No new listings")
self.save_listings(listings)
await asyncio.sleep(self.refresh_minutes * 60)

99
main.py
View file

@ -2,22 +2,101 @@ import asyncio
from playwright.async_api import async_playwright
from application_handler import ApplicationHandler
from telegram_bot import TelegramBot
from handlers.wgcompany_notifier import WGCompanyNotifier
import logging
from logging.handlers import RotatingFileHandler
import os
from dotenv import load_dotenv
from state_manager import StateManager
from pathlib import Path
# --- Environment & Logging Setup ---
# Load environment variables from .env file
load_dotenv()
# Configure logging: file (rotating) + console for Docker visibility, enforce for all modules
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
RotatingFileHandler("data/monitor.log", maxBytes=1 * 1024 * 1024, backupCount=5), # 1 MB per file, 5 backups
logging.StreamHandler()
],
force=True # Enforce for all modules, Python 3.8+
)
logger = logging.getLogger() # Use root logger for universal logging
logger.info("Logging initialized: outputting to both data/monitor.log and console (Docker logs)")
# Interval (seconds) between checks for new listings
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", 300)) # Default: 300 seconds
def _flush_rotating_file_handlers():
"""Flush all RotatingFileHandlers attached to the root logger."""
root_logger = logging.getLogger()
for handler in root_logger.handlers:
if isinstance(handler, RotatingFileHandler):
handler.flush()
async def main():
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
logger.info("Starting the bot...")
# Initialize state manager
state_manager = StateManager(Path("data/state.json"))
# Application handler manages browser/context
app_handler = ApplicationHandler(None, state_manager)
# Set up Telegram bot and inject into handler, passing the main event loop
event_loop = asyncio.get_running_loop()
telegram_bot = TelegramBot(app_handler, event_loop=event_loop)
telegram_bot.start() # Start Telegram command listener for reactivity
app_handler.set_telegram_bot(telegram_bot)
# Start WGCompanyNotifier as a background task
wg_notifier = WGCompanyNotifier(telegram_bot=telegram_bot, refresh_minutes=10)
wg_task = asyncio.create_task(wg_notifier.run())
await app_handler.init_browser()
# Initialize the application
app_handler = ApplicationHandler(context)
bot = TelegramBot(app_handler)
bot.start()
# Keep the bot running
try:
await asyncio.Event().wait()
logger.info(f"Bot is now running. Refreshing every {CHECK_INTERVAL} seconds...")
while True:
current_listings = await app_handler.fetch_listings()
if not current_listings:
logger.warning("No listings fetched")
await asyncio.sleep(CHECK_INTERVAL)
_flush_rotating_file_handlers()
continue
previous_listings = app_handler.load_previous_listings()
if not previous_listings:
logger.info(f"First run - saving {len(current_listings)} listings as baseline")
app_handler.save_listings(current_listings)
await asyncio.sleep(CHECK_INTERVAL)
_flush_rotating_file_handlers()
continue
new_listings = app_handler.find_new_listings(current_listings, previous_listings)
application_results = {}
if new_listings:
logger.info(f"Found {len(new_listings)} new listing(s)")
app_handler.log_listing_times(new_listings)
if app_handler.is_autopilot_enabled():
logger.info("Autopilot enabled - applying to listings...")
application_results = await app_handler.apply_to_listings(new_listings)
app_handler.notify_new_listings(new_listings, application_results)
app_handler.save_listings(current_listings)
await asyncio.sleep(CHECK_INTERVAL)
_flush_rotating_file_handlers()
except (KeyboardInterrupt, SystemExit):
print("Shutting down...")
logger.info("Shutting down...")
except Exception as e:
logger.error(f"[MAIN] Error in main loop: {e}")
finally:
if hasattr(app_handler, 'browser') and app_handler.browser:
await app_handler.browser.close()
logger.info("Browser closed successfully.")
if __name__ == "__main__":
asyncio.run(main())

53
state_manager.py Normal file
View file

@ -0,0 +1,53 @@
import json
from pathlib import Path
import logging
import os
import dotenv
logger = logging.getLogger(__name__)
dotenv.load_dotenv() # Load environment variables from .env file
class StateManager:
def __init__(self, state_file: Path):
self.state_file = state_file
self.logged_in = False # Initialize logged_in attribute
# Load credentials from environment variables
self.email = os.getenv("INBERLIN_EMAIL")
self.password = os.getenv("INBERLIN_PASSWORD")
if not self.email or not self.password:
logger.warning("Email or password not set in environment variables.")
def load_state(self) -> dict:
"""Load persistent state"""
if self.state_file.exists():
with open(self.state_file, "r") as f:
return json.load(f)
return {"autopilot": False}
def save_state(self, state: dict):
"""Save persistent state"""
with open(self.state_file, "w") as f:
json.dump(state, f, indent=2)
def set_autopilot(self, enabled: bool):
"""Enable or disable autopilot mode"""
state = self.load_state()
state["autopilot"] = enabled
self.save_state(state)
logger.info(f"Autopilot {'enabled' if enabled else 'disabled'}")
def is_autopilot_enabled(self) -> bool:
"""Check if autopilot mode is enabled"""
return self.load_state().get("autopilot", False)
def set_logged_in(self, status: bool):
"""Set the logged_in status"""
self.logged_in = status
logger.info(f"Logged in status set to: {status}")
def is_logged_in(self) -> bool:
"""Check the logged_in status"""
return self.logged_in

View file

@ -1,25 +1,34 @@
import os
import logging
import threading
import time
import requests
import asyncio
# Configuration from environment
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
TELEGRAM_MAX_RETRIES = int(os.environ.get("TELEGRAM_MAX_RETRIES", 3))
logger = logging.getLogger(__name__)
class TelegramBot:
"""Handle Telegram commands for controlling the monitor"""
def __init__(self, monitor, bot_token=None, chat_id=None):
def __init__(self, monitor, bot_token=None, chat_id=None, event_loop=None):
self.monitor = monitor
self.bot_token = bot_token or TELEGRAM_BOT_TOKEN
self.chat_id = chat_id or TELEGRAM_CHAT_ID
self.last_update_id = 0
self.running = False
# Add reference to application handler
self.app_handler = monitor
# Store the main event loop for thread-safe async calls
self.event_loop = event_loop or asyncio.get_event_loop()
def start(self):
if not self.bot_token:
logger.warning("Telegram bot token not configured, commands disabled")
@ -68,8 +77,63 @@ class TelegramBot:
self._handle_plot_command()
elif text == "/errorrate":
self._handle_error_rate_command()
elif text == "/retryfailed":
# Schedule coroutine on the main event loop for thread safety
fut = asyncio.run_coroutine_threadsafe(
self._handle_retry_failed_command(max_retries=TELEGRAM_MAX_RETRIES),
self.event_loop
)
# Optionally, wait for result or handle exceptions
try:
fut.result()
except Exception as e:
logger.error(f"/retryfailed command failed: {e}")
elif text.startswith("/"):
self._handle_unknown_command(text)
async def _handle_retry_failed_command(self, max_retries: int = 3):
"""Retry all failed applications up to max_retries."""
# Ensure browser context is initialized
if not hasattr(self.app_handler, 'context') or self.app_handler.context is None:
if hasattr(self.app_handler, 'init_browser'):
await self.app_handler.init_browser()
# After (re-)init, propagate context to all sub-handlers (defensive)
if hasattr(self.app_handler, 'context') and hasattr(self.app_handler, 'handlers'):
for handler in self.app_handler.handlers.values():
handler.context = self.app_handler.context
self._send_message(f"🔄 Retrying failed applications (max retries: {max_retries})...")
applications = self.app_handler.load_applications()
failed = [app for app in applications.values() if not app.get("success") and app.get("retries", 0) < max_retries]
if not failed:
self._send_message("✅ No failed applications to retry (or all reached max retries).")
return
results = {}
details = []
for app in failed:
listing = {
"id": app["listing_id"],
"rooms": app.get("rooms", ""),
"size": app.get("size", ""),
"price": app.get("price", ""),
"address": app.get("address", ""),
"link": app.get("link", "")
}
retries = app.get("retries", 0) + 1
result = await self.app_handler.apply(listing)
result["retries"] = retries
self.app_handler.save_application(result)
results[listing["id"]] = result
status_emoji = "" if result["success"] else ""
details.append(
f"{status_emoji} <b>{result.get('address', '')}</b> ({result.get('company', '')})\n"
f"<code>{result.get('link', '')}</code>\n"
f"<i>{result.get('message', '')}</i>\n"
)
n_success = sum(1 for r in results.values() if r["success"])
n_fail = sum(1 for r in results.values() if not r["success"])
summary = f"🔄 Retried {len(results)} failed applications.\n✅ Success: {n_success}\n❌ Still failed: {n_fail}"
if details:
summary += "\n\n<b>Details:</b>\n" + "\n".join(details)
self._send_message(summary)
def _handle_autopilot_command(self, text):
logger.info(f"Processing autopilot command: {text}")
@ -89,9 +153,9 @@ class TelegramBot:
self._send_message("Usage: /autopilot on|off")
def _handle_status_command(self):
state = self.monitor.load_state()
state = self.app_handler.load_state()
autopilot = state.get("autopilot", False)
applications = self.monitor.load_applications()
applications = self.app_handler.load_applications()
status = "🤖 <b>Autopilot:</b> " + ("ON ✅" if autopilot else "OFF ❌")
status += f"\n📝 <b>Applications sent:</b> {len(applications)}"
by_company = {}
@ -118,13 +182,12 @@ When autopilot is ON, I will automatically apply to new listings."""
def _handle_unknown_command(self, text):
cmd = text.split()[0] if text else text
self._send_message(f"❓ Unknown command: {cmd}")
self._send_message(f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands.")
def _handle_error_rate_command(self):
"""Generate and send a plot showing success vs failure ratio for autopilot applications."""
logger.info("Generating autopilot errorrate plot...")
try:
plot_path, summary = self._generate_error_rate_plot()
plot_path, summary = self.app_handler._generate_error_rate_plot()
if plot_path:
caption = "📉 <b>Autopilot Success vs Failure</b>\n\n" + summary
self._send_photo(plot_path, caption)
@ -132,10 +195,26 @@ When autopilot is ON, I will automatically apply to new listings."""
self._send_message("📉 Not enough application data to generate errorrate plot.")
except Exception as e:
logger.error(f"Error generating errorrate plot: {e}")
self._send_message("📉 Error generating error rate plot.")
import traceback
logger.error(traceback.format_exc())
self._send_message(f"❌ Error generating errorrate plot: {str(e)}")
def _handle_plot_command(self):
logger.info("Generating listing times plot...")
try:
plot_path = self.app_handler._generate_weekly_plot()
if plot_path:
self._send_photo(plot_path, "📊 <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
else:
self._send_message("📊 Not enough data to generate plot yet. Keep monitoring!")
except Exception as e:
logger.error(f"Error generating plot: {e}")
import traceback
logger.error(traceback.format_exc())
self._send_message(f"❌ Error generating plot: {str(e)}")
def _send_message(self, text):
"""Send a text message to the configured Telegram chat."""
"""Send a text message to the configured Telegram chat, with detailed error logging."""
if not self.bot_token or not self.chat_id:
logger.warning("Telegram bot token or chat ID not configured, cannot send message")
return
@ -143,10 +222,13 @@ When autopilot is ON, I will automatically apply to new listings."""
payload = {"chat_id": self.chat_id, "text": text, "parse_mode": "HTML"}
try:
response = requests.post(url, json=payload, timeout=10)
logger.info(f"[TELEGRAM] Sent message: status={response.status_code}, ok={response.ok}, response={response.text}")
if not response.ok:
logger.error(f"Failed to send Telegram message: {response.text}")
except Exception as e:
logger.error(f"Error while sending Telegram message: {e}")
import traceback
logger.error(traceback.format_exc())
def _send_photo(self, photo_path, caption):
"""Send a photo to the configured Telegram chat."""
@ -165,11 +247,14 @@ When autopilot is ON, I will automatically apply to new listings."""
logger.error(f"Error while sending Telegram photo: {e}")
def _generate_error_rate_plot(self):
"""Placeholder for generating an error rate plot."""
logger.warning("_generate_error_rate_plot is not implemented.")
return None, "Error rate plot generation not implemented."
def _handle_plot_command(self):
"""Placeholder for handling the /plot command."""
logger.warning("_handle_plot_command is not implemented.")
self._send_message("📊 Plot command is not implemented yet.")
"""Generate and send a plot showing success vs failure ratio for autopilot applications."""
logger.info("Generating autopilot errorrate plot...")
try:
plot_path, summary = self.app_handler._generate_error_rate_plot()
if plot_path:
self._send_photo(plot_path, caption=summary)
else:
self._send_message("No data available to generate the error rate plot.")
except Exception as e:
logger.error(f"Error generating errorrate plot: {e}")
self._send_message(f"❌ Error generating errorrate plot: {str(e)}")

View file

@ -0,0 +1,87 @@
import pytest
import json
from pathlib import Path
import sys
from pathlib import Path as _Path
sys.path.append(str(_Path(__file__).parent.parent))
from application_handler import ApplicationHandler
@pytest.fixture
def temp_applications_file(tmp_path):
"""Fixture to create a temporary applications file."""
file = tmp_path / "applications.json"
file.write_text("{}", encoding="utf-8")
return file
@pytest.fixture
def application_handler(temp_applications_file, monkeypatch):
"""Fixture to create an ApplicationHandler instance with a temporary applications file."""
monkeypatch.setattr("application_handler.APPLICATIONS_FILE", temp_applications_file)
return ApplicationHandler(browser_context=None, state_manager=None)
def test_detect_company_domains():
handler = ApplicationHandler(browser_context=None, state_manager=None)
assert handler._detect_company('https://howoge.de/abc') == 'howoge'
assert handler._detect_company('https://www.howoge.de/abc') == 'howoge'
assert handler._detect_company('https://portal.gewobag.de/') == 'gewobag'
assert handler._detect_company('https://degewo.de/') == 'degewo'
assert handler._detect_company('https://gesobau.de/') == 'gesobau'
assert handler._detect_company('https://stadtundland.de/') == 'stadtundland'
assert handler._detect_company('https://stadt-und-land.de/') == 'stadtundland'
assert handler._detect_company('https://wbm.de/') == 'wbm'
def test_detect_company_path_fallback():
handler = ApplicationHandler(browser_context=None, state_manager=None)
assert handler._detect_company('https://example.com/howoge/abc') == 'howoge'
assert handler._detect_company('https://foo.bar/gewobag') == 'gewobag'
assert handler._detect_company('https://foo.bar/degewo') == 'degewo'
assert handler._detect_company('https://foo.bar/gesobau') == 'gesobau'
assert handler._detect_company('https://foo.bar/stadt-und-land') == 'stadtundland'
assert handler._detect_company('https://foo.bar/wbm') == 'wbm'
def test_detect_company_unknown():
handler = ApplicationHandler(browser_context=None, state_manager=None)
assert handler._detect_company('https://example.com/') == 'unknown'
assert handler._detect_company('') == 'unknown'
assert handler._detect_company(None) == 'unknown'
def test_load_applications_empty(application_handler):
"""Test loading applications when the file is empty."""
applications = application_handler.load_applications()
assert applications == {}
def test_save_application(application_handler):
"""Test saving an application."""
result = {
"listing_id": "12345",
"company": "test_company",
"link": "http://example.com",
"timestamp": "2025-12-27T12:00:00",
"success": True,
"message": "Application successful",
"address": "Test Address",
"rooms": "3",
"price": "1000"
}
application_handler.save_application(result)
applications = application_handler.load_applications()
assert "12345" in applications
assert applications["12345"] == result
def test_has_applied(application_handler):
"""Test checking if an application exists."""
result = {
"listing_id": "12345",
"company": "test_company",
"link": "http://example.com",
"timestamp": "2025-12-27T12:00:00",
"success": True,
"message": "Application successful",
"address": "Test Address",
"rooms": "3",
"price": "1000"
}
application_handler.save_application(result)
assert application_handler.has_applied("12345") is True
assert application_handler.has_applied("67890") is False

View file

@ -0,0 +1,44 @@
import pytest
import sys
from pathlib import Path as _Path
sys.path.append(str(_Path(__file__).parent.parent))
from application_handler import ApplicationHandler
class DummyStateManager:
email = None
password = None
logged_in = False
def set_autopilot(self, enabled): pass
def is_autopilot_enabled(self): return False
def make_handler():
# context is not used for _detect_company
return ApplicationHandler(browser_context=None, state_manager=DummyStateManager())
def test_detect_company_domains():
handler = make_handler()
# Domain and subdomain cases
assert handler._detect_company('https://howoge.de/abc') == 'howoge'
assert handler._detect_company('https://www.howoge.de/abc') == 'howoge'
assert handler._detect_company('https://portal.gewobag.de/') == 'gewobag'
assert handler._detect_company('https://degewo.de/') == 'degewo'
assert handler._detect_company('https://gesobau.de/') == 'gesobau'
assert handler._detect_company('https://stadtundland.de/') == 'stadtundland'
assert handler._detect_company('https://stadt-und-land.de/') == 'stadtundland'
assert handler._detect_company('https://wbm.de/') == 'wbm'
def test_detect_company_path_fallback():
handler = make_handler()
# Path/query fallback
assert handler._detect_company('https://example.com/howoge/abc') == 'howoge'
assert handler._detect_company('https://foo.bar/gewobag') == 'gewobag'
assert handler._detect_company('https://foo.bar/degewo') == 'degewo'
assert handler._detect_company('https://foo.bar/gesobau') == 'gesobau'
assert handler._detect_company('https://foo.bar/stadt-und-land') == 'stadtundland'
assert handler._detect_company('https://foo.bar/wbm') == 'wbm'
def test_detect_company_unknown():
handler = make_handler()
assert handler._detect_company('https://example.com/') == 'unknown'
assert handler._detect_company('') == 'unknown'
assert handler._detect_company(None) == 'unknown'

View file

@ -1,35 +1,48 @@
import os
import sys
from pathlib import Path
import pytest
from unittest.mock import patch, mock_open
from archive.test_errorrate_runner import generate_error_rate_plot
from unittest.mock import patch, mock_open, MagicMock
sys.path.append(str(Path(__file__).parent.parent))
from application_handler import ApplicationHandler
@pytest.fixture
def mock_data_dir(tmp_path):
"""Fixture to create a temporary data directory."""
def temp_applications_file(tmp_path):
data_dir = tmp_path / "data"
data_dir.mkdir()
return data_dir
file = data_dir / "applications.json"
file.write_text("{}", encoding="utf-8")
return file
@patch("builtins.open", new_callable=mock_open, read_data="{}")
@patch("os.path.exists", return_value=True)
def test_generate_error_rate_plot_no_data(mock_exists, mock_open, mock_data_dir):
"""Test generate_error_rate_plot with no data."""
plot_path, summary = generate_error_rate_plot(str(mock_data_dir / "applications.json"))
assert plot_path is None
class DummyStateManager:
email = None
password = None
logged_in = False
def set_autopilot(self, enabled): pass
def is_autopilot_enabled(self): return False
@patch("matplotlib.pyplot.savefig")
def test_generate_error_rate_plot_no_data(mock_savefig, temp_applications_file):
handler = ApplicationHandler(None, DummyStateManager(), applications_file=temp_applications_file)
plot_path, summary = handler._generate_error_rate_plot()
assert plot_path is None or plot_path == ""
assert summary == ""
@patch("builtins.open", new_callable=mock_open)
@patch("os.path.exists", return_value=True)
@patch("matplotlib.pyplot.savefig")
def test_generate_error_rate_plot_with_data(mock_savefig, mock_exists, mock_open, mock_data_dir):
"""Test generate_error_rate_plot with valid data."""
mock_open.return_value.read.return_value = """
def test_generate_error_rate_plot_with_data(mock_savefig, temp_applications_file):
handler = ApplicationHandler(None, DummyStateManager(), applications_file=temp_applications_file)
# Write valid data to the temp applications file
temp_applications_file.write_text('''
{
"1": {"timestamp": "2025-12-25T12:00:00", "company": "CompanyA", "success": true},
"2": {"timestamp": "2025-12-26T12:00:00", "company": "CompanyB", "success": false}
}
"""
plot_path, summary = generate_error_rate_plot(str(mock_data_dir / "applications.json"))
''', encoding="utf-8")
plot_path, summary = handler._generate_error_rate_plot()
assert plot_path is not None
assert "Total attempts" in summary
assert "Successes" in summary

View file

@ -5,7 +5,12 @@ from handlers.degewo_handler import DegewoHandler
from handlers.gesobau_handler import GesobauHandler
from handlers.stadtundland_handler import StadtUndLandHandler
from handlers.wbm_handler import WBMHandler
from unittest.mock import AsyncMock
from handlers.base_handler import BaseHandler
from unittest.mock import AsyncMock, MagicMock
class MockBaseHandler(BaseHandler):
async def apply(self, listing: dict, result: dict) -> dict:
return result
@pytest.mark.asyncio
async def test_howoge_handler():
@ -60,3 +65,75 @@ async def test_wbm_handler():
result = {"success": False}
await handler.apply(listing, result)
assert "success" in result
@pytest.mark.asyncio
async def test_handle_cookies():
"""Test the handle_cookies method in BaseHandler."""
context = AsyncMock()
handler = MockBaseHandler(context)
mock_page = AsyncMock()
mock_cookie_btn = AsyncMock()
mock_cookie_btn.is_visible = AsyncMock(return_value=True)
mock_cookie_btn.click = AsyncMock()
mock_page.query_selector = AsyncMock(return_value=mock_cookie_btn)
await handler.handle_cookies(mock_page)
mock_cookie_btn.click.assert_called_once()
@pytest.mark.asyncio
async def test_handle_consent():
"""Test the handle_consent method in BaseHandler."""
context = AsyncMock()
handler = MockBaseHandler(context)
mock_page = AsyncMock()
mock_consent_btn = AsyncMock()
mock_consent_btn.is_visible = AsyncMock(return_value=True)
mock_consent_btn.click = AsyncMock()
mock_page.query_selector = AsyncMock(return_value=mock_consent_btn)
await handler.handle_consent(mock_page)
mock_consent_btn.click.assert_called_once()
@pytest.mark.asyncio
async def test_login():
"""Test the login method in BaseHandler."""
context = AsyncMock()
handler = MockBaseHandler(context, email="test@example.com", password="password123")
mock_page = AsyncMock()
# Mock the page interactions
mock_page.goto = AsyncMock()
mock_page.fill = AsyncMock()
mock_page.click = AsyncMock()
mock_page.wait_for_load_state = AsyncMock()
mock_page.url = "https://www.inberlinwohnen.de/mein-bereich"
mock_page.query_selector = AsyncMock(return_value=AsyncMock(is_visible=AsyncMock(return_value=True)))
result = await handler.login(mock_page)
# Assertions
mock_page.goto.assert_called_once_with("https://www.inberlinwohnen.de/login", wait_until="networkidle")
mock_page.fill.assert_any_call('input[name="email"], input[type="email"]', "test@example.com")
mock_page.fill.assert_any_call('input[name="password"], input[type="password"]', "password123")
mock_page.click.assert_called_once_with('button[type="submit"], input[type="submit"]')
mock_page.wait_for_load_state.assert_called_once_with("networkidle")
assert result is True
# Test for fetch_listings method in BaseHandler
@pytest.mark.asyncio
async def test_fetch_listings():
context = AsyncMock()
handler = MockBaseHandler(context)
# Mock the fetch_listings method
handler.fetch_listings = AsyncMock(return_value=[
{"id": "1", "title": "Listing 1", "price": 1000},
{"id": "2", "title": "Listing 2", "price": 1200}
])
listings = await handler.fetch_listings()
# Assertions
assert len(listings) == 2
assert listings[0]["id"] == "1"
assert listings[1]["title"] == "Listing 2"

View file

@ -0,0 +1,42 @@
import asyncio
import pytest
from playwright.async_api import async_playwright
USER_AGENTS = [
# Chrome on Mac
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
# Chrome on Windows
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
# Firefox on Mac
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:120.0) Gecko/20100101 Firefox/120.0",
# Edge on Windows
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
# Safari on Mac
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
# iPhone Safari
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1",
]
@pytest.mark.asyncio
async def test_inberlin_login_flow():
async with async_playwright() as p:
for ua in USER_AGENTS:
print("\n==============================")
print(f"Testing user agent: {ua}")
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(user_agent=ua)
page = await context.new_page()
try:
print("Navigating to login page...")
login_response = await page.goto("https://www.inberlinwohnen.de/login", wait_until="networkidle")
print(f"Login page status: {login_response.status if login_response else 'No response'}")
print(f"Login page headers: {login_response.headers if login_response else 'No response'}")
await asyncio.sleep(2)
except Exception as e:
print(f"Exception for user agent: {ua}\n{e}")
finally:
await browser.close()
if __name__ == "__main__":
asyncio.run(test_inberlin_login_flow())

View file

@ -0,0 +1,29 @@
import pytest
from pathlib import Path
from state_manager import StateManager
import json
@pytest.fixture
def state_file(tmp_path):
return tmp_path / "state.json"
@pytest.fixture
def state_manager(state_file):
return StateManager(state_file)
def test_load_state_default(state_manager):
state = state_manager.load_state()
assert state == {"autopilot": False}
def test_save_state(state_manager):
state = {"autopilot": True}
state_manager.save_state(state)
loaded_state = state_manager.load_state()
assert loaded_state == state
def test_set_autopilot(state_manager):
state_manager.set_autopilot(True)
assert state_manager.is_autopilot_enabled() is True
state_manager.set_autopilot(False)
assert state_manager.is_autopilot_enabled() is False

View file

@ -1,6 +1,9 @@
import os
import sys
from pathlib import Path
import pytest
from unittest.mock import MagicMock, patch
sys.path.append(str(Path(__file__).parent.parent))
from telegram_bot import TelegramBot
from dotenv import load_dotenv
@ -60,3 +63,33 @@ def test_handle_unknown_command(mock_send_message, telegram_bot):
telegram_bot._handle_unknown_command("/unknown")
mock_send_message.assert_called_once()
assert "Unknown command" in mock_send_message.call_args[0][0]
@patch("telegram_bot.TelegramBot._send_photo")
@patch("telegram_bot.TelegramBot._send_message")
def test_handle_plot_command(mock_send_message, mock_send_photo, telegram_bot):
telegram_bot.app_handler._generate_weekly_plot = MagicMock(return_value="/path/to/plot.png")
telegram_bot._handle_plot_command()
mock_send_photo.assert_called_once_with("/path/to/plot.png", "📊 <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
@patch("telegram_bot.TelegramBot._send_message")
def test_handle_plot_command_no_data(mock_send_message, telegram_bot):
telegram_bot.app_handler._generate_weekly_plot = MagicMock(return_value="")
telegram_bot._handle_plot_command()
mock_send_message.assert_called_once_with("📊 Not enough data to generate plot yet. Keep monitoring!")
@patch("telegram_bot.TelegramBot._send_photo")
@patch("telegram_bot.TelegramBot._send_message")
def test_handle_error_rate_command(mock_send_message, mock_send_photo, telegram_bot):
telegram_bot.app_handler._generate_error_rate_plot = MagicMock(return_value=("/path/to/error_rate.png", "Summary text"))
telegram_bot._handle_error_rate_command()
mock_send_photo.assert_called_once_with("/path/to/error_rate.png", "📉 <b>Autopilot Success vs Failure</b>\n\nSummary text")
@patch("telegram_bot.TelegramBot._send_message")
def test_handle_error_rate_command_no_data(mock_send_message, telegram_bot):
telegram_bot.app_handler._generate_error_rate_plot = MagicMock(return_value=("", ""))
telegram_bot._handle_error_rate_command()
mock_send_message.assert_called_once_with("📉 Not enough application data to generate errorrate plot.")