Coverage for health / services / data_sync.py: 0%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-02 17:44 +0800

1""" 

2Data synchronization orchestration service. 

3 

4Coordinates Garmin API data fetching, storage, and incremental sync logic. 

5""" 

6 

7from datetime import date, timedelta 

8from typing import List, Dict, Any, Optional, Tuple 

9from pathlib import Path 

10 

11from health import config 

12from health.services.garmin_client import GarminHealthClient 

13from health.services.storage import HealthStorage 

14from health.db.repository import HealthRepository 

15from health.utils.exceptions import SyncError, GarminAPIError 

16from health.utils.date_utils import ( 

17 parse_date, 

18 date_range, 

19 split_date_range, 

20 get_yesterday, 

21 format_date, 

22) 

23from health.utils.logging_config import setup_logger 

24 

25logger = setup_logger(__name__) 

26 

27 

28class HealthDataSync: 

29 """Service for orchestrating health data synchronization.""" 

30 

31 def __init__( 

32 self, 

33 client: Optional[GarminHealthClient] = None, 

34 storage: Optional[HealthStorage] = None, 

35 repo: Optional[HealthRepository] = None, 

36 ) -> None: 

37 """Initialize sync service. 

38 

39 Args: 

40 client: Optional Garmin client (creates new if not provided) 

41 storage: Optional storage service (creates new if not provided) 

42 repo: Optional repository (creates new if not provided) 

43 """ 

44 self.client = client or GarminHealthClient() 

45 self.storage = storage or HealthStorage() 

46 self.repo = repo or HealthRepository() 

47 

48 def authenticate(self) -> None: 

49 """Authenticate with Garmin Connect. 

50 

51 Raises: 

52 GarminAuthError: If authentication fails 

53 """ 

54 logger.info("Authenticating with Garmin...") 

55 self.client.authenticate() 

56 logger.info("Authentication successful") 

57 

58 def sync_daily_metric( 

59 self, metric_type: str, target_date: date, force: bool = False 

60 ) -> bool: 

61 """Sync a single daily metric for a specific date. 

62 

63 Args: 

64 metric_type: Type of metric to sync 

65 target_date: Date to sync 

66 force: If True, re-sync even if data already exists 

67 

68 Returns: 

69 True if data was synced, False if skipped or no data available 

70 """ 

71 # Check if already synced (unless forcing) 

72 if not force and self.storage.metric_exists(metric_type, target_date): 

73 logger.debug(f"Skipping {metric_type} for {target_date} (already synced)") 

74 return False 

75 

76 # Get the appropriate fetch method 

77 fetch_method = getattr(self.client, f"fetch_{metric_type}", None) 

78 if not fetch_method: 

79 logger.warning(f"No fetch method for metric type: {metric_type}") 

80 return False 

81 

82 try: 

83 # Fetch data from Garmin 

84 data = fetch_method(target_date) 

85 

86 if not data: 

87 logger.debug(f"No {metric_type} data available for {target_date}") 

88 return False 

89 

90 # Save to storage 

91 self.storage.save_daily_metric(data, metric_type) 

92 logger.info(f"✓ Synced {metric_type} for {target_date}") 

93 return True 

94 

95 except GarminAPIError as e: 

96 logger.error(f"API error syncing {metric_type} for {target_date}: {e}") 

97 raise 

98 except Exception as e: 

99 logger.error(f"Error syncing {metric_type} for {target_date}: {e}") 

100 return False 

101 

102 def sync_daily_metrics_range( 

103 self, 

104 metric_type: str, 

105 start_date: date, 

106 end_date: date, 

107 force: bool = False, 

108 ) -> Dict[str, int]: 

109 """Sync a daily metric for a date range. 

110 

111 Args: 

112 metric_type: Type of metric to sync 

113 start_date: Start date (inclusive) 

114 end_date: End date (inclusive) 

115 force: If True, re-sync existing data 

116 

117 Returns: 

118 Statistics dictionary with sync counts 

119 """ 

120 logger.info( 

121 f"Syncing {metric_type} from {start_date} to {end_date} " 

122 f"({(end_date - start_date).days + 1} days)" 

123 ) 

124 

125 stats = {"synced": 0, "skipped": 0, "errors": 0} 

126 

127 for current_date in date_range(start_date, end_date): 

128 try: 

129 if self.sync_daily_metric(metric_type, current_date, force=force): 

130 stats["synced"] += 1 

131 else: 

132 stats["skipped"] += 1 

133 

134 except GarminAPIError as e: 

135 logger.error(f"API error on {current_date}: {e}") 

136 stats["errors"] += 1 

137 # Continue with other dates even if one fails 

138 continue 

139 

140 logger.info( 

141 f"{metric_type} sync complete: " 

142 f"{stats['synced']} synced, {stats['skipped']} skipped, " 

143 f"{stats['errors']} errors" 

144 ) 

145 

146 return stats 

147 

148 def sync_activities_range( 

149 self, start_date: date, end_date: date, force: bool = False 

150 ) -> Dict[str, int]: 

151 """Sync activities for a date range. 

152 

153 Args: 

154 start_date: Start date 

155 end_date: End date 

156 force: If True, re-sync existing activities 

157 

158 Returns: 

159 Statistics dictionary 

160 """ 

161 logger.info( 

162 f"Syncing activities from {start_date} to {end_date}" 

163 ) 

164 

165 stats = {"synced": 0, "skipped": 0, "errors": 0} 

166 

167 try: 

168 # Fetch all activities in the range 

169 activities = self.client.fetch_activities(start_date, end_date) 

170 

171 logger.info(f"Found {len(activities)} activities") 

172 

173 for activity in activities: 

174 try: 

175 # Check if already synced 

176 if not force and self.storage.activity_exists(activity.activity_id): 

177 logger.debug( 

178 f"Skipping activity {activity.activity_id} (already synced)" 

179 ) 

180 stats["skipped"] += 1 

181 continue 

182 

183 # Save activity 

184 self.storage.save_activity(activity) 

185 stats["synced"] += 1 

186 logger.info( 

187 f"✓ Synced activity {activity.activity_id} " 

188 f"({activity.activity_type}) on {activity.date}" 

189 ) 

190 

191 except Exception as e: 

192 logger.error(f"Error saving activity {activity.activity_id}: {e}") 

193 stats["errors"] += 1 

194 continue 

195 

196 except GarminAPIError as e: 

197 logger.error(f"API error fetching activities: {e}") 

198 stats["errors"] += 1 

199 

200 logger.info( 

201 f"Activity sync complete: " 

202 f"{stats['synced']} synced, {stats['skipped']} skipped, " 

203 f"{stats['errors']} errors" 

204 ) 

205 

206 return stats 

207 

208 def sync_all_metrics( 

209 self, 

210 start_date: date, 

211 end_date: date, 

212 metric_types: Optional[List[str]] = None, 

213 force: bool = False, 

214 ) -> Dict[str, Dict[str, int]]: 

215 """Sync all (or specified) metrics for a date range. 

216 

217 Args: 

218 start_date: Start date 

219 end_date: End date 

220 metric_types: Optional list of metric types to sync (defaults to all) 

221 force: If True, re-sync existing data 

222 

223 Returns: 

224 Dictionary mapping metric type to sync statistics 

225 """ 

226 # Default to all configured metric types except activities 

227 if not metric_types: 

228 metric_types = [ 

229 mt 

230 for mt in config.DATA_TYPE_CONFIG.keys() 

231 if mt != "activities" # Handle activities separately 

232 ] 

233 else: 

234 # Even if metric_types is provided, filter out activities to handle separately 

235 metric_types = [mt for mt in metric_types if mt != "activities"] 

236 

237 logger.info( 

238 f"Syncing {len(metric_types)} metric types from {start_date} to {end_date}" 

239 ) 

240 

241 results = {} 

242 

243 for metric_type in metric_types: 

244 try: 

245 stats = self.sync_daily_metrics_range( 

246 metric_type, start_date, end_date, force=force 

247 ) 

248 results[metric_type] = stats 

249 

250 # Update last sync state 

251 if stats["synced"] > 0: 

252 self.repo.update_last_sync_state( 

253 metric_type, end_date, total_records=stats["synced"] 

254 ) 

255 

256 except Exception as e: 

257 logger.error(f"Failed to sync {metric_type}: {e}") 

258 results[metric_type] = {"synced": 0, "skipped": 0, "errors": 1} 

259 continue 

260 

261 # Sync activities (always, regardless of metric_types parameter) 

262 try: 

263 activity_stats = self.sync_activities_range(start_date, end_date, force=force) 

264 results["activities"] = activity_stats 

265 

266 if activity_stats["synced"] > 0: 

267 self.repo.update_last_sync_state( 

268 "activities", end_date, total_records=activity_stats["synced"] 

269 ) 

270 

271 except Exception as e: 

272 logger.error(f"Failed to sync activities: {e}") 

273 results["activities"] = {"synced": 0, "skipped": 0, "errors": 1} 

274 

275 # Create sync record 

276 total_synced = sum(r.get("synced", 0) for r in results.values()) 

277 total_errors = sum(r.get("errors", 0) for r in results.values()) 

278 

279 status = "success" if total_errors == 0 else ("partial" if total_synced > 0 else "failed") 

280 

281 self.repo.create_sync_record( 

282 data_type="all_metrics", 

283 start_date=start_date, 

284 end_date=end_date, 

285 status=status, 

286 records_synced=total_synced, 

287 error_message=f"{total_errors} errors" if total_errors > 0 else None, 

288 ) 

289 

290 return results 

291 

292 def sync_incremental( 

293 self, 

294 metric_types: Optional[List[str]] = None, 

295 until_date: Optional[date] = None, 

296 ) -> Dict[str, Dict[str, int]]: 

297 """Perform incremental sync for all metrics since last sync. 

298 

299 Args: 

300 metric_types: Optional list of metric types to sync 

301 until_date: Optional end date (defaults to yesterday) 

302 

303 Returns: 

304 Dictionary mapping metric type to sync statistics 

305 """ 

306 if not until_date: 

307 until_date = get_yesterday() 

308 

309 if not metric_types: 

310 metric_types = list(config.DATA_TYPE_CONFIG.keys()) 

311 

312 logger.info(f"Starting incremental sync (until {until_date})") 

313 

314 results = {} 

315 

316 for metric_type in metric_types: 

317 try: 

318 # Get last sync date 

319 last_sync = self.repo.get_last_sync_date(metric_type) 

320 

321 # Determine start_date for historical sync 

322 if last_sync: 

323 start_date = last_sync + timedelta(days=1) 

324 else: 

325 # First sync - use default start date 

326 start_date = parse_date(config.HISTORICAL_START_DATE) 

327 logger.info( 

328 f"{metric_type} not synced before, starting from {start_date}" 

329 ) 

330 

331 stats = {"synced": 0, "skipped": 0, "errors": 0} 

332 today = date.today() 

333 yesterday = today - timedelta(days=1) 

334 

335 # 1. Sync Historical Data (start_date to min(until_date, yesterday)) 

336 # Only if we have a valid range before today 

337 historical_end = min(until_date, yesterday) 

338 

339 if start_date <= historical_end: 

340 if metric_type == "activities": 

341 h_stats = self.sync_activities_range(start_date, historical_end, force=False) 

342 else: 

343 h_stats = self.sync_daily_metrics_range( 

344 metric_type, start_date, historical_end, force=False 

345 ) 

346 

347 stats["synced"] += h_stats.get("synced", 0) 

348 stats["skipped"] += h_stats.get("skipped", 0) 

349 stats["errors"] += h_stats.get("errors", 0) 

350 

351 # 2. Sync Today (if requested) - Always Force 

352 # If until_date includes today, we force sync today regardless of last_sync 

353 if until_date >= today: 

354 logger.info(f"Force syncing {metric_type} for today ({today})") 

355 if metric_type == "activities": 

356 t_stats = self.sync_activities_range(today, today, force=True) 

357 else: 

358 t_stats = self.sync_daily_metrics_range( 

359 metric_type, today, today, force=True 

360 ) 

361 

362 stats["synced"] += t_stats.get("synced", 0) 

363 stats["skipped"] += t_stats.get("skipped", 0) 

364 stats["errors"] += t_stats.get("errors", 0) 

365 

366 # Log if skipped everything 

367 if stats["synced"] == 0 and stats["errors"] == 0 and stats["skipped"] == 0: 

368 if start_date > historical_end and until_date < today: 

369 logger.info(f"{metric_type} already up to date") 

370 

371 results[metric_type] = stats 

372 

373 # Update last sync state if we synced anything 

374 if stats["synced"] > 0: 

375 self.repo.update_last_sync_state(metric_type, until_date) 

376 

377 except Exception as e: 

378 logger.error(f"Incremental sync failed for {metric_type}: {e}") 

379 results[metric_type] = {"synced": 0, "skipped": 0, "errors": 1} 

380 continue 

381 

382 return results 

383 

384 def backfill_historical( 

385 self, 

386 start_date: date, 

387 end_date: Optional[date] = None, 

388 metric_types: Optional[List[str]] = None, 

389 batch_size: int = config.DEFAULT_BATCH_SIZE_DAYS, 

390 ) -> Dict[str, Dict[str, int]]: 

391 """Backfill historical data in batches. 

392 

393 Args: 

394 start_date: Start date for backfill 

395 end_date: End date (defaults to yesterday) 

396 metric_types: Optional list of metric types 

397 batch_size: Days per batch (default: 30) 

398 

399 Returns: 

400 Aggregated sync statistics 

401 """ 

402 if not end_date: 

403 end_date = get_yesterday() 

404 

405 if not metric_types: 

406 metric_types = list(config.DATA_TYPE_CONFIG.keys()) 

407 

408 logger.info( 

409 f"Starting historical backfill from {start_date} to {end_date} " 

410 f"({(end_date - start_date).days + 1} days, batch size: {batch_size})" 

411 ) 

412 

413 # Split into batches 

414 batches = split_date_range(start_date, end_date, batch_size) 

415 logger.info(f"Processing {len(batches)} batches") 

416 

417 # Aggregate results 

418 aggregated_results: Dict[str, Dict[str, int]] = { 

419 mt: {"synced": 0, "skipped": 0, "errors": 0} for mt in metric_types 

420 } 

421 

422 for i, (batch_start, batch_end) in enumerate(batches, 1): 

423 logger.info( 

424 f"\n📦 Batch {i}/{len(batches)}: {batch_start} to {batch_end}" 

425 ) 

426 

427 batch_results = self.sync_all_metrics( 

428 batch_start, batch_end, metric_types=metric_types 

429 ) 

430 

431 # Aggregate statistics 

432 for metric_type, stats in batch_results.items(): 

433 if metric_type not in aggregated_results: 

434 aggregated_results[metric_type] = {"synced": 0, "skipped": 0, "errors": 0} 

435 

436 aggregated_results[metric_type]["synced"] += stats.get("synced", 0) 

437 aggregated_results[metric_type]["skipped"] += stats.get("skipped", 0) 

438 aggregated_results[metric_type]["errors"] += stats.get("errors", 0) 

439 

440 logger.info("\n✅ Historical backfill complete!") 

441 logger.info("\n📊 Final Statistics:") 

442 for metric_type, stats in aggregated_results.items(): 

443 logger.info( 

444 f" {metric_type}: {stats['synced']} synced, " 

445 f"{stats['skipped']} skipped, {stats['errors']} errors" 

446 ) 

447 

448 return aggregated_results 

449 

450 def get_sync_status(self) -> Dict[str, Any]: 

451 """Get current sync status for all data types. 

452 

453 Returns: 

454 Dictionary with sync status information 

455 """ 

456 status = {} 

457 all_states = self.repo.get_all_last_sync_states() 

458 

459 for data_type in config.DATA_TYPE_CONFIG.keys(): 

460 if data_type in all_states: 

461 state = all_states[data_type] 

462 status[data_type] = { 

463 "last_sync_date": state["last_sync_date"], 

464 "total_records": state["total_records"], 

465 "updated_at": state["updated_at"], 

466 "status": "synced", 

467 } 

468 else: 

469 status[data_type] = { 

470 "last_sync_date": None, 

471 "total_records": 0, 

472 "updated_at": None, 

473 "status": "never_synced", 

474 } 

475 

476 return status