2026-01-01 15:27:25 +01:00
|
|
|
import pytest
|
|
|
|
|
import sys
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
import json
|
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
|
sys.path.append(str(Path(__file__).parent.parent))
|
|
|
|
|
from handlers.wgcompany_notifier import WGCompanyNotifier
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def temp_listings_file(tmp_path):
|
|
|
|
|
"""Fixture to create a temporary wgcompany listings file."""
|
|
|
|
|
file = tmp_path / "wgcompany_listings.json"
|
|
|
|
|
file.write_text("{}", encoding="utf-8")
|
|
|
|
|
return file
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def temp_timing_file(tmp_path):
|
|
|
|
|
"""Fixture to create a temporary wgcompany timing file."""
|
|
|
|
|
file = tmp_path / "wgcompany_times.csv"
|
|
|
|
|
return file
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def wgcompany_notifier(temp_listings_file, temp_timing_file, monkeypatch):
|
|
|
|
|
"""Fixture to create a WGCompanyNotifier instance with temporary files."""
|
|
|
|
|
monkeypatch.setattr("handlers.wgcompany_notifier.WGCOMPANY_LISTINGS_FILE", temp_listings_file)
|
|
|
|
|
monkeypatch.setattr("handlers.wgcompany_notifier.WGCOMPANY_TIMING_FILE", temp_timing_file)
|
|
|
|
|
mock_telegram_bot = MagicMock()
|
|
|
|
|
mock_telegram_bot._send_message = AsyncMock()
|
|
|
|
|
return WGCompanyNotifier(telegram_bot=mock_telegram_bot, refresh_minutes=10)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_init_browser(wgcompany_notifier):
|
|
|
|
|
"""Test browser initialization."""
|
|
|
|
|
await wgcompany_notifier.init_browser()
|
|
|
|
|
assert wgcompany_notifier.browser is not None
|
|
|
|
|
assert wgcompany_notifier.context is not None
|
|
|
|
|
await wgcompany_notifier.browser.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_load_previous_listings_empty(wgcompany_notifier):
|
|
|
|
|
"""Test loading previous listings when file is empty."""
|
|
|
|
|
listings = wgcompany_notifier.load_previous_listings()
|
|
|
|
|
assert listings == {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_save_and_load_listings(wgcompany_notifier):
|
|
|
|
|
"""Test saving and loading listings."""
|
|
|
|
|
test_listings = [
|
|
|
|
|
{
|
|
|
|
|
"id": "abc123",
|
|
|
|
|
"rooms": "1 Zimmer (WG)",
|
|
|
|
|
"size": "20 m²",
|
|
|
|
|
"price": "500 €",
|
|
|
|
|
"address": "Kreuzberg",
|
|
|
|
|
"link": "http://example.com/wg1",
|
|
|
|
|
"source": "wgcompany"
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
wgcompany_notifier.save_listings(test_listings)
|
|
|
|
|
loaded = wgcompany_notifier.load_previous_listings()
|
|
|
|
|
assert "abc123" in loaded
|
|
|
|
|
assert loaded["abc123"]["price"] == "500 €"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_find_new_listings(wgcompany_notifier):
|
|
|
|
|
"""Test finding new listings."""
|
|
|
|
|
current = [
|
|
|
|
|
{"id": "1", "link": "http://example.com/1"},
|
|
|
|
|
{"id": "2", "link": "http://example.com/2"},
|
|
|
|
|
{"id": "3", "link": "http://example.com/3"}
|
|
|
|
|
]
|
|
|
|
|
previous = {
|
|
|
|
|
"1": {"id": "1", "link": "http://example.com/1"}
|
|
|
|
|
}
|
|
|
|
|
new_listings = wgcompany_notifier.find_new_listings(current, previous)
|
|
|
|
|
assert len(new_listings) == 2
|
|
|
|
|
assert new_listings[0]["id"] == "2"
|
|
|
|
|
assert new_listings[1]["id"] == "3"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_find_new_listings_empty(wgcompany_notifier):
|
|
|
|
|
"""Test finding new listings when all are already seen."""
|
|
|
|
|
current = [
|
|
|
|
|
{"id": "1", "link": "http://example.com/1"}
|
|
|
|
|
]
|
|
|
|
|
previous = {
|
|
|
|
|
"1": {"id": "1", "link": "http://example.com/1"}
|
|
|
|
|
}
|
|
|
|
|
new_listings = wgcompany_notifier.find_new_listings(current, previous)
|
|
|
|
|
assert len(new_listings) == 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_log_listing_times(wgcompany_notifier, temp_timing_file):
|
|
|
|
|
"""Test logging listing times to CSV."""
|
|
|
|
|
new_listings = [
|
|
|
|
|
{
|
|
|
|
|
"id": "test123",
|
|
|
|
|
"rooms": "1 Zimmer (WG)",
|
|
|
|
|
"size": "20 m²",
|
|
|
|
|
"price": "500 €",
|
|
|
|
|
"address": "Kreuzberg"
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
wgcompany_notifier.log_listing_times(new_listings)
|
|
|
|
|
assert temp_timing_file.exists()
|
|
|
|
|
content = temp_timing_file.read_text()
|
|
|
|
|
assert "timestamp" in content
|
|
|
|
|
assert "test123" in content
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_notify_new_listings(wgcompany_notifier):
|
|
|
|
|
"""Test notifying new listings via Telegram."""
|
|
|
|
|
new_listings = [
|
|
|
|
|
{
|
|
|
|
|
"id": "test123",
|
|
|
|
|
"rooms": "1 Zimmer (WG)",
|
|
|
|
|
"size": "20 m²",
|
|
|
|
|
"price": "500 €",
|
|
|
|
|
"address": "Kreuzberg",
|
|
|
|
|
"link": "http://example.com/wg1"
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
await wgcompany_notifier.notify_new_listings(new_listings)
|
|
|
|
|
wgcompany_notifier.telegram_bot._send_message.assert_called_once()
|
|
|
|
|
call_args = wgcompany_notifier.telegram_bot._send_message.call_args[0][0]
|
|
|
|
|
assert "WGCOMPANY" in call_args
|
|
|
|
|
assert "Kreuzberg" in call_args
|
|
|
|
|
assert "500 €" in call_args
|
2026-01-11 12:05:06 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_no_save_on_empty_fetch(wgcompany_notifier):
|
|
|
|
|
"""Test that empty fetch results don't overwrite existing listings."""
|
|
|
|
|
# First save some listings
|
|
|
|
|
existing_listings = [
|
|
|
|
|
{"id": "1", "link": "http://example.com/1", "price": "500 €"},
|
|
|
|
|
{"id": "2", "link": "http://example.com/2", "price": "600 €"}
|
|
|
|
|
]
|
|
|
|
|
wgcompany_notifier.save_listings(existing_listings)
|
|
|
|
|
|
|
|
|
|
# Verify they were saved
|
|
|
|
|
loaded = wgcompany_notifier.load_previous_listings()
|
|
|
|
|
assert len(loaded) == 2
|
|
|
|
|
|
|
|
|
|
# Simulate empty fetch - should not save
|
|
|
|
|
# The run() method should skip save_listings() when fetch returns 0
|
|
|
|
|
# We test this by ensuring the file is not modified
|
|
|
|
|
import time
|
|
|
|
|
before_mtime = Path(wgcompany_notifier.load_previous_listings.__self__.__class__.__module__).parent / "data" / "wgcompany_listings.json"
|
|
|
|
|
|
|
|
|
|
# Just verify the logic directly
|
|
|
|
|
empty_listings = []
|
|
|
|
|
previous = wgcompany_notifier.load_previous_listings()
|
|
|
|
|
|
|
|
|
|
# The fix ensures we don't call save_listings([]) if len(listings) == 0
|
|
|
|
|
# This test confirms the loaded data persists
|
|
|
|
|
assert len(previous) == 2
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_no_save_on_suspiciously_small_fetch(wgcompany_notifier):
|
|
|
|
|
"""Test that suspiciously small fetch results don't overwrite existing listings."""
|
|
|
|
|
# First save many listings
|
|
|
|
|
existing_listings = [
|
|
|
|
|
{"id": str(i), "link": f"http://example.com/{i}", "price": "500 €"}
|
|
|
|
|
for i in range(100)
|
|
|
|
|
]
|
|
|
|
|
wgcompany_notifier.save_listings(existing_listings)
|
|
|
|
|
|
|
|
|
|
# Verify they were saved
|
|
|
|
|
loaded = wgcompany_notifier.load_previous_listings()
|
|
|
|
|
assert len(loaded) == 100
|
|
|
|
|
|
|
|
|
|
# Simulate fetching only 10 listings (10% of previous, less than 50% threshold)
|
|
|
|
|
# The run() method should skip save to prevent data loss
|
|
|
|
|
small_fetch = [{"id": str(i), "link": f"http://example.com/{i}", "price": "500 €"} for i in range(10)]
|
|
|
|
|
|
|
|
|
|
# The fix checks: len(listings) < len(previous) * 0.5
|
|
|
|
|
# 10 < 100 * 0.5 = 10 < 50 = True, so save should be skipped
|
|
|
|
|
assert len(small_fetch) < len(loaded) * 0.5
|
|
|
|
|
|
|
|
|
|
# Verify previous data still intact
|
|
|
|
|
loaded_again = wgcompany_notifier.load_previous_listings()
|
|
|
|
|
assert len(loaded_again) == 100
|