import pytest import sys from pathlib import Path import json from unittest.mock import AsyncMock, MagicMock, patch sys.path.append(str(Path(__file__).parent.parent)) from handlers.wgcompany_notifier import WGCompanyNotifier @pytest.fixture def temp_listings_file(tmp_path): """Fixture to create a temporary wgcompany listings file.""" file = tmp_path / "wgcompany_listings.json" file.write_text("{}", encoding="utf-8") return file @pytest.fixture def temp_timing_file(tmp_path): """Fixture to create a temporary wgcompany timing file.""" file = tmp_path / "wgcompany_times.csv" return file @pytest.fixture def wgcompany_notifier(temp_listings_file, temp_timing_file, monkeypatch): """Fixture to create a WGCompanyNotifier instance with temporary files.""" monkeypatch.setattr("handlers.wgcompany_notifier.WGCOMPANY_LISTINGS_FILE", temp_listings_file) monkeypatch.setattr("handlers.wgcompany_notifier.WGCOMPANY_TIMING_FILE", temp_timing_file) mock_telegram_bot = MagicMock() mock_telegram_bot._send_message = AsyncMock() return WGCompanyNotifier(telegram_bot=mock_telegram_bot, refresh_minutes=10) @pytest.mark.asyncio async def test_init_browser(wgcompany_notifier): """Test browser initialization.""" await wgcompany_notifier.init_browser() assert wgcompany_notifier.browser is not None assert wgcompany_notifier.context is not None await wgcompany_notifier.browser.close() def test_load_previous_listings_empty(wgcompany_notifier): """Test loading previous listings when file is empty.""" listings = wgcompany_notifier.load_previous_listings() assert listings == {} def test_save_and_load_listings(wgcompany_notifier): """Test saving and loading listings.""" test_listings = [ { "id": "abc123", "rooms": "1 Zimmer (WG)", "size": "20 m²", "price": "500 €", "address": "Kreuzberg", "link": "http://example.com/wg1", "source": "wgcompany" } ] wgcompany_notifier.save_listings(test_listings) loaded = wgcompany_notifier.load_previous_listings() assert "abc123" in loaded assert loaded["abc123"]["price"] == "500 €" def test_find_new_listings(wgcompany_notifier): """Test finding new listings.""" current = [ {"id": "1", "link": "http://example.com/1"}, {"id": "2", "link": "http://example.com/2"}, {"id": "3", "link": "http://example.com/3"} ] previous = { "1": {"id": "1", "link": "http://example.com/1"} } new_listings = wgcompany_notifier.find_new_listings(current, previous) assert len(new_listings) == 2 assert new_listings[0]["id"] == "2" assert new_listings[1]["id"] == "3" def test_find_new_listings_empty(wgcompany_notifier): """Test finding new listings when all are already seen.""" current = [ {"id": "1", "link": "http://example.com/1"} ] previous = { "1": {"id": "1", "link": "http://example.com/1"} } new_listings = wgcompany_notifier.find_new_listings(current, previous) assert len(new_listings) == 0 def test_log_listing_times(wgcompany_notifier, temp_timing_file): """Test logging listing times to CSV.""" new_listings = [ { "id": "test123", "rooms": "1 Zimmer (WG)", "size": "20 m²", "price": "500 €", "address": "Kreuzberg" } ] wgcompany_notifier.log_listing_times(new_listings) assert temp_timing_file.exists() content = temp_timing_file.read_text() assert "timestamp" in content assert "test123" in content @pytest.mark.asyncio async def test_notify_new_listings(wgcompany_notifier): """Test notifying new listings via Telegram.""" new_listings = [ { "id": "test123", "rooms": "1 Zimmer (WG)", "size": "20 m²", "price": "500 €", "address": "Kreuzberg", "link": "http://example.com/wg1" } ] await wgcompany_notifier.notify_new_listings(new_listings) wgcompany_notifier.telegram_bot._send_message.assert_called_once() call_args = wgcompany_notifier.telegram_bot._send_message.call_args[0][0] assert "WGCOMPANY" in call_args assert "Kreuzberg" in call_args assert "500 €" in call_args