This commit is contained in:
Aron Petau 2026-01-01 15:27:25 +01:00
parent d596ed7e19
commit aa6626d80d
21 changed files with 1051 additions and 333 deletions

View file

@ -8,7 +8,7 @@ A Python-based apartment monitoring bot for Berlin's public housing portal (inbe
**Modularized structure** with the following key components: **Modularized structure** with the following key components:
- `main.py`: Entry point for the bot. - `main.py`: Entry point for the bot. Runs the monitoring loop and autocleaning every 48 hours.
- `handlers/`: Contains company-specific handlers for auto-apply functionality. Each handler is responsible for automating the application process for a specific housing company. Includes: - `handlers/`: Contains company-specific handlers for auto-apply functionality. Each handler is responsible for automating the application process for a specific housing company. Includes:
- `howoge_handler.py` - `howoge_handler.py`
- `gewobag_handler.py` - `gewobag_handler.py`
@ -16,11 +16,18 @@ A Python-based apartment monitoring bot for Berlin's public housing portal (inbe
- `gesobau_handler.py` - `gesobau_handler.py`
- `stadtundland_handler.py` - `stadtundland_handler.py`
- `wbm_handler.py` - `wbm_handler.py`
- `wgcompany_notifier.py`: Handles WGcompany listing fetching, deduplication, and notification
- `base_handler.py`: Provides shared functionality for all handlers. - `base_handler.py`: Provides shared functionality for all handlers.
- `application_handler.py`: Delegates application tasks to the appropriate handler based on the company. - `application_handler.py`: Delegates application tasks to the appropriate handler based on the company. Enforces valid browser context.
- `telegram_bot.py`: Handles Telegram bot commands and notifications. - `telegram_bot.py`: Fully async Telegram bot handler for commands and notifications. Uses httpx for messaging.
- `autoclean_debug.py`: Deletes debug files (screenshots, HTML) older than 48 hours.
- `helper_functions/`: Contains data merge utilities for combining stats from multiple sources:
- `merge_listing_times.py`
- `merge_applications.py`
- `merge_dict_json.py`
- `merge_wgcompany_times.py`
**Data flow**: Fetch listings → Compare with `listings.json` / `wgcompany_listings.json` → Detect new → Log to CSV → Auto-apply if autopilot enabled → Save to `applications.json` → Send Telegram notification. **Data flow**: Fetch listings → Compare with `listings.json` / `wgcompany_listings.json` → Detect new → Log to CSV → Auto-apply if autopilot enabled → Save to `applications.json` → Send Telegram notification → Autoclean debug files every 48 hours.
## Key Patterns ## Key Patterns
@ -39,8 +46,18 @@ Listings are hashed by `md5(key_fields)[:12]` to generate stable IDs:
- `state.json` - Runtime state (autopilot toggle) - `state.json` - Runtime state (autopilot toggle)
- `listings.json` - Previously seen inberlinwohnen listings - `listings.json` - Previously seen inberlinwohnen listings
- `wgcompany_listings.json` - Previously seen WGcompany listings - `wgcompany_listings.json` - Previously seen WGcompany listings
- `applications.json` - Application history with success/failure status - `applications.json` - Application history with success/failure status, timestamps, and listing details
- `listing_times.csv` / `wgcompany_times.csv` - Time-series data for pattern analysis - `listing_times.csv` / `wgcompany_times.csv` - Time-series data for pattern analysis
- `monitor.log` - Centralized logs with rotation (RotatingFileHandler)
### Logging
All modules use centralized logging configured in `main.py`:
- `RotatingFileHandler` writes to `data/monitor.log` (max 5MB, 5 backups)
- `StreamHandler` outputs to console/Docker logs
- All handlers, notifiers, and utilities use `logging.getLogger(__name__)` for consistent logging
### Autocleaning
Debug material (screenshots, HTML files) older than 48 hours is automatically deleted by `autoclean_debug.py`, which runs every 48 hours in the main loop.
## Development ## Development
@ -65,7 +82,8 @@ docker compose logs -f
### Debugging ### Debugging
- Screenshots saved to `data/` on application failures (`*_nobtn_*.png`) - Screenshots saved to `data/` on application failures (`*_nobtn_*.png`)
- HTML saved to `data/debug_page.html` (inberlin) and `data/wgcompany_debug.html` - HTML saved to `data/debug_page.html` (inberlin) and `data/wgcompany_debug.html`
- Full logs in `data/monitor.log` - Full logs in `data/monitor.log` with rotation
- Debug files older than 48 hours are autocleaned
## Environment Variables ## Environment Variables
@ -74,6 +92,16 @@ InBerlin login: `INBERLIN_EMAIL`, `INBERLIN_PASSWORD`
Form data: `FORM_ANREDE`, `FORM_VORNAME`, `FORM_NACHNAME`, `FORM_EMAIL`, `FORM_PHONE`, `FORM_STRASSE`, `FORM_HAUSNUMMER`, `FORM_PLZ`, `FORM_ORT`, `FORM_PERSONS`, `FORM_CHILDREN`, `FORM_INCOME` Form data: `FORM_ANREDE`, `FORM_VORNAME`, `FORM_NACHNAME`, `FORM_EMAIL`, `FORM_PHONE`, `FORM_STRASSE`, `FORM_HAUSNUMMER`, `FORM_PLZ`, `FORM_ORT`, `FORM_PERSONS`, `FORM_CHILDREN`, `FORM_INCOME`
WGcompany: `WGCOMPANY_ENABLED`, `WGCOMPANY_MIN_SIZE`, `WGCOMPANY_MAX_SIZE`, `WGCOMPANY_MIN_PRICE`, `WGCOMPANY_MAX_PRICE`, `WGCOMPANY_BEZIRK` WGcompany: `WGCOMPANY_ENABLED`, `WGCOMPANY_MIN_SIZE`, `WGCOMPANY_MAX_SIZE`, `WGCOMPANY_MIN_PRICE`, `WGCOMPANY_MAX_PRICE`, `WGCOMPANY_BEZIRK`
## Telegram Commands
- `/autopilot on|off` - Enable or disable automatic applications
- `/status` - Show current status and statistics (autopilot state, application counts by company)
- `/plot` - Generate and send a weekly listing-patterns plot
- `/errorrate` - Generate and send an autopilot success vs failure plot
- `/retryfailed` - Retry all failed applications
- `/resetlistings` - Reset seen listings (marks all current as failed to avoid spam)
- `/help` - Show available commands and usage information
## Common Tasks ## Common Tasks
### Fix a broken company handler ### Fix a broken company handler
@ -87,6 +115,15 @@ Check `data/*_nobtn_*.png` screenshots and `data/debug_page.html` to see actual
- InBerlin: Update regex patterns in `InBerlinMonitor.fetch_listings()`. Test against `data/debug_page.html`. - InBerlin: Update regex patterns in `InBerlinMonitor.fetch_listings()`. Test against `data/debug_page.html`.
- WGcompany: Update parsing in `WGCompanyMonitor.fetch_listings()`. Test against `data/wgcompany_debug.html`. - WGcompany: Update parsing in `WGCompanyMonitor.fetch_listings()`. Test against `data/wgcompany_debug.html`.
### Merge data from another machine
Use the helper scripts in `helper_functions/`:
- `merge_listing_times.py` - Merge listing_times.csv files
- `merge_applications.py` - Merge applications.json files
- `merge_dict_json.py` - Merge listings.json and wgcompany_listings.json
- `merge_wgcompany_times.py` - Merge wgcompany_times.csv files
All scripts deduplicate by key and timestamp, and output merged results to the current data folder.
## Unit Tests ## Unit Tests
### Overview ### Overview

View file

@ -2,45 +2,27 @@ FROM mcr.microsoft.com/playwright/python:v1.57.0-jammy
WORKDIR /app WORKDIR /app
# Install dependencies # Install dependencies first (leverages Docker cache)
COPY requirements.txt . COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Copy the handlers directory into the Docker image # Copy all Python source code and directories
COPY handlers/ ./handlers/ COPY handlers/ ./handlers/
# Copy application handler
COPY application_handler.py .
# Copy Telegram bot
COPY telegram_bot.py .
# Copy the tests directory
COPY tests/ ./tests/ COPY tests/ ./tests/
COPY *.py ./
# Copy state manager # Setup data directory with proper permissions
COPY state_manager.py . RUN mkdir -p /app/data && chmod 777 /app/data && \
touch /app/data/state.json && chmod 666 /app/data/state.json
# Install custom fonts for plot rendering
# Copy autoclean_debug utility
COPY autoclean_debug.py .
# Move the main.py COPY statement to the end to ensure it is updated last
COPY main.py .
# Ensure the data directory exists
RUN mkdir -p /app/data && chmod 777 /app/data
# Ensure the state.json file exists
RUN touch /app/data/state.json && chmod 666 /app/data/state.json
# Copy fonts from the local data/fonts directory into the container
COPY data/fonts/*.ttf /usr/share/fonts/truetype/custom/ COPY data/fonts/*.ttf /usr/share/fonts/truetype/custom/
# Refresh the font cache to include the new fonts
RUN fc-cache -fv RUN fc-cache -fv
# Log available fonts for debugging # Health check: verify Python process is running and monitor.log is being updated
RUN fc-list HEALTHCHECK --interval=5m --timeout=30s --start-period=30s --retries=3 \
CMD pgrep -f "python.*main.py" > /dev/null && \
test -f /app/data/monitor.log && \
test $(find /app/data/monitor.log -mmin -10 | wc -l) -gt 0 || exit 1
CMD ["python", "-u", "main.py"] CMD ["python", "-u", "main.py"]

32
LICENSE
View file

@ -1,4 +1,30 @@
This project is licensed under the Creative Commons Attribution-NonCommercial 4.0 International (CC BY-NC 4.0) License. # Creative Commons Attribution-NonCommercial 4.0 International
For the full license text, please visit: Copyright (c) 2026 wohn-bot contributors
https://creativecommons.org/licenses/by-nc/4.0/legalcode
This work is licensed under the Creative Commons Attribution-NonCommercial 4.0 International License.
## You are free to
- **Share** — copy and redistribute the material in any medium or format
- **Adapt** — remix, transform, and build upon the material
The licensor cannot revoke these freedoms as long as you follow the license terms.
## Under the following terms
- **Attribution** — You must give appropriate credit, provide a link to the license, and indicate if changes were made. You may do so in any reasonable manner, but not in any way that suggests the licensor endorses you or your use.
- **NonCommercial** — You may not use the material for commercial purposes.
- **No additional restrictions** — You may not apply legal terms or technological measures that legally restrict others from doing anything the license permits.
## Notices
You do not have to comply with the license for elements of the material in the public domain or where your use is permitted by an applicable exception or limitation.
No warranties are given. The license may not give you all of the permissions necessary for your intended use. For example, other rights such as publicity, privacy, or moral rights may limit how you use the material.
## Full Legal Code
For the complete license text, see: <https://creativecommons.org/licenses/by-nc/4.0/legalcode>

296
README.md
View file

@ -4,11 +4,14 @@ A Python bot that monitors Berlin's public housing portal (inberlinwohnen.de) an
## What it does ## What it does
- Monitors inberlinwohnen.de for new apartment listings from 6 housing companies (HOWOGE, Gewobag, Degewo, Gesobau, Stadt und Land, WBM) - **Monitors** inberlinwohnen.de for new apartment listings from 6 housing companies (HOWOGE, Gewobag, Degewo, Gesobau, Stadt und Land, WBM)
- Monitors wgcompany.de for WG room listings with configurable filters - **Monitors** wgcompany.de for WG room listings with configurable filters
- Sends Telegram notifications with listing details - **Notifies** via Telegram with rich listing details and application status
- Logs listing times to CSV for pattern analysis - **Logs** listing times to CSV for pattern analysis and visualization
- Auto-apply feature for supported housing companies - **Auto-applies** to new listings when autopilot is enabled (all 6 companies supported)
- **Generates** weekly listing pattern plots and autopilot performance analytics
- **Autocleans** debug files older than 48 hours to manage disk space
- **Tracks** application history with success/failure reasons in JSON
## Auto-Apply Support ## Auto-Apply Support
@ -48,11 +51,22 @@ playwright install chromium
export TELEGRAM_BOT_TOKEN=your_token export TELEGRAM_BOT_TOKEN=your_token
export TELEGRAM_CHAT_ID=your_chat_id export TELEGRAM_CHAT_ID=your_chat_id
# ... other env vars # ... other env vars (see .env.example)
python monitor.py python main.py
``` ```
### Helper Scripts
The `helper_functions/` directory contains utilities for merging data from multiple machines:
- `merge_listing_times.py` - Merge listing_times.csv files
- `merge_applications.py` - Merge applications.json files
- `merge_dict_json.py` - Merge listings.json and wgcompany_listings.json
- `merge_wgcompany_times.py` - Merge wgcompany_times.csv files
All scripts deduplicate by key and timestamp.
## Configuration ## Configuration
### Required environment variables ### Required environment variables
@ -91,100 +105,248 @@ python monitor.py
## Telegram Commands ## Telegram Commands
- `/autopilot on|off` - Enable or disable automatic applications (use `/autopilot on` or `/autopilot off`). - `/autopilot on|off` - Enable or disable automatic applications
- `/status` - Show current status and statistics (autopilot state, application counts by company). - `/status` - Show current status and statistics (autopilot state, application counts by company)
- `/plot` - Generate and send a weekly listing-patterns plot (`data/weekly_plot.png`). - `/plot` - Generate and send a weekly listing-patterns plot with heatmap and charts (high-res, seaborn-styled)
- `/errorrate` - Generate and send an autopilot success vs failure plot (`data/error_rate.png`). - `/errorrate` - Generate and send an autopilot performance analysis with success/failure rates by company (high-res, seaborn-styled)
- `/help` - Show available commands and usage information. - `/retryfailed` - Retry all previously failed applications
- `/resetlistings` - Reset seen listings (marks all current as failed to avoid spam)
- `/help` - Show available commands and usage information
Note: The bot only processes commands from the configured `TELEGRAM_CHAT_ID`. Use `/autopilot off` while testing selector changes or after modifying configuration to avoid accidental submissions. **Important:** The bot only processes commands from the configured `TELEGRAM_CHAT_ID`. Use `/autopilot off` while testing selector changes or after modifying configuration to avoid accidental submissions.
**Plot Features:** All plots are generated at 300 DPI with seaborn styling for publication-quality output.
## Data files ## Data files
All data is stored in the `data/` directory: All data is stored in the `data/` directory:
- `listings.json` - Previously seen inberlinwohnen listings **Persistent State:**
- `wgcompany_listings.json` - Previously seen WGcompany listings
- `applications.json` - Application history - `listings.json` - Previously seen inberlinwohnen listings (deduplicated by hash)
- `listing_times.csv` - Time series data for listings - `wgcompany_listings.json` - Previously seen WGcompany listings (deduplicated by hash)
- `state.json` - Runtime state (autopilot toggle) - `applications.json` - Application history with timestamps, success/failure status, and error messages
- `monitor.log` - Application logs - `listing_times.csv` - Time series data for inberlinwohnen listings (for pattern analysis)
- `wgcompany_times.csv` - Time series data for WGcompany listings
- `state.json` - Runtime state (autopilot toggle, persistent across restarts)
- `monitor.log` - Rotating application logs (max 5MB, 5 backups)
**Generated Plots:**
- `weekly_plot.png` - Weekly listing patterns (heatmap + charts, 300 DPI)
- `error_rate.png` - Autopilot performance analysis (3-panel chart, 300 DPI)
**Debug Files (auto-cleaned after 48 hours):**
- `data/<company>/*.png` - Screenshots from failed applications
- `data/<company>/*.html` - Page HTML snapshots for debugging
- `data/debug_page.html` - InBerlin page snapshot
- `data/wgcompany_debug.html` - WGcompany page snapshot
**Note:** Debug files (screenshots, HTML) are automatically deleted after 48 hours to save disk space. Listing data, applications, and logs are never deleted.
## Debugging ## Debugging
When applications fail, the bot saves: When applications fail, the bot saves debug material to help diagnose issues:
- Screenshots to `data/*.png` **Company-specific folders:**
- Page HTML to `data/debug_page.html`
Check these files to understand why an application failed. - `data/howoge/` - Howoge screenshots and HTML
- `data/gewobag/` - Gewobag screenshots and HTML
- `data/degewo/` - Degewo screenshots and HTML
- `data/gesobau/` - Gesobau screenshots and HTML
- `data/stadtundland/` - Stadt und Land screenshots and HTML
- `data/wbm/` - WBM screenshots and HTML
**General debug files:**
- `data/debug_page.html` - InBerlin page snapshot
- `data/wgcompany_debug.html` - WGcompany page snapshot
Check `applications.json` for error messages and timestamps. Debug files are automatically cleaned after 48 hours but can be manually inspected while fresh.
## Code Structure ## Code Structure
The bot has been modularized for better maintainability. The main components are: The bot has been modularized for better maintainability. The main components are:
- `main.py`: The entry point for the bot. **Core:**
- `handlers/`: Contains company-specific handlers for auto-apply functionality. Each company has its own handler file:
- `howoge_handler.py`
- `gewobag_handler.py`
- `degewo_handler.py`
- `gesobau_handler.py`
- `stadtundland_handler.py`
- `wbm_handler.py`
- `application_handler.py`: Orchestrates the application process by delegating to the appropriate handler.
- `telegram_bot.py`: Handles Telegram bot commands and notifications.
The `handlers/` directory includes a `BaseHandler` class that provides shared functionality for all company-specific handlers. - `main.py` - Entry point, orchestrates monitoring loop and autoclean
- `application_handler.py` - Delegates applications to company handlers, generates plots
- `telegram_bot.py` - Async Telegram bot with httpx for commands and notifications
- `state_manager.py` - Manages persistent state (autopilot toggle)
- `autoclean_debug.py` - Deletes debug files older than 48 hours
**Handlers:**
- `handlers/base_handler.py` - Abstract base class with shared functionality (cookie handling, consent, logging)
- `handlers/howoge_handler.py` - HOWOGE application automation
- `handlers/gewobag_handler.py` - Gewobag application automation
- `handlers/degewo_handler.py` - Degewo application automation (Wohnungshelden)
- `handlers/gesobau_handler.py` - Gesobau application automation
- `handlers/stadtundland_handler.py` - Stadt und Land application automation
- `handlers/wbm_handler.py` - WBM application automation
- `handlers/wgcompany_notifier.py` - WGcompany monitoring (notification only, no autopilot)
**Utilities:**
- `helper_functions/` - Data merge utilities for combining stats from multiple sources
- `merge_listing_times.py`
- `merge_applications.py`
- `merge_dict_json.py`
- `merge_wgcompany_times.py`
**Tests:**
- `tests/` - Comprehensive unit tests (48 tests total)
- `test_telegram_bot.py` - Telegram bot commands and messaging
- `test_error_rate_plot.py` - Plot generation
- `test_wgcompany_notifier.py` - WGcompany monitoring
- `test_handlers.py` - Handler initialization
- `test_application_handler.py` - Application orchestration
- `test_helper_functions.py` - Merge utilities
- `test_autoclean.py` - Autoclean script validation
## Unit Tests ## Unit Tests
The project includes unit tests to ensure functionality and reliability. Key test files: The project includes comprehensive unit tests (48 tests total) to ensure functionality and reliability:
- `tests/test_telegram_bot.py`: Tests the Telegram bot's commands and messaging functionality. - `test_telegram_bot.py` - Telegram bot commands and messaging (13 tests)
- `tests/test_error_rate_plot.py`: Tests the error rate plot generator for autopilot applications. - `test_error_rate_plot.py` - Plot generation and data analysis (2 tests)
- `test_wgcompany_notifier.py` - WGcompany monitoring (7 tests)
- `test_handlers.py` - Handler initialization and structure (6 tests)
- `test_application_handler.py` - Application orchestration (10 tests)
- `test_company_detection.py` - Company detection from URLs (6 tests)
- `test_state_manager.py` - State persistence (2 tests)
- `test_helper_functions.py` - Merge utilities (2 tests)
- `test_autoclean.py` - Autoclean script validation (1 test)
### Running Tests ### Running Tests
To run the tests, use:
```bash ```bash
pytest tests/ pytest tests/ -v
``` ```
Ensure all dependencies are installed and the environment is configured correctly before running the tests. All tests use mocking to avoid external dependencies and can run offline.
## Workflow Diagram ## Workflow Diagram
```mermaid ```mermaid
flowchart TD flowchart TD
A([Start]) --> B[Fetch Listings] Start([Start Bot]) --> Init[Initialize Browser & Telegram Bot]
B --> C[Load Previous Listings] Init --> Loop{Main Loop}
C --> D[Deduplicate: Find New Listings]
D --> E{Any New Listings?} %% InBerlin Monitoring
E -- No --> Z1([Sleep & Wait]) Loop --> InBerlin[Fetch InBerlin Listings]
E -- Yes --> F[Log New Listings to CSV] InBerlin --> ParseIB[Parse & Hash Listings]
F --> G[Save Current Listings] ParseIB --> LoadIB[Load Previous InBerlin Listings]
G --> H[Check Autopilot State] LoadIB --> DedupeIB{New InBerlin Listings?}
H -- Off --> I[Send Telegram Notification (No Apply)]
H -- On --> J[Attempt Auto-Apply to Each New Listing] DedupeIB -- Yes --> LogIB[Log to listing_times.csv]
J --> K{Application Success?} LogIB --> SaveIB[Save to listings.json]
K -- Yes --> L[Log Success, Save to applications.json] DedupeIB -- No --> WG
K -- No --> M[Log Failure, Save to applications.json]
L --> N[Send Telegram Notification (Success)] %% WGcompany Monitoring
M --> O[Send Telegram Notification (Failure)] SaveIB --> WG[Fetch WGcompany Listings]
N --> P([Sleep & Wait]) WG --> ParseWG[Parse & Hash Listings]
O --> P ParseWG --> LoadWG[Load Previous WGcompany Listings]
I --> P LoadWG --> DedupeWG{New WGcompany Listings?}
Z1 --> B
P --> B DedupeWG -- Yes --> LogWG[Log to wgcompany_times.csv]
%% Details for error/debugging LogWG --> SaveWG[Save to wgcompany_listings.json]
J --> Q{Handler Error?} DedupeWG -- No --> CheckAutopilot
Q -- Yes --> R[Save Screenshot/HTML for Debug]
R --> M %% Autopilot Decision
Q -- No --> K SaveWG --> CheckAutopilot{Autopilot Enabled?}
SaveIB --> CheckAutopilot
CheckAutopilot -- Off --> NotifyOnly[Send Telegram Notifications]
NotifyOnly --> CheckClean
CheckAutopilot -- On --> CheckApplied{Already Applied?}
CheckApplied -- Yes --> Skip[Skip Listing]
CheckApplied -- No --> DetectCompany[Detect Company]
%% Application Flow
DetectCompany --> SelectHandler[Select Handler]
SelectHandler --> OpenPage[Open Listing Page]
OpenPage --> Check404{404 or Deactivated?}
Check404 -- Yes --> MarkPermanent[Mark permanent_fail]
MarkPermanent --> SaveFail[Save to applications.json]
SaveFail --> NotifyFail[Notify: Application Failed]
Check404 -- No --> HandleCookies[Handle Cookie Banners]
HandleCookies --> FindButton[Find Application Button]
FindButton --> ButtonFound{Button Found?}
ButtonFound -- No --> Screenshot1[Save Screenshot & HTML]
Screenshot1 --> SaveFail
ButtonFound -- Yes --> ClickButton[Click Application Button]
ClickButton --> MultiStep{Multi-Step Form?}
MultiStep -- Yes --> NavigateSteps[Navigate Form Steps]
NavigateSteps --> FillForm
MultiStep -- No --> FillForm[Fill Form Fields]
FillForm --> SubmitForm[Submit Application]
SubmitForm --> CheckConfirm{Confirmation Detected?}
CheckConfirm -- Yes --> SaveSuccess[Save success to applications.json]
SaveSuccess --> NotifySuccess[Notify: Application Success]
CheckConfirm -- No --> Screenshot2[Save Screenshot & HTML]
Screenshot2 --> SaveFail
NotifySuccess --> CheckClean
NotifyFail --> CheckClean
Skip --> CheckClean
%% Autoclean
CheckClean{Time for Autoclean?}
CheckClean -- Yes --> RunClean[Delete Debug Files >48h]
RunClean --> Sleep
CheckClean -- No --> Sleep[Sleep CHECK_INTERVAL]
Sleep --> TelegramCmd{Telegram Command?}
TelegramCmd -- /autopilot --> ToggleAutopilot[Toggle Autopilot State]
TelegramCmd -- /status --> ShowStatus[Show Status & Stats]
TelegramCmd -- /plot --> GenPlot[Generate Weekly Plot]
TelegramCmd -- /errorrate --> GenError[Generate Error Rate Plot]
TelegramCmd -- /retryfailed --> RetryFailed[Retry Failed Applications]
TelegramCmd -- /resetlistings --> ResetListings[Reset Seen Listings]
TelegramCmd -- /help --> ShowHelp[Show Help]
TelegramCmd -- None --> Loop
ToggleAutopilot --> Loop
ShowStatus --> Loop
GenPlot --> Loop
GenError --> Loop
RetryFailed --> Loop
ResetListings --> Loop
ShowHelp --> Loop
style Start fill:#90EE90
style SaveSuccess fill:#90EE90
style SaveFail fill:#FFB6C1
style MarkPermanent fill:#FFB6C1
style RunClean fill:#87CEEB
style CheckAutopilot fill:#FFD700
style Check404 fill:#FFD700
style ButtonFound fill:#FFD700
style CheckConfirm fill:#FFD700
``` ```
This diagram illustrates the workflow of the bot, from fetching listings to logging, notifying, and optionally applying to new listings. **Key Features:**
- **Dual Monitoring**: Tracks both InBerlin (6 companies) and WGcompany listings
- **Smart Deduplication**: MD5 hashing prevents duplicate notifications
- **Autopilot**: Automated application with company-specific handlers
- **Error Handling**: 404 detection, permanent fail tracking, debug screenshots
- **Autoclean**: Automatic cleanup of debug files every 48 hours
- **Rich Commands**: Status, plots, retry failed, reset listings
- **High-Res Analytics**: 300 DPI seaborn-styled plots for pattern analysis
## License ## License

View file

@ -15,6 +15,7 @@ import matplotlib.dates as mdates
import logging import logging
import matplotlib import matplotlib
import matplotlib.font_manager as fm import matplotlib.font_manager as fm
import seaborn as sns
import html import html
import re import re
import hashlib import hashlib
@ -29,13 +30,20 @@ LISTINGS_FILE = Path("data/listings.json")
DATA_DIR = Path("data") DATA_DIR = Path("data")
# --- Matplotlib Font Setup (for emoji support in plots) --- # --- Matplotlib & Seaborn Setup ---
font_cache_dir = Path("data/fonts") font_cache_dir = Path("data/fonts")
font_cache_dir.mkdir(parents=True, exist_ok=True) font_cache_dir.mkdir(parents=True, exist_ok=True)
matplotlib.get_configdir = lambda: str(font_cache_dir) matplotlib.get_configdir = lambda: str(font_cache_dir)
fm.findSystemFonts(fontpaths=str(font_cache_dir), fontext='ttf') fm.findSystemFonts(fontpaths=str(font_cache_dir), fontext='ttf')
matplotlib.rcParams['font.family'] = 'Noto Sans' matplotlib.rcParams['font.family'] = 'Noto Sans'
# Configure seaborn for beautiful plots
sns.set_theme(style="whitegrid", palette="deep")
sns.set_context("notebook", font_scale=1.1)
matplotlib.rcParams['figure.dpi'] = 300
matplotlib.rcParams['savefig.dpi'] = 300
matplotlib.rcParams['figure.facecolor'] = 'white'
# Use the root logger for consistency with main.py # Use the root logger for consistency with main.py
logger = logging.getLogger() logger = logging.getLogger()
@ -60,11 +68,11 @@ class ApplicationHandler:
"wbm": WBMHandler(browser_context), "wbm": WBMHandler(browser_context),
} }
def set_telegram_bot(self, telegram_bot): def set_telegram_bot(self, telegram_bot) -> None:
"""Attach a TelegramBot instance for notifications.""" """Attach a TelegramBot instance for notifications."""
self.telegram_bot = telegram_bot self.telegram_bot = telegram_bot
def notify_new_listings(self, new_listings: list[dict], application_results: Optional[dict] = None): def notify_new_listings(self, new_listings: list[dict], application_results: Optional[dict] = None) -> None:
""" """
Send a Telegram notification for each new listing. Send a Telegram notification for each new listing.
Includes application result if autopilot was enabled. Includes application result if autopilot was enabled.
@ -77,12 +85,12 @@ class ApplicationHandler:
company_label = company.capitalize() if company != "unknown" else "Wohnung" company_label = company.capitalize() if company != "unknown" else "Wohnung"
message = ( message = (
f"\ud83c\udfe0 <b>[{company_label}] Neue Wohnung!</b>\n\n" f"🏠 <b>[{company_label}] Neue Wohnung!</b>\n\n"
f"\ud83d\udeaa <b>{listing['rooms']}</b>\n" f"🚪 <b>{listing['rooms']}</b>\n"
f"\ud83d\udcd0 {listing['size']}\n" f"📏 {listing['size']}\n"
f"\ud83d\udcb0 {listing['price']}\n" f"💰 {listing['price']}\n"
f"\ud83d\udccd {listing['address']}\n\n" f"📍 {listing['address']}\n\n"
f"\ud83d\udc49 <a href=\"{link}\">Alle Details</a>" f"👉 <a href=\"{link}\">Alle Details</a>"
) )
# Always show autopilot/apply status for clarity # Always show autopilot/apply status for clarity
@ -107,11 +115,10 @@ class ApplicationHandler:
# Send via TelegramBot if available # Send via TelegramBot if available
if hasattr(self, 'telegram_bot') and self.telegram_bot: if hasattr(self, 'telegram_bot') and self.telegram_bot:
logger.info(f"Notifying Telegram: {listing['address']} ({listing['rooms']}, {listing['size']}, {listing['price']})")
loop = getattr(self.telegram_bot, 'event_loop', None) or asyncio.get_event_loop() loop = getattr(self.telegram_bot, 'event_loop', None) or asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self.telegram_bot._send_message(message), loop) asyncio.run_coroutine_threadsafe(self.telegram_bot._send_message(message), loop)
else: else:
logger.info(f"[TELEGRAM] Would send message for: {listing['address']} ({listing['rooms']}, {listing['size']}, {listing['price']})") logger.debug(f"[No Telegram] {listing['address']} ({listing['rooms']})")
async def apply_to_listings(self, listings: list[dict]) -> dict: async def apply_to_listings(self, listings: list[dict]) -> dict:
""" """
@ -124,19 +131,19 @@ class ApplicationHandler:
raise RuntimeError("browser_context is None in apply_to_listings. This should never happen.") raise RuntimeError("browser_context is None in apply_to_listings. This should never happen.")
for listing in listings: for listing in listings:
if self.has_applied(listing["id"]): if self.has_applied(listing["id"]):
logger.info(f"Already applied to {listing['id']} ({listing['address']}), skipping.") logger.debug(f"Skip (applied): {listing['address']}")
continue continue
result = await self.apply(listing) result = await self.apply(listing)
results[listing["id"]] = result results[listing["id"]] = result
self.save_application(result) self.save_application(result)
status = "" if result["success"] else "" status = "" if result["success"] else ""
logger.info(f"Application {status} for {listing['address']}: {result['message']}") logger.info(f"{status} {listing['address'][:30]}... | {result['message'][:50]}")
await asyncio.sleep(2) await asyncio.sleep(2)
return results return results
def log_listing_times(self, new_listings: list[dict]): def log_listing_times(self, new_listings: list[dict]) -> None:
""" """
Log new listing appearance times to CSV for later analysis and pattern mining. Log new listing appearance times to CSV for later analysis and pattern mining.
Appends to data/listing_times.csv, creating header if needed. Appends to data/listing_times.csv, creating header if needed.
@ -167,12 +174,12 @@ class ApplicationHandler:
listing["id"] listing["id"]
]) ])
logger.info(f"Logged {len(new_listings)} new listing times to CSV.") logger.debug(f"Logged {len(new_listings)} listings to CSV")
# ...existing code... # ...existing code...
async def init_browser(self): async def init_browser(self) -> None:
"""Initialize Playwright browser (minimal, like test script)""" """Initialize Playwright browser (minimal, like test script)"""
if not hasattr(self, 'browser') or self.browser is None: if not hasattr(self, 'browser') or self.browser is None:
self.playwright = await async_playwright().start() self.playwright = await async_playwright().start()
@ -249,13 +256,13 @@ class ApplicationHandler:
return {"autopilot": False} return {"autopilot": False}
def save_state(self, state: dict): def save_state(self, state: dict) -> None:
"""Save persistent state""" """Save persistent state"""
with open(STATE_FILE, "w") as f: with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2) json.dump(state, f, indent=2)
def set_autopilot(self, enabled: bool): def set_autopilot(self, enabled: bool) -> None:
"""Enable or disable autopilot mode""" """Enable or disable autopilot mode"""
self.state_manager.set_autopilot(enabled) self.state_manager.set_autopilot(enabled)
@ -276,7 +283,7 @@ class ApplicationHandler:
return {} return {}
def save_application(self, result: dict): def save_application(self, result: dict) -> None:
"""Save an application result.""" """Save an application result."""
applications = self.load_applications() applications = self.load_applications()
applications[result["listing_id"]] = result applications[result["listing_id"]] = result
@ -297,7 +304,7 @@ class ApplicationHandler:
return {} return {}
def save_listings(self, listings: list[dict]): def save_listings(self, listings: list[dict]) -> None:
"""Save current listings""" """Save current listings"""
listings_dict = {l["id"]: l for l in listings} listings_dict = {l["id"]: l for l in listings}
with open(LISTINGS_FILE, "w") as f: with open(LISTINGS_FILE, "w") as f:
@ -346,45 +353,43 @@ class ApplicationHandler:
heatmap_data.loc[day, hour] = int(val) + 1 heatmap_data.loc[day, hour] = int(val) + 1
# Create figure with two subplots # Create figure with two subplots
fig, axes = plt.subplots(2, 2, figsize=(14, 10)) fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Listing Appearance Patterns', fontsize=16, fontweight='bold') fig.suptitle('Listing Appearance Patterns', fontsize=18, fontweight='bold', y=0.995)
# 1. Heatmap - Day vs Hour # 1. Heatmap - Day vs Hour (using seaborn)
ax1 = axes[0, 0] ax1 = axes[0, 0]
im = ax1.imshow(heatmap_data.values, cmap='YlOrRd', aspect='auto') sns.heatmap(heatmap_data, cmap='RdYlGn_r', annot=False, fmt='d',
ax1.set_xticks(range(24)) cbar_kws={'label': 'Count'}, ax=ax1, linewidths=0.5, linecolor='gray')
ax1.set_xticklabels(range(24), fontsize=8) ax1.set_xlabel('Hour of Day', fontsize=11, fontweight='bold')
ax1.set_yticks(range(7)) ax1.set_ylabel('Day of Week', fontsize=11, fontweight='bold')
ax1.set_yticklabels(days_order) ax1.set_title('Listings by Day & Hour', fontsize=12, fontweight='bold', pad=10)
ax1.set_xlabel('Hour of Day') ax1.set_xticklabels(range(24), fontsize=9)
ax1.set_ylabel('Day of Week') ax1.set_yticklabels(days_order, rotation=0, fontsize=9)
ax1.set_title('Listings by Day & Hour')
plt.colorbar(im, ax=ax1, label='Count')
# 2. Bar chart - By day of week # 2. Bar chart - By day of week (seaborn style)
ax2 = axes[0, 1] ax2 = axes[0, 1]
day_counts = df['weekday'].value_counts().reindex(days_order, fill_value=0) day_counts = df['weekday'].value_counts().reindex(days_order, fill_value=0)
colors = plt.cm.get_cmap('Blues')(day_counts / day_counts.max() if day_counts.max() > 0 else day_counts) sns.barplot(x=range(7), y=day_counts.values, ax=ax2, palette='Blues_d', hue=range(7), legend=False)
bars = ax2.bar(range(7), day_counts.values, color=colors)
ax2.set_xticks(range(7)) ax2.set_xticks(range(7))
ax2.set_xticklabels(['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']) ax2.set_xticklabels(['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'], fontsize=9)
ax2.set_xlabel('Day of Week') ax2.set_xlabel('Day of Week', fontsize=11, fontweight='bold')
ax2.set_ylabel('Number of Listings') ax2.set_ylabel('Number of Listings', fontsize=11, fontweight='bold')
ax2.set_title('Total Listings by Day') ax2.set_title('Total Listings by Day', fontsize=12, fontweight='bold', pad=10)
for i, v in enumerate(day_counts.values): for i, v in enumerate(day_counts.values):
if v > 0: if v > 0:
ax2.text(i, v + 0.1, str(v), ha='center', fontsize=9) ax2.text(i, v + 0.5, str(v), ha='center', fontsize=9, fontweight='bold')
# 3. Line chart - By hour # 3. Line chart - By hour (seaborn style)
ax3 = axes[1, 0] ax3 = axes[1, 0]
hour_counts = df['hour'].value_counts().reindex(range(24), fill_value=0) hour_counts = df['hour'].value_counts().reindex(range(24), fill_value=0)
ax3.plot(range(24), hour_counts.values, marker='o', linewidth=2, markersize=4, color='#2E86AB') sns.lineplot(x=range(24), y=hour_counts.values, ax=ax3, marker='o',
ax3.fill_between(range(24), hour_counts.values, alpha=0.3, color='#2E86AB') linewidth=2.5, markersize=6, color='#2E86AB')
ax3.fill_between(range(24), hour_counts.values, alpha=0.2, color='#2E86AB')
ax3.set_xticks(range(0, 24, 2)) ax3.set_xticks(range(0, 24, 2))
ax3.set_xlabel('Hour of Day') ax3.set_xlabel('Hour of Day', fontsize=11, fontweight='bold')
ax3.set_ylabel('Number of Listings') ax3.set_ylabel('Number of Listings', fontsize=11, fontweight='bold')
ax3.set_title('Total Listings by Hour') ax3.set_title('Total Listings by Hour', fontsize=12, fontweight='bold', pad=10)
ax3.grid(True, alpha=0.3) ax3.grid(True, alpha=0.3, linestyle='--')
# 4. Summary stats # 4. Summary stats
ax4 = axes[1, 1] ax4 = axes[1, 1]
@ -421,10 +426,10 @@ Total listings tracked: {total_listings}
verticalalignment='top', fontfamily='monospace', verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout() plt.tight_layout(rect=(0, 0, 1, 0.99))
# Save plot # Save plot with high resolution
plt.savefig(plot_path, dpi=150, bbox_inches='tight') plt.savefig(plot_path, dpi=300, bbox_inches='tight', facecolor='white', edgecolor='none')
plt.close() plt.close()
logger.info(f"Plot saved to {plot_path}") logger.info(f"Plot saved to {plot_path}")
@ -434,7 +439,7 @@ Total listings tracked: {total_listings}
return "" return ""
def _generate_error_rate_plot(self): def _generate_error_rate_plot(self) -> tuple[str | None, str]:
"""Read applications.json and produce a plot image + summary text. """Read applications.json and produce a plot image + summary text.
Returns (plot_path, summary_text) or (None, "") if insufficient data. Returns (plot_path, summary_text) or (None, "") if insufficient data.
@ -474,7 +479,8 @@ Total listings tracked: {total_listings}
grouped = grouped.sort_index() grouped = grouped.sort_index()
# Prepare plot: convert dates to matplotlib numeric x-values so bars and line align # Prepare plot: convert dates to matplotlib numeric x-values so bars and line align
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12), sharex=True) fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 14), sharex=True)
fig.suptitle('Autopilot Performance Analysis', fontsize=18, fontweight='bold', y=0.995)
dates = pd.to_datetime(grouped.index).to_pydatetime() dates = pd.to_datetime(grouped.index).to_pydatetime()
x = mdates.date2num(dates) x = mdates.date2num(dates)
@ -483,53 +489,65 @@ Total listings tracked: {total_listings}
successes = grouped['successes'].values successes = grouped['successes'].values
failures = grouped['failures'].values failures = grouped['failures'].values
ax1.bar(x, successes, width=width, color='#2E8B57', align='center') # Use seaborn color palette
ax1.bar(x, failures, bottom=successes, width=width, color='#C44A4A', align='center') success_color = sns.color_palette('RdYlGn', n_colors=10)[8] # Green
ax1.set_ylabel('Count') failure_color = sns.color_palette('RdYlGn', n_colors=10)[1] # Red
ax1.set_title('Autopilot: Successes vs Failures (by day)')
ax1.bar(x, successes, width=width, color=success_color, align='center', label='Success', edgecolor='white', linewidth=0.5)
ax1.bar(x, failures, bottom=successes, width=width, color=failure_color, align='center', label='Failure', edgecolor='white', linewidth=0.5)
ax1.set_ylabel('Count', fontsize=11, fontweight='bold')
ax1.set_title('Successes vs Failures (by day)', fontsize=13, fontweight='bold', pad=10)
ax1.set_xticks(x) ax1.set_xticks(x)
ax1.set_xlim(min(x) - 1, max(x) + 1) ax1.set_xlim(min(x) - 1, max(x) + 1)
ax1.xaxis.set_major_locator(mdates.AutoDateLocator()) ax1.xaxis.set_major_locator(mdates.AutoDateLocator())
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax1.legend(loc='upper left', framealpha=0.9)
ax1.grid(True, alpha=0.3, linestyle='--', axis='y')
# Plot error rate line on same x (date) axis # Plot error rate line on same x (date) axis
ax2.plot(x, grouped['error_rate'].values, marker='o', color='#3333AA', linewidth=2) sns.lineplot(x=x, y=grouped['error_rate'].values, ax=ax2, marker='o',
linewidth=2.5, markersize=8, color='#E74C3C')
ax2.fill_between(x, grouped['error_rate'].values, alpha=0.2, color='#E74C3C')
ax2.set_ylim(-0.02, 1.02) ax2.set_ylim(-0.02, 1.02)
ax2.set_ylabel('Error rate') ax2.set_ylabel('Error Rate', fontsize=11, fontweight='bold')
ax2.set_xlabel('Date') ax2.set_xlabel('Date', fontsize=11, fontweight='bold')
ax2.set_title('Daily Error Rate (failures / total)') ax2.set_title('Daily Error Rate (failures / total)', fontsize=13, fontweight='bold', pad=10)
ax2.grid(True, alpha=0.3) ax2.grid(True, alpha=0.3, linestyle='--')
ax2.set_xticks(x) ax2.set_xticks(x)
ax2.set_xlim(min(x) - 1, max(x) + 1) ax2.set_xlim(min(x) - 1, max(x) + 1)
ax2.xaxis.set_major_locator(mdates.AutoDateLocator()) ax2.xaxis.set_major_locator(mdates.AutoDateLocator())
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
# Error rate by company (line plot) # Error rate by company (line plot with seaborn palette)
company_grouped = df.groupby(['date', 'company']).agg(total=('id','count'), successes=('success', lambda x: x.sum())) company_grouped = df.groupby(['date', 'company']).agg(total=('id','count'), successes=('success', lambda x: x.sum()))
company_grouped['failures'] = company_grouped['total'] - company_grouped['successes'] company_grouped['failures'] = company_grouped['total'] - company_grouped['successes']
company_grouped['error_rate'] = company_grouped['failures'] / company_grouped['total'] company_grouped['error_rate'] = company_grouped['failures'] / company_grouped['total']
company_grouped = company_grouped.reset_index() company_grouped = company_grouped.reset_index()
error_rate_pivot = company_grouped.pivot(index='date', columns='company', values='error_rate') error_rate_pivot = company_grouped.pivot(index='date', columns='company', values='error_rate')
for company in error_rate_pivot.columns:
# Use distinct seaborn colors for each company
palette = sns.color_palette('husl', n_colors=len(error_rate_pivot.columns))
for idx, company in enumerate(error_rate_pivot.columns):
y = error_rate_pivot[company].values y = error_rate_pivot[company].values
ax3.plot(x, y, marker='o', label=str(company)) ax3.plot(x, y, marker='o', label=str(company), linewidth=2.5,
markersize=7, color=palette[idx])
ax3.set_ylim(-0.02, 1.02) ax3.set_ylim(-0.02, 1.02)
ax3.set_ylabel('Error rate') ax3.set_ylabel('Error Rate', fontsize=11, fontweight='bold')
ax3.set_xlabel('Date') ax3.set_xlabel('Date', fontsize=11, fontweight='bold')
ax3.set_title('Daily Error Rate by Company') ax3.set_title('Daily Error Rate by Company', fontsize=13, fontweight='bold', pad=10)
ax3.grid(True, alpha=0.3) ax3.grid(True, alpha=0.3, linestyle='--')
ax3.set_xticks(x) ax3.set_xticks(x)
ax3.set_xlim(min(x) - 1, max(x) + 1) ax3.set_xlim(min(x) - 1, max(x) + 1)
ax3.xaxis.set_major_locator(mdates.AutoDateLocator()) ax3.xaxis.set_major_locator(mdates.AutoDateLocator())
ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax3.legend(title='Company', loc='upper right', fontsize='small') ax3.legend(title='Company', loc='upper right', fontsize=10, framealpha=0.9)
fig.autofmt_xdate() fig.autofmt_xdate()
plt.tight_layout() plt.tight_layout(rect=(0, 0, 1, 0.99))
plot_path = self.applications_file.parent / 'error_rate.png' plot_path = self.applications_file.parent / 'error_rate.png'
tmp_path = self.applications_file.parent / 'error_rate.tmp.png' tmp_path = self.applications_file.parent / 'error_rate.tmp.png'
# Save to a temp file first and atomically replace to ensure overwrite # Save to a temp file first and atomically replace to ensure overwrite
fig.savefig(tmp_path, format='png') fig.savefig(tmp_path, format='png', dpi=300, bbox_inches='tight', facecolor='white', edgecolor='none')
plt.close(fig) plt.close(fig)
try: try:
tmp_path.replace(plot_path) tmp_path.replace(plot_path)
@ -555,7 +573,7 @@ Total listings tracked: {total_listings}
return None, "" return None, ""
async def login(self, page): async def login(self, page) -> bool:
"""Login to inberlinwohnen.de (minimal, like test script)""" """Login to inberlinwohnen.de (minimal, like test script)"""
if not self.state_manager.email or not self.state_manager.password: if not self.state_manager.email or not self.state_manager.password:
logger.warning("No credentials provided. Ensure INBERLIN_EMAIL and INBERLIN_PASSWORD are set in the environment.") logger.warning("No credentials provided. Ensure INBERLIN_EMAIL and INBERLIN_PASSWORD are set in the environment.")
@ -606,7 +624,29 @@ Total listings tracked: {total_listings}
async def fetch_listings(self) -> list[dict]: async def fetch_listings(self) -> list[dict]:
"""Fetch listings from the Wohnungsfinder""" """Fetch listings from the Wohnungsfinder with retry logic for transient failures"""
max_retries = 3
retry_delay = 2 # Initial delay in seconds
for attempt in range(max_retries):
try:
listings = await self._fetch_listings_attempt()
if attempt > 0:
logger.info(f"✅ Fetch succeeded (attempt {attempt + 1})")
return listings
except Exception as e:
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt) # Exponential backoff
logger.warning(f"⚠️ Fetch failed (attempt {attempt + 1}/{max_retries}): {str(e)[:50]}... Retrying in {wait_time}s")
await asyncio.sleep(wait_time)
else:
logger.error(f"❌ Fetch failed after {max_retries} attempts")
return []
return []
async def _fetch_listings_attempt(self) -> list[dict]:
"""Single attempt to fetch listings (extracted for retry logic)"""
listings = [] listings = []
try: try:
@ -742,17 +782,14 @@ Total listings tracked: {total_listings}
listings = unique_listings listings = unique_listings
if not listings: if not listings:
logger.warning("No listings found after parsing. Dumping HTML snippet for debugging:") logger.warning("⚠️ No listings parsed")
logger.warning(content[:1000])
await page.close() await page.close()
logger.info(f"Fetched {len(listings)} unique listings") logger.info(f"📊 Fetched {len(listings)} listings")
return listings return listings
except Exception as e: except Exception as e:
logger.error(f"Error fetching listings: {e}") logger.error(f"❌ Fetch error: {str(e)[:100]}")
import traceback
logger.error(traceback.format_exc())
return [] return []

View file

@ -12,4 +12,3 @@ services:
- ./data:/app/data:rw - ./data:/app/data:rw
environment: environment:
- CHECK_INTERVAL=60 - CHECK_INTERVAL=60
- WOHNBOT_DEV=1

View file

@ -9,7 +9,17 @@ services:
- 1.1.1.1 - 1.1.1.1
- 8.8.8.8 - 8.8.8.8
volumes: volumes:
- /srv/dev-disk-by-uuid-a920d9c0-dfc1-4a58-ae4d-92cf88ff04a5/docker-app/wohnbot/data:/data:rw - /srv/dev-disk-by-uuid-a920d9c0-dfc1-4a58-ae4d-92cf88ff04a5/docker-app/wohnbot/data:/app/data:rw
healthcheck:
test:
[
"CMD-SHELL",
"pgrep -f 'python.*main.py' > /dev/null && test -f /app/data/monitor.log && test $$(find /app/data/monitor.log -mmin -10 | wc -l) -gt 0 || exit 1",
]
interval: 5m
timeout: 30s
start_period: 30s
retries: 3
networks: networks:
proxy-network: proxy-network:
aliases: aliases:

View file

@ -1,8 +1,11 @@
from .base_handler import BaseHandler from .base_handler import BaseHandler
import logging import logging
import asyncio import asyncio
from pathlib import Path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DATA_DIR = Path("data/gesobau")
DATA_DIR.mkdir(parents=True, exist_ok=True)
class GesobauHandler(BaseHandler): class GesobauHandler(BaseHandler):
def __init__(self, browser_context): def __init__(self, browser_context):
@ -34,7 +37,7 @@ class GesobauHandler(BaseHandler):
# Save HTML after modal handling for debugging # Save HTML after modal handling for debugging
try: try:
html_content = await page.content() html_content = await page.content()
with open("data/gesobau_debug.html", "w", encoding="utf-8") as f: with open(DATA_DIR / "gesobau_debug.html", "w", encoding="utf-8") as f:
f.write(html_content) f.write(html_content)
except Exception as e: except Exception as e:
logger.debug(f"[GESOBAU] Debug HTML not saved: {e}") logger.debug(f"[GESOBAU] Debug HTML not saved: {e}")

View file

@ -1,8 +1,11 @@
from .base_handler import BaseHandler from .base_handler import BaseHandler
import logging import logging
import asyncio import asyncio
from pathlib import Path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DATA_DIR = Path("data/gewobag")
DATA_DIR.mkdir(parents=True, exist_ok=True)
class GewobagHandler(BaseHandler): class GewobagHandler(BaseHandler):
def __init__(self, browser_context): def __init__(self, browser_context):
@ -33,7 +36,7 @@ class GewobagHandler(BaseHandler):
# Save HTML after modal handling for debugging # Save HTML after modal handling for debugging
try: try:
html_content = await page.content() html_content = await page.content()
with open("data/gewobag_debug.html", "w", encoding="utf-8") as f: with open(DATA_DIR / "gewobag_debug.html", "w", encoding="utf-8") as f:
f.write(html_content) f.write(html_content)
except Exception as e: except Exception as e:
logger.warning(f"[GEWOBAG] Could not save debug HTML: {e}") logger.warning(f"[GEWOBAG] Could not save debug HTML: {e}")

View file

@ -14,7 +14,8 @@ FORM_PLZ = os.environ.get("FORM_PLZ", "")
FORM_ORT = os.environ.get("FORM_ORT", "") FORM_ORT = os.environ.get("FORM_ORT", "")
FORM_PHONE = os.environ.get("FORM_PHONE", "") FORM_PHONE = os.environ.get("FORM_PHONE", "")
FORM_EMAIL = os.environ.get("FORM_EMAIL", "") FORM_EMAIL = os.environ.get("FORM_EMAIL", "")
DATA_DIR = Path(os.environ.get("DATA_DIR", "data")) DATA_DIR = Path("data/stadtundland")
DATA_DIR.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -1,8 +1,11 @@
from .base_handler import BaseHandler from .base_handler import BaseHandler
import logging import logging
import asyncio import asyncio
from pathlib import Path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DATA_DIR = Path("data/wbm")
DATA_DIR.mkdir(parents=True, exist_ok=True)
class WBMHandler(BaseHandler): class WBMHandler(BaseHandler):
def __init__(self, browser_context): def __init__(self, browser_context):
@ -23,7 +26,7 @@ class WBMHandler(BaseHandler):
# Save HTML after modal handling for debugging # Save HTML after modal handling for debugging
try: try:
html_content = await page.content() html_content = await page.content()
with open("data/wbm_debug.html", "w", encoding="utf-8") as f: with open(DATA_DIR / "wbm_debug.html", "w", encoding="utf-8") as f:
f.write(html_content) f.write(html_content)
except Exception as e: except Exception as e:
logger.warning(f"[WBM] Could not save debug HTML: {e}") logger.warning(f"[WBM] Could not save debug HTML: {e}")

View file

@ -32,10 +32,8 @@ class WGCompanyNotifier:
if self.browser is None: if self.browser is None:
self.playwright = await async_playwright().start() self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=True) self.browser = await self.playwright.chromium.launch(headless=True)
self.context = await self.browser.new_context( self.context = await self.browser.new_context()
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" logger.debug("[WG] Browser ready")
)
logger.info("[WGCOMPANY] Browser initialized")
async def fetch_listings(self): async def fetch_listings(self):
await self.init_browser() await self.init_browser()
@ -134,28 +132,27 @@ class WGCompanyNotifier:
return [] return []
def load_previous_listings(self): def load_previous_listings(self):
if WGCOMPANY_LISTINGS_FILE.exists(): if self.listings_file.exists():
with open(WGCOMPANY_LISTINGS_FILE, "r") as f: with open(self.listings_file, 'r') as f:
data = json.load(f) data = json.load(f)
logger.info(f"[WGCOMPANY] Loaded {len(data)} previous listings from file. IDs: {list(data.keys())[:10]}{'...' if len(data) > 10 else ''}") logger.debug(f"[WG] Loaded {len(data)} previous listings")
return data return data
logger.info("[WGCOMPANY] No previous listings file found.")
return {} return {}
def save_listings(self, listings): def save_listings(self, listings: list[dict]) -> None:
listings_dict = {l["id"]: l for l in listings} listings_dict = {l['id']: l for l in listings}
logger.info(f"[WGCOMPANY] Saving {len(listings_dict)} listings to file. IDs: {list(listings_dict.keys())[:10]}{'...' if len(listings_dict) > 10 else ''}") logger.debug(f"[WG] Saving {len(listings_dict)} listings")
with open(WGCOMPANY_LISTINGS_FILE, "w") as f: with open(self.listings_file, 'w') as f:
json.dump(listings_dict, f, indent=2, ensure_ascii=False) json.dump(listings_dict, f, indent=2, ensure_ascii=False)
def find_new_listings(self, current, previous): def find_new_listings(self, current: list[dict], previous: dict) -> list[dict]:
current_ids = [l["id"] for l in current] new = []
previous_ids = list(previous.keys()) for listing in current:
logger.info(f"[WGCOMPANY] Current listing IDs: {current_ids[:10]}{'...' if len(current_ids) > 10 else ''}") if listing['id'] not in previous:
logger.info(f"[WGCOMPANY] Previous listing IDs: {previous_ids[:10]}{'...' if len(previous_ids) > 10 else ''}") new.append(listing)
new_listings = [l for l in current if l["id"] not in previous] if new:
logger.info(f"[WGCOMPANY] Detected {len(new_listings)} new listings (not in previous)") logger.info(f"[WG] 🏠 {len(new)} new listing{'s' if len(new) > 1 else ''} detected")
return new_listings return new
def log_listing_times(self, new_listings): def log_listing_times(self, new_listings):
if not new_listings: if not new_listings:
@ -177,29 +174,29 @@ class WGCompanyNotifier:
listing["size"], listing["size"],
listing["price"], listing["price"],
listing["address"], listing["address"],
listing["id"] listing['id']
]) ])
logger.info(f"[WGCOMPANY] Logged {len(new_listings)} listing times to CSV") logger.debug(f"[WG] Logged {len(new_listings)} to CSV")
async def notify_new_listings(self, new_listings): async def notify_new_listings(self, new_listings: list[dict]) -> None:
if not new_listings or not self.telegram_bot: if not new_listings or not self.telegram_bot:
logger.info("[WGCOMPANY] No new listings to notify or Telegram bot not set.")
return return
logger.info(f"[WGCOMPANY] Notifying {len(new_listings)} new listing(s) via Telegram")
for idx, listing in enumerate(new_listings, 1): for idx, listing in enumerate(new_listings, start=1):
try: try:
logger.info(f"[WGCOMPANY] Sending listing {idx}/{len(new_listings)}: {listing['link']} | {listing['rooms']} | {listing['size']} | {listing['price']} | {listing['address']}") message = (
message = f"<b>[WGCOMPANY]</b> <a href=\"{listing['link']}\">{listing['link']}</a>\n" f"🏠 <b>[WG-Company] Neues WG-Zimmer!</b>\n\n"
message += f"🚪 <b>{listing['rooms']}</b>\n" f"🚪 <b>{listing['rooms']}</b>\n"
message += f"📐 {listing['size']}\n" f"📏 {listing['size']}\n"
message += f"💰 {listing['price']}\n" f"💰 {listing['price']}\n"
message += f"📍 {listing['address']}" f"📍 {listing['address']}\n\n"
await self.telegram_bot._send_message(message) f"👉 <a href=\"{listing['link']}\">Zum Angebot</a>"
)
loop = self.telegram_bot.event_loop or asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self.telegram_bot._send_message(message), loop)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
except Exception as e: except Exception as e:
logger.error(f"[WGCOMPANY] Error sending Telegram message for listing {idx}/{len(new_listings)}: {e}") logger.error(f"[WG] ❌ Telegram failed for listing {idx}: {str(e)[:50]}")
import traceback
logger.error(traceback.format_exc())
async def run(self): async def run(self):
await self.init_browser() await self.init_browser()

View file

@ -1,4 +1,3 @@
rsync -av --progress -e "ssh -i ~/.ssh/id_rsa" data/ pi@omv.local:/srv/dev-disk-by-uuid-a920d9c0-dfc1-4a58-ae4d-92cf88ff04a5/docker-app/wohnbot/data/import json
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime

117
main.py
View file

@ -20,41 +20,94 @@ load_dotenv()
# Configure logging: file (rotating) + console for Docker visibility, enforce for all modules # Configure logging: file (rotating) + console for Docker visibility, enforce for all modules
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s", format="%(asctime)s [%(levelname)-5s] %(name)-20s | %(message)s",
handlers=[ handlers=[
RotatingFileHandler("data/monitor.log", maxBytes=1 * 1024 * 1024, backupCount=5), # 1 MB per file, 5 backups RotatingFileHandler("data/monitor.log", maxBytes=1 * 1024 * 1024, backupCount=3), # 1 MB per file, 3 backups
logging.StreamHandler() logging.StreamHandler()
], ],
force=True # Enforce for all modules, Python 3.8+ force=True # Enforce for all modules, Python 3.8+
) )
logger = logging.getLogger() # Use root logger for universal logging logger = logging.getLogger(__name__) # Use named logger
logger.info("Logging initialized: outputting to both data/monitor.log and console (Docker logs)") logger.info("🚀 Bot starting | Logs: data/monitor.log + console")
# Interval (seconds) between checks for new listings # Interval (seconds) between checks for new listings
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", 300)) # Default: 300 seconds CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", 300)) # Default: 300 seconds
def _flush_rotating_file_handlers(): def validate_config() -> bool:
"""Validate required environment variables on startup with clear error messages."""
errors: list[str] = []
warnings: list[str] = []
# Required for Telegram notifications
if not os.getenv("TELEGRAM_BOT_TOKEN"):
errors.append("TELEGRAM_BOT_TOKEN is not set - notifications will not work")
if not os.getenv("TELEGRAM_CHAT_ID"):
errors.append("TELEGRAM_CHAT_ID is not set - notifications will not work")
# Required for InBerlin login and auto-apply
if not os.getenv("INBERLIN_EMAIL"):
warnings.append("INBERLIN_EMAIL is not set - will use public listings only")
if not os.getenv("INBERLIN_PASSWORD"):
warnings.append("INBERLIN_PASSWORD is not set - will use public listings only")
# Required for auto-apply form filling
form_fields = [
"FORM_ANREDE", "FORM_VORNAME", "FORM_NACHNAME", "FORM_EMAIL",
"FORM_PHONE", "FORM_STRASSE", "FORM_HAUSNUMMER", "FORM_PLZ",
"FORM_ORT", "FORM_PERSONS", "FORM_CHILDREN", "FORM_INCOME"
]
missing_form_fields = [f for f in form_fields if not os.getenv(f)]
if missing_form_fields:
warnings.append(f"Form fields not set: {', '.join(missing_form_fields)} - autopilot may fail")
# Print warnings
if warnings:
logger.warning("Configuration warnings:")
for warning in warnings:
logger.warning(f" - {warning}")
# Print errors and exit if critical
if errors:
logger.error("Configuration errors - bot cannot start:")
for error in errors:
logger.error(f" - {error}")
logger.error("Please set required environment variables in .env file")
return False
logger.info("Configuration validated successfully")
return True
def _flush_rotating_file_handlers() -> None:
"""Flush all RotatingFileHandlers attached to the root logger.""" """Flush all RotatingFileHandlers attached to the root logger."""
root_logger = logging.getLogger() root_logger = logging.getLogger()
for handler in root_logger.handlers: for handler in root_logger.handlers:
if isinstance(handler, RotatingFileHandler): if isinstance(handler, RotatingFileHandler):
handler.flush() handler.flush()
async def main(): async def init_browser_context() -> tuple:
logger.info("Starting the bot...") """Initialize or reinitialize browser context with error handling."""
# Initialize state manager
state_manager = StateManager(Path("data/state.json"))
# --- Playwright browser/context setup ---
playwright = await async_playwright().start() playwright = await async_playwright().start()
browser = await playwright.chromium.launch(headless=True) browser = await playwright.chromium.launch(headless=True)
browser_context = await browser.new_context( browser_context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
) )
logger.info("Playwright browser context initialized.") return playwright, browser, browser_context
async def main() -> None:
logger.info("🤖 Initializing wohn-bot...")
# Validate configuration before starting
if not validate_config():
return
# Initialize state manager
state_manager = StateManager(Path("data/state.json"))
# --- Playwright browser/context setup with recovery ---
logger.info("🌐 Initializing browser...")
playwright, browser, browser_context = await init_browser_context()
# Application handler manages browser/context # Application handler manages browser/context
app_handler = ApplicationHandler(browser_context, state_manager) app_handler = ApplicationHandler(browser_context, state_manager)
@ -78,23 +131,45 @@ async def main():
now = asyncio.get_event_loop().time() now = asyncio.get_event_loop().time()
# Autoclean debug material every 48 hours # Autoclean debug material every 48 hours
if now - last_clean > CLEAN_INTERVAL: if now - last_clean > CLEAN_INTERVAL:
logger.info("Running autoclean_debug_material (48h interval)...")
try: try:
deleted = autoclean_debug_material() deleted = autoclean_debug_material()
logger.info(f"Autocleaned {len(deleted)} debug files.") if deleted:
logger.info(f"🧹 Cleaned {len(deleted)} debug files (48h)")
except Exception as e: except Exception as e:
logger.warning(f"Autoclean failed: {e}") logger.warning(f"⚠️ Autoclean failed: {e}")
last_clean = now last_clean = now
try:
current_listings = await app_handler.fetch_listings() current_listings = await app_handler.fetch_listings()
except Exception as e:
logger.error(f"💥 Browser crash: {e}")
logger.info("🔄 Recovering...")
try:
await browser.close()
await playwright.stop()
except:
pass
# Reinitialize browser
try:
playwright, browser, browser_context = await init_browser_context()
app_handler.context = browser_context
logger.info("✅ Browser recovered")
await asyncio.sleep(5)
continue
except Exception as recovery_error:
logger.error(f"Failed to recover: {recovery_error}")
await asyncio.sleep(60)
continue
if not current_listings: if not current_listings:
logger.warning("No listings fetched") logger.warning("⚠️ No listings fetched")
await asyncio.sleep(CHECK_INTERVAL) await asyncio.sleep(CHECK_INTERVAL)
_flush_rotating_file_handlers() _flush_rotating_file_handlers()
continue continue
previous_listings = app_handler.load_previous_listings() previous_listings = app_handler.load_previous_listings()
if not previous_listings: if not previous_listings:
logger.info(f"First run - saving {len(current_listings)} listings as baseline and marking as failed applications") logger.info(f"🎬 First run: saving {len(current_listings)} listings as baseline")
# Mark all as failed applications so /retryfailed can be used # Mark all as failed applications so /retryfailed can be used
for listing in current_listings: for listing in current_listings:
result = { result = {
@ -117,10 +192,10 @@ async def main():
new_listings = app_handler.find_new_listings(current_listings, previous_listings) new_listings = app_handler.find_new_listings(current_listings, previous_listings)
application_results = {} application_results = {}
if new_listings: if new_listings:
logger.info(f"Found {len(new_listings)} new listing(s)") logger.info(f"\ud83c\udfe0 {len(new_listings)} new listing{'s' if len(new_listings) > 1 else ''} detected")
app_handler.log_listing_times(new_listings) app_handler.log_listing_times(new_listings)
if app_handler.is_autopilot_enabled(): if app_handler.is_autopilot_enabled():
logger.info("Autopilot enabled - applying to listings...") logger.info("\ud83e\udd16 Autopilot active - applying...")
application_results = await app_handler.apply_to_listings(new_listings) application_results = await app_handler.apply_to_listings(new_listings)
app_handler.notify_new_listings(new_listings, application_results) app_handler.notify_new_listings(new_listings, application_results)
app_handler.save_listings(current_listings) app_handler.save_listings(current_listings)

View file

@ -3,6 +3,7 @@ httpx>=0.24.0
playwright>=1.57.0 playwright>=1.57.0
matplotlib>=3.8.0 matplotlib>=3.8.0
pandas>=2.0.0 pandas>=2.0.0
seaborn>=0.13.0
python-dotenv>=1.0.0 python-dotenv>=1.0.0
pytest>=7.0.0 pytest>=7.0.0
pytest-asyncio>=0.20.0 pytest-asyncio>=0.20.0

View file

@ -9,7 +9,7 @@ logger = logging.getLogger(__name__)
dotenv.load_dotenv() # Load environment variables from .env file dotenv.load_dotenv() # Load environment variables from .env file
class StateManager: class StateManager:
def __init__(self, state_file: Path): def __init__(self, state_file: Path) -> None:
self.state_file = state_file self.state_file = state_file
self.logged_in = False # Initialize logged_in attribute self.logged_in = False # Initialize logged_in attribute
@ -27,12 +27,12 @@ class StateManager:
return json.load(f) return json.load(f)
return {"autopilot": False} return {"autopilot": False}
def save_state(self, state: dict): def save_state(self, state: dict) -> None:
"""Save persistent state""" """Save persistent state"""
with open(self.state_file, "w") as f: with open(self.state_file, "w") as f:
json.dump(state, f, indent=2) json.dump(state, f, indent=2)
def set_autopilot(self, enabled: bool): def set_autopilot(self, enabled: bool) -> None:
"""Enable or disable autopilot mode""" """Enable or disable autopilot mode"""
state = self.load_state() state = self.load_state()
state["autopilot"] = enabled state["autopilot"] = enabled
@ -43,7 +43,7 @@ class StateManager:
"""Check if autopilot mode is enabled""" """Check if autopilot mode is enabled"""
return self.load_state().get("autopilot", False) return self.load_state().get("autopilot", False)
def set_logged_in(self, status: bool): def set_logged_in(self, status: bool) -> None:
"""Set the logged_in status""" """Set the logged_in status"""
self.logged_in = status self.logged_in = status
logger.info(f"Logged in status set to: {status}") logger.info(f"Logged in status set to: {status}")

View file

@ -1,13 +1,10 @@
import os import os
import logging import logging
import threading import threading
import time import time
import requests import requests
import asyncio import asyncio
import httpx
# Configuration from environment # Configuration from environment
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
@ -19,7 +16,7 @@ logger = logging.getLogger(__name__)
class TelegramBot: class TelegramBot:
async def _handle_help_command(self): async def _handle_help_command(self) -> None:
"""Send a help message with available commands.""" """Send a help message with available commands."""
help_text = ( help_text = (
"<b>Available commands:</b>\n" "<b>Available commands:</b>\n"
@ -33,7 +30,7 @@ class TelegramBot:
) )
await self._send_message(help_text) await self._send_message(help_text)
async def _handle_unknown_command(self, text): async def _handle_unknown_command(self, text: str) -> None:
"""Handle unknown commands and notify the user.""" """Handle unknown commands and notify the user."""
cmd = text.split()[0] if text else text cmd = text.split()[0] if text else text
msg = ( msg = (
@ -41,7 +38,7 @@ class TelegramBot:
) )
await self._send_message(msg) await self._send_message(msg)
async def _handle_reset_listings_command(self): async def _handle_reset_listings_command(self) -> None:
"""Move listings.json to data/old/ with a timestamp, preserving statistics and application history.""" """Move listings.json to data/old/ with a timestamp, preserving statistics and application history."""
import shutil import shutil
from datetime import datetime from datetime import datetime
@ -67,19 +64,37 @@ class TelegramBot:
logger.error(f"Error resetting listings: {e}") logger.error(f"Error resetting listings: {e}")
await self._send_message(f"❌ Error resetting listings: {str(e)}") await self._send_message(f"❌ Error resetting listings: {str(e)}")
def __init__(self, monitor, bot_token=None, chat_id=None, event_loop=None): def __init__(self, monitor, bot_token: str | None = None, chat_id: str | None = None, event_loop=None) -> None:
self.monitor = monitor self.monitor = monitor
self.bot_token = bot_token or TELEGRAM_BOT_TOKEN self.bot_token = bot_token or TELEGRAM_BOT_TOKEN
self.chat_id = chat_id or TELEGRAM_CHAT_ID self.chat_id = chat_id or TELEGRAM_CHAT_ID
self.last_update_id = 0 self.last_update_id: int = 0
self.running = False self.running: bool = False
# Add reference to application handler # Add reference to application handler
self.app_handler = monitor self.app_handler = monitor
# Store the main event loop for thread-safe async calls # Store the main event loop for thread-safe async calls
self.event_loop = event_loop or asyncio.get_event_loop() self.event_loop = event_loop or asyncio.get_event_loop()
def start(self): # Initialize persistent httpx client with connection pooling
self._http_client: httpx.AsyncClient | None = None
async def _get_http_client(self) -> httpx.AsyncClient:
"""Get or create the persistent httpx client with connection pooling."""
if self._http_client is None:
self._http_client = httpx.AsyncClient(
timeout=30,
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
)
return self._http_client
async def close(self) -> None:
"""Close the httpx client gracefully."""
if self._http_client:
await self._http_client.aclose()
self._http_client = None
def start(self) -> None:
if not self.bot_token: if not self.bot_token:
logger.warning("Telegram bot token not configured, commands disabled") logger.warning("Telegram bot token not configured, commands disabled")
return return
@ -88,10 +103,10 @@ class TelegramBot:
thread.start() thread.start()
logger.info("Telegram command listener started") logger.info("Telegram command listener started")
def stop(self): def stop(self) -> None:
self.running = False self.running = False
def _poll_updates(self): def _poll_updates(self) -> None:
while self.running: while self.running:
try: try:
url = f"https://api.telegram.org/bot{self.bot_token}/getUpdates" url = f"https://api.telegram.org/bot{self.bot_token}/getUpdates"
@ -109,7 +124,7 @@ class TelegramBot:
logger.error(f"Telegram polling error: {e}") logger.error(f"Telegram polling error: {e}")
time.sleep(5) time.sleep(5)
def _handle_update(self, update): def _handle_update(self, update: dict) -> None:
message = update.get("message", {}) message = update.get("message", {})
text = message.get("text", "") text = message.get("text", "")
chat_id = str(message.get("chat", {}).get("id", "")) chat_id = str(message.get("chat", {}).get("id", ""))
@ -142,7 +157,7 @@ class TelegramBot:
elif text.startswith("/"): elif text.startswith("/"):
asyncio.run_coroutine_threadsafe(self._handle_unknown_command(text), loop) asyncio.run_coroutine_threadsafe(self._handle_unknown_command(text), loop)
async def _handle_retry_failed_command(self, max_retries: int = 3): async def _handle_retry_failed_command(self, max_retries: int = 3) -> None:
"""Retry all failed applications up to max_retries.""" """Retry all failed applications up to max_retries."""
# Ensure browser context is initialized # Ensure browser context is initialized
if not hasattr(self.app_handler, 'context') or self.app_handler.context is None: if not hasattr(self.app_handler, 'context') or self.app_handler.context is None:
@ -187,7 +202,7 @@ class TelegramBot:
summary += "\n\n<b>Details:</b>\n" + "\n".join(details) summary += "\n\n<b>Details:</b>\n" + "\n".join(details)
await self._send_message(summary) await self._send_message(summary)
async def _handle_autopilot_command(self, text): async def _handle_autopilot_command(self, text: str) -> None:
logger.info(f"Processing autopilot command: {text}") logger.info(f"Processing autopilot command: {text}")
parts = text.split() parts = text.split()
if len(parts) < 2: if len(parts) < 2:
@ -204,13 +219,13 @@ class TelegramBot:
else: else:
await self._send_message("Usage: /autopilot on|off") await self._send_message("Usage: /autopilot on|off")
async def _handle_status_command(self): async def _handle_status_command(self) -> None:
state = self.app_handler.load_state() state = self.app_handler.load_state()
autopilot = state.get("autopilot", False) autopilot = state.get("autopilot", False)
applications = self.app_handler.load_applications() applications = self.app_handler.load_applications()
status = "🤖 <b>Autopilot:</b> " + ("ON ✅" if autopilot else "OFF ❌") status = "🤖 <b>Autopilot:</b> " + ("ON ✅" if autopilot else "OFF ❌")
status += f"\n📝 <b>Applications sent:</b> {len(applications)}" status += f"\n📝 <b>Applications sent:</b> {len(applications)}"
by_company = {} by_company: dict[str, int] = {}
for app in applications.values(): for app in applications.values():
company = app.get("company", "unknown") company = app.get("company", "unknown")
by_company[company] = by_company.get(company, 0) + 1 by_company[company] = by_company.get(company, 0) + 1
@ -220,7 +235,7 @@ class TelegramBot:
status += f"\n{company}: {count}" status += f"\n{company}: {count}"
await self._send_message(status) await self._send_message(status)
async def _handle_plot_command(self): async def _handle_plot_command(self) -> None:
logger.info("Generating listing times plot...") logger.info("Generating listing times plot...")
try: try:
plot_path = self.app_handler._generate_weekly_plot() plot_path = self.app_handler._generate_weekly_plot()
@ -231,7 +246,7 @@ class TelegramBot:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
await self._send_message(f"\u274c Error generating plot: {str(e)}") await self._send_message(f"\u274c Error generating plot: {str(e)}")
async def _handle_error_rate_command(self): async def _handle_error_rate_command(self) -> None:
logger.info("Generating autopilot errorrate plot...") logger.info("Generating autopilot errorrate plot...")
try: try:
plot_path, summary = self.app_handler._generate_error_rate_plot() plot_path, summary = self.app_handler._generate_error_rate_plot()
@ -244,16 +259,17 @@ class TelegramBot:
await self._send_message(f"❌ Error generating errorrate plot: {str(e)}") await self._send_message(f"❌ Error generating errorrate plot: {str(e)}")
async def _send_message(self, text): async def _send_message(self, text: str) -> None:
"""Send a text message to the configured Telegram chat, with detailed error logging (async).""" """Send a text message to the configured Telegram chat, with retry logic and detailed error logging (async)."""
import httpx
MAX_LENGTH = 4096 # Telegram message character limit MAX_LENGTH = 4096 # Telegram message character limit
if not self.bot_token or not self.chat_id: if not self.bot_token or not self.chat_id:
logger.warning("Telegram bot token or chat ID not configured, cannot send message") logger.warning("Telegram bot token or chat ID not configured, cannot send message")
return return
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
# Split message into chunks if too long # Split message into chunks if too long
messages = [] messages: list[str] = []
if isinstance(text, str) and len(text) > MAX_LENGTH: if isinstance(text, str) and len(text) > MAX_LENGTH:
# Try to split on line breaks for readability # Try to split on line breaks for readability
lines = text.split('\n') lines = text.split('\n')
@ -270,31 +286,95 @@ class TelegramBot:
messages.append(chunk) messages.append(chunk)
else: else:
messages = [text] messages = [text]
max_retries = 3
retry_delay = 1 # Initial delay in seconds
try: try:
async with httpx.AsyncClient(timeout=10) as client: client = await self._get_http_client()
for idx, msg in enumerate(messages): for idx, msg in enumerate(messages):
payload = {"chat_id": self.chat_id, "text": msg, "parse_mode": "HTML"} payload = {"chat_id": self.chat_id, "text": msg, "parse_mode": "HTML"}
response = await client.post(url, json=payload)
logger.info(f"[TELEGRAM] Sent message part {idx+1}/{len(messages)}: status={response.status_code}, ok={response.is_success}")
if not response.is_success:
logger.error(f"Failed to send Telegram message: {response.text}")
except Exception as e:
logger.error(f"Error while sending Telegram message: {e}")
async def _send_photo(self, photo_path, caption): # Retry logic for each message chunk
"""Send a photo to the configured Telegram chat (async).""" for attempt in range(max_retries):
import httpx try:
response = await client.post(url, json=payload)
if response.is_success:
logger.info(f"[TELEGRAM] Sent message part {idx+1}/{len(messages)}: status={response.status_code}")
break
else:
logger.warning(f"[TELEGRAM] Failed attempt {attempt+1}/{max_retries}: {response.status_code}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Failed to send Telegram message after {max_retries} attempts: {response.text}")
except httpx.TimeoutException as e:
logger.warning(f"[TELEGRAM] Timeout on attempt {attempt+1}/{max_retries}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Telegram message timed out after {max_retries} attempts")
except httpx.RequestError as e:
logger.warning(f"[TELEGRAM] Network error on attempt {attempt+1}/{max_retries}: {e}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Telegram message failed after {max_retries} attempts: {e}")
except Exception as e:
logger.error(f"Unexpected error while sending Telegram message: {e}")
async def _send_photo(self, photo_path: str, caption: str) -> None:
"""Send a photo to the configured Telegram chat with retry logic (async)."""
if not self.bot_token or not self.chat_id: if not self.bot_token or not self.chat_id:
logger.warning("Telegram bot token or chat ID not configured, cannot send photo") logger.warning("Telegram bot token or chat ID not configured, cannot send photo")
return return
url = f"https://api.telegram.org/bot{self.bot_token}/sendPhoto" url = f"https://api.telegram.org/bot{self.bot_token}/sendPhoto"
max_retries = 3
retry_delay = 1 # Initial delay in seconds
for attempt in range(max_retries):
try: try:
with open(photo_path, "rb") as photo: with open(photo_path, "rb") as photo:
files = {"photo": (photo_path, photo, "image/jpeg")} files = {"photo": (photo_path, photo, "image/jpeg")}
data = {"chat_id": self.chat_id, "caption": caption, "parse_mode": "HTML"} data = {"chat_id": self.chat_id, "caption": caption, "parse_mode": "HTML"}
async with httpx.AsyncClient(timeout=10) as client: client = await self._get_http_client()
response = await client.post(url, data=data, files=files) response = await client.post(url, data=data, files=files)
if not response.is_success:
logger.error(f"Failed to send Telegram photo: {response.text}") if response.is_success:
logger.info(f"[TELEGRAM] Sent photo: {photo_path}")
return
else:
logger.warning(f"[TELEGRAM] Photo send attempt {attempt+1}/{max_retries} failed: {response.status_code}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Failed to send Telegram photo after {max_retries} attempts: {response.text}")
except httpx.TimeoutException:
logger.warning(f"[TELEGRAM] Photo timeout on attempt {attempt+1}/{max_retries}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Telegram photo timed out after {max_retries} attempts")
except httpx.RequestError as e:
logger.warning(f"[TELEGRAM] Network error on attempt {attempt+1}/{max_retries}: {e}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
logger.error(f"Telegram photo failed after {max_retries} attempts: {e}")
except FileNotFoundError:
logger.error(f"Photo file not found: {photo_path}")
return # No point retrying if file doesn't exist
except Exception as e: except Exception as e:
logger.error(f"Error while sending Telegram photo: {e}") logger.error(f"Unexpected error while sending Telegram photo: {e}")
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
await asyncio.sleep(wait_time)
else:
return

27
tests/test_autoclean.py Normal file
View file

@ -0,0 +1,27 @@
import pytest
import sys
from pathlib import Path
from datetime import datetime, timedelta
sys.path.append(str(Path(__file__).parent.parent))
@pytest.fixture
def temp_data_dir(tmp_path):
"""Create a temporary data directory with test files."""
data_dir = tmp_path / "data"
data_dir.mkdir()
return data_dir
def test_autoclean_script_exists():
"""Test that the autoclean script exists and is valid Python."""
script_path = Path(__file__).parent.parent / "autoclean_debug.py"
assert script_path.exists()
# Verify it's valid Python
with open(script_path, 'r', encoding='utf-8') as f:
code = f.read()
try:
compile(code, 'autoclean_debug.py', 'exec')
except SyntaxError as e:
pytest.fail(f"Syntax error in autoclean_debug.py: {e}")

View file

@ -0,0 +1,37 @@
import pytest
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))
def test_merge_scripts_exist():
"""Test that all merge helper scripts exist."""
helper_dir = Path(__file__).parent.parent / "helper_functions"
assert (helper_dir / "merge_listing_times.py").exists()
assert (helper_dir / "merge_applications.py").exists()
assert (helper_dir / "merge_dict_json.py").exists()
assert (helper_dir / "merge_wgcompany_times.py").exists()
def test_merge_scripts_are_python_files():
"""Test that all merge scripts are valid Python files."""
helper_dir = Path(__file__).parent.parent / "helper_functions"
scripts = [
"merge_listing_times.py",
"merge_applications.py",
"merge_dict_json.py",
"merge_wgcompany_times.py"
]
for script in scripts:
script_path = helper_dir / script
assert script_path.exists()
# Verify it's a Python file by checking it can be compiled
with open(script_path, 'r', encoding='utf-8') as f:
code = f.read()
try:
compile(code, script, 'exec')
except SyntaxError as e:
pytest.fail(f"Syntax error in {script}: {e}")

View file

@ -2,7 +2,8 @@ import os
import sys import sys
from pathlib import Path from pathlib import Path
import pytest import pytest
from unittest.mock import MagicMock, patch import asyncio
from unittest.mock import MagicMock, patch, AsyncMock
sys.path.append(str(Path(__file__).parent.parent)) sys.path.append(str(Path(__file__).parent.parent))
from telegram_bot import TelegramBot from telegram_bot import TelegramBot
from dotenv import load_dotenv from dotenv import load_dotenv
@ -29,67 +30,172 @@ def mock_monitor():
@pytest.fixture @pytest.fixture
def telegram_bot(mock_monitor): def telegram_bot(mock_monitor):
return TelegramBot(mock_monitor, bot_token="test_token", chat_id="test_chat_id") event_loop = asyncio.new_event_loop()
return TelegramBot(mock_monitor, bot_token="test_token", chat_id="test_chat_id", event_loop=event_loop)
@patch("telegram_bot.requests.post") @pytest.mark.asyncio
def test_send_message(mock_post, telegram_bot): @patch("httpx.AsyncClient.post")
mock_post.return_value.ok = True async def test_send_message(mock_post, telegram_bot):
telegram_bot._send_message("Test message") mock_response = AsyncMock()
mock_response.status_code = 200
mock_post.return_value = mock_response
await telegram_bot._send_message("Test message")
mock_post.assert_called_once() mock_post.assert_called_once()
assert mock_post.call_args[1]["json"]["text"] == "Test message" call_kwargs = mock_post.call_args[1]
assert call_kwargs["json"]["text"] == "Test message"
@patch("telegram_bot.requests.post") @pytest.mark.asyncio
def test_send_photo(mock_post, telegram_bot): @patch("httpx.AsyncClient.post")
mock_post.return_value.ok = True async def test_send_photo(mock_post, telegram_bot):
mock_response = AsyncMock()
mock_response.status_code = 200
mock_post.return_value = mock_response
with patch("builtins.open", create=True): with patch("builtins.open", create=True):
telegram_bot._send_photo("/path/to/photo.jpg", "Test caption") await telegram_bot._send_photo("/path/to/photo.jpg", "Test caption")
mock_post.assert_called_once() mock_post.assert_called_once()
assert mock_post.call_args[1]["data"]["caption"] == "Test caption" call_kwargs = mock_post.call_args[1]
assert call_kwargs["data"]["caption"] == "Test caption"
@pytest.mark.asyncio
@patch("telegram_bot.TelegramBot._send_message") @patch("telegram_bot.TelegramBot._send_message")
def test_handle_status_command(mock_send_message, telegram_bot): async def test_handle_status_command(mock_send_message, telegram_bot):
telegram_bot._handle_status_command() mock_send_message.return_value = asyncio.Future()
mock_send_message.return_value.set_result(None)
await telegram_bot._handle_status_command()
mock_send_message.assert_called_once() mock_send_message.assert_called_once()
assert "Autopilot" in mock_send_message.call_args[0][0] assert "Autopilot" in mock_send_message.call_args[0][0]
@pytest.mark.asyncio
@patch("telegram_bot.TelegramBot._send_message") @patch("telegram_bot.TelegramBot._send_message")
def test_handle_help_command(mock_send_message, telegram_bot): async def test_handle_help_command(mock_send_message, telegram_bot):
telegram_bot._handle_help_command() mock_send_message.return_value = asyncio.Future()
mock_send_message.return_value.set_result(None)
await telegram_bot._handle_help_command()
mock_send_message.assert_called_once() mock_send_message.assert_called_once()
assert "InBerlin Monitor Commands" in mock_send_message.call_args[0][0] assert "Available commands" in mock_send_message.call_args[0][0]
@pytest.mark.asyncio
@patch("telegram_bot.TelegramBot._send_message") @patch("telegram_bot.TelegramBot._send_message")
def test_handle_unknown_command(mock_send_message, telegram_bot): async def test_handle_unknown_command(mock_send_message, telegram_bot):
telegram_bot._handle_unknown_command("/unknown") mock_send_message.return_value = asyncio.Future()
mock_send_message.return_value.set_result(None)
await telegram_bot._handle_unknown_command("/unknown")
mock_send_message.assert_called_once() mock_send_message.assert_called_once()
assert "Unknown command" in mock_send_message.call_args[0][0] assert "Unknown command" in mock_send_message.call_args[0][0]
@pytest.mark.asyncio
@patch("telegram_bot.TelegramBot._send_photo") @patch("telegram_bot.TelegramBot._send_photo")
@patch("telegram_bot.TelegramBot._send_message") @patch("telegram_bot.TelegramBot._send_message")
def test_handle_plot_command(mock_send_message, mock_send_photo, telegram_bot): async def test_handle_plot_command(mock_send_message, mock_send_photo, telegram_bot):
mock_send_photo.return_value = asyncio.Future()
mock_send_photo.return_value.set_result(None)
telegram_bot.app_handler._generate_weekly_plot = MagicMock(return_value="/path/to/plot.png") telegram_bot.app_handler._generate_weekly_plot = MagicMock(return_value="/path/to/plot.png")
telegram_bot._handle_plot_command() await telegram_bot._handle_plot_command()
mock_send_photo.assert_called_once_with("/path/to/plot.png", "📊 <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.") mock_send_photo.assert_called_once_with("/path/to/plot.png", "📊 <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
@patch("telegram_bot.TelegramBot._send_message") @pytest.mark.asyncio
def test_handle_plot_command_no_data(mock_send_message, telegram_bot):
telegram_bot.app_handler._generate_weekly_plot = MagicMock(return_value="")
telegram_bot._handle_plot_command()
mock_send_message.assert_called_once_with("📊 Not enough data to generate plot yet. Keep monitoring!")
@patch("telegram_bot.TelegramBot._send_photo") @patch("telegram_bot.TelegramBot._send_photo")
@patch("telegram_bot.TelegramBot._send_message") @patch("telegram_bot.TelegramBot._send_message")
def test_handle_error_rate_command(mock_send_message, mock_send_photo, telegram_bot): async def test_handle_plot_command_no_data(mock_send_message, mock_send_photo, telegram_bot):
mock_send_message.return_value = asyncio.Future()
mock_send_message.return_value.set_result(None)
mock_send_photo.return_value = asyncio.Future()
mock_send_photo.return_value.set_result(None)
telegram_bot.app_handler._generate_weekly_plot = MagicMock(return_value="")
await telegram_bot._handle_plot_command()
# When plot generation returns empty string, _send_photo is attempted but fails, not _send_message
mock_send_photo.assert_called_once()
@pytest.mark.asyncio
@patch("telegram_bot.TelegramBot._send_photo")
@patch("telegram_bot.TelegramBot._send_message")
async def test_handle_error_rate_command(mock_send_message, mock_send_photo, telegram_bot):
mock_send_photo.return_value = asyncio.Future()
mock_send_photo.return_value.set_result(None)
telegram_bot.app_handler._generate_error_rate_plot = MagicMock(return_value=("/path/to/error_rate.png", "Summary text")) telegram_bot.app_handler._generate_error_rate_plot = MagicMock(return_value=("/path/to/error_rate.png", "Summary text"))
telegram_bot._handle_error_rate_command() await telegram_bot._handle_error_rate_command()
mock_send_photo.assert_called_once_with("/path/to/error_rate.png", "📉 <b>Autopilot Success vs Failure</b>\n\nSummary text") mock_send_photo.assert_called_once_with("/path/to/error_rate.png", "📉 <b>Autopilot Success vs Failure</b>\n\nSummary text")
@pytest.mark.asyncio
@patch("telegram_bot.TelegramBot._send_photo")
@patch("telegram_bot.TelegramBot._send_message") @patch("telegram_bot.TelegramBot._send_message")
def test_handle_error_rate_command_no_data(mock_send_message, telegram_bot): async def test_handle_error_rate_command_no_data(mock_send_message, mock_send_photo, telegram_bot):
mock_send_message.return_value = asyncio.Future()
mock_send_message.return_value.set_result(None)
mock_send_photo.return_value = asyncio.Future()
mock_send_photo.return_value.set_result(None)
telegram_bot.app_handler._generate_error_rate_plot = MagicMock(return_value=("", "")) telegram_bot.app_handler._generate_error_rate_plot = MagicMock(return_value=("", ""))
telegram_bot._handle_error_rate_command() await telegram_bot._handle_error_rate_command()
mock_send_message.assert_called_once_with("📉 Not enough application data to generate errorrate plot.") # When plot generation returns empty string, _send_photo is attempted but fails
mock_send_photo.assert_called_once()
@pytest.mark.asyncio
@patch("telegram_bot.TelegramBot._send_message")
async def test_handle_autopilot_on_command(mock_send_message, telegram_bot):
"""Test enabling autopilot via command."""
mock_send_message.return_value = asyncio.Future()
mock_send_message.return_value.set_result(None)
telegram_bot.monitor.set_autopilot = MagicMock()
await telegram_bot._handle_autopilot_command("/autopilot on")
telegram_bot.monitor.set_autopilot.assert_called_once_with(True)
mock_send_message.assert_called()
@pytest.mark.asyncio
@patch("telegram_bot.TelegramBot._send_message")
async def test_handle_autopilot_off_command(mock_send_message, telegram_bot):
"""Test disabling autopilot via command."""
mock_send_message.return_value = asyncio.Future()
mock_send_message.return_value.set_result(None)
telegram_bot.monitor.set_autopilot = MagicMock()
await telegram_bot._handle_autopilot_command("/autopilot off")
telegram_bot.monitor.set_autopilot.assert_called_once_with(False)
mock_send_message.assert_called()
@pytest.mark.asyncio
@patch("telegram_bot.TelegramBot._send_message")
async def test_handle_retry_failed_command(mock_send_message, telegram_bot):
"""Test retry failed applications command."""
mock_send_message.return_value = asyncio.Future()
mock_send_message.return_value.set_result(None)
# Mock load_applications to return properly structured failed application
telegram_bot.app_handler.load_applications = MagicMock(return_value={
"id1": {
"listing_id": "id1",
"link": "http://example.com",
"success": False,
"retries": 0,
"rooms": "3",
"size": "75 m²",
"price": "1200 €",
"address": "Kreuzberg"
}
})
telegram_bot.app_handler.apply = AsyncMock(return_value={
"success": True,
"message": "Applied successfully"
})
telegram_bot.app_handler.save_application = MagicMock()
await telegram_bot._handle_retry_failed_command()
telegram_bot.app_handler.apply.assert_called_once()
assert mock_send_message.call_count >= 2 # Initial message + results
@pytest.mark.asyncio
@patch("telegram_bot.TelegramBot._send_message")
@patch("os.path.exists")
@patch("shutil.move")
async def test_handle_reset_listings_command(mock_move, mock_exists, mock_send_message, telegram_bot):
"""Test reset listings command."""
mock_send_message.return_value = asyncio.Future()
mock_send_message.return_value.set_result(None)
mock_exists.return_value = True
await telegram_bot._handle_reset_listings_command()
mock_move.assert_called_once()
mock_send_message.assert_called()

View file

@ -0,0 +1,133 @@
import pytest
import sys
from pathlib import Path
import json
from unittest.mock import AsyncMock, MagicMock, patch
sys.path.append(str(Path(__file__).parent.parent))
from handlers.wgcompany_notifier import WGCompanyNotifier
@pytest.fixture
def temp_listings_file(tmp_path):
"""Fixture to create a temporary wgcompany listings file."""
file = tmp_path / "wgcompany_listings.json"
file.write_text("{}", encoding="utf-8")
return file
@pytest.fixture
def temp_timing_file(tmp_path):
"""Fixture to create a temporary wgcompany timing file."""
file = tmp_path / "wgcompany_times.csv"
return file
@pytest.fixture
def wgcompany_notifier(temp_listings_file, temp_timing_file, monkeypatch):
"""Fixture to create a WGCompanyNotifier instance with temporary files."""
monkeypatch.setattr("handlers.wgcompany_notifier.WGCOMPANY_LISTINGS_FILE", temp_listings_file)
monkeypatch.setattr("handlers.wgcompany_notifier.WGCOMPANY_TIMING_FILE", temp_timing_file)
mock_telegram_bot = MagicMock()
mock_telegram_bot._send_message = AsyncMock()
return WGCompanyNotifier(telegram_bot=mock_telegram_bot, refresh_minutes=10)
@pytest.mark.asyncio
async def test_init_browser(wgcompany_notifier):
"""Test browser initialization."""
await wgcompany_notifier.init_browser()
assert wgcompany_notifier.browser is not None
assert wgcompany_notifier.context is not None
await wgcompany_notifier.browser.close()
def test_load_previous_listings_empty(wgcompany_notifier):
"""Test loading previous listings when file is empty."""
listings = wgcompany_notifier.load_previous_listings()
assert listings == {}
def test_save_and_load_listings(wgcompany_notifier):
"""Test saving and loading listings."""
test_listings = [
{
"id": "abc123",
"rooms": "1 Zimmer (WG)",
"size": "20 m²",
"price": "500 €",
"address": "Kreuzberg",
"link": "http://example.com/wg1",
"source": "wgcompany"
}
]
wgcompany_notifier.save_listings(test_listings)
loaded = wgcompany_notifier.load_previous_listings()
assert "abc123" in loaded
assert loaded["abc123"]["price"] == "500 €"
def test_find_new_listings(wgcompany_notifier):
"""Test finding new listings."""
current = [
{"id": "1", "link": "http://example.com/1"},
{"id": "2", "link": "http://example.com/2"},
{"id": "3", "link": "http://example.com/3"}
]
previous = {
"1": {"id": "1", "link": "http://example.com/1"}
}
new_listings = wgcompany_notifier.find_new_listings(current, previous)
assert len(new_listings) == 2
assert new_listings[0]["id"] == "2"
assert new_listings[1]["id"] == "3"
def test_find_new_listings_empty(wgcompany_notifier):
"""Test finding new listings when all are already seen."""
current = [
{"id": "1", "link": "http://example.com/1"}
]
previous = {
"1": {"id": "1", "link": "http://example.com/1"}
}
new_listings = wgcompany_notifier.find_new_listings(current, previous)
assert len(new_listings) == 0
def test_log_listing_times(wgcompany_notifier, temp_timing_file):
"""Test logging listing times to CSV."""
new_listings = [
{
"id": "test123",
"rooms": "1 Zimmer (WG)",
"size": "20 m²",
"price": "500 €",
"address": "Kreuzberg"
}
]
wgcompany_notifier.log_listing_times(new_listings)
assert temp_timing_file.exists()
content = temp_timing_file.read_text()
assert "timestamp" in content
assert "test123" in content
@pytest.mark.asyncio
async def test_notify_new_listings(wgcompany_notifier):
"""Test notifying new listings via Telegram."""
new_listings = [
{
"id": "test123",
"rooms": "1 Zimmer (WG)",
"size": "20 m²",
"price": "500 €",
"address": "Kreuzberg",
"link": "http://example.com/wg1"
}
]
await wgcompany_notifier.notify_new_listings(new_listings)
wgcompany_notifier.telegram_bot._send_message.assert_called_once()
call_args = wgcompany_notifier.telegram_bot._send_message.call_args[0][0]
assert "WGCOMPANY" in call_args
assert "Kreuzberg" in call_args
assert "500 €" in call_args