Coverage for slack_bot / dispatcher.py: 0%

314 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-02 17:44 +0800

1from typing import Optional, List, Dict, Any 

2from slack_sdk import WebClient 

3import os 

4 

5from health.utils.logging_config import setup_logger 

6from slack_bot.llm.gemini import GeminiLLM 

7from slack_bot.context.storage import ContextStorage 

8from slack_bot.tools.registry import TOOLS_SCHEMA, TOOL_FUNCTIONS 

9from slack_bot.tools.groups import get_tool_preset 

10 

11logger = setup_logger(__name__) 

12 

13 

14class MessageDispatcher: 

15 """Routes incoming Slack messages to Gemini and executes tools.""" 

16 

17 def __init__(self, bot_token: Optional[str] = None, system_instruction: Optional[str] = None, tools: Optional[List[Dict]] = None, tool_mode: str = "light"): 

18 """ 

19 Initialize dispatcher. 

20 Args: 

21 bot_token: Optional specific Slack token 

22 system_instruction: Custom system prompt (e.g. for Shell Bot) 

23 tools: specific tools list to use (if None, uses preset based on tool_mode) 

24 tool_mode: 'none' (no tools), 'light' (5 tools), 'standard' (10 tools), 'full' (15 tools), 'all' (21 tools) 

25 """ 

26 self.llm = GeminiLLM(system_instruction=system_instruction) 

27 

28 # Use tool preset if tools not explicitly provided 

29 if tools is not None: 

30 self.tools = tools 

31 self.tool_functions = TOOL_FUNCTIONS 

32 elif tool_mode == "none": 

33 self.tools = None 

34 self.tool_functions = {} 

35 logger.info("Running in NO-TOOLS mode (pure text)") 

36 else: 

37 self.tools, self.tool_functions = get_tool_preset(tool_mode) 

38 logger.info(f"Using tool preset: {tool_mode} ({len(self.tools)} tools)") 

39 

40 # Initialize Slack WebClient for posting replies 

41 self.bot_token = bot_token or os.environ.get("SLACK_BOT_TOKEN") 

42 self.client = WebClient(token=self.bot_token) 

43 

44 logger.info("Dispatcher initialized with Gemini") 

45 

46 def dispatch(self, message_text: str, channel_id: str, user_id: str, response_ts: Optional[str] = None, request_id: str = "N/A", files: Optional[List[Dict]] = None) -> None: 

47 """ 

48 Process a message and trigger responses. 

49 """ 

50 import time 

51 t0 = time.time() 

52 

53 prefix = f"[{request_id}]" 

54 logger.info(f"{prefix} Processing message in {channel_id} from {user_id}") 

55 if files: 

56 logger.info(f"{prefix} Message contains {len(files)} files") 

57 

58 # 1. Initialize Context 

59 storage = ContextStorage(channel_id) 

60 

61 # Handle "clear" command 

62 if message_text.strip().lower() in ["clear", "reset", "清除"]: 

63 storage.clear() 

64 self.client.chat_postMessage(channel=channel_id, text="🧹 Context cleared. (对话历史已清除)") 

65 return 

66 

67 storage.add_message("user", message_text) 

68 

69 # Download images if any 

70 image_data_list = [] 

71 download_failed = False 

72 if files: 

73 image_data_list = self._download_files(files, prefix) 

74 # Anti-Hallucination: Check if download failed 

75 if len(image_data_list) == 0: 

76 failure_msg = "\n[SYSTEM WARNING: User uploaded an image but it FAILED to download/process. You CANNOT see the image. DO NOT GUESS. Inform the user that the image access failed.]" 

77 logger.warning(f"{prefix} Image download failed! Appending warning to LLM.") 

78 message_text += failure_msg 

79 download_failed = True 

80 

81 try: 

82 # 2. Get conversation context 

83 context = storage.get_context() 

84 logger.debug(f"{prefix} Context size: {len(context)}") 

85 

86 # 3. Process with Gemini (with tools) 

87 # CRITICAL: If image download failed, disable tools to prevent "400 INVALID_ARGUMENT" 

88 # (which happens when mixing text-only warning prompts with complex tool schemas in some proxies) 

89 # 

90 # NOTE: gemini-3-flash has a bug where it returns empty response with tools, 

91 # but we use SAFETY OVERRIDE below to manually trigger tools based on keywords 

92 current_tools = None if download_failed else self.tools 

93 

94 logger.info(f"{prefix} Calling Gemini for initial response...") 

95 logger.info(f"{prefix} Request config: model={self.llm.get_model_name()}, tools={len(current_tools) if current_tools else 0}, context={len(context)-1} msgs") 

96 t_llm_start = time.time() 

97 response_text, tool_calls = self.llm.generate_response( 

98 message=message_text, 

99 context=context[:-1], 

100 tools=current_tools, 

101 images=image_data_list 

102 ) 

103 # Defensive check for None/empty response 

104 if response_text is None: 

105 logger.error(f"{prefix} Gemini returned None for response_text!") 

106 response_text = "" 

107 elif not response_text.strip(): 

108 logger.warning(f"{prefix} Gemini returned empty response_text! (This usually means the proxy has issues)") 

109 

110 # Initialize tool_calls 

111 if tool_calls is None: 

112 tool_calls = [] 

113 

114 # Log what we got back 

115 logger.info(f"{prefix} Gemini response: text={len(response_text)} chars, tool_calls={len(tool_calls) if tool_calls else 0}") 

116 

117 # --- SAFETY OVERRIDE START --- 

118 # If the LLM failed to trigger tools (common with local proxies + tools bug), 

119 # manually detect intent and force tool calls based on keywords. 

120 # This is a FALLBACK mechanism when Gemini returns empty response or MALFORMED_FUNCTION_CALL. 

121 if not tool_calls: 

122 logger.info(f"{prefix} No tool calls from Gemini - checking SAFETY OVERRIDE keywords...") 

123 lower_msg = message_text.lower() 

124 

125 # 1. Sync Garmin data 

126 if any(k in lower_msg for k in ["sync", "update", "fetch", "同步", "拉取", "更新"]) and \ 

127 any(k in lower_msg for k in ["garmin", "佳明", "data", "数据"]): 

128 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: Forcing sync_garmin tool") 

129 from datetime import datetime, timedelta 

130 from health.utils.time_utils import get_cst_today 

131 # Try to infer sync target date from user message keywords 

132 if any(k in lower_msg for k in ["昨天", "昨晚", "昨日", "yesterday", "last night"]): 

133 _sync_date = (get_cst_today() - timedelta(days=1)).isoformat() 

134 elif any(k in lower_msg for k in ["前天", "day before yesterday"]): 

135 _sync_date = (get_cst_today() - timedelta(days=2)).isoformat() 

136 else: 

137 _sync_date = get_cst_today().isoformat() # today in CST 

138 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: sync target_date={_sync_date}") 

139 tool_calls.append({ 

140 "name": "sync_garmin", 

141 "args": {"target_date": _sync_date} 

142 }) 

143 

144 # 2. Query health data (sleep, steps, activities, etc.) 

145 elif any(k in lower_msg for k in ["睡眠", "sleep", "步数", "steps", "心率", "heart", "hrv", "运动", "activity", "workout", "锻炼", "昨天", "yesterday", "今天", "today", "上午", "下午", "morning", "afternoon", "查询", "query", "变化", "趋势", "trend"]): 

146 from datetime import timedelta 

147 from health.utils.time_utils import get_cst_now, get_cst_today 

148 

149 # CRITICAL: First check for single-day keywords to avoid false positives 

150 has_single_day_keyword = any(k in lower_msg for k in [ 

151 "昨天", "昨晚", "昨日", "last night", "yesterday", 

152 "今天", "今晚", "今日", "today", "tonight", 

153 "前天", "前晚", "day before yesterday", 

154 "大前天", "three days ago" 

155 ]) 

156 

157 # Check if asking for time-range trends (past X days/weeks/months) 

158 is_trend_query = (not has_single_day_keyword) and any(k in lower_msg for k in [ 

159 "过去", "最近", "近", "past", "recent", 

160 "变化", "趋势", "trend", "change", "历史", "history", 

161 "个月", "month", "周", "week", "年", "year", 

162 ]) 

163 

164 if is_trend_query: 

165 # Parse time range from query 

166 import re 

167 today_cst = get_cst_today() 

168 end_date = today_cst.strftime("%Y-%m-%d") 

169 days_ago = 30 # default 

170 

171 if re.search(r'(\d+)\s*个?月|(\d+)\s*month', lower_msg): 

172 months = int(re.search(r'(\d+)\s*个?月|(\d+)\s*month', lower_msg).group(1) or re.search(r'(\d+)\s*个?月|(\d+)\s*month', lower_msg).group(2)) 

173 days_ago = months * 30 

174 elif re.search(r'(\d+)\s*周|(\d+)\s*week', lower_msg): 

175 weeks = int(re.search(r'(\d+)\s*周|(\d+)\s*week', lower_msg).group(1) or re.search(r'(\d+)\s*周|(\d+)\s*week', lower_msg).group(2)) 

176 days_ago = weeks * 7 

177 elif re.search(r'(\d+)\s*天|(\d+)\s*day', lower_msg): 

178 days_ago = int(re.search(r'(\d+)\s*天|(\d+)\s*day', lower_msg).group(1) or re.search(r'(\d+)\s*天|(\d+)\s*day', lower_msg).group(2)) 

179 elif re.search(r'(\d+)\s*年|(\d+)\s*year', lower_msg): 

180 years = int(re.search(r'(\d+)\s*年|(\d+)\s*year', lower_msg).group(1) or re.search(r'(\d+)\s*年|(\d+)\s*year', lower_msg).group(2)) 

181 days_ago = years * 365 

182 elif "半年" in lower_msg or "six month" in lower_msg: 

183 days_ago = 180 

184 elif "三个月" in lower_msg or "three month" in lower_msg: 

185 days_ago = 90 

186 

187 from datetime import timedelta 

188 start_date = (today_cst - timedelta(days=days_ago)).strftime("%Y-%m-%d") 

189 

190 # Determine metric type 

191 metric_type = "rhr" # default 

192 if any(k in lower_msg for k in ["hrv", "心率变异"]): 

193 metric_type = "hrv" 

194 elif any(k in lower_msg for k in ["睡眠", "sleep"]): 

195 metric_type = "sleep" 

196 elif any(k in lower_msg for k in ["步数", "steps"]): 

197 metric_type = "steps" 

198 elif any(k in lower_msg for k in ["压力", "stress"]): 

199 metric_type = "stress" 

200 

201 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: Forcing get_metric_history for {metric_type} ({start_date} to {end_date})") 

202 tool_calls.append({ 

203 "name": "get_metric_history", 

204 "args": { 

205 "metric_type": metric_type, 

206 "start_date": start_date, 

207 "end_date": end_date 

208 } 

209 }) 

210 else: 

211 # Single-day query — all dates computed in CST (UTC+8) 

212 today_cst = get_cst_today() 

213 if any(k in lower_msg for k in ["昨天", "昨晚", "昨日", "last night", "yesterday"]): 

214 target = (today_cst - timedelta(days=1)).strftime("%Y-%m-%d") 

215 elif any(k in lower_msg for k in ["前天", "前晚", "day before yesterday"]): 

216 target = (today_cst - timedelta(days=2)).strftime("%Y-%m-%d") 

217 else: 

218 target = today_cst.strftime("%Y-%m-%d") 

219 

220 # Check if specifically asking for activity history 

221 if any(k in lower_msg for k in ["运动", "activity", "workout", "锻炼"]): 

222 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: Forcing get_activity_history for {target}") 

223 tool_calls.append({ 

224 "name": "get_activity_history", 

225 "args": {"start_date": target, "end_date": target} 

226 }) 

227 else: 

228 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: Forcing get_daily_detailed_stats for {target}") 

229 tool_calls.append({ 

230 "name": "get_daily_detailed_stats", 

231 "args": {"target_date": target} 

232 }) 

233 

234 # 3. Web search 

235 elif any(k in lower_msg for k in ["搜索", "search", "查一下", "look up", "最新", "latest"]): 

236 query = message_text 

237 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: Forcing search_web") 

238 tool_calls.append({ 

239 "name": "search_web", 

240 "args": {"query": query} 

241 }) 

242 

243 # 4. Detect diet logging intent (for logging, not forcing tool call yet) 

244 food_items = ["鸡", "肉", "菜", "蛋", "饭", "面", "粉", "汤", "鱼", "虾", "蟹", "牛", "猪", "羊", "豆", "奶"] 

245 meal_indicators = ["晚餐", "午餐", "早餐", "夜宵", "零食", "breakfast", "lunch", "dinner", "snack"] 

246 has_ate_pattern = any(k in lower_msg for k in ["吃了", "ate", "吃的", "eating"]) 

247 has_meal_indicator = any(k in lower_msg for k in meal_indicators) 

248 has_food_item = any(k in lower_msg for k in food_items) 

249 

250 if (has_ate_pattern and has_food_item) or has_meal_indicator: 

251 logger.warning(f"{prefix} Detected diet logging intent: ate={has_ate_pattern}, meal={has_meal_indicator}, food={has_food_item}") 

252 # Don't force tool - let user confirm first or LLM will handle in second round 

253 else: 

254 logger.info(f"{prefix} No SAFETY OVERRIDE match found - will return empty response or text") 

255 else: 

256 logger.info(f"{prefix} Tool calls successfully generated by Gemini (no SAFETY OVERRIDE needed)") 

257 # --- SAFETY OVERRIDE END --- 

258 

259 tool_count = len(tool_calls) if tool_calls else 0 

260 logger.info(f"{prefix} Gemini response in {time.time()-t_llm_start:.2f}s: {len(response_text)} chars, {tool_count} tools") 

261 

262 # 4. Execute tools if requested 

263 tool_results = [] 

264 if tool_calls: 

265 logger.info(f"{prefix} Executing {len(tool_calls)} tools") 

266 

267 for tool_call in tool_calls: 

268 tool_name = tool_call["name"] 

269 tool_args = tool_call["args"] 

270 logger.info(f"{prefix} Tool: {tool_name} Args: {tool_args}") 

271 

272 t_tool_start = time.time() 

273 if tool_name in self.tool_functions: 

274 try: 

275 # Inject channel_id for shell tool 

276 if tool_name == "execute_shell": 

277 tool_args["channel_id"] = channel_id 

278 

279 result = self.tool_functions[tool_name](**tool_args) 

280 

281 # Safety Truncation: Prevent massive JSONs from blowing up context 

282 str_result = str(result) 

283 if len(str_result) > 8000: 

284 str_result = str_result[:8000] + "... (truncated)" 

285 

286 tool_results.append({ 

287 "tool": tool_name, 

288 "args": tool_args, 

289 "result": str_result 

290 }) 

291 logger.info(f"{prefix} ✓ {tool_name} finished in {time.time()-t_tool_start:.2f}s") 

292 except Exception as e: 

293 error_msg = f"Error executing {tool_name}: {str(e)}" 

294 logger.error(f"{prefix} {error_msg}") 

295 tool_results.append({ 

296 "tool": tool_name, 

297 "args": tool_args, 

298 "result": f"❌ {error_msg}" 

299 }) 

300 else: 

301 logger.warning(f"{prefix} Unknown tool: {tool_name}") 

302 

303 # 4b. Second round: Get final response with tool results 

304 logger.info(f"{prefix} Requesting final analysis...") 

305 

306 tool_names = ", ".join([tr['tool'] for tr in tool_results]) 

307 

308 # Format specific tool results for context 

309 tool_results_text = "\n".join([ 

310 f"Tool '{tr['tool']}' (Args: {tr['args']}) returned:\n{tr['result']}" 

311 for tr in tool_results 

312 ]) 

313 

314 # Inject thoughts into memory 

315 storage.add_message("assistant", f"I checked: {tool_names}", model="gemini") 

316 

317 # Stronger Prompt for Analysis 

318 analysis_prompt = ( 

319 f"Here are the execution results from the tools:\n{tool_results_text}\n\n" 

320 f"CRITICAL INSTRUCTION: You represent the 'Butler' bot. You must now answer the user's original question based ONLY on the above results.\n" 

321 f"1. Summarize the key data points found (e.g., steps, sleep score).\n" 

322 f"2. Provide direct, natural language insights.\n" 

323 f"3. If the tool indicated an error, explain it to the user.\n\n" 

324 f"Do NOT output raw JSON. Write a helpful paragraph." 

325 ) 

326 

327 # Do NOT add this massive data blob to persistent storage. 

328 # Just use it for this generation context. 

329 final_context = storage.get_context() 

330 

331 t_llm_2_start = time.time() 

332 response_text, _ = self.llm.generate_response( 

333 message=analysis_prompt, 

334 context=final_context, 

335 tools=None 

336 ) 

337 logger.info(f"{prefix} Final analysis received in {time.time()-t_llm_2_start:.2f}s") 

338 

339 # Fallback 

340 if not response_text or not response_text.strip(): 

341 logger.warning(f"{prefix} Empty analysis received!") 

342 if files: 

343 response_text = "⚠️ 图片已接收,但 AI 模型未能返回描述。这可能是由于网络波动或模型暂时无法识别该图像,请稍后重试。" 

344 else: 

345 response_text = "✅ 数据已同步,但我似乎暂时无法生成文字分析。请参考上方的工具执行结果。" 

346 

347 # 5. Format and send response 

348 # Relaxing internal limit to allow for multi-message splitting (max 3 messages of ~4000 chars) 

349 APP_LIMIT = 12000 

350 full_response = self._format_response(response_text, tool_results, max_length=APP_LIMIT) 

351 

352 # Split into chunks 

353 chunks = self._split_text(full_response, max_chunk_size=3800) 

354 

355 # Limit to 3 chunks as requested 

356 if len(chunks) > 3: 

357 chunks = chunks[:3] 

358 chunks[-1] += "\n...(remaining content truncated)" 

359 

360 # Post to Slack 

361 if response_ts: 

362 # Update the first message (Thinking...) 

363 try: 

364 self.client.chat_update(channel=channel_id, ts=response_ts, text=chunks[0]) 

365 logger.info(f"{prefix} 📤 Message updated {response_ts}") 

366 except Exception as e: 

367 logger.warning(f"{prefix} Update failed, posting new: {e}") 

368 self.client.chat_postMessage(channel=channel_id, text=chunks[0]) 

369 

370 # Post subsequent chunks as new messages 

371 for chunk in chunks[1:]: 

372 # Small delay to ensure order 

373 time.sleep(0.5) 

374 self.client.chat_postMessage(channel=channel_id, text=chunk) 

375 logger.info(f"{prefix} 📤 Continuation message posted") 

376 

377 else: 

378 # Post all as new messages 

379 for i, chunk in enumerate(chunks): 

380 if i > 0: time.sleep(0.5) 

381 self.client.chat_postMessage(channel=channel_id, text=chunk) 

382 logger.info(f"{prefix} 📤 {len(chunks)} message(s) posted") 

383 

384 

385 # 6. Save to Context 

386 storage.add_message("assistant", full_response, model="gemini") 

387 logger.info(f"{prefix} Dispatch completed in {time.time()-t0:.2f}s") 

388 

389 except Exception as e: 

390 logger.error(f"{prefix} Dispatch failed: {e}", exc_info=True) 

391 try: 

392 self.client.chat_postMessage( 

393 channel=channel_id, 

394 text=f"⚠️ Internal Error: {str(e)}" 

395 ) 

396 logger.info("Error message posted to Slack") 

397 except Exception as slack_error: 

398 logger.error(f"Failed to post error message to Slack: {slack_error}", exc_info=True) 

399 

400 def _split_text(self, text: str, max_chunk_size: int = 3800) -> List[str]: 

401 """Split text into chunks avoiding breaking code blocks if possible.""" 

402 if len(text) <= max_chunk_size: 

403 return [text] 

404 

405 chunks = [] 

406 current_chunk = "" 

407 lines = text.split('\n') 

408 

409 in_code_block = False 

410 

411 for line in lines: 

412 # Check for code block toggle 

413 if line.strip().startswith('```'): 

414 in_code_block = not in_code_block 

415 

416 # If adding this line exceeds chunk size 

417 if len(current_chunk) + len(line) + 1 > max_chunk_size: 

418 # Close code block if open 

419 if in_code_block: 

420 current_chunk += "\n```" 

421 

422 chunks.append(current_chunk) 

423 

424 # Start new chunk 

425 current_chunk = "" 

426 # Re-open code block if it was open 

427 if in_code_block: 

428 current_chunk += "```\n" + "(...continued)\n" 

429 

430 if current_chunk: 

431 current_chunk += "\n" 

432 current_chunk += line 

433 

434 if current_chunk: 

435 chunks.append(current_chunk) 

436 

437 return chunks 

438 

439 def _format_response(self, text: str, tool_results: list, max_length: int = 39900) -> str: 

440 """ 

441 Format the response with tool execution results. 

442 

443 Args: 

444 text: The main response text 

445 tool_results: List of tool execution results 

446 max_length: Maximum allowed message length (default for new messages) 

447 

448 Returns: 

449 Formatted response string, guaranteed to be under max_length 

450 """ 

451 parts = [] 

452 

453 # Add tool execution results first (with smart truncation) 

454 if tool_results: 

455 parts.append("🛠️ *Tool Executions:*") 

456 for tr in tool_results: 

457 # Format args nicely 

458 args_str = ", ".join(f"{k}={v}" for k, v in tr['args'].items()) 

459 

460 # Truncate individual tool result if needed (more aggressive for multiple tools) 

461 result_str = str(tr['result']) 

462 # Allow larger tool outputs since we have multi-message support 

463 max_per_tool = 5000 

464 if len(result_str) > max_per_tool: 

465 result_str = result_str[:max_per_tool] + "... (truncated)" 

466 

467 parts.append(f"• `{tr['tool']}({args_str})`: {result_str}") 

468 parts.append("") # Blank line 

469 

470 # Add text response with formatting 

471 if text: 

472 from slack_bot.utils.mrkdwn import SlackFormatter 

473 formatted_text = SlackFormatter.convert(text) 

474 logger.info(f"Formatted text (len {len(text)} -> {len(formatted_text)})") 

475 parts.append(formatted_text) 

476 

477 # Join all parts 

478 if parts: 

479 full_response = "\n".join(parts) 

480 else: 

481 # No tools and no response - this is unusual 

482 logger.warning("No response text and no tool results - returning error message") 

483 

484 # Enhanced debugging info for empty Gemini responses 

485 debug_info = f"\n\n🔍 调试信息:\n- 响应文本长度: {len(text)} 字符\n- 工具调用数: {len(tool_results)}\n- 模型: {self.llm.get_model_name()}" 

486 

487 full_response = ( 

488 "⚠️ 抱歉,我似乎没有理解你的请求。请尝试更具体的描述,或者直接说明你想查询什么数据(例如:今天的步数、昨天的睡眠、上午的运动等)。" 

489 + debug_info 

490 ) 

491 

492 # Final safety check: ensure we're under the limit 

493 if len(full_response) > max_length: 

494 truncation_msg = "\n\n⚠️ (Response truncated due to Slack length limit)" 

495 allowed_length = max_length - len(truncation_msg) 

496 full_response = full_response[:allowed_length] + truncation_msg 

497 logger.warning(f"Final response truncated from {len(full_response)} to {max_length} chars") 

498 

499 return full_response 

500 

501 def _download_files(self, files: List[Dict], prefix: str) -> List[Dict[str, Any]]: 

502 """Download files from Slack.""" 

503 import requests 

504 

505 token = self.bot_token 

506 from PIL import Image 

507 import io 

508 

509 downloaded_files = [] 

510 

511 for file_info in files: 

512 logger.info(f"{prefix} found file: {file_info.get('name')} Type: {file_info.get('mimetype')} URL_P: {bool(file_info.get('url_private'))}") 

513 # Check if it's an image 

514 if 'mimetype' in file_info and file_info['mimetype'].startswith('image/'): 

515 url = file_info.get('url_private_download') or file_info.get('url_private') 

516 if not url: 

517 continue 

518 

519 headers = {'Authorization': f'Bearer {token}'} 

520 # Debug Auth (Masked) 

521 token_masked = f"{token[:5]}...{token[-5:]}" if token else "None" 

522 logger.info(f"{prefix} Attempting download from {url} with token {token_masked}") 

523 

524 # Try using url_private with token in query param 

525 # This often bypasses redirect auth stripping issues 

526 base_url = file_info.get('url_private') 

527 if not base_url: 

528 continue 

529 

530 # Use Session to handle cookies/redirects better 

531 session = requests.Session() 

532 session.headers.update({'Authorization': f'Bearer {token}'}) 

533 

534 # Debug Auth (Masked) 

535 token_masked = f"{token[:5]}...{token[-5:]}" if token else "None" 

536 logger.info(f"{prefix} Attempting download from {url} using Session (Token: {token_masked})") 

537 

538 try: 

539 response = session.get(url, timeout=10) 

540 

541 # Log final URL to check for login page redirect 

542 if response.url != url: 

543 logger.info(f"{prefix} Redirected to: {response.url}") 

544 

545 if response.status_code == 200: 

546 # DEBUG: Check for HTML (Login page) 

547 ct = response.headers.get('Content-Type', 'unknown') 

548 if 'html' in ct.lower(): 

549 logger.error(f"{prefix} Download returned HTML (likely Login Page) instead of image. Auth failed.") 

550 continue 

551 

552 # Process image: Resize and Compress 

553 try: 

554 img = Image.open(io.BytesIO(response.content)) 

555 except Exception as img_err: 

556 logger.error(f"{prefix} PIL Open Failed! Content preamble: {response.content[:200]!r}") 

557 continue # Skip this file 

558 

559 # Convert to RGB if necessary (e.g., PNG with alpha) 

560 if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info): 

561 bg = Image.new('RGB', img.size, (255, 255, 255)) 

562 if img.mode != 'RGBA': 

563 img = img.convert('RGBA') 

564 bg.paste(img, mask=img.split()[3]) 

565 img = bg 

566 elif img.mode != 'RGB': 

567 img = img.convert('RGB') 

568 

569 # Resize if too large (max dimension 1024) 

570 max_dim = 1024 

571 if max(img.size) > max_dim: 

572 img.thumbnail((max_dim, max_dim), Image.Resampling.LANCZOS) 

573 

574 # Compress to JPEG 

575 buf = io.BytesIO() 

576 img.save(buf, format='JPEG', quality=85) 

577 optimized_data = buf.getvalue() 

578 

579 logger.info(f"{prefix} Downloaded and optimized image: {file_info.get('name')} " 

580 f"({len(response.content)/1024:.1f}KB -> {len(optimized_data)/1024:.1f}KB)") 

581 

582 downloaded_files.append({ 

583 "mime_type": "image/jpeg", # Always normalize to JPEG for consistency 

584 "data": optimized_data 

585 }) 

586 else: 

587 logger.error(f"{prefix} Failed to download file: status {response.status_code}") 

588 except Exception as e: 

589 logger.error(f"{prefix} Error downloading file: {e}") 

590 

591 return downloaded_files