wohnbot/helper_functions/merge_all_data.py

211 lines
7.6 KiB
Python
Raw Permalink Normal View History

2026-01-08 21:04:43 +01:00
#!/usr/bin/env python3
"""
Merge all data from prod and dev environments.
Handles applications.json, listings.json, wgcompany_listings.json, and CSV files.
For failed applications with duplicates, keeps the earlier timestamp.
"""
import json
import pandas as pd
from pathlib import Path
from datetime import datetime
def parse_timestamp(ts_str):
"""Parse ISO format timestamp string to datetime object."""
if ts_str:
try:
return datetime.fromisoformat(ts_str)
except Exception:
return None
return None
def merge_applications(local_path, merge_path, output_path=None):
"""
Merge two applications.json files, deduplicate by listing_id.
Special handling: For failed applications with duplicates, keep the earlier timestamp.
For successful applications, keep the entry with more complete data.
"""
output_path = output_path or local_path
with open(local_path, encoding='utf-8') as f:
local = json.load(f)
with open(merge_path, encoding='utf-8') as f:
remote = json.load(f)
merged = {}
all_keys = set(local.keys()) | set(remote.keys())
for key in all_keys:
l_entry = local.get(key)
r_entry = remote.get(key)
if l_entry and r_entry:
# Both have this application
l_success = l_entry.get('success', False)
r_success = r_entry.get('success', False)
l_ts = parse_timestamp(l_entry.get('timestamp'))
r_ts = parse_timestamp(r_entry.get('timestamp'))
# If both failed, keep the one with earlier timestamp (to avoid timestamp corruption bug)
if not l_success and not r_success:
if l_ts and r_ts:
merged[key] = l_entry if l_ts < r_ts else r_entry
else:
merged[key] = l_entry # fallback if timestamp missing
# If one succeeded and one failed, keep the successful one
elif l_success and not r_success:
merged[key] = l_entry
elif r_success and not l_success:
merged[key] = r_entry
# If both succeeded, prefer entry with more fields, or latest timestamp
else:
if len(l_entry) > len(r_entry):
merged[key] = l_entry
elif len(r_entry) > len(l_entry):
merged[key] = r_entry
else:
# Same length, prefer latest timestamp
if l_ts and r_ts:
merged[key] = l_entry if l_ts > r_ts else r_entry
else:
merged[key] = l_entry
else:
# Only one has this application
merged[key] = l_entry or r_entry
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(merged, f, ensure_ascii=False, indent=2)
print(f"✓ Merged applications: {len(merged)} unique entries → {output_path}")
return merged
def merge_dict_json(local_path, merge_path, output_path=None, timestamp_field='fetched_at'):
"""
Merge two dict-based JSON files (keyed by id), deduplicate by key.
If duplicate, keep entry with latest timestamp_field.
"""
output_path = output_path or local_path
with open(local_path, encoding='utf-8') as f:
local = json.load(f)
with open(merge_path, encoding='utf-8') as f:
remote = json.load(f)
merged = {}
all_keys = set(local.keys()) | set(remote.keys())
for key in all_keys:
l_entry = local.get(key)
r_entry = remote.get(key)
if l_entry and r_entry:
l_ts = l_entry.get(timestamp_field)
r_ts = r_entry.get(timestamp_field)
if l_ts and r_ts:
merged[key] = l_entry if l_ts > r_ts else r_entry
else:
merged[key] = l_entry
else:
merged[key] = l_entry or r_entry
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(merged, f, ensure_ascii=False, indent=2)
print(f"✓ Merged {Path(local_path).name}: {len(merged)} unique entries → {output_path}")
return merged
def merge_csv_times(local_path, merge_path, output_path=None):
"""
Merge two CSV files with time-series data, deduplicate by all columns.
Keeps unique rows based on all column values.
"""
output_path = output_path or local_path
local_df = pd.read_csv(local_path)
remote_df = pd.read_csv(merge_path)
# Combine and drop duplicates
merged_df = pd.concat([local_df, remote_df], ignore_index=True)
merged_df = merged_df.drop_duplicates()
# Sort by timestamp if present
if 'timestamp' in merged_df.columns:
merged_df = merged_df.sort_values('timestamp')
merged_df.to_csv(output_path, index=False)
print(f"✓ Merged {Path(local_path).name}: {len(merged_df)} rows → {output_path}")
return merged_df
def merge_all_data(local_base_dir="data", merge_base_dir="data/to_merge", output_base_dir=None):
"""
Main function to merge all data from prod and dev environments.
Args:
local_base_dir: Base directory for local (dev) data
merge_base_dir: Base directory for data to merge (prod)
output_base_dir: Output directory (defaults to local_base_dir)
Returns:
dict: Summary of merge results
"""
output_base_dir = output_base_dir or local_base_dir
local_base = Path(local_base_dir)
merge_base = Path(merge_base_dir)
output_base = Path(output_base_dir)
print("=" * 60)
print("MERGING PROD AND DEV DATA")
print("=" * 60)
results = {}
# 1. Merge applications.json (special handling for failed duplicates)
if (local_base / "applications.json").exists() and (merge_base / "applications.json").exists():
results['applications'] = merge_applications(
str(local_base / "applications.json"),
str(merge_base / "applications.json"),
str(output_base / "applications.json")
)
# 2. Merge listings.json
if (local_base / "listings.json").exists() and (merge_base / "listings.json").exists():
results['listings'] = merge_dict_json(
str(local_base / "listings.json"),
str(merge_base / "listings.json"),
str(output_base / "listings.json"),
timestamp_field='fetched_at'
)
# 3. Merge wgcompany_listings.json
if (local_base / "wgcompany_listings.json").exists() and (merge_base / "wgcompany_listings.json").exists():
results['wgcompany_listings'] = merge_dict_json(
str(local_base / "wgcompany_listings.json"),
str(merge_base / "wgcompany_listings.json"),
str(output_base / "wgcompany_listings.json"),
timestamp_field='fetched_at'
)
# 4. Merge listing_times.csv
if (local_base / "listing_times.csv").exists() and (merge_base / "listing_times.csv").exists():
results['listing_times'] = merge_csv_times(
str(local_base / "listing_times.csv"),
str(merge_base / "listing_times.csv"),
str(output_base / "listing_times.csv")
)
# 5. Merge wgcompany_times.csv
if (local_base / "wgcompany_times.csv").exists() and (merge_base / "wgcompany_times.csv").exists():
results['wgcompany_times'] = merge_csv_times(
str(local_base / "wgcompany_times.csv"),
str(merge_base / "wgcompany_times.csv"),
str(output_base / "wgcompany_times.csv")
)
print("=" * 60)
print("MERGE COMPLETE")
print("=" * 60)
return results
if __name__ == "__main__":
# Usage: Place prod data in data/to_merge/ directory, then run this script
merge_all_data()