internal logic error

This commit is contained in:
Aron Petau 2026-01-04 22:51:46 +01:00
parent 044ef111ac
commit cc40121e46
2 changed files with 141 additions and 7 deletions

View file

@ -308,17 +308,17 @@ class ApplicationHandler:
def has_applied(self, listing_id: str) -> bool: def has_applied(self, listing_id: str) -> bool:
""" """
Check if we've already applied to this listing. Check if we've successfully applied to this listing.
Excludes baseline entries from first run (not auto-applied). Only returns True if application was successful.
Failed applications can be retried.
""" """
applications = self.load_applications() applications = self.load_applications()
if listing_id not in applications: if listing_id not in applications:
return False return False
app = applications[listing_id] app = applications[listing_id]
# If message contains "First run, not auto-applied", treat as not applied # Only skip if application was successful
if "First run, not auto-applied" in app.get("message", ""): # Failed applications (success=False) should be retried
return False return app.get("success", False)
return True
def load_previous_listings(self) -> dict: def load_previous_listings(self) -> dict:

View file

@ -2,6 +2,7 @@ import asyncio
import logging import logging
import hashlib import hashlib
import re import re
import csv
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
import json import json
@ -14,6 +15,7 @@ logger = logging.getLogger(__name__)
WGCOMPANY_LISTINGS_FILE = Path("data/wgcompany_listings.json") WGCOMPANY_LISTINGS_FILE = Path("data/wgcompany_listings.json")
WGCOMPANY_TIMING_FILE = Path("data/wgcompany_times.csv") WGCOMPANY_TIMING_FILE = Path("data/wgcompany_times.csv")
CONTACTS_FILE = Path("data/contacts.csv")
# Environment variables for search filters # Environment variables for search filters
WGCOMPANY_MIN_SIZE = os.environ.get("WGCOMPANY_MIN_SIZE", "") WGCOMPANY_MIN_SIZE = os.environ.get("WGCOMPANY_MIN_SIZE", "")
@ -157,10 +159,76 @@ class WGCompanyNotifier:
logger.info(f"[WG] {len(new)} new listing{'s' if len(new) > 1 else ''} detected") logger.info(f"[WG] {len(new)} new listing{'s' if len(new) > 1 else ''} detected")
return new return new
async def fetch_listing_details(self, listing_url: str) -> dict:
"""Fetch detailed information from a listing page including email."""
details = {
"email": "",
"contact_person": "",
"address": "",
"description": "",
"wg_name": ""
}
try:
assert self.context is not None, "Browser context not initialized"
page = await self.context.new_page()
await page.goto(listing_url, wait_until="networkidle")
await asyncio.sleep(1)
content = await page.content()
# Extract email (look for patterns like email: xxx@yyy.zz or Email: xxx)
email_patterns = [
r'[Ee]-?[Mm]ail[:\s]+([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})',
r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})'
]
for pattern in email_patterns:
email_match = re.search(pattern, content)
if email_match:
details["email"] = email_match.group(1)
break
# Extract WG name from URL
wg_match = re.search(r'wg=([^&]+)', listing_url)
if wg_match:
details["wg_name"] = wg_match.group(1)
# Try to extract address or location details
text = await page.inner_text('body')
# Look for address patterns
addr_patterns = [
r'((?:[A-ZÄÖÜ][a-zäöüß]+(?:straße|str\.|platz|weg|allee))\s*\d+)',
r'Adresse[:\s]+([^\n]+)',
r'Lage[:\s]+([^\n]+)'
]
for pattern in addr_patterns:
addr_match = re.search(pattern, text, re.IGNORECASE)
if addr_match:
details["address"] = addr_match.group(1).strip()
break
# Extract contact person name if available
contact_patterns = [
r'Kontakt[:\s]+([A-ZÄÖÜ][a-zäöüß]+(?:\s+[A-ZÄÖÜ][a-zäöüß]+)?)',
r'Ansprechpartner[:\s]+([A-ZÄÖÜ][a-zäöüß]+(?:\s+[A-ZÄÖÜ][a-zäöüß]+)?)',
]
for pattern in contact_patterns:
contact_match = re.search(pattern, text)
if contact_match:
details["contact_person"] = contact_match.group(1).strip()
break
await page.close()
logger.debug(f"[WG] Fetched details: email={details['email']}, wg={details['wg_name']}")
except Exception as e:
logger.error(f"[WG] Error fetching listing details: {e}")
return details
def log_listing_times(self, new_listings): def log_listing_times(self, new_listings):
if not new_listings: if not new_listings:
return return
import csv
file_exists = WGCOMPANY_TIMING_FILE.exists() file_exists = WGCOMPANY_TIMING_FILE.exists()
with open(WGCOMPANY_TIMING_FILE, "a", newline="", encoding="utf-8") as f: with open(WGCOMPANY_TIMING_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f) writer = csv.writer(f)
@ -181,6 +249,65 @@ class WGCompanyNotifier:
]) ])
logger.debug(f"[WG] Logged {len(new_listings)} to CSV") logger.debug(f"[WG] Logged {len(new_listings)} to CSV")
async def save_to_contacts(self, listing: dict, details: dict) -> None:
"""Save new listing to contacts.csv with details."""
try:
# Check if contacts file exists, create with header if not
file_exists = CONTACTS_FILE.exists()
# Read existing contacts to avoid duplicates
existing_urls = set()
if file_exists:
with open(CONTACTS_FILE, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if row.get('ListingLink'):
existing_urls.add(row['ListingLink'])
# Skip if already exists
if listing['link'] in existing_urls:
logger.debug(f"[WG] Listing already in contacts: {listing['link']}")
return
# Prepare row data
wg_name = details.get('wg_name', '')
contact_person = details.get('contact_person', '')
address_full = details.get('address', '') or listing.get('address', '')
# Combine room info with listing details for Notes
notes = f"{listing.get('size', '')} / {listing.get('rooms', '')}; {listing.get('price', '')}"
row = {
'Name': f"WG {wg_name}" if wg_name else "WG (unnamed)",
'ContactPerson': contact_person,
'Platform': 'WGcompany',
'DateContacted': '', # Empty - user will fill when contacting
'Address': address_full,
'ListingLink': listing['link'],
'ContactMethod': f"email: {details.get('email', '')}" if details.get('email') else 'wgcompany message',
'Response': '',
'FollowUpDate': '',
'PreferredMoveIn': '',
'Notes': notes,
'Status': 'open'
}
# Append to CSV
with open(CONTACTS_FILE, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'Name', 'ContactPerson', 'Platform', 'DateContacted', 'Address',
'ListingLink', 'ContactMethod', 'Response', 'FollowUpDate',
'PreferredMoveIn', 'Notes', 'Status'
])
if not file_exists:
writer.writeheader()
writer.writerow(row)
logger.info(f"[WG] Saved to contacts.csv: {row['Name']} - {details.get('email', 'no email')}")
except Exception as e:
logger.error(f"[WG] Error saving to contacts: {e}")
async def notify_new_listings(self, new_listings: list[dict]) -> None: async def notify_new_listings(self, new_listings: list[dict]) -> None:
if not new_listings or not self.telegram_bot: if not new_listings or not self.telegram_bot:
return return
@ -210,6 +337,13 @@ class WGCompanyNotifier:
if new_listings: if new_listings:
logger.info(f"[WGCOMPANY] Found {len(new_listings)} new listing(s)") logger.info(f"[WGCOMPANY] Found {len(new_listings)} new listing(s)")
self.log_listing_times(new_listings) self.log_listing_times(new_listings)
# Fetch details and save to contacts for each new listing
for listing in new_listings:
details = await self.fetch_listing_details(listing['link'])
await self.save_to_contacts(listing, details)
await asyncio.sleep(1) # Be polite with requests
await self.notify_new_listings(new_listings) await self.notify_new_listings(new_listings)
else: else:
logger.info("[WGCOMPANY] No new listings") logger.info("[WGCOMPANY] No new listings")