Coverage for health / services / data_sync.py: 0%
185 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
1"""
2Data synchronization orchestration service.
4Coordinates Garmin API data fetching, storage, and incremental sync logic.
5"""
7from datetime import date, timedelta
8from typing import List, Dict, Any, Optional, Tuple
9from pathlib import Path
11from health import config
12from health.services.garmin_client import GarminHealthClient
13from health.services.storage import HealthStorage
14from health.db.repository import HealthRepository
15from health.utils.exceptions import SyncError, GarminAPIError
16from health.utils.date_utils import (
17 parse_date,
18 date_range,
19 split_date_range,
20 get_yesterday,
21 format_date,
22)
23from health.utils.logging_config import setup_logger
25logger = setup_logger(__name__)
28class HealthDataSync:
29 """Service for orchestrating health data synchronization."""
31 def __init__(
32 self,
33 client: Optional[GarminHealthClient] = None,
34 storage: Optional[HealthStorage] = None,
35 repo: Optional[HealthRepository] = None,
36 ) -> None:
37 """Initialize sync service.
39 Args:
40 client: Optional Garmin client (creates new if not provided)
41 storage: Optional storage service (creates new if not provided)
42 repo: Optional repository (creates new if not provided)
43 """
44 self.client = client or GarminHealthClient()
45 self.storage = storage or HealthStorage()
46 self.repo = repo or HealthRepository()
48 def authenticate(self) -> None:
49 """Authenticate with Garmin Connect.
51 Raises:
52 GarminAuthError: If authentication fails
53 """
54 logger.info("Authenticating with Garmin...")
55 self.client.authenticate()
56 logger.info("Authentication successful")
58 def sync_daily_metric(
59 self, metric_type: str, target_date: date, force: bool = False
60 ) -> bool:
61 """Sync a single daily metric for a specific date.
63 Args:
64 metric_type: Type of metric to sync
65 target_date: Date to sync
66 force: If True, re-sync even if data already exists
68 Returns:
69 True if data was synced, False if skipped or no data available
70 """
71 # Check if already synced (unless forcing)
72 if not force and self.storage.metric_exists(metric_type, target_date):
73 logger.debug(f"Skipping {metric_type} for {target_date} (already synced)")
74 return False
76 # Get the appropriate fetch method
77 fetch_method = getattr(self.client, f"fetch_{metric_type}", None)
78 if not fetch_method:
79 logger.warning(f"No fetch method for metric type: {metric_type}")
80 return False
82 try:
83 # Fetch data from Garmin
84 data = fetch_method(target_date)
86 if not data:
87 logger.debug(f"No {metric_type} data available for {target_date}")
88 return False
90 # Save to storage
91 self.storage.save_daily_metric(data, metric_type)
92 logger.info(f"✓ Synced {metric_type} for {target_date}")
93 return True
95 except GarminAPIError as e:
96 logger.error(f"API error syncing {metric_type} for {target_date}: {e}")
97 raise
98 except Exception as e:
99 logger.error(f"Error syncing {metric_type} for {target_date}: {e}")
100 return False
102 def sync_daily_metrics_range(
103 self,
104 metric_type: str,
105 start_date: date,
106 end_date: date,
107 force: bool = False,
108 ) -> Dict[str, int]:
109 """Sync a daily metric for a date range.
111 Args:
112 metric_type: Type of metric to sync
113 start_date: Start date (inclusive)
114 end_date: End date (inclusive)
115 force: If True, re-sync existing data
117 Returns:
118 Statistics dictionary with sync counts
119 """
120 logger.info(
121 f"Syncing {metric_type} from {start_date} to {end_date} "
122 f"({(end_date - start_date).days + 1} days)"
123 )
125 stats = {"synced": 0, "skipped": 0, "errors": 0}
127 for current_date in date_range(start_date, end_date):
128 try:
129 if self.sync_daily_metric(metric_type, current_date, force=force):
130 stats["synced"] += 1
131 else:
132 stats["skipped"] += 1
134 except GarminAPIError as e:
135 logger.error(f"API error on {current_date}: {e}")
136 stats["errors"] += 1
137 # Continue with other dates even if one fails
138 continue
140 logger.info(
141 f"{metric_type} sync complete: "
142 f"{stats['synced']} synced, {stats['skipped']} skipped, "
143 f"{stats['errors']} errors"
144 )
146 return stats
148 def sync_activities_range(
149 self, start_date: date, end_date: date, force: bool = False
150 ) -> Dict[str, int]:
151 """Sync activities for a date range.
153 Args:
154 start_date: Start date
155 end_date: End date
156 force: If True, re-sync existing activities
158 Returns:
159 Statistics dictionary
160 """
161 logger.info(
162 f"Syncing activities from {start_date} to {end_date}"
163 )
165 stats = {"synced": 0, "skipped": 0, "errors": 0}
167 try:
168 # Fetch all activities in the range
169 activities = self.client.fetch_activities(start_date, end_date)
171 logger.info(f"Found {len(activities)} activities")
173 for activity in activities:
174 try:
175 # Check if already synced
176 if not force and self.storage.activity_exists(activity.activity_id):
177 logger.debug(
178 f"Skipping activity {activity.activity_id} (already synced)"
179 )
180 stats["skipped"] += 1
181 continue
183 # Save activity
184 self.storage.save_activity(activity)
185 stats["synced"] += 1
186 logger.info(
187 f"✓ Synced activity {activity.activity_id} "
188 f"({activity.activity_type}) on {activity.date}"
189 )
191 except Exception as e:
192 logger.error(f"Error saving activity {activity.activity_id}: {e}")
193 stats["errors"] += 1
194 continue
196 except GarminAPIError as e:
197 logger.error(f"API error fetching activities: {e}")
198 stats["errors"] += 1
200 logger.info(
201 f"Activity sync complete: "
202 f"{stats['synced']} synced, {stats['skipped']} skipped, "
203 f"{stats['errors']} errors"
204 )
206 return stats
208 def sync_all_metrics(
209 self,
210 start_date: date,
211 end_date: date,
212 metric_types: Optional[List[str]] = None,
213 force: bool = False,
214 ) -> Dict[str, Dict[str, int]]:
215 """Sync all (or specified) metrics for a date range.
217 Args:
218 start_date: Start date
219 end_date: End date
220 metric_types: Optional list of metric types to sync (defaults to all)
221 force: If True, re-sync existing data
223 Returns:
224 Dictionary mapping metric type to sync statistics
225 """
226 # Default to all configured metric types except activities
227 if not metric_types:
228 metric_types = [
229 mt
230 for mt in config.DATA_TYPE_CONFIG.keys()
231 if mt != "activities" # Handle activities separately
232 ]
233 else:
234 # Even if metric_types is provided, filter out activities to handle separately
235 metric_types = [mt for mt in metric_types if mt != "activities"]
237 logger.info(
238 f"Syncing {len(metric_types)} metric types from {start_date} to {end_date}"
239 )
241 results = {}
243 for metric_type in metric_types:
244 try:
245 stats = self.sync_daily_metrics_range(
246 metric_type, start_date, end_date, force=force
247 )
248 results[metric_type] = stats
250 # Update last sync state
251 if stats["synced"] > 0:
252 self.repo.update_last_sync_state(
253 metric_type, end_date, total_records=stats["synced"]
254 )
256 except Exception as e:
257 logger.error(f"Failed to sync {metric_type}: {e}")
258 results[metric_type] = {"synced": 0, "skipped": 0, "errors": 1}
259 continue
261 # Sync activities (always, regardless of metric_types parameter)
262 try:
263 activity_stats = self.sync_activities_range(start_date, end_date, force=force)
264 results["activities"] = activity_stats
266 if activity_stats["synced"] > 0:
267 self.repo.update_last_sync_state(
268 "activities", end_date, total_records=activity_stats["synced"]
269 )
271 except Exception as e:
272 logger.error(f"Failed to sync activities: {e}")
273 results["activities"] = {"synced": 0, "skipped": 0, "errors": 1}
275 # Create sync record
276 total_synced = sum(r.get("synced", 0) for r in results.values())
277 total_errors = sum(r.get("errors", 0) for r in results.values())
279 status = "success" if total_errors == 0 else ("partial" if total_synced > 0 else "failed")
281 self.repo.create_sync_record(
282 data_type="all_metrics",
283 start_date=start_date,
284 end_date=end_date,
285 status=status,
286 records_synced=total_synced,
287 error_message=f"{total_errors} errors" if total_errors > 0 else None,
288 )
290 return results
292 def sync_incremental(
293 self,
294 metric_types: Optional[List[str]] = None,
295 until_date: Optional[date] = None,
296 ) -> Dict[str, Dict[str, int]]:
297 """Perform incremental sync for all metrics since last sync.
299 Args:
300 metric_types: Optional list of metric types to sync
301 until_date: Optional end date (defaults to yesterday)
303 Returns:
304 Dictionary mapping metric type to sync statistics
305 """
306 if not until_date:
307 until_date = get_yesterday()
309 if not metric_types:
310 metric_types = list(config.DATA_TYPE_CONFIG.keys())
312 logger.info(f"Starting incremental sync (until {until_date})")
314 results = {}
316 for metric_type in metric_types:
317 try:
318 # Get last sync date
319 last_sync = self.repo.get_last_sync_date(metric_type)
321 # Determine start_date for historical sync
322 if last_sync:
323 start_date = last_sync + timedelta(days=1)
324 else:
325 # First sync - use default start date
326 start_date = parse_date(config.HISTORICAL_START_DATE)
327 logger.info(
328 f"{metric_type} not synced before, starting from {start_date}"
329 )
331 stats = {"synced": 0, "skipped": 0, "errors": 0}
332 today = date.today()
333 yesterday = today - timedelta(days=1)
335 # 1. Sync Historical Data (start_date to min(until_date, yesterday))
336 # Only if we have a valid range before today
337 historical_end = min(until_date, yesterday)
339 if start_date <= historical_end:
340 if metric_type == "activities":
341 h_stats = self.sync_activities_range(start_date, historical_end, force=False)
342 else:
343 h_stats = self.sync_daily_metrics_range(
344 metric_type, start_date, historical_end, force=False
345 )
347 stats["synced"] += h_stats.get("synced", 0)
348 stats["skipped"] += h_stats.get("skipped", 0)
349 stats["errors"] += h_stats.get("errors", 0)
351 # 2. Sync Today (if requested) - Always Force
352 # If until_date includes today, we force sync today regardless of last_sync
353 if until_date >= today:
354 logger.info(f"Force syncing {metric_type} for today ({today})")
355 if metric_type == "activities":
356 t_stats = self.sync_activities_range(today, today, force=True)
357 else:
358 t_stats = self.sync_daily_metrics_range(
359 metric_type, today, today, force=True
360 )
362 stats["synced"] += t_stats.get("synced", 0)
363 stats["skipped"] += t_stats.get("skipped", 0)
364 stats["errors"] += t_stats.get("errors", 0)
366 # Log if skipped everything
367 if stats["synced"] == 0 and stats["errors"] == 0 and stats["skipped"] == 0:
368 if start_date > historical_end and until_date < today:
369 logger.info(f"{metric_type} already up to date")
371 results[metric_type] = stats
373 # Update last sync state if we synced anything
374 if stats["synced"] > 0:
375 self.repo.update_last_sync_state(metric_type, until_date)
377 except Exception as e:
378 logger.error(f"Incremental sync failed for {metric_type}: {e}")
379 results[metric_type] = {"synced": 0, "skipped": 0, "errors": 1}
380 continue
382 return results
384 def backfill_historical(
385 self,
386 start_date: date,
387 end_date: Optional[date] = None,
388 metric_types: Optional[List[str]] = None,
389 batch_size: int = config.DEFAULT_BATCH_SIZE_DAYS,
390 ) -> Dict[str, Dict[str, int]]:
391 """Backfill historical data in batches.
393 Args:
394 start_date: Start date for backfill
395 end_date: End date (defaults to yesterday)
396 metric_types: Optional list of metric types
397 batch_size: Days per batch (default: 30)
399 Returns:
400 Aggregated sync statistics
401 """
402 if not end_date:
403 end_date = get_yesterday()
405 if not metric_types:
406 metric_types = list(config.DATA_TYPE_CONFIG.keys())
408 logger.info(
409 f"Starting historical backfill from {start_date} to {end_date} "
410 f"({(end_date - start_date).days + 1} days, batch size: {batch_size})"
411 )
413 # Split into batches
414 batches = split_date_range(start_date, end_date, batch_size)
415 logger.info(f"Processing {len(batches)} batches")
417 # Aggregate results
418 aggregated_results: Dict[str, Dict[str, int]] = {
419 mt: {"synced": 0, "skipped": 0, "errors": 0} for mt in metric_types
420 }
422 for i, (batch_start, batch_end) in enumerate(batches, 1):
423 logger.info(
424 f"\n📦 Batch {i}/{len(batches)}: {batch_start} to {batch_end}"
425 )
427 batch_results = self.sync_all_metrics(
428 batch_start, batch_end, metric_types=metric_types
429 )
431 # Aggregate statistics
432 for metric_type, stats in batch_results.items():
433 if metric_type not in aggregated_results:
434 aggregated_results[metric_type] = {"synced": 0, "skipped": 0, "errors": 0}
436 aggregated_results[metric_type]["synced"] += stats.get("synced", 0)
437 aggregated_results[metric_type]["skipped"] += stats.get("skipped", 0)
438 aggregated_results[metric_type]["errors"] += stats.get("errors", 0)
440 logger.info("\n✅ Historical backfill complete!")
441 logger.info("\n📊 Final Statistics:")
442 for metric_type, stats in aggregated_results.items():
443 logger.info(
444 f" {metric_type}: {stats['synced']} synced, "
445 f"{stats['skipped']} skipped, {stats['errors']} errors"
446 )
448 return aggregated_results
450 def get_sync_status(self) -> Dict[str, Any]:
451 """Get current sync status for all data types.
453 Returns:
454 Dictionary with sync status information
455 """
456 status = {}
457 all_states = self.repo.get_all_last_sync_states()
459 for data_type in config.DATA_TYPE_CONFIG.keys():
460 if data_type in all_states:
461 state = all_states[data_type]
462 status[data_type] = {
463 "last_sync_date": state["last_sync_date"],
464 "total_records": state["total_records"],
465 "updated_at": state["updated_at"],
466 "status": "synced",
467 }
468 else:
469 status[data_type] = {
470 "last_sync_date": None,
471 "total_records": 0,
472 "updated_at": None,
473 "status": "never_synced",
474 }
476 return status