Coverage for health / analytics / engine.py: 0%

103 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-02 17:44 +0800

1import pandas as pd 

2import numpy as np 

3from datetime import date, timedelta 

4from typing import Dict, Any, List, Optional 

5from health.services.query import HealthDataQuery 

6from health.services.manual_log_storage import ManualLogStorage 

7from health.utils.logging_config import setup_logger 

8 

9logger = setup_logger(__name__) 

10 

11class HealthAnalyst: 

12 """ 

13 Advanced analytics engine for health data using Pandas. 

14 Calculates correlations, trends, and lifestyle impacts. 

15 """ 

16 

17 def __init__(self): 

18 self.query = HealthDataQuery() 

19 self.manual_storage = ManualLogStorage() 

20 

21 def get_dataframe(self, days: int = 30) -> pd.DataFrame: 

22 """ 

23 Fetch all relevant data for the last N days and merge into a single DataFrame. 

24 """ 

25 end_date = date.today() 

26 start_date = end_date - timedelta(days=days) 

27 

28 # 1. Fetch Daily Metrics (Steps, Sleep, HR, Stess, Body Battery) 

29 # We need to fetch each metric range and merge by date 

30 metrics_to_fetch = ["sleep", "heart_rate", "stress", "body_battery", "steps", "hrv", "rhr"] 

31 

32 data_map = {} # date -> dict of metrics 

33 

34 # Initialize dates 

35 curr = start_date 

36 while curr <= end_date: 

37 data_map[curr.isoformat()] = {"date": curr.isoformat()} 

38 curr += timedelta(days=1) 

39 

40 # Batch fetch metrics (this might be slow if we do it one by one,  

41 # but HealthDataQuery is built for this. Ideally we optimize query later) 

42 for metric in metrics_to_fetch: 

43 points = self.query.get_metric_range(metric, start_date, end_date) 

44 for p in points: 

45 # Key extraction logic similar to Reader 

46 # Simplified extraction for DataFrame 

47 val = np.nan 

48 if hasattr(p, 'value'): val = p.value 

49 elif isinstance(p, dict): 

50 # Try common keys 

51 if 'average_heart_rate' in p: val = p['average_heart_rate'] 

52 elif 'overall_sleep_score' in p: val = p['overall_sleep_score'] 

53 elif 'resting_heart_rate' in p: val = p['resting_heart_rate'] 

54 elif 'average_stress_level' in p: val = p['average_stress_level'] 

55 elif 'charged' in p: val = p.get('charged', 0) # Body battery charged?  

56 # Actually for body battery we usually want max or charged 

57 

58 # Store in map 

59 d_str = p.get('calendar_date') or p.get('date') 

60 if d_str and d_str in data_map: 

61 data_map[d_str][metric] = val 

62 

63 # 2. Fetch Manual Logs (Alcohol, Fasting, Supplements) 

64 logs = self.manual_storage.get_logs_in_range(start_date, end_date) 

65 for log in logs: 

66 d_str = log.log_date 

67 if d_str in data_map: 

68 # Alcohol (Boolean & Amount) 

69 data_map[d_str]["alcohol_units"] = sum([1 for _ in log.alcohol_entries]) 

70 data_map[d_str]["has_alcohol"] = 1 if log.alcohol_entries else 0 

71 

72 # Fasting (Categorical) 

73 data_map[d_str]["fasting_mode"] = log.fasting_mode or "Normal" 

74 

75 # Supplements (One-hotish checks) 

76 data_map[d_str]["has_magnesium"] = 1 if any("magnesium" in s.supplement_name.lower() for s in log.supplement_entries) else 0 

77 

78 # Convert to DataFrame 

79 df = pd.DataFrame(list(data_map.values())) 

80 df['date'] = pd.to_datetime(df['date']) 

81 df = df.sort_values('date').set_index('date') 

82 return df 

83 

84 def analyze_recovery_correlations(self, days: int = 90) -> Dict[str, Any]: 

85 """ 

86 Analyze impact of Alcohol and Activity on Next-Day Recovery (Sleep, HRV, RHR). 

87 """ 

88 df = self.get_dataframe(days) 

89 

90 if df.empty: 

91 return {"error": "No data available"} 

92 

93 # Shift recovery metrics to represent "Next Day" 

94 # We want to see if Alcohol on Day T affects Sleep on Day T (technically sleep starts on Day T night) 

95 # Actually usually Alcohol on Day T affects Sleep Score of the night of Day T (which serves Day T+1 morning) 

96 # Garmin assigns Sleep Date to the morning it ends.  

97 # So Alcohol on Jan 1 affects Sleep of Jan 2. 

98 

99 # Let's align:  

100 # Feature: Alcohol(T) 

101 # Target: Sleep(T+1), RHR(T+1) 

102 

103 df['next_sleep'] = df['sleep'].shift(-1) 

104 df['next_rhr'] = df['rhr'].shift(-1) 

105 df['next_hrv'] = df['hrv'].shift(-1) 

106 

107 results = {} 

108 

109 # 1. Alcohol Impact 

110 alcohol_days = df[df['has_alcohol'] == 1] 

111 sober_days = df[df['has_alcohol'] == 0] 

112 

113 if len(alcohol_days) > 0 and len(sober_days) > 0: 

114 results['alcohol_impact'] = { 

115 "sample_size_alcohol": len(alcohol_days), 

116 "sample_size_sober": len(sober_days), 

117 "avg_sleep_alcohol": float(alcohol_days['next_sleep'].mean()), 

118 "avg_sleep_sober": float(sober_days['next_sleep'].mean()), 

119 "avg_rhr_alcohol": float(alcohol_days['next_rhr'].mean()), 

120 "avg_rhr_sober": float(sober_days['next_rhr'].mean()), 

121 "sleep_diff": float(alcohol_days['next_sleep'].mean() - sober_days['next_sleep'].mean()) 

122 } 

123 

124 return results 

125 

126 def analyze_fitness_trends(self, days: int = 90) -> Dict[str, Any]: 

127 """ 

128 Analyze RHR vs Activity trends. 

129 """ 

130 df = self.get_dataframe(days) 

131 if df.empty: return {"error": "No data"} 

132 

133 # Calculate rolling averages (7-day) 

134 df['rhr_roll'] = df['rhr'].rolling(7).mean() 

135 df['steps_roll'] = df['steps'].rolling(7).mean() 

136 

137 # Simple Linear Trend 

138 # Drop NaNs 

139 valid = df.dropna(subset=['rhr']) 

140 if len(valid) > 10: 

141 # Simple slope (not robust but indicative) 

142 # rhr change over period 

143 start_rhr = valid.iloc[0]['rhr_roll'] if not np.isnan(valid.iloc[0]['rhr_roll']) else valid.iloc[0]['rhr'] 

144 end_rhr = valid.iloc[-1]['rhr_roll'] if not np.isnan(valid.iloc[-1]['rhr_roll']) else valid.iloc[-1]['rhr'] 

145 

146 return { 

147 "period_days": days, 

148 "start_rhr_7d_avg": float(start_rhr), 

149 "end_rhr_7d_avg": float(end_rhr), 

150 "trend": "improving" if end_rhr < start_rhr else "declining" 

151 } 

152 

153 return {"msg": "Not enough data for trend analysis"} 

154 

155 def analyze_lifestyle_impact(self, days: int = 30) -> Dict[str, Any]: 

156 """ 

157 Analyze Fasting and Supplements. 

158 """ 

159 df = self.get_dataframe(days) 

160 if df.empty: return {"error": "No data"} 

161 

162 results = {} 

163 

164 # Fasting Impact 

165 # Compare "Normal" vs other modes 

166 fasting_stats = df.groupby('fasting_mode')['sleep'].mean().to_dict() 

167 results['fasting_sleep_scores'] = fasting_stats 

168 

169 return results 

170 

171 def compare_groups(self, condition_col: str, target_col: str, days: int = 90) -> Dict[str, Any]: 

172 """ 

173 Compare average of 'target_col' when 'condition_col' is non-zero vs zero. 

174 Example: Sleep Score when has_magnesium=1 vs 0. 

175 """ 

176 df = self.get_dataframe(days) 

177 if df.empty: return {"error": "No data"} 

178 

179 if condition_col not in df.columns or target_col not in df.columns: 

180 return {"error": f"Columns not found: {condition_col} or {target_col}"} 

181 

182 group_true = df[df[condition_col] > 0][target_col] 

183 group_false = df[df[condition_col] == 0][target_col] 

184 

185 if len(group_true) == 0 or len(group_false) == 0: 

186 return {"error": "One group has no data (e.g. never took supplement)"} 

187 

188 avg_true = group_true.mean() 

189 avg_false = group_false.mean() 

190 diff_pct = ((avg_true - avg_false) / avg_false) * 100 if avg_false != 0 else 0 

191 

192 return { 

193 "condition": condition_col, 

194 "target": target_col, 

195 "avg_with_condition": float(avg_true), 

196 "avg_without_condition": float(avg_false), 

197 "sample_with": len(group_true), 

198 "sample_without": len(group_false), 

199 "difference_pct": float(diff_pct), 

200 "verdict": "better" if avg_true > avg_false else "worse" 

201 } 

202 def analyze_lagged_correlation(self, driver: str, target: str, lag: int = 1, days: int = 90) -> Dict[str, Any]: 

203 """ 

204 Analyze if 'driver' on Day T correlates with 'target' on Day T+lag. 

205 Example: Alcohol (T) -> Sleep Score (T+1). 

206  

207 Args: 

208 driver: Column name of the driver (e.g., 'alcohol_units', 'stress'). 

209 target: Column name of the target (e.g., 'sleep', 'hrv', 'rhr'). 

210 lag: Days to shift target (1 means next day). 

211 days: Lookback window. 

212 """ 

213 df = self.get_dataframe(days) 

214 if df.empty: return {"error": "No data"} 

215 

216 # Verify columns exist 

217 if driver not in df.columns or target not in df.columns: 

218 return {"error": f"Columns not found: {driver} or {target}"} 

219 

220 # Shift target 

221 # lag=1 means we align Driver(T) with Target(T+1) 

222 target_col = f"target_lag_{lag}" 

223 df[target_col] = df[target].shift(-lag) 

224 

225 # Filter valid rows 

226 valid = df.dropna(subset=[driver, target_col]) 

227 if len(valid) < 5: 

228 return {"error": "Not enough data points (<5)"} 

229 

230 # Calculate Correlation 

231 corr = valid[driver].corr(valid[target_col]) 

232 

233 return { 

234 "driver": driver, 

235 "target": target, 

236 "lag_days": lag, 

237 "correlation": float(corr) if not pd.isna(corr) else 0.0, 

238 "sample_size": len(valid), 

239 "msg": f" Correlation {corr:.2f} (1.0 is perfect positive, -1.0 is perfect negative)" 

240 }