mostly working shape?
This commit is contained in:
parent
3057cda8d3
commit
540a3cc884
10 changed files with 462 additions and 183 deletions
|
|
@ -108,7 +108,8 @@ class ApplicationHandler:
|
|||
# Send via TelegramBot if available
|
||||
if hasattr(self, 'telegram_bot') and self.telegram_bot:
|
||||
logger.info(f"Notifying Telegram: {listing['address']} ({listing['rooms']}, {listing['size']}, {listing['price']})")
|
||||
self.telegram_bot._send_message(message)
|
||||
loop = getattr(self.telegram_bot, 'event_loop', None) or asyncio.get_event_loop()
|
||||
asyncio.run_coroutine_threadsafe(self.telegram_bot._send_message(message), loop)
|
||||
else:
|
||||
logger.info(f"[TELEGRAM] Would send message for: {listing['address']} ({listing['rooms']}, {listing['size']}, {listing['price']})")
|
||||
|
||||
|
|
@ -313,69 +314,124 @@ class ApplicationHandler:
|
|||
|
||||
|
||||
def _generate_weekly_plot(self) -> str:
|
||||
"""Generate a heatmap of listings by day of week and hour. Always returns a plot path, even if no data."""
|
||||
"""Generate a heatmap, bar chart, line chart, and summary of listings by day/hour, like monitor.py."""
|
||||
plot_path = DATA_DIR / "weekly_plot.png"
|
||||
try:
|
||||
if not TIMING_FILE.exists():
|
||||
logger.warning("No timing file found for weekly plot. Generating empty plot.")
|
||||
# Generate empty plot
|
||||
fig, ax = plt.subplots(figsize=(10, 6))
|
||||
ax.set_xticks(range(24))
|
||||
ax.set_yticks(range(7))
|
||||
ax.set_xticklabels([f"{h}:00" for h in range(24)], rotation=90)
|
||||
ax.set_yticklabels(["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"])
|
||||
ax.set_title("Listings Heatmap (No Data)")
|
||||
ax.text(0.5, 0.5, "No data available", fontsize=18, ha='center', va='center', transform=ax.transAxes, color='gray')
|
||||
plt.savefig(plot_path)
|
||||
plt.close(fig)
|
||||
return str(plot_path)
|
||||
logger.warning("No timing data file found")
|
||||
return ""
|
||||
|
||||
df = pd.read_csv(TIMING_FILE, parse_dates=["timestamp"])
|
||||
if df.empty:
|
||||
logger.warning("Timing file is empty. Generating empty plot.")
|
||||
fig, ax = plt.subplots(figsize=(10, 6))
|
||||
ax.set_xticks(range(24))
|
||||
ax.set_yticks(range(7))
|
||||
ax.set_xticklabels([f"{h}:00" for h in range(24)], rotation=90)
|
||||
ax.set_yticklabels(["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"])
|
||||
ax.set_title("Listings Heatmap (No Data)")
|
||||
ax.text(0.5, 0.5, "No data available", fontsize=18, ha='center', va='center', transform=ax.transAxes, color='gray')
|
||||
plt.savefig(plot_path)
|
||||
plt.close(fig)
|
||||
return str(plot_path)
|
||||
df = pd.read_csv(TIMING_FILE)
|
||||
if len(df) < 1:
|
||||
logger.warning("Timing file is empty")
|
||||
return ""
|
||||
|
||||
df["day_of_week"] = df["timestamp"].dt.dayofweek
|
||||
df["hour"] = df["timestamp"].dt.hour
|
||||
heatmap_data = df.groupby(["day_of_week", "hour"]).size().unstack(fill_value=0)
|
||||
logger.info(f"Loaded {len(df)} listing records for plot")
|
||||
|
||||
fig, ax = plt.subplots(figsize=(10, 6))
|
||||
cax = ax.matshow(heatmap_data, cmap="YlGnBu", aspect="auto")
|
||||
fig.colorbar(cax)
|
||||
# Create day-hour matrix
|
||||
days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
|
||||
|
||||
ax.set_xticks(range(24))
|
||||
ax.set_yticks(range(7))
|
||||
ax.set_xticklabels([f"{h}:00" for h in range(24)], rotation=90)
|
||||
ax.set_yticklabels(["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"])
|
||||
# Count listings per day and hour
|
||||
heatmap_data = pd.DataFrame(0, index=days_order, columns=range(24))
|
||||
|
||||
ax.set_title("Listings Heatmap (Day of Week vs Hour)")
|
||||
for _, row in df.iterrows():
|
||||
day = row['weekday']
|
||||
hour = int(row['hour'])
|
||||
if day in days_order:
|
||||
# Use pd.to_numeric to ensure value is numeric before incrementing
|
||||
val = pd.to_numeric(heatmap_data.loc[day, hour], errors='coerce')
|
||||
if pd.isna(val):
|
||||
heatmap_data.loc[day, hour] = 1
|
||||
else:
|
||||
heatmap_data.loc[day, hour] = int(val) + 1
|
||||
|
||||
plt.savefig(plot_path)
|
||||
plt.close(fig)
|
||||
logger.info(f"Weekly plot saved to {plot_path}")
|
||||
# Create figure with two subplots
|
||||
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
|
||||
fig.suptitle('Listing Appearance Patterns', fontsize=16, fontweight='bold')
|
||||
|
||||
# 1. Heatmap - Day vs Hour
|
||||
ax1 = axes[0, 0]
|
||||
im = ax1.imshow(heatmap_data.values, cmap='YlOrRd', aspect='auto')
|
||||
ax1.set_xticks(range(24))
|
||||
ax1.set_xticklabels(range(24), fontsize=8)
|
||||
ax1.set_yticks(range(7))
|
||||
ax1.set_yticklabels(days_order)
|
||||
ax1.set_xlabel('Hour of Day')
|
||||
ax1.set_ylabel('Day of Week')
|
||||
ax1.set_title('Listings by Day & Hour')
|
||||
plt.colorbar(im, ax=ax1, label='Count')
|
||||
|
||||
# 2. Bar chart - By day of week
|
||||
ax2 = axes[0, 1]
|
||||
day_counts = df['weekday'].value_counts().reindex(days_order, fill_value=0)
|
||||
colors = plt.cm.get_cmap('Blues')(day_counts / day_counts.max() if day_counts.max() > 0 else day_counts)
|
||||
bars = ax2.bar(range(7), day_counts.values, color=colors)
|
||||
ax2.set_xticks(range(7))
|
||||
ax2.set_xticklabels(['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'])
|
||||
ax2.set_xlabel('Day of Week')
|
||||
ax2.set_ylabel('Number of Listings')
|
||||
ax2.set_title('Total Listings by Day')
|
||||
for i, v in enumerate(day_counts.values):
|
||||
if v > 0:
|
||||
ax2.text(i, v + 0.1, str(v), ha='center', fontsize=9)
|
||||
|
||||
# 3. Line chart - By hour
|
||||
ax3 = axes[1, 0]
|
||||
hour_counts = df['hour'].value_counts().reindex(range(24), fill_value=0)
|
||||
ax3.plot(range(24), hour_counts.values, marker='o', linewidth=2, markersize=4, color='#2E86AB')
|
||||
ax3.fill_between(range(24), hour_counts.values, alpha=0.3, color='#2E86AB')
|
||||
ax3.set_xticks(range(0, 24, 2))
|
||||
ax3.set_xlabel('Hour of Day')
|
||||
ax3.set_ylabel('Number of Listings')
|
||||
ax3.set_title('Total Listings by Hour')
|
||||
ax3.grid(True, alpha=0.3)
|
||||
|
||||
# 4. Summary stats
|
||||
ax4 = axes[1, 1]
|
||||
ax4.axis('off')
|
||||
|
||||
# Calculate best times
|
||||
best_day = day_counts.idxmax() if day_counts.max() > 0 else "N/A"
|
||||
best_hour = hour_counts.idxmax() if hour_counts.max() > 0 else "N/A"
|
||||
total_listings = len(df)
|
||||
|
||||
# Find peak combinations
|
||||
peak_combo = heatmap_data.stack().idxmax() if heatmap_data.values.max() > 0 else ("N/A", "N/A")
|
||||
|
||||
# Fix: Ensure peak_combo is iterable
|
||||
if isinstance(peak_combo, tuple) and len(peak_combo) == 2:
|
||||
stats_text = f"🎯 Peak time: {peak_combo[0]} at {peak_combo[1]}:00"
|
||||
else:
|
||||
stats_text = "🎯 Peak time: N/A"
|
||||
|
||||
stats_text = f"""📊 Summary Statistics
|
||||
|
||||
Total listings tracked: {total_listings}
|
||||
|
||||
🏆 Best day: {best_day}
|
||||
⏰ Best hour: {best_hour}:00
|
||||
{stats_text}
|
||||
|
||||
📈 Average per day: {total_listings/7:.1f}
|
||||
📅 Data collection period:
|
||||
From: {df['timestamp'].min()[:10] if 'timestamp' in df.columns else 'N/A'}
|
||||
To: {df['timestamp'].max()[:10] if 'timestamp' in df.columns else 'N/A'}
|
||||
"""
|
||||
ax4.text(0.1, 0.9, stats_text, transform=ax4.transAxes, fontsize=11,
|
||||
verticalalignment='top', fontfamily='monospace',
|
||||
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
|
||||
|
||||
plt.tight_layout()
|
||||
|
||||
# Save plot
|
||||
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
|
||||
plt.close()
|
||||
|
||||
logger.info(f"Plot saved to {plot_path}")
|
||||
return str(plot_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to generate weekly plot: {e}")
|
||||
# Always generate a fallback empty plot
|
||||
fig, ax = plt.subplots(figsize=(10, 6))
|
||||
ax.set_xticks(range(24))
|
||||
ax.set_yticks(range(7))
|
||||
ax.set_xticklabels([f"{h}:00" for h in range(24)], rotation=90)
|
||||
ax.set_yticklabels(["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"])
|
||||
ax.set_title("Listings Heatmap (Error)")
|
||||
ax.text(0.5, 0.5, "Plot error", fontsize=18, ha='center', va='center', transform=ax.transAxes, color='red')
|
||||
plt.savefig(plot_path)
|
||||
plt.close(fig)
|
||||
return str(plot_path)
|
||||
logger.error(f"Error creating plot: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def _generate_error_rate_plot(self):
|
||||
|
|
@ -383,6 +439,8 @@ class ApplicationHandler:
|
|||
|
||||
Returns (plot_path, summary_text) or (None, "") if insufficient data.
|
||||
"""
|
||||
import matplotlib.dates as mdates
|
||||
from pathlib import Path
|
||||
if not self.applications_file.exists():
|
||||
logger.warning("No applications.json found for errorrate plot")
|
||||
return None, ""
|
||||
|
|
@ -390,25 +448,21 @@ class ApplicationHandler:
|
|||
try:
|
||||
with open(self.applications_file, 'r', encoding='utf-8') as f:
|
||||
apps = json.load(f)
|
||||
|
||||
if not apps:
|
||||
logger.warning("No application data available for errorrate plot")
|
||||
return None, ""
|
||||
|
||||
# Convert to DataFrame
|
||||
rows = []
|
||||
for _id, rec in apps.items():
|
||||
rows.append({
|
||||
"id": _id,
|
||||
"ts": pd.to_datetime(rec.get("timestamp")),
|
||||
"success": rec.get("success", False),
|
||||
"company": rec.get("company", "unknown")
|
||||
})
|
||||
|
||||
ts = rec.get('timestamp')
|
||||
try:
|
||||
dt = pd.to_datetime(ts)
|
||||
except Exception:
|
||||
dt = pd.NaT
|
||||
rows.append({'id': _id, 'company': rec.get('company'), 'success': bool(rec.get('success')), 'ts': dt})
|
||||
df = pd.DataFrame(rows)
|
||||
df = df.dropna(subset=['ts'])
|
||||
if df.empty:
|
||||
logger.warning("No valid data for errorrate plot")
|
||||
return None, ""
|
||||
|
||||
df['date'] = df['ts'].dt.floor('D')
|
||||
|
|
@ -419,28 +473,83 @@ class ApplicationHandler:
|
|||
# Ensure index is sorted by date for plotting
|
||||
grouped = grouped.sort_index()
|
||||
|
||||
# Prepare plot
|
||||
fig, ax = plt.subplots(figsize=(10, 6))
|
||||
ax.plot(grouped.index, grouped['error_rate'], marker='o', color='red', label='Error Rate')
|
||||
ax.set_title('Autopilot Error Rate Over Time')
|
||||
ax.set_xlabel('Date')
|
||||
ax.set_ylabel('Error Rate')
|
||||
ax.legend()
|
||||
ax.grid(True)
|
||||
# Prepare plot: convert dates to matplotlib numeric x-values so bars and line align
|
||||
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12), sharex=True)
|
||||
|
||||
# Save plot to the same directory as the applications file
|
||||
dates = pd.to_datetime(grouped.index).to_pydatetime()
|
||||
x = mdates.date2num(dates)
|
||||
width = 0.6 # width in days for bars
|
||||
|
||||
successes = grouped['successes'].values
|
||||
failures = grouped['failures'].values
|
||||
|
||||
ax1.bar(x, successes, width=width, color='#2E8B57', align='center')
|
||||
ax1.bar(x, failures, bottom=successes, width=width, color='#C44A4A', align='center')
|
||||
ax1.set_ylabel('Count')
|
||||
ax1.set_title('Autopilot: Successes vs Failures (by day)')
|
||||
ax1.set_xticks(x)
|
||||
ax1.set_xlim(min(x) - 1, max(x) + 1)
|
||||
ax1.xaxis.set_major_locator(mdates.AutoDateLocator())
|
||||
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
|
||||
|
||||
# Plot error rate line on same x (date) axis
|
||||
ax2.plot(x, grouped['error_rate'].values, marker='o', color='#3333AA', linewidth=2)
|
||||
ax2.set_ylim(-0.02, 1.02)
|
||||
ax2.set_ylabel('Error rate')
|
||||
ax2.set_xlabel('Date')
|
||||
ax2.set_title('Daily Error Rate (failures / total)')
|
||||
ax2.grid(True, alpha=0.3)
|
||||
ax2.set_xticks(x)
|
||||
ax2.set_xlim(min(x) - 1, max(x) + 1)
|
||||
ax2.xaxis.set_major_locator(mdates.AutoDateLocator())
|
||||
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
|
||||
|
||||
# Error rate by company (line plot)
|
||||
company_grouped = df.groupby(['date', 'company']).agg(total=('id','count'), successes=('success', lambda x: x.sum()))
|
||||
company_grouped['failures'] = company_grouped['total'] - company_grouped['successes']
|
||||
company_grouped['error_rate'] = company_grouped['failures'] / company_grouped['total']
|
||||
company_grouped = company_grouped.reset_index()
|
||||
error_rate_pivot = company_grouped.pivot(index='date', columns='company', values='error_rate')
|
||||
for company in error_rate_pivot.columns:
|
||||
y = error_rate_pivot[company].values
|
||||
ax3.plot(x, y, marker='o', label=str(company))
|
||||
ax3.set_ylim(-0.02, 1.02)
|
||||
ax3.set_ylabel('Error rate')
|
||||
ax3.set_xlabel('Date')
|
||||
ax3.set_title('Daily Error Rate by Company')
|
||||
ax3.grid(True, alpha=0.3)
|
||||
ax3.set_xticks(x)
|
||||
ax3.set_xlim(min(x) - 1, max(x) + 1)
|
||||
ax3.xaxis.set_major_locator(mdates.AutoDateLocator())
|
||||
ax3.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
|
||||
ax3.legend(title='Company', loc='upper right', fontsize='small')
|
||||
|
||||
fig.autofmt_xdate()
|
||||
plt.tight_layout()
|
||||
plot_path = self.applications_file.parent / 'error_rate.png'
|
||||
plt.savefig(plot_path)
|
||||
tmp_path = self.applications_file.parent / 'error_rate.tmp.png'
|
||||
# Save to a temp file first and atomically replace to ensure overwrite
|
||||
fig.savefig(tmp_path, format='png')
|
||||
plt.close(fig)
|
||||
try:
|
||||
tmp_path.replace(plot_path)
|
||||
except Exception:
|
||||
# Fallback: try removing existing and renaming
|
||||
try:
|
||||
if plot_path.exists():
|
||||
plot_path.unlink()
|
||||
tmp_path.rename(plot_path)
|
||||
except Exception:
|
||||
logger.exception(f"Failed to write plot to {plot_path}")
|
||||
|
||||
# Summary
|
||||
total_attempts = int(grouped['total'].sum())
|
||||
total_success = int(grouped['successes'].sum())
|
||||
total_fail = int(grouped['failures'].sum())
|
||||
overall_error = (total_fail / total_attempts) if total_attempts > 0 else 0.0
|
||||
overall_error = (total_fail / total_attempts) if total_attempts>0 else 0.0
|
||||
summary = f"<b>Total attempts:</b> {total_attempts}\n<b>Successes:</b> {total_success}\n<b>Failures:</b> {total_fail}\n<b>Overall error rate:</b> {overall_error:.1%}"
|
||||
|
||||
return plot_path, summary
|
||||
return str(plot_path), summary
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to generate error rate plot: {e}")
|
||||
return None, ""
|
||||
|
|
|
|||
|
|
@ -11,5 +11,5 @@ services:
|
|||
volumes:
|
||||
- ./data:/app/data:rw
|
||||
environment:
|
||||
- CHECK_INTERVAL=30
|
||||
- CHECK_INTERVAL=60
|
||||
- WOHNBOT_DEV=1
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ class DegewoHandler(BaseHandler):
|
|||
self.context = browser_context
|
||||
|
||||
async def apply(self, listing: dict, result: dict) -> dict:
|
||||
import os
|
||||
DATA_DIR = Path("data/degewo")
|
||||
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
||||
page = await self.context.new_page()
|
||||
|
|
@ -60,31 +61,173 @@ class DegewoHandler(BaseHandler):
|
|||
await asyncio.sleep(3)
|
||||
|
||||
# Degewo uses Wohnungshelden iframe for the application form
|
||||
# Find the iframe and get its URL to navigate directly
|
||||
iframe_element = await page.query_selector('iframe[src*="wohnungshelden.de"]')
|
||||
if iframe_element:
|
||||
iframe_url = await iframe_element.get_attribute('src')
|
||||
logger.info(f"[DEGEWO] Found Wohnungshelden iframe: {iframe_url}")
|
||||
|
||||
# Navigate to the iframe URL directly in a new page for full access
|
||||
iframe_page = await self.context.new_page()
|
||||
try:
|
||||
await iframe_page.goto(iframe_url, wait_until="networkidle")
|
||||
await asyncio.sleep(2)
|
||||
logger.info("[DEGEWO] Loaded Wohnungshelden application page")
|
||||
# TODO: Implement form-filling and submission logic here
|
||||
|
||||
# Screenshot and HTML for debugging
|
||||
screenshot_path = DATA_DIR / f"degewo_wohnungshelden_{listing['id']}.png"
|
||||
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
|
||||
logger.info(f"[DEGEWO] Saved Wohnungshelden screenshot to {screenshot_path}")
|
||||
html_content = await iframe_page.content()
|
||||
html_path = DATA_DIR / f"degewo_wohnungshelden_{listing['id']}.html"
|
||||
with open(html_path, 'w', encoding='utf-8') as f:
|
||||
f.write(html_content)
|
||||
logger.info(f"[DEGEWO] Saved HTML to {html_path}")
|
||||
|
||||
# Fill out Wohnungshelden form
|
||||
form_filled = False
|
||||
# Anrede (Salutation)
|
||||
try:
|
||||
salutation_dropdown = await iframe_page.query_selector('#salutation-dropdown, ng-select[id*="salutation"]')
|
||||
if salutation_dropdown:
|
||||
await salutation_dropdown.click()
|
||||
await asyncio.sleep(0.5)
|
||||
anrede_option = await iframe_page.query_selector(f'.ng-option:has-text("{os.environ.get("FORM_ANREDE", "Herr")}")')
|
||||
if anrede_option:
|
||||
await anrede_option.click()
|
||||
logger.info(f"[DEGEWO] Selected Anrede: {os.environ.get('FORM_ANREDE', 'Herr')}")
|
||||
form_filled = True
|
||||
except Exception as e:
|
||||
logger.warning(f"[DEGEWO] Could not set Anrede: {e}")
|
||||
|
||||
# Vorname
|
||||
try:
|
||||
vorname_field = await iframe_page.query_selector('#firstName')
|
||||
if vorname_field:
|
||||
await vorname_field.fill(os.environ.get("FORM_VORNAME", "Max"))
|
||||
logger.info(f"[DEGEWO] Filled Vorname: {os.environ.get('FORM_VORNAME', 'Max')}")
|
||||
form_filled = True
|
||||
except Exception as e:
|
||||
logger.warning(f"[DEGEWO] Could not fill Vorname: {e}")
|
||||
|
||||
# Nachname
|
||||
try:
|
||||
nachname_field = await iframe_page.query_selector('#lastName')
|
||||
if nachname_field:
|
||||
await nachname_field.fill(os.environ.get("FORM_NACHNAME", "Mustermann"))
|
||||
logger.info(f"[DEGEWO] Filled Nachname: {os.environ.get('FORM_NACHNAME', 'Mustermann')}")
|
||||
form_filled = True
|
||||
except Exception as e:
|
||||
logger.warning(f"[DEGEWO] Could not fill Nachname: {e}")
|
||||
|
||||
# E-Mail
|
||||
try:
|
||||
email_field = await iframe_page.query_selector('#email')
|
||||
if email_field:
|
||||
await email_field.fill(os.environ.get("FORM_EMAIL", "test@example.com"))
|
||||
logger.info(f"[DEGEWO] Filled E-Mail: {os.environ.get('FORM_EMAIL', 'test@example.com')}")
|
||||
form_filled = True
|
||||
except Exception as e:
|
||||
logger.warning(f"[DEGEWO] Could not fill E-Mail: {e}")
|
||||
|
||||
# Telefonnummer
|
||||
try:
|
||||
tel_field = await iframe_page.query_selector('input[id*="telefonnummer"]')
|
||||
if tel_field:
|
||||
await tel_field.fill(os.environ.get("FORM_PHONE", "0123456789"))
|
||||
logger.info(f"[DEGEWO] Filled Telefon: {os.environ.get('FORM_PHONE', '0123456789')}")
|
||||
form_filled = True
|
||||
except Exception as e:
|
||||
logger.warning(f"[DEGEWO] Could not fill Telefon: {e}")
|
||||
|
||||
# Anzahl einziehende Personen
|
||||
try:
|
||||
personen_field = await iframe_page.query_selector('input[id*="numberPersonsTotal"]')
|
||||
if personen_field:
|
||||
await personen_field.fill(os.environ.get("FORM_PERSONS", "1"))
|
||||
logger.info(f"[DEGEWO] Filled Anzahl Personen: {os.environ.get('FORM_PERSONS', '1')}")
|
||||
form_filled = True
|
||||
except Exception as e:
|
||||
logger.warning(f"[DEGEWO] Could not fill Anzahl Personen: {e}")
|
||||
|
||||
# "Für sich selbst" dropdown
|
||||
try:
|
||||
selbst_dropdown = await iframe_page.query_selector('ng-select[id*="fuer_wen"]')
|
||||
if selbst_dropdown:
|
||||
await selbst_dropdown.click()
|
||||
await asyncio.sleep(0.5)
|
||||
selbst_option = await iframe_page.query_selector('.ng-option:has-text("Für mich selbst"), .ng-option:has-text("selbst")')
|
||||
if selbst_option:
|
||||
await selbst_option.click()
|
||||
logger.info("[DEGEWO] Selected: Für mich selbst")
|
||||
form_filled = True
|
||||
except Exception as e:
|
||||
logger.warning(f"[DEGEWO] Could not set 'Für sich selbst': {e}")
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Take screenshot after filling form
|
||||
screenshot_path = DATA_DIR / f"degewo_form_filled_{listing['id']}.png"
|
||||
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
|
||||
logger.info(f"[DEGEWO] Saved filled form screenshot to {screenshot_path}")
|
||||
|
||||
# Try to submit
|
||||
try:
|
||||
submit_selectors = [
|
||||
'button[type="submit"]',
|
||||
'input[type="submit"]',
|
||||
'button:has-text("Absenden")',
|
||||
'button:has-text("Senden")',
|
||||
'button:has-text("Anfrage")',
|
||||
'button:has-text("Bewerben")',
|
||||
'button:has-text("Submit")',
|
||||
'.btn-primary',
|
||||
'.submit-btn',
|
||||
]
|
||||
|
||||
submit_btn = None
|
||||
for selector in submit_selectors:
|
||||
submit_btn = await iframe_page.query_selector(selector)
|
||||
if submit_btn and await submit_btn.is_visible():
|
||||
logger.info(f"[DEGEWO] Found submit button with selector: {selector}")
|
||||
break
|
||||
submit_btn = None
|
||||
|
||||
if submit_btn:
|
||||
await submit_btn.click()
|
||||
logger.info("[DEGEWO] Clicked submit button")
|
||||
await asyncio.sleep(3)
|
||||
|
||||
# Take screenshot after submission
|
||||
screenshot_path = DATA_DIR / f"degewo_submitted_{listing['id']}.png"
|
||||
await iframe_page.screenshot(path=str(screenshot_path), full_page=True)
|
||||
logger.info(f"[DEGEWO] Saved submission screenshot to {screenshot_path}")
|
||||
|
||||
result["success"] = True
|
||||
result["message"] = "Application submitted via Wohnungshelden"
|
||||
else:
|
||||
result["success"] = False
|
||||
result["message"] = "Wohnungshelden form loaded but submit button not found"
|
||||
logger.warning("[DEGEWO] Submit button not found in Wohnungshelden form")
|
||||
except Exception as e:
|
||||
result["success"] = False
|
||||
result["message"] = f"Wohnungshelden submit error: {str(e)}"
|
||||
logger.warning(f"[DEGEWO] Submit error: {e}")
|
||||
finally:
|
||||
await iframe_page.close()
|
||||
else:
|
||||
# No iframe found - try the old approach (fallback for different page structure)
|
||||
logger.warning("[DEGEWO] Wohnungshelden iframe not found, trying direct form...")
|
||||
# TODO: Implement fallback logic here
|
||||
screenshot_path = DATA_DIR / f"degewo_noiframe_{listing['id']}.png"
|
||||
await page.screenshot(path=str(screenshot_path), full_page=True)
|
||||
html_content = await page.content()
|
||||
html_path = DATA_DIR / "degewo_debug.html"
|
||||
with open(html_path, 'w', encoding='utf-8') as f:
|
||||
f.write(html_content)
|
||||
result["success"] = False
|
||||
result["message"] = "Wohnungshelden iframe not found on page"
|
||||
else:
|
||||
result["message"] = "No kontaktieren button found"
|
||||
logger.warning("[DEGEWO] Could not find kontaktieren button")
|
||||
screenshot_path = DATA_DIR / f"degewo_nobtn_{listing['id']}.png"
|
||||
await page.screenshot(path=str(screenshot_path), full_page=True)
|
||||
|
||||
await page.close()
|
||||
return result
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ class WGCompanyNotifier:
|
|||
logger.info("[WGCOMPANY] Browser initialized")
|
||||
|
||||
async def fetch_listings(self):
|
||||
await self.init_browser()
|
||||
listings = []
|
||||
try:
|
||||
page = await self.context.new_page()
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
requests>=2.31.0
|
||||
httpx>=0.24.0
|
||||
playwright>=1.57.0
|
||||
matplotlib>=3.8.0
|
||||
pandas>=2.0.0
|
||||
|
|
|
|||
185
telegram_bot.py
185
telegram_bot.py
|
|
@ -1,4 +1,5 @@
|
|||
|
||||
|
||||
import os
|
||||
import logging
|
||||
import threading
|
||||
|
|
@ -7,6 +8,7 @@ import requests
|
|||
import asyncio
|
||||
|
||||
|
||||
|
||||
# Configuration from environment
|
||||
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
|
||||
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
|
||||
|
|
@ -16,9 +18,30 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class TelegramBot:
|
||||
"""Handle Telegram commands for controlling the monitor"""
|
||||
|
||||
def _handle_reset_listings_command(self):
|
||||
async def _handle_help_command(self):
|
||||
"""Send a help message with available commands."""
|
||||
help_text = (
|
||||
"<b>Available commands:</b>\n"
|
||||
"/autopilot on|off - Enable/disable autopilot\n"
|
||||
"/status - Show current status\n"
|
||||
"/plot - Show weekly listing pattern plot\n"
|
||||
"/errorrate - Show autopilot error rate plot\n"
|
||||
"/retryfailed - Retry failed applications\n"
|
||||
"/resetlistings - Reset listings file\n"
|
||||
"/help - Show this help message"
|
||||
)
|
||||
await self._send_message(help_text)
|
||||
|
||||
async def _handle_unknown_command(self, text):
|
||||
"""Handle unknown commands and notify the user."""
|
||||
cmd = text.split()[0] if text else text
|
||||
msg = (
|
||||
f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands."
|
||||
)
|
||||
await self._send_message(msg)
|
||||
|
||||
async def _handle_reset_listings_command(self):
|
||||
"""Move listings.json to data/old/ with a timestamp, preserving statistics and application history."""
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
|
|
@ -29,15 +52,20 @@ class TelegramBot:
|
|||
# Ensure old_dir exists
|
||||
os.makedirs(old_dir, exist_ok=True)
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
dest_path = os.path.join(old_dir, f"listings_{timestamp}.json")
|
||||
dest_path = os.path.join(
|
||||
old_dir, f"listings_{timestamp}.json"
|
||||
)
|
||||
shutil.move(listings_path, dest_path)
|
||||
msg = f"🗑️ <b>Listings reset:</b>\n<code>listings.json</code> moved to <code>old/listings_{timestamp}.json</code>."
|
||||
msg = (
|
||||
f"🗑️ <b>Listings reset:</b>\n<code>listings.json</code> moved to "
|
||||
f"<code>old/listings_{timestamp}.json</code>."
|
||||
)
|
||||
else:
|
||||
msg = "ℹ️ No listings file found to move."
|
||||
self._send_message(msg)
|
||||
await self._send_message(msg)
|
||||
except Exception as e:
|
||||
logger.error(f"Error resetting listings: {e}")
|
||||
self._send_message(f"❌ Error resetting listings: {str(e)}")
|
||||
await self._send_message(f"❌ Error resetting listings: {str(e)}")
|
||||
|
||||
def __init__(self, monitor, bot_token=None, chat_id=None, event_loop=None):
|
||||
self.monitor = monitor
|
||||
|
|
@ -89,44 +117,31 @@ class TelegramBot:
|
|||
logger.debug(f"Ignoring message from unknown chat: {chat_id}")
|
||||
return
|
||||
logger.info(f"Received Telegram command: {text}")
|
||||
loop = self.event_loop
|
||||
if text.startswith("/autopilot"):
|
||||
self._handle_autopilot_command(text)
|
||||
asyncio.run_coroutine_threadsafe(self._handle_autopilot_command(text), loop)
|
||||
elif text == "/status":
|
||||
self._handle_status_command()
|
||||
asyncio.run_coroutine_threadsafe(self._handle_status_command(), loop)
|
||||
elif text == "/help":
|
||||
self._handle_help_command()
|
||||
asyncio.run_coroutine_threadsafe(self._handle_help_command(), loop)
|
||||
elif text == "/plot":
|
||||
self._handle_plot_command()
|
||||
asyncio.run_coroutine_threadsafe(self._handle_plot_command(), loop)
|
||||
elif text == "/errorrate":
|
||||
self._handle_error_rate_command()
|
||||
asyncio.run_coroutine_threadsafe(self._handle_error_rate_command(), loop)
|
||||
elif text == "/retryfailed":
|
||||
# Schedule coroutine on the main event loop for thread safety
|
||||
fut = asyncio.run_coroutine_threadsafe(
|
||||
self._handle_retry_failed_command(max_retries=TELEGRAM_MAX_RETRIES),
|
||||
self.event_loop
|
||||
loop
|
||||
)
|
||||
# Optionally, wait for result or handle exceptions
|
||||
try:
|
||||
fut.result()
|
||||
except Exception as e:
|
||||
logger.error(f"/retryfailed command failed: {e}")
|
||||
elif text == "/resetlistings":
|
||||
self._handle_reset_listings_command()
|
||||
asyncio.run_coroutine_threadsafe(self._handle_reset_listings_command(), loop)
|
||||
elif text.startswith("/"):
|
||||
self._handle_unknown_command(text)
|
||||
def _handle_reset_listings_command(self):
|
||||
"""Delete listings.json (not wgcompany_listings.json), but preserve statistics and application history."""
|
||||
try:
|
||||
listings_path = os.path.join("data", "listings.json")
|
||||
if os.path.exists(listings_path):
|
||||
os.remove(listings_path)
|
||||
msg = "🗑️ <b>Listings reset:</b>\n<code>listings.json</code> deleted."
|
||||
else:
|
||||
msg = "ℹ️ No listings file found to delete."
|
||||
self._send_message(msg)
|
||||
except Exception as e:
|
||||
logger.error(f"Error resetting listings: {e}")
|
||||
self._send_message(f"❌ Error resetting listings: {str(e)}")
|
||||
asyncio.run_coroutine_threadsafe(self._handle_unknown_command(text), loop)
|
||||
|
||||
async def _handle_retry_failed_command(self, max_retries: int = 3):
|
||||
"""Retry all failed applications up to max_retries."""
|
||||
# Ensure browser context is initialized
|
||||
|
|
@ -137,11 +152,11 @@ class TelegramBot:
|
|||
if hasattr(self.app_handler, 'context') and hasattr(self.app_handler, 'handlers'):
|
||||
for handler in self.app_handler.handlers.values():
|
||||
handler.context = self.app_handler.context
|
||||
self._send_message(f"🔄 Retrying failed applications (max retries: {max_retries})...")
|
||||
applications = self.app_handler.load_applications()
|
||||
failed = [app for app in applications.values() if not app.get("success") and app.get("retries", 0) < max_retries]
|
||||
await self._send_message(f"🔄 Retrying {len(failed)} failed applications (max retries: {max_retries})...")
|
||||
if not failed:
|
||||
self._send_message("✅ No failed applications to retry (or all reached max retries).")
|
||||
await self._send_message("✅ No failed applications to retry (or all reached max retries).")
|
||||
return
|
||||
results = {}
|
||||
details = []
|
||||
|
|
@ -170,26 +185,26 @@ class TelegramBot:
|
|||
summary = f"🔄 Retried {len(results)} failed applications.\n✅ Success: {n_success}\n❌ Still failed: {n_fail}"
|
||||
if details:
|
||||
summary += "\n\n<b>Details:</b>\n" + "\n".join(details)
|
||||
self._send_message(summary)
|
||||
await self._send_message(summary)
|
||||
|
||||
def _handle_autopilot_command(self, text):
|
||||
async def _handle_autopilot_command(self, text):
|
||||
logger.info(f"Processing autopilot command: {text}")
|
||||
parts = text.split()
|
||||
if len(parts) < 2:
|
||||
self._send_message("Usage: /autopilot on|off")
|
||||
await self._send_message("Usage: /autopilot on|off")
|
||||
return
|
||||
action = parts[1].lower()
|
||||
if action == "on":
|
||||
logger.info("Enabling autopilot mode")
|
||||
self.monitor.set_autopilot(True)
|
||||
self._send_message("🤖 <b>Autopilot ENABLED</b>\n\nI will automatically apply to new listings!")
|
||||
await self._send_message("🤖 <b>Autopilot ENABLED</b>\n\nI will automatically apply to new listings!")
|
||||
elif action == "off":
|
||||
self.monitor.set_autopilot(False)
|
||||
self._send_message("🛑 <b>Autopilot DISABLED</b>\n\nI will only notify you of new listings.")
|
||||
await self._send_message("🛑 <b>Autopilot DISABLED</b>\n\nI will only notify you of new listings.")
|
||||
else:
|
||||
self._send_message("Usage: /autopilot on|off")
|
||||
await self._send_message("Usage: /autopilot on|off")
|
||||
|
||||
def _handle_status_command(self):
|
||||
async def _handle_status_command(self):
|
||||
state = self.app_handler.load_state()
|
||||
autopilot = state.get("autopilot", False)
|
||||
applications = self.app_handler.load_applications()
|
||||
|
|
@ -203,89 +218,83 @@ class TelegramBot:
|
|||
status += "\n\n<b>By company:</b>"
|
||||
for company, count in sorted(by_company.items()):
|
||||
status += f"\n • {company}: {count}"
|
||||
self._send_message(status)
|
||||
await self._send_message(status)
|
||||
|
||||
def _handle_plot_command(self):
|
||||
async def _handle_plot_command(self):
|
||||
logger.info("Generating listing times plot...")
|
||||
try:
|
||||
plot_path = self.app_handler._generate_weekly_plot()
|
||||
self._send_photo(plot_path, "\U0001f4ca <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
|
||||
await self._send_photo(plot_path, "\U0001f4ca <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating plot: {e}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
self._send_message(f"\u274c Error generating plot: {str(e)}")
|
||||
cmd = text.split()[0] if text else text
|
||||
self._send_message(f"❓ Unknown command: <code>{cmd}</code>\n\nUse /help to see available commands.")
|
||||
await self._send_message(f"\u274c Error generating plot: {str(e)}")
|
||||
|
||||
def _handle_error_rate_command(self):
|
||||
async def _handle_error_rate_command(self):
|
||||
logger.info("Generating autopilot errorrate plot...")
|
||||
try:
|
||||
plot_path, summary = self.app_handler._generate_error_rate_plot()
|
||||
caption = "📉 <b>Autopilot Success vs Failure</b>\n\n" + summary
|
||||
self._send_photo(plot_path, caption)
|
||||
await self._send_photo(plot_path, caption)
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating errorrate plot: {e}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
self._send_message(f"❌ Error generating errorrate plot: {str(e)}")
|
||||
await self._send_message(f"❌ Error generating errorrate plot: {str(e)}")
|
||||
|
||||
def _handle_plot_command(self):
|
||||
logger.info("Generating listing times plot...")
|
||||
try:
|
||||
plot_path = self.app_handler._generate_weekly_plot()
|
||||
if plot_path:
|
||||
self._send_photo(plot_path, "📊 <b>Weekly Listing Patterns</b>\n\nThis shows when new listings typically appear throughout the week.")
|
||||
else:
|
||||
self._send_message("📊 Not enough data to generate plot yet. Keep monitoring!")
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating plot: {e}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
self._send_message(f"❌ Error generating plot: {str(e)}")
|
||||
|
||||
def _send_message(self, text):
|
||||
"""Send a text message to the configured Telegram chat, with detailed error logging."""
|
||||
async def _send_message(self, text):
|
||||
"""Send a text message to the configured Telegram chat, with detailed error logging (async)."""
|
||||
import httpx
|
||||
MAX_LENGTH = 4096 # Telegram message character limit
|
||||
if not self.bot_token or not self.chat_id:
|
||||
logger.warning("Telegram bot token or chat ID not configured, cannot send message")
|
||||
return
|
||||
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
|
||||
payload = {"chat_id": self.chat_id, "text": text, "parse_mode": "HTML"}
|
||||
# Split message into chunks if too long
|
||||
messages = []
|
||||
if isinstance(text, str) and len(text) > MAX_LENGTH:
|
||||
# Try to split on line breaks for readability
|
||||
lines = text.split('\n')
|
||||
chunk = ""
|
||||
for line in lines:
|
||||
if len(chunk) + len(line) + 1 > MAX_LENGTH:
|
||||
messages.append(chunk)
|
||||
chunk = line
|
||||
else:
|
||||
if chunk:
|
||||
chunk += "\n"
|
||||
chunk += line
|
||||
if chunk:
|
||||
messages.append(chunk)
|
||||
else:
|
||||
messages = [text]
|
||||
try:
|
||||
response = requests.post(url, json=payload, timeout=10)
|
||||
logger.info(f"[TELEGRAM] Sent message: status={response.status_code}, ok={response.ok}, response={response.text}")
|
||||
if not response.ok:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
for idx, msg in enumerate(messages):
|
||||
payload = {"chat_id": self.chat_id, "text": msg, "parse_mode": "HTML"}
|
||||
response = await client.post(url, json=payload)
|
||||
logger.info(f"[TELEGRAM] Sent message part {idx+1}/{len(messages)}: status={response.status_code}, ok={response.is_success}")
|
||||
if not response.is_success:
|
||||
logger.error(f"Failed to send Telegram message: {response.text}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error while sending Telegram message: {e}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
def _send_photo(self, photo_path, caption):
|
||||
"""Send a photo to the configured Telegram chat."""
|
||||
async def _send_photo(self, photo_path, caption):
|
||||
"""Send a photo to the configured Telegram chat (async)."""
|
||||
import httpx
|
||||
if not self.bot_token or not self.chat_id:
|
||||
logger.warning("Telegram bot token or chat ID not configured, cannot send photo")
|
||||
return
|
||||
url = f"https://api.telegram.org/bot{self.bot_token}/sendPhoto"
|
||||
with open(photo_path, "rb") as photo:
|
||||
payload = {"chat_id": self.chat_id, "caption": caption, "parse_mode": "HTML"}
|
||||
files = {"photo": photo}
|
||||
try:
|
||||
response = requests.post(url, data=payload, files=files, timeout=10)
|
||||
if not response.ok:
|
||||
with open(photo_path, "rb") as photo:
|
||||
files = {"photo": (photo_path, photo, "image/jpeg")}
|
||||
data = {"chat_id": self.chat_id, "caption": caption, "parse_mode": "HTML"}
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
response = await client.post(url, data=data, files=files)
|
||||
if not response.is_success:
|
||||
logger.error(f"Failed to send Telegram photo: {response.text}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error while sending Telegram photo: {e}")
|
||||
|
||||
def _generate_error_rate_plot(self):
|
||||
"""Generate and send a plot showing success vs failure ratio for autopilot applications."""
|
||||
logger.info("Generating autopilot errorrate plot...")
|
||||
try:
|
||||
plot_path, summary = self.app_handler._generate_error_rate_plot()
|
||||
if plot_path:
|
||||
self._send_photo(plot_path, caption=summary)
|
||||
else:
|
||||
self._send_message("No data available to generate the error rate plot.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating errorrate plot: {e}")
|
||||
self._send_message(f"❌ Error generating errorrate plot: {str(e)}")
|
||||
|
|
@ -16,12 +16,18 @@ def temp_applications_file(tmp_path):
|
|||
@pytest.fixture
|
||||
def application_handler(temp_applications_file, monkeypatch):
|
||||
"""Fixture to create an ApplicationHandler instance with a temporary applications file."""
|
||||
class DummyContext: pass
|
||||
class DummyStateManager: pass
|
||||
monkeypatch.setattr("application_handler.APPLICATIONS_FILE", temp_applications_file)
|
||||
return ApplicationHandler(browser_context=None, state_manager=None)
|
||||
return ApplicationHandler(browser_context=DummyContext(), state_manager=DummyStateManager(), applications_file=temp_applications_file)
|
||||
|
||||
|
||||
import types
|
||||
class DummyContext: pass
|
||||
class DummyStateManager: pass
|
||||
|
||||
def test_detect_company_domains():
|
||||
handler = ApplicationHandler(browser_context=None, state_manager=None)
|
||||
handler = ApplicationHandler(browser_context=DummyContext(), state_manager=DummyStateManager())
|
||||
assert handler._detect_company('https://howoge.de/abc') == 'howoge'
|
||||
assert handler._detect_company('https://www.howoge.de/abc') == 'howoge'
|
||||
assert handler._detect_company('https://portal.gewobag.de/') == 'gewobag'
|
||||
|
|
@ -32,7 +38,7 @@ def test_detect_company_domains():
|
|||
assert handler._detect_company('https://wbm.de/') == 'wbm'
|
||||
|
||||
def test_detect_company_path_fallback():
|
||||
handler = ApplicationHandler(browser_context=None, state_manager=None)
|
||||
handler = ApplicationHandler(browser_context=DummyContext(), state_manager=DummyStateManager())
|
||||
assert handler._detect_company('https://example.com/howoge/abc') == 'howoge'
|
||||
assert handler._detect_company('https://foo.bar/gewobag') == 'gewobag'
|
||||
assert handler._detect_company('https://foo.bar/degewo') == 'degewo'
|
||||
|
|
@ -41,7 +47,7 @@ def test_detect_company_path_fallback():
|
|||
assert handler._detect_company('https://foo.bar/wbm') == 'wbm'
|
||||
|
||||
def test_detect_company_unknown():
|
||||
handler = ApplicationHandler(browser_context=None, state_manager=None)
|
||||
handler = ApplicationHandler(browser_context=DummyContext(), state_manager=DummyStateManager())
|
||||
assert handler._detect_company('https://example.com/') == 'unknown'
|
||||
assert handler._detect_company('') == 'unknown'
|
||||
assert handler._detect_company(None) == 'unknown'
|
||||
|
|
|
|||
|
|
@ -13,7 +13,8 @@ class DummyStateManager:
|
|||
|
||||
def make_handler():
|
||||
# context is not used for _detect_company
|
||||
return ApplicationHandler(browser_context=None, state_manager=DummyStateManager())
|
||||
class DummyContext: pass
|
||||
return ApplicationHandler(browser_context=DummyContext(), state_manager=DummyStateManager())
|
||||
|
||||
def test_detect_company_domains():
|
||||
handler = make_handler()
|
||||
|
|
|
|||
|
|
@ -24,17 +24,23 @@ class DummyStateManager:
|
|||
def is_autopilot_enabled(self): return False
|
||||
|
||||
|
||||
|
||||
@patch("matplotlib.pyplot.savefig")
|
||||
def test_generate_error_rate_plot_no_data(mock_savefig, temp_applications_file):
|
||||
handler = ApplicationHandler(None, DummyStateManager(), applications_file=temp_applications_file)
|
||||
class DummyContext: pass
|
||||
handler = ApplicationHandler(DummyContext(), DummyStateManager(), applications_file=temp_applications_file)
|
||||
plot_path, summary = handler._generate_error_rate_plot()
|
||||
assert plot_path is None or plot_path == ""
|
||||
assert summary == ""
|
||||
|
||||
|
||||
@patch("matplotlib.pyplot.savefig")
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
@patch("matplotlib.figure.Figure.savefig")
|
||||
def test_generate_error_rate_plot_with_data(mock_savefig, temp_applications_file):
|
||||
handler = ApplicationHandler(None, DummyStateManager(), applications_file=temp_applications_file)
|
||||
class DummyContext: pass
|
||||
handler = ApplicationHandler(DummyContext(), DummyStateManager(), applications_file=temp_applications_file)
|
||||
# Write valid data to the temp applications file
|
||||
temp_applications_file.write_text('''
|
||||
{
|
||||
|
|
@ -48,4 +54,4 @@ def test_generate_error_rate_plot_with_data(mock_savefig, temp_applications_file
|
|||
assert "Successes" in summary
|
||||
assert "Failures" in summary
|
||||
assert "Overall error rate" in summary
|
||||
mock_savefig.assert_called_once()
|
||||
mock_savefig.assert_called()
|
||||
|
|
@ -1,3 +1,6 @@
|
|||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.append(str(Path(__file__).parent.parent))
|
||||
import pytest
|
||||
from handlers.howoge_handler import HowogeHandler
|
||||
from handlers.gewobag_handler import GewobagHandler
|
||||
|
|
@ -16,7 +19,7 @@ class MockBaseHandler(BaseHandler):
|
|||
async def test_howoge_handler():
|
||||
context = AsyncMock()
|
||||
handler = HowogeHandler(context)
|
||||
listing = {"link": "https://www.howoge.de/example"}
|
||||
listing = {"link": "https://www.howoge.de/example", "id": "testid"}
|
||||
result = {"success": False}
|
||||
await handler.apply(listing, result)
|
||||
assert "success" in result
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue