Coverage for health / db / repository.py: 0%
78 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
1"""
2Data access layer for health database.
4Provides CRUD operations for sync records, sync state, and data indexes.
5"""
7import sqlite3
8from datetime import date, datetime
9from typing import List, Optional, Dict, Any
10from pathlib import Path
12from health.db.schema import get_connection
13from health.utils.logging_config import setup_logger
15logger = setup_logger(__name__)
18class HealthRepository:
19 """Repository for health data database operations."""
21 def __init__(self, db_path: Optional[Path] = None) -> None:
22 """Initialize repository.
24 Args:
25 db_path: Optional path to database file
26 """
27 self.db_path = db_path
29 def _get_conn(self) -> sqlite3.Connection:
30 """Get database connection.
32 Returns:
33 SQLite connection
34 """
35 return get_connection(self.db_path)
37 # ============ Sync Records ============
39 def create_sync_record(
40 self,
41 data_type: str,
42 start_date: date,
43 end_date: date,
44 status: str,
45 records_synced: int = 0,
46 error_message: Optional[str] = None,
47 ) -> int:
48 """Create a new sync record.
50 Args:
51 data_type: Type of data synced
52 start_date: Start date of sync range
53 end_date: End date of sync range
54 status: Sync status ('success', 'failed', 'partial')
55 records_synced: Number of records successfully synced
56 error_message: Optional error message if sync failed
58 Returns:
59 ID of created record
60 """
61 with self._get_conn() as conn:
62 cursor = conn.cursor()
63 cursor.execute(
64 """
65 INSERT INTO sync_records
66 (data_type, start_date, end_date, status, records_synced, error_message)
67 VALUES (?, ?, ?, ?, ?, ?)
68 """,
69 (
70 data_type,
71 start_date.isoformat(),
72 end_date.isoformat(),
73 status,
74 records_synced,
75 error_message,
76 ),
77 )
78 conn.commit()
79 return cursor.lastrowid
81 def get_sync_records(
82 self,
83 data_type: Optional[str] = None,
84 limit: int = 100,
85 ) -> List[Dict[str, Any]]:
86 """Get sync records, optionally filtered by data type.
88 Args:
89 data_type: Optional data type filter
90 limit: Maximum number of records to return
92 Returns:
93 List of sync record dictionaries
94 """
95 with self._get_conn() as conn:
96 cursor = conn.cursor()
98 if data_type:
99 cursor.execute(
100 """
101 SELECT * FROM sync_records
102 WHERE data_type = ?
103 ORDER BY created_at DESC
104 LIMIT ?
105 """,
106 (data_type, limit),
107 )
108 else:
109 cursor.execute(
110 """
111 SELECT * FROM sync_records
112 ORDER BY created_at DESC
113 LIMIT ?
114 """,
115 (limit,),
116 )
118 return [dict(row) for row in cursor.fetchall()]
120 # ============ Last Sync State ============
122 def get_last_sync_date(self, data_type: str) -> Optional[date]:
123 """Get the last sync date for a data type.
125 Args:
126 data_type: Type of data
128 Returns:
129 Last sync date, or None if never synced
130 """
131 with self._get_conn() as conn:
132 cursor = conn.cursor()
133 cursor.execute(
134 "SELECT last_sync_date FROM last_sync_state WHERE data_type = ?",
135 (data_type,),
136 )
137 row = cursor.fetchone()
139 if row:
140 return datetime.fromisoformat(row["last_sync_date"]).date()
141 return None
143 def update_last_sync_state(
144 self,
145 data_type: str,
146 last_sync_date: date,
147 total_records: Optional[int] = None,
148 ) -> None:
149 """Update the last sync state for a data type.
151 Args:
152 data_type: Type of data
153 last_sync_date: Last successfully synced date
154 total_records: Optional total record count to set
155 """
156 with self._get_conn() as conn:
157 cursor = conn.cursor()
159 # Check if record exists
160 cursor.execute(
161 "SELECT total_records FROM last_sync_state WHERE data_type = ?",
162 (data_type,),
163 )
164 row = cursor.fetchone()
166 if row:
167 # Update existing
168 if total_records is not None:
169 cursor.execute(
170 """
171 UPDATE last_sync_state
172 SET last_sync_date = ?, total_records = ?, updated_at = CURRENT_TIMESTAMP
173 WHERE data_type = ?
174 """,
175 (last_sync_date.isoformat(), total_records, data_type),
176 )
177 else:
178 cursor.execute(
179 """
180 UPDATE last_sync_state
181 SET last_sync_date = ?, updated_at = CURRENT_TIMESTAMP
182 WHERE data_type = ?
183 """,
184 (last_sync_date.isoformat(), data_type),
185 )
186 else:
187 # Insert new
188 cursor.execute(
189 """
190 INSERT INTO last_sync_state (data_type, last_sync_date, total_records)
191 VALUES (?, ?, ?)
192 """,
193 (data_type, last_sync_date.isoformat(), total_records or 0),
194 )
196 conn.commit()
198 def get_all_last_sync_states(self) -> Dict[str, Dict[str, Any]]:
199 """Get last sync state for all data types.
201 Returns:
202 Dictionary mapping data_type to state info
203 """
204 with self._get_conn() as conn:
205 cursor = conn.cursor()
206 cursor.execute("SELECT * FROM last_sync_state")
208 result = {}
209 for row in cursor.fetchall():
210 result[row["data_type"]] = {
211 "last_sync_date": row["last_sync_date"],
212 "total_records": row["total_records"],
213 "updated_at": row["updated_at"],
214 }
216 return result
218 # ============ Daily Metrics Index ============
220 def index_daily_metric(
221 self,
222 metric_type: str,
223 metric_date: date,
224 file_path: Path,
225 has_data: bool = True,
226 ) -> None:
227 """Index a daily metric file.
229 Args:
230 metric_type: Type of metric
231 metric_date: Date of the metric
232 file_path: Path to JSON file
233 has_data: Whether the file contains valid data
234 """
235 with self._get_conn() as conn:
236 cursor = conn.cursor()
237 cursor.execute(
238 """
239 INSERT OR REPLACE INTO daily_metrics_index
240 (metric_type, date, file_path, has_data)
241 VALUES (?, ?, ?, ?)
242 """,
243 (
244 metric_type,
245 metric_date.isoformat(),
246 str(file_path),
247 has_data,
248 ),
249 )
250 conn.commit()
252 def get_daily_metric_path(
253 self, metric_type: str, metric_date: date
254 ) -> Optional[str]:
255 """Get file path for a daily metric.
257 Args:
258 metric_type: Type of metric
259 metric_date: Date of the metric
261 Returns:
262 File path string, or None if not found
263 """
264 with self._get_conn() as conn:
265 cursor = conn.cursor()
266 cursor.execute(
267 """
268 SELECT file_path FROM daily_metrics_index
269 WHERE metric_type = ? AND date = ?
270 """,
271 (metric_type, metric_date.isoformat()),
272 )
273 row = cursor.fetchone()
275 return row["file_path"] if row else None
277 # ============ Activity Index ============
279 def index_activity(
280 self,
281 activity_id: str,
282 activity_type: str,
283 activity_date: date,
284 file_path: Path,
285 duration_seconds: Optional[int] = None,
286 distance_meters: Optional[float] = None,
287 ) -> None:
288 """Index an activity file.
290 Args:
291 activity_id: Unique activity ID
292 activity_type: Type of activity
293 activity_date: Date of the activity
294 file_path: Path to JSON file
295 duration_seconds: Optional activity duration
296 distance_meters: Optional activity distance
297 """
298 with self._get_conn() as conn:
299 cursor = conn.cursor()
300 cursor.execute(
301 """
302 INSERT OR REPLACE INTO activity_index
303 (activity_id, activity_type, date, duration_seconds, distance_meters, file_path)
304 VALUES (?, ?, ?, ?, ?, ?)
305 """,
306 (
307 activity_id,
308 activity_type,
309 activity_date.isoformat(),
310 duration_seconds,
311 distance_meters,
312 str(file_path),
313 ),
314 )
315 conn.commit()
317 def get_activities_by_date_range(
318 self, start_date: date, end_date: date
319 ) -> List[Dict[str, Any]]:
320 """Get all activities within a date range.
322 Args:
323 start_date: Start date
324 end_date: End date
326 Returns:
327 List of activity index records
328 """
329 with self._get_conn() as conn:
330 cursor = conn.cursor()
331 cursor.execute(
332 """
333 SELECT * FROM activity_index
334 WHERE date >= ? AND date <= ?
335 ORDER BY date DESC
336 """,
337 (start_date.isoformat(), end_date.isoformat()),
338 )
340 return [dict(row) for row in cursor.fetchall()]
342 def activity_exists(self, activity_id: str) -> bool:
343 """Check if an activity is already indexed.
345 Args:
346 activity_id: Activity ID to check
348 Returns:
349 True if activity exists in index
350 """
351 with self._get_conn() as conn:
352 cursor = conn.cursor()
353 cursor.execute(
354 "SELECT 1 FROM activity_index WHERE activity_id = ?",
355 (activity_id,),
356 )
357 return cursor.fetchone() is not None