Coverage for health / analytics / engine.py: 0%
103 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
1import pandas as pd
2import numpy as np
3from datetime import date, timedelta
4from typing import Dict, Any, List, Optional
5from health.services.query import HealthDataQuery
6from health.services.manual_log_storage import ManualLogStorage
7from health.utils.logging_config import setup_logger
9logger = setup_logger(__name__)
11class HealthAnalyst:
12 """
13 Advanced analytics engine for health data using Pandas.
14 Calculates correlations, trends, and lifestyle impacts.
15 """
17 def __init__(self):
18 self.query = HealthDataQuery()
19 self.manual_storage = ManualLogStorage()
21 def get_dataframe(self, days: int = 30) -> pd.DataFrame:
22 """
23 Fetch all relevant data for the last N days and merge into a single DataFrame.
24 """
25 end_date = date.today()
26 start_date = end_date - timedelta(days=days)
28 # 1. Fetch Daily Metrics (Steps, Sleep, HR, Stess, Body Battery)
29 # We need to fetch each metric range and merge by date
30 metrics_to_fetch = ["sleep", "heart_rate", "stress", "body_battery", "steps", "hrv", "rhr"]
32 data_map = {} # date -> dict of metrics
34 # Initialize dates
35 curr = start_date
36 while curr <= end_date:
37 data_map[curr.isoformat()] = {"date": curr.isoformat()}
38 curr += timedelta(days=1)
40 # Batch fetch metrics (this might be slow if we do it one by one,
41 # but HealthDataQuery is built for this. Ideally we optimize query later)
42 for metric in metrics_to_fetch:
43 points = self.query.get_metric_range(metric, start_date, end_date)
44 for p in points:
45 # Key extraction logic similar to Reader
46 # Simplified extraction for DataFrame
47 val = np.nan
48 if hasattr(p, 'value'): val = p.value
49 elif isinstance(p, dict):
50 # Try common keys
51 if 'average_heart_rate' in p: val = p['average_heart_rate']
52 elif 'overall_sleep_score' in p: val = p['overall_sleep_score']
53 elif 'resting_heart_rate' in p: val = p['resting_heart_rate']
54 elif 'average_stress_level' in p: val = p['average_stress_level']
55 elif 'charged' in p: val = p.get('charged', 0) # Body battery charged?
56 # Actually for body battery we usually want max or charged
58 # Store in map
59 d_str = p.get('calendar_date') or p.get('date')
60 if d_str and d_str in data_map:
61 data_map[d_str][metric] = val
63 # 2. Fetch Manual Logs (Alcohol, Fasting, Supplements)
64 logs = self.manual_storage.get_logs_in_range(start_date, end_date)
65 for log in logs:
66 d_str = log.log_date
67 if d_str in data_map:
68 # Alcohol (Boolean & Amount)
69 data_map[d_str]["alcohol_units"] = sum([1 for _ in log.alcohol_entries])
70 data_map[d_str]["has_alcohol"] = 1 if log.alcohol_entries else 0
72 # Fasting (Categorical)
73 data_map[d_str]["fasting_mode"] = log.fasting_mode or "Normal"
75 # Supplements (One-hotish checks)
76 data_map[d_str]["has_magnesium"] = 1 if any("magnesium" in s.supplement_name.lower() for s in log.supplement_entries) else 0
78 # Convert to DataFrame
79 df = pd.DataFrame(list(data_map.values()))
80 df['date'] = pd.to_datetime(df['date'])
81 df = df.sort_values('date').set_index('date')
82 return df
84 def analyze_recovery_correlations(self, days: int = 90) -> Dict[str, Any]:
85 """
86 Analyze impact of Alcohol and Activity on Next-Day Recovery (Sleep, HRV, RHR).
87 """
88 df = self.get_dataframe(days)
90 if df.empty:
91 return {"error": "No data available"}
93 # Shift recovery metrics to represent "Next Day"
94 # We want to see if Alcohol on Day T affects Sleep on Day T (technically sleep starts on Day T night)
95 # Actually usually Alcohol on Day T affects Sleep Score of the night of Day T (which serves Day T+1 morning)
96 # Garmin assigns Sleep Date to the morning it ends.
97 # So Alcohol on Jan 1 affects Sleep of Jan 2.
99 # Let's align:
100 # Feature: Alcohol(T)
101 # Target: Sleep(T+1), RHR(T+1)
103 df['next_sleep'] = df['sleep'].shift(-1)
104 df['next_rhr'] = df['rhr'].shift(-1)
105 df['next_hrv'] = df['hrv'].shift(-1)
107 results = {}
109 # 1. Alcohol Impact
110 alcohol_days = df[df['has_alcohol'] == 1]
111 sober_days = df[df['has_alcohol'] == 0]
113 if len(alcohol_days) > 0 and len(sober_days) > 0:
114 results['alcohol_impact'] = {
115 "sample_size_alcohol": len(alcohol_days),
116 "sample_size_sober": len(sober_days),
117 "avg_sleep_alcohol": float(alcohol_days['next_sleep'].mean()),
118 "avg_sleep_sober": float(sober_days['next_sleep'].mean()),
119 "avg_rhr_alcohol": float(alcohol_days['next_rhr'].mean()),
120 "avg_rhr_sober": float(sober_days['next_rhr'].mean()),
121 "sleep_diff": float(alcohol_days['next_sleep'].mean() - sober_days['next_sleep'].mean())
122 }
124 return results
126 def analyze_fitness_trends(self, days: int = 90) -> Dict[str, Any]:
127 """
128 Analyze RHR vs Activity trends.
129 """
130 df = self.get_dataframe(days)
131 if df.empty: return {"error": "No data"}
133 # Calculate rolling averages (7-day)
134 df['rhr_roll'] = df['rhr'].rolling(7).mean()
135 df['steps_roll'] = df['steps'].rolling(7).mean()
137 # Simple Linear Trend
138 # Drop NaNs
139 valid = df.dropna(subset=['rhr'])
140 if len(valid) > 10:
141 # Simple slope (not robust but indicative)
142 # rhr change over period
143 start_rhr = valid.iloc[0]['rhr_roll'] if not np.isnan(valid.iloc[0]['rhr_roll']) else valid.iloc[0]['rhr']
144 end_rhr = valid.iloc[-1]['rhr_roll'] if not np.isnan(valid.iloc[-1]['rhr_roll']) else valid.iloc[-1]['rhr']
146 return {
147 "period_days": days,
148 "start_rhr_7d_avg": float(start_rhr),
149 "end_rhr_7d_avg": float(end_rhr),
150 "trend": "improving" if end_rhr < start_rhr else "declining"
151 }
153 return {"msg": "Not enough data for trend analysis"}
155 def analyze_lifestyle_impact(self, days: int = 30) -> Dict[str, Any]:
156 """
157 Analyze Fasting and Supplements.
158 """
159 df = self.get_dataframe(days)
160 if df.empty: return {"error": "No data"}
162 results = {}
164 # Fasting Impact
165 # Compare "Normal" vs other modes
166 fasting_stats = df.groupby('fasting_mode')['sleep'].mean().to_dict()
167 results['fasting_sleep_scores'] = fasting_stats
169 return results
171 def compare_groups(self, condition_col: str, target_col: str, days: int = 90) -> Dict[str, Any]:
172 """
173 Compare average of 'target_col' when 'condition_col' is non-zero vs zero.
174 Example: Sleep Score when has_magnesium=1 vs 0.
175 """
176 df = self.get_dataframe(days)
177 if df.empty: return {"error": "No data"}
179 if condition_col not in df.columns or target_col not in df.columns:
180 return {"error": f"Columns not found: {condition_col} or {target_col}"}
182 group_true = df[df[condition_col] > 0][target_col]
183 group_false = df[df[condition_col] == 0][target_col]
185 if len(group_true) == 0 or len(group_false) == 0:
186 return {"error": "One group has no data (e.g. never took supplement)"}
188 avg_true = group_true.mean()
189 avg_false = group_false.mean()
190 diff_pct = ((avg_true - avg_false) / avg_false) * 100 if avg_false != 0 else 0
192 return {
193 "condition": condition_col,
194 "target": target_col,
195 "avg_with_condition": float(avg_true),
196 "avg_without_condition": float(avg_false),
197 "sample_with": len(group_true),
198 "sample_without": len(group_false),
199 "difference_pct": float(diff_pct),
200 "verdict": "better" if avg_true > avg_false else "worse"
201 }
202 def analyze_lagged_correlation(self, driver: str, target: str, lag: int = 1, days: int = 90) -> Dict[str, Any]:
203 """
204 Analyze if 'driver' on Day T correlates with 'target' on Day T+lag.
205 Example: Alcohol (T) -> Sleep Score (T+1).
207 Args:
208 driver: Column name of the driver (e.g., 'alcohol_units', 'stress').
209 target: Column name of the target (e.g., 'sleep', 'hrv', 'rhr').
210 lag: Days to shift target (1 means next day).
211 days: Lookback window.
212 """
213 df = self.get_dataframe(days)
214 if df.empty: return {"error": "No data"}
216 # Verify columns exist
217 if driver not in df.columns or target not in df.columns:
218 return {"error": f"Columns not found: {driver} or {target}"}
220 # Shift target
221 # lag=1 means we align Driver(T) with Target(T+1)
222 target_col = f"target_lag_{lag}"
223 df[target_col] = df[target].shift(-lag)
225 # Filter valid rows
226 valid = df.dropna(subset=[driver, target_col])
227 if len(valid) < 5:
228 return {"error": "Not enough data points (<5)"}
230 # Calculate Correlation
231 corr = valid[driver].corr(valid[target_col])
233 return {
234 "driver": driver,
235 "target": target,
236 "lag_days": lag,
237 "correlation": float(corr) if not pd.isna(corr) else 0.0,
238 "sample_size": len(valid),
239 "msg": f" Correlation {corr:.2f} (1.0 is perfect positive, -1.0 is perfect negative)"
240 }