Coverage for health / db / repository.py: 0%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-02 17:44 +0800

1""" 

2Data access layer for health database. 

3 

4Provides CRUD operations for sync records, sync state, and data indexes. 

5""" 

6 

7import sqlite3 

8from datetime import date, datetime 

9from typing import List, Optional, Dict, Any 

10from pathlib import Path 

11 

12from health.db.schema import get_connection 

13from health.utils.logging_config import setup_logger 

14 

15logger = setup_logger(__name__) 

16 

17 

18class HealthRepository: 

19 """Repository for health data database operations.""" 

20 

21 def __init__(self, db_path: Optional[Path] = None) -> None: 

22 """Initialize repository. 

23 

24 Args: 

25 db_path: Optional path to database file 

26 """ 

27 self.db_path = db_path 

28 

29 def _get_conn(self) -> sqlite3.Connection: 

30 """Get database connection. 

31 

32 Returns: 

33 SQLite connection 

34 """ 

35 return get_connection(self.db_path) 

36 

37 # ============ Sync Records ============ 

38 

39 def create_sync_record( 

40 self, 

41 data_type: str, 

42 start_date: date, 

43 end_date: date, 

44 status: str, 

45 records_synced: int = 0, 

46 error_message: Optional[str] = None, 

47 ) -> int: 

48 """Create a new sync record. 

49 

50 Args: 

51 data_type: Type of data synced 

52 start_date: Start date of sync range 

53 end_date: End date of sync range 

54 status: Sync status ('success', 'failed', 'partial') 

55 records_synced: Number of records successfully synced 

56 error_message: Optional error message if sync failed 

57 

58 Returns: 

59 ID of created record 

60 """ 

61 with self._get_conn() as conn: 

62 cursor = conn.cursor() 

63 cursor.execute( 

64 """ 

65 INSERT INTO sync_records 

66 (data_type, start_date, end_date, status, records_synced, error_message) 

67 VALUES (?, ?, ?, ?, ?, ?) 

68 """, 

69 ( 

70 data_type, 

71 start_date.isoformat(), 

72 end_date.isoformat(), 

73 status, 

74 records_synced, 

75 error_message, 

76 ), 

77 ) 

78 conn.commit() 

79 return cursor.lastrowid 

80 

81 def get_sync_records( 

82 self, 

83 data_type: Optional[str] = None, 

84 limit: int = 100, 

85 ) -> List[Dict[str, Any]]: 

86 """Get sync records, optionally filtered by data type. 

87 

88 Args: 

89 data_type: Optional data type filter 

90 limit: Maximum number of records to return 

91 

92 Returns: 

93 List of sync record dictionaries 

94 """ 

95 with self._get_conn() as conn: 

96 cursor = conn.cursor() 

97 

98 if data_type: 

99 cursor.execute( 

100 """ 

101 SELECT * FROM sync_records 

102 WHERE data_type = ? 

103 ORDER BY created_at DESC 

104 LIMIT ? 

105 """, 

106 (data_type, limit), 

107 ) 

108 else: 

109 cursor.execute( 

110 """ 

111 SELECT * FROM sync_records 

112 ORDER BY created_at DESC 

113 LIMIT ? 

114 """, 

115 (limit,), 

116 ) 

117 

118 return [dict(row) for row in cursor.fetchall()] 

119 

120 # ============ Last Sync State ============ 

121 

122 def get_last_sync_date(self, data_type: str) -> Optional[date]: 

123 """Get the last sync date for a data type. 

124 

125 Args: 

126 data_type: Type of data 

127 

128 Returns: 

129 Last sync date, or None if never synced 

130 """ 

131 with self._get_conn() as conn: 

132 cursor = conn.cursor() 

133 cursor.execute( 

134 "SELECT last_sync_date FROM last_sync_state WHERE data_type = ?", 

135 (data_type,), 

136 ) 

137 row = cursor.fetchone() 

138 

139 if row: 

140 return datetime.fromisoformat(row["last_sync_date"]).date() 

141 return None 

142 

143 def update_last_sync_state( 

144 self, 

145 data_type: str, 

146 last_sync_date: date, 

147 total_records: Optional[int] = None, 

148 ) -> None: 

149 """Update the last sync state for a data type. 

150 

151 Args: 

152 data_type: Type of data 

153 last_sync_date: Last successfully synced date 

154 total_records: Optional total record count to set 

155 """ 

156 with self._get_conn() as conn: 

157 cursor = conn.cursor() 

158 

159 # Check if record exists 

160 cursor.execute( 

161 "SELECT total_records FROM last_sync_state WHERE data_type = ?", 

162 (data_type,), 

163 ) 

164 row = cursor.fetchone() 

165 

166 if row: 

167 # Update existing 

168 if total_records is not None: 

169 cursor.execute( 

170 """ 

171 UPDATE last_sync_state 

172 SET last_sync_date = ?, total_records = ?, updated_at = CURRENT_TIMESTAMP 

173 WHERE data_type = ? 

174 """, 

175 (last_sync_date.isoformat(), total_records, data_type), 

176 ) 

177 else: 

178 cursor.execute( 

179 """ 

180 UPDATE last_sync_state 

181 SET last_sync_date = ?, updated_at = CURRENT_TIMESTAMP 

182 WHERE data_type = ? 

183 """, 

184 (last_sync_date.isoformat(), data_type), 

185 ) 

186 else: 

187 # Insert new 

188 cursor.execute( 

189 """ 

190 INSERT INTO last_sync_state (data_type, last_sync_date, total_records) 

191 VALUES (?, ?, ?) 

192 """, 

193 (data_type, last_sync_date.isoformat(), total_records or 0), 

194 ) 

195 

196 conn.commit() 

197 

198 def get_all_last_sync_states(self) -> Dict[str, Dict[str, Any]]: 

199 """Get last sync state for all data types. 

200 

201 Returns: 

202 Dictionary mapping data_type to state info 

203 """ 

204 with self._get_conn() as conn: 

205 cursor = conn.cursor() 

206 cursor.execute("SELECT * FROM last_sync_state") 

207 

208 result = {} 

209 for row in cursor.fetchall(): 

210 result[row["data_type"]] = { 

211 "last_sync_date": row["last_sync_date"], 

212 "total_records": row["total_records"], 

213 "updated_at": row["updated_at"], 

214 } 

215 

216 return result 

217 

218 # ============ Daily Metrics Index ============ 

219 

220 def index_daily_metric( 

221 self, 

222 metric_type: str, 

223 metric_date: date, 

224 file_path: Path, 

225 has_data: bool = True, 

226 ) -> None: 

227 """Index a daily metric file. 

228 

229 Args: 

230 metric_type: Type of metric 

231 metric_date: Date of the metric 

232 file_path: Path to JSON file 

233 has_data: Whether the file contains valid data 

234 """ 

235 with self._get_conn() as conn: 

236 cursor = conn.cursor() 

237 cursor.execute( 

238 """ 

239 INSERT OR REPLACE INTO daily_metrics_index 

240 (metric_type, date, file_path, has_data) 

241 VALUES (?, ?, ?, ?) 

242 """, 

243 ( 

244 metric_type, 

245 metric_date.isoformat(), 

246 str(file_path), 

247 has_data, 

248 ), 

249 ) 

250 conn.commit() 

251 

252 def get_daily_metric_path( 

253 self, metric_type: str, metric_date: date 

254 ) -> Optional[str]: 

255 """Get file path for a daily metric. 

256 

257 Args: 

258 metric_type: Type of metric 

259 metric_date: Date of the metric 

260 

261 Returns: 

262 File path string, or None if not found 

263 """ 

264 with self._get_conn() as conn: 

265 cursor = conn.cursor() 

266 cursor.execute( 

267 """ 

268 SELECT file_path FROM daily_metrics_index 

269 WHERE metric_type = ? AND date = ? 

270 """, 

271 (metric_type, metric_date.isoformat()), 

272 ) 

273 row = cursor.fetchone() 

274 

275 return row["file_path"] if row else None 

276 

277 # ============ Activity Index ============ 

278 

279 def index_activity( 

280 self, 

281 activity_id: str, 

282 activity_type: str, 

283 activity_date: date, 

284 file_path: Path, 

285 duration_seconds: Optional[int] = None, 

286 distance_meters: Optional[float] = None, 

287 ) -> None: 

288 """Index an activity file. 

289 

290 Args: 

291 activity_id: Unique activity ID 

292 activity_type: Type of activity 

293 activity_date: Date of the activity 

294 file_path: Path to JSON file 

295 duration_seconds: Optional activity duration 

296 distance_meters: Optional activity distance 

297 """ 

298 with self._get_conn() as conn: 

299 cursor = conn.cursor() 

300 cursor.execute( 

301 """ 

302 INSERT OR REPLACE INTO activity_index 

303 (activity_id, activity_type, date, duration_seconds, distance_meters, file_path) 

304 VALUES (?, ?, ?, ?, ?, ?) 

305 """, 

306 ( 

307 activity_id, 

308 activity_type, 

309 activity_date.isoformat(), 

310 duration_seconds, 

311 distance_meters, 

312 str(file_path), 

313 ), 

314 ) 

315 conn.commit() 

316 

317 def get_activities_by_date_range( 

318 self, start_date: date, end_date: date 

319 ) -> List[Dict[str, Any]]: 

320 """Get all activities within a date range. 

321 

322 Args: 

323 start_date: Start date 

324 end_date: End date 

325 

326 Returns: 

327 List of activity index records 

328 """ 

329 with self._get_conn() as conn: 

330 cursor = conn.cursor() 

331 cursor.execute( 

332 """ 

333 SELECT * FROM activity_index 

334 WHERE date >= ? AND date <= ? 

335 ORDER BY date DESC 

336 """, 

337 (start_date.isoformat(), end_date.isoformat()), 

338 ) 

339 

340 return [dict(row) for row in cursor.fetchall()] 

341 

342 def activity_exists(self, activity_id: str) -> bool: 

343 """Check if an activity is already indexed. 

344 

345 Args: 

346 activity_id: Activity ID to check 

347 

348 Returns: 

349 True if activity exists in index 

350 """ 

351 with self._get_conn() as conn: 

352 cursor = conn.cursor() 

353 cursor.execute( 

354 "SELECT 1 FROM activity_index WHERE activity_id = ?", 

355 (activity_id,), 

356 ) 

357 return cursor.fetchone() is not None