import pytest import sys from pathlib import Path import json from unittest.mock import AsyncMock, MagicMock, patch sys.path.append(str(Path(__file__).parent.parent)) from handlers.wgcompany_notifier import WGCompanyNotifier @pytest.fixture def temp_listings_file(tmp_path): """Fixture to create a temporary wgcompany listings file.""" file = tmp_path / "wgcompany_listings.json" file.write_text("{}", encoding="utf-8") return file @pytest.fixture def temp_timing_file(tmp_path): """Fixture to create a temporary wgcompany timing file.""" file = tmp_path / "wgcompany_times.csv" return file @pytest.fixture def wgcompany_notifier(temp_listings_file, temp_timing_file, monkeypatch): """Fixture to create a WGCompanyNotifier instance with temporary files.""" monkeypatch.setattr("handlers.wgcompany_notifier.WGCOMPANY_LISTINGS_FILE", temp_listings_file) monkeypatch.setattr("handlers.wgcompany_notifier.WGCOMPANY_TIMING_FILE", temp_timing_file) mock_telegram_bot = MagicMock() mock_telegram_bot._send_message = AsyncMock() return WGCompanyNotifier(telegram_bot=mock_telegram_bot, refresh_minutes=10) @pytest.mark.asyncio async def test_init_browser(wgcompany_notifier): """Test browser initialization.""" await wgcompany_notifier.init_browser() assert wgcompany_notifier.browser is not None assert wgcompany_notifier.context is not None await wgcompany_notifier.browser.close() def test_load_previous_listings_empty(wgcompany_notifier): """Test loading previous listings when file is empty.""" listings = wgcompany_notifier.load_previous_listings() assert listings == {} def test_save_and_load_listings(wgcompany_notifier): """Test saving and loading listings.""" test_listings = [ { "id": "abc123", "rooms": "1 Zimmer (WG)", "size": "20 m²", "price": "500 €", "address": "Kreuzberg", "link": "http://example.com/wg1", "source": "wgcompany" } ] wgcompany_notifier.save_listings(test_listings) loaded = wgcompany_notifier.load_previous_listings() assert "abc123" in loaded assert loaded["abc123"]["price"] == "500 €" def test_find_new_listings(wgcompany_notifier): """Test finding new listings.""" current = [ {"id": "1", "link": "http://example.com/1"}, {"id": "2", "link": "http://example.com/2"}, {"id": "3", "link": "http://example.com/3"} ] previous = { "1": {"id": "1", "link": "http://example.com/1"} } new_listings = wgcompany_notifier.find_new_listings(current, previous) assert len(new_listings) == 2 assert new_listings[0]["id"] == "2" assert new_listings[1]["id"] == "3" def test_find_new_listings_empty(wgcompany_notifier): """Test finding new listings when all are already seen.""" current = [ {"id": "1", "link": "http://example.com/1"} ] previous = { "1": {"id": "1", "link": "http://example.com/1"} } new_listings = wgcompany_notifier.find_new_listings(current, previous) assert len(new_listings) == 0 def test_log_listing_times(wgcompany_notifier, temp_timing_file): """Test logging listing times to CSV.""" new_listings = [ { "id": "test123", "rooms": "1 Zimmer (WG)", "size": "20 m²", "price": "500 €", "address": "Kreuzberg" } ] wgcompany_notifier.log_listing_times(new_listings) assert temp_timing_file.exists() content = temp_timing_file.read_text() assert "timestamp" in content assert "test123" in content @pytest.mark.asyncio async def test_notify_new_listings(wgcompany_notifier): """Test notifying new listings via Telegram.""" new_listings = [ { "id": "test123", "rooms": "1 Zimmer (WG)", "size": "20 m²", "price": "500 €", "address": "Kreuzberg", "link": "http://example.com/wg1" } ] await wgcompany_notifier.notify_new_listings(new_listings) wgcompany_notifier.telegram_bot._send_message.assert_called_once() call_args = wgcompany_notifier.telegram_bot._send_message.call_args[0][0] assert "WGCOMPANY" in call_args assert "Kreuzberg" in call_args assert "500 €" in call_args def test_no_save_on_empty_fetch(wgcompany_notifier): """Test that empty fetch results don't overwrite existing listings.""" # First save some listings existing_listings = [ {"id": "1", "link": "http://example.com/1", "price": "500 €"}, {"id": "2", "link": "http://example.com/2", "price": "600 €"} ] wgcompany_notifier.save_listings(existing_listings) # Verify they were saved loaded = wgcompany_notifier.load_previous_listings() assert len(loaded) == 2 # Simulate empty fetch - should not save # The run() method should skip save_listings() when fetch returns 0 # We test this by ensuring the file is not modified import time before_mtime = Path(wgcompany_notifier.load_previous_listings.__self__.__class__.__module__).parent / "data" / "wgcompany_listings.json" # Just verify the logic directly empty_listings = [] previous = wgcompany_notifier.load_previous_listings() # The fix ensures we don't call save_listings([]) if len(listings) == 0 # This test confirms the loaded data persists assert len(previous) == 2 def test_no_save_on_suspiciously_small_fetch(wgcompany_notifier): """Test that suspiciously small fetch results don't overwrite existing listings.""" # First save many listings existing_listings = [ {"id": str(i), "link": f"http://example.com/{i}", "price": "500 €"} for i in range(100) ] wgcompany_notifier.save_listings(existing_listings) # Verify they were saved loaded = wgcompany_notifier.load_previous_listings() assert len(loaded) == 100 # Simulate fetching only 10 listings (10% of previous, less than 50% threshold) # The run() method should skip save to prevent data loss small_fetch = [{"id": str(i), "link": f"http://example.com/{i}", "price": "500 €"} for i in range(10)] # The fix checks: len(listings) < len(previous) * 0.5 # 10 < 100 * 0.5 = 10 < 50 = True, so save should be skipped assert len(small_fetch) < len(loaded) * 0.5 # Verify previous data still intact loaded_again = wgcompany_notifier.load_previous_listings() assert len(loaded_again) == 100