Coverage for slack_bot / dispatcher.py: 0%
314 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
1from typing import Optional, List, Dict, Any
2from slack_sdk import WebClient
3import os
5from health.utils.logging_config import setup_logger
6from slack_bot.llm.gemini import GeminiLLM
7from slack_bot.context.storage import ContextStorage
8from slack_bot.tools.registry import TOOLS_SCHEMA, TOOL_FUNCTIONS
9from slack_bot.tools.groups import get_tool_preset
11logger = setup_logger(__name__)
14class MessageDispatcher:
15 """Routes incoming Slack messages to Gemini and executes tools."""
17 def __init__(self, bot_token: Optional[str] = None, system_instruction: Optional[str] = None, tools: Optional[List[Dict]] = None, tool_mode: str = "light"):
18 """
19 Initialize dispatcher.
20 Args:
21 bot_token: Optional specific Slack token
22 system_instruction: Custom system prompt (e.g. for Shell Bot)
23 tools: specific tools list to use (if None, uses preset based on tool_mode)
24 tool_mode: 'none' (no tools), 'light' (5 tools), 'standard' (10 tools), 'full' (15 tools), 'all' (21 tools)
25 """
26 self.llm = GeminiLLM(system_instruction=system_instruction)
28 # Use tool preset if tools not explicitly provided
29 if tools is not None:
30 self.tools = tools
31 self.tool_functions = TOOL_FUNCTIONS
32 elif tool_mode == "none":
33 self.tools = None
34 self.tool_functions = {}
35 logger.info("Running in NO-TOOLS mode (pure text)")
36 else:
37 self.tools, self.tool_functions = get_tool_preset(tool_mode)
38 logger.info(f"Using tool preset: {tool_mode} ({len(self.tools)} tools)")
40 # Initialize Slack WebClient for posting replies
41 self.bot_token = bot_token or os.environ.get("SLACK_BOT_TOKEN")
42 self.client = WebClient(token=self.bot_token)
44 logger.info("Dispatcher initialized with Gemini")
46 def dispatch(self, message_text: str, channel_id: str, user_id: str, response_ts: Optional[str] = None, request_id: str = "N/A", files: Optional[List[Dict]] = None) -> None:
47 """
48 Process a message and trigger responses.
49 """
50 import time
51 t0 = time.time()
53 prefix = f"[{request_id}]"
54 logger.info(f"{prefix} Processing message in {channel_id} from {user_id}")
55 if files:
56 logger.info(f"{prefix} Message contains {len(files)} files")
58 # 1. Initialize Context
59 storage = ContextStorage(channel_id)
61 # Handle "clear" command
62 if message_text.strip().lower() in ["clear", "reset", "清除"]:
63 storage.clear()
64 self.client.chat_postMessage(channel=channel_id, text="🧹 Context cleared. (对话历史已清除)")
65 return
67 storage.add_message("user", message_text)
69 # Download images if any
70 image_data_list = []
71 download_failed = False
72 if files:
73 image_data_list = self._download_files(files, prefix)
74 # Anti-Hallucination: Check if download failed
75 if len(image_data_list) == 0:
76 failure_msg = "\n[SYSTEM WARNING: User uploaded an image but it FAILED to download/process. You CANNOT see the image. DO NOT GUESS. Inform the user that the image access failed.]"
77 logger.warning(f"{prefix} Image download failed! Appending warning to LLM.")
78 message_text += failure_msg
79 download_failed = True
81 try:
82 # 2. Get conversation context
83 context = storage.get_context()
84 logger.debug(f"{prefix} Context size: {len(context)}")
86 # 3. Process with Gemini (with tools)
87 # CRITICAL: If image download failed, disable tools to prevent "400 INVALID_ARGUMENT"
88 # (which happens when mixing text-only warning prompts with complex tool schemas in some proxies)
89 #
90 # NOTE: gemini-3-flash has a bug where it returns empty response with tools,
91 # but we use SAFETY OVERRIDE below to manually trigger tools based on keywords
92 current_tools = None if download_failed else self.tools
94 logger.info(f"{prefix} Calling Gemini for initial response...")
95 logger.info(f"{prefix} Request config: model={self.llm.get_model_name()}, tools={len(current_tools) if current_tools else 0}, context={len(context)-1} msgs")
96 t_llm_start = time.time()
97 response_text, tool_calls = self.llm.generate_response(
98 message=message_text,
99 context=context[:-1],
100 tools=current_tools,
101 images=image_data_list
102 )
103 # Defensive check for None/empty response
104 if response_text is None:
105 logger.error(f"{prefix} Gemini returned None for response_text!")
106 response_text = ""
107 elif not response_text.strip():
108 logger.warning(f"{prefix} Gemini returned empty response_text! (This usually means the proxy has issues)")
110 # Initialize tool_calls
111 if tool_calls is None:
112 tool_calls = []
114 # Log what we got back
115 logger.info(f"{prefix} Gemini response: text={len(response_text)} chars, tool_calls={len(tool_calls) if tool_calls else 0}")
117 # --- SAFETY OVERRIDE START ---
118 # If the LLM failed to trigger tools (common with local proxies + tools bug),
119 # manually detect intent and force tool calls based on keywords.
120 # This is a FALLBACK mechanism when Gemini returns empty response or MALFORMED_FUNCTION_CALL.
121 if not tool_calls:
122 logger.info(f"{prefix} No tool calls from Gemini - checking SAFETY OVERRIDE keywords...")
123 lower_msg = message_text.lower()
125 # 1. Sync Garmin data
126 if any(k in lower_msg for k in ["sync", "update", "fetch", "同步", "拉取", "更新"]) and \
127 any(k in lower_msg for k in ["garmin", "佳明", "data", "数据"]):
128 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: Forcing sync_garmin tool")
129 from datetime import datetime, timedelta
130 from health.utils.time_utils import get_cst_today
131 # Try to infer sync target date from user message keywords
132 if any(k in lower_msg for k in ["昨天", "昨晚", "昨日", "yesterday", "last night"]):
133 _sync_date = (get_cst_today() - timedelta(days=1)).isoformat()
134 elif any(k in lower_msg for k in ["前天", "day before yesterday"]):
135 _sync_date = (get_cst_today() - timedelta(days=2)).isoformat()
136 else:
137 _sync_date = get_cst_today().isoformat() # today in CST
138 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: sync target_date={_sync_date}")
139 tool_calls.append({
140 "name": "sync_garmin",
141 "args": {"target_date": _sync_date}
142 })
144 # 2. Query health data (sleep, steps, activities, etc.)
145 elif any(k in lower_msg for k in ["睡眠", "sleep", "步数", "steps", "心率", "heart", "hrv", "运动", "activity", "workout", "锻炼", "昨天", "yesterday", "今天", "today", "上午", "下午", "morning", "afternoon", "查询", "query", "变化", "趋势", "trend"]):
146 from datetime import timedelta
147 from health.utils.time_utils import get_cst_now, get_cst_today
149 # CRITICAL: First check for single-day keywords to avoid false positives
150 has_single_day_keyword = any(k in lower_msg for k in [
151 "昨天", "昨晚", "昨日", "last night", "yesterday",
152 "今天", "今晚", "今日", "today", "tonight",
153 "前天", "前晚", "day before yesterday",
154 "大前天", "three days ago"
155 ])
157 # Check if asking for time-range trends (past X days/weeks/months)
158 is_trend_query = (not has_single_day_keyword) and any(k in lower_msg for k in [
159 "过去", "最近", "近", "past", "recent",
160 "变化", "趋势", "trend", "change", "历史", "history",
161 "个月", "month", "周", "week", "年", "year",
162 ])
164 if is_trend_query:
165 # Parse time range from query
166 import re
167 today_cst = get_cst_today()
168 end_date = today_cst.strftime("%Y-%m-%d")
169 days_ago = 30 # default
171 if re.search(r'(\d+)\s*个?月|(\d+)\s*month', lower_msg):
172 months = int(re.search(r'(\d+)\s*个?月|(\d+)\s*month', lower_msg).group(1) or re.search(r'(\d+)\s*个?月|(\d+)\s*month', lower_msg).group(2))
173 days_ago = months * 30
174 elif re.search(r'(\d+)\s*周|(\d+)\s*week', lower_msg):
175 weeks = int(re.search(r'(\d+)\s*周|(\d+)\s*week', lower_msg).group(1) or re.search(r'(\d+)\s*周|(\d+)\s*week', lower_msg).group(2))
176 days_ago = weeks * 7
177 elif re.search(r'(\d+)\s*天|(\d+)\s*day', lower_msg):
178 days_ago = int(re.search(r'(\d+)\s*天|(\d+)\s*day', lower_msg).group(1) or re.search(r'(\d+)\s*天|(\d+)\s*day', lower_msg).group(2))
179 elif re.search(r'(\d+)\s*年|(\d+)\s*year', lower_msg):
180 years = int(re.search(r'(\d+)\s*年|(\d+)\s*year', lower_msg).group(1) or re.search(r'(\d+)\s*年|(\d+)\s*year', lower_msg).group(2))
181 days_ago = years * 365
182 elif "半年" in lower_msg or "six month" in lower_msg:
183 days_ago = 180
184 elif "三个月" in lower_msg or "three month" in lower_msg:
185 days_ago = 90
187 from datetime import timedelta
188 start_date = (today_cst - timedelta(days=days_ago)).strftime("%Y-%m-%d")
190 # Determine metric type
191 metric_type = "rhr" # default
192 if any(k in lower_msg for k in ["hrv", "心率变异"]):
193 metric_type = "hrv"
194 elif any(k in lower_msg for k in ["睡眠", "sleep"]):
195 metric_type = "sleep"
196 elif any(k in lower_msg for k in ["步数", "steps"]):
197 metric_type = "steps"
198 elif any(k in lower_msg for k in ["压力", "stress"]):
199 metric_type = "stress"
201 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: Forcing get_metric_history for {metric_type} ({start_date} to {end_date})")
202 tool_calls.append({
203 "name": "get_metric_history",
204 "args": {
205 "metric_type": metric_type,
206 "start_date": start_date,
207 "end_date": end_date
208 }
209 })
210 else:
211 # Single-day query — all dates computed in CST (UTC+8)
212 today_cst = get_cst_today()
213 if any(k in lower_msg for k in ["昨天", "昨晚", "昨日", "last night", "yesterday"]):
214 target = (today_cst - timedelta(days=1)).strftime("%Y-%m-%d")
215 elif any(k in lower_msg for k in ["前天", "前晚", "day before yesterday"]):
216 target = (today_cst - timedelta(days=2)).strftime("%Y-%m-%d")
217 else:
218 target = today_cst.strftime("%Y-%m-%d")
220 # Check if specifically asking for activity history
221 if any(k in lower_msg for k in ["运动", "activity", "workout", "锻炼"]):
222 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: Forcing get_activity_history for {target}")
223 tool_calls.append({
224 "name": "get_activity_history",
225 "args": {"start_date": target, "end_date": target}
226 })
227 else:
228 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: Forcing get_daily_detailed_stats for {target}")
229 tool_calls.append({
230 "name": "get_daily_detailed_stats",
231 "args": {"target_date": target}
232 })
234 # 3. Web search
235 elif any(k in lower_msg for k in ["搜索", "search", "查一下", "look up", "最新", "latest"]):
236 query = message_text
237 logger.warning(f"{prefix} 🔧 SAFETY OVERRIDE: Forcing search_web")
238 tool_calls.append({
239 "name": "search_web",
240 "args": {"query": query}
241 })
243 # 4. Detect diet logging intent (for logging, not forcing tool call yet)
244 food_items = ["鸡", "肉", "菜", "蛋", "饭", "面", "粉", "汤", "鱼", "虾", "蟹", "牛", "猪", "羊", "豆", "奶"]
245 meal_indicators = ["晚餐", "午餐", "早餐", "夜宵", "零食", "breakfast", "lunch", "dinner", "snack"]
246 has_ate_pattern = any(k in lower_msg for k in ["吃了", "ate", "吃的", "eating"])
247 has_meal_indicator = any(k in lower_msg for k in meal_indicators)
248 has_food_item = any(k in lower_msg for k in food_items)
250 if (has_ate_pattern and has_food_item) or has_meal_indicator:
251 logger.warning(f"{prefix} Detected diet logging intent: ate={has_ate_pattern}, meal={has_meal_indicator}, food={has_food_item}")
252 # Don't force tool - let user confirm first or LLM will handle in second round
253 else:
254 logger.info(f"{prefix} No SAFETY OVERRIDE match found - will return empty response or text")
255 else:
256 logger.info(f"{prefix} Tool calls successfully generated by Gemini (no SAFETY OVERRIDE needed)")
257 # --- SAFETY OVERRIDE END ---
259 tool_count = len(tool_calls) if tool_calls else 0
260 logger.info(f"{prefix} Gemini response in {time.time()-t_llm_start:.2f}s: {len(response_text)} chars, {tool_count} tools")
262 # 4. Execute tools if requested
263 tool_results = []
264 if tool_calls:
265 logger.info(f"{prefix} Executing {len(tool_calls)} tools")
267 for tool_call in tool_calls:
268 tool_name = tool_call["name"]
269 tool_args = tool_call["args"]
270 logger.info(f"{prefix} Tool: {tool_name} Args: {tool_args}")
272 t_tool_start = time.time()
273 if tool_name in self.tool_functions:
274 try:
275 # Inject channel_id for shell tool
276 if tool_name == "execute_shell":
277 tool_args["channel_id"] = channel_id
279 result = self.tool_functions[tool_name](**tool_args)
281 # Safety Truncation: Prevent massive JSONs from blowing up context
282 str_result = str(result)
283 if len(str_result) > 8000:
284 str_result = str_result[:8000] + "... (truncated)"
286 tool_results.append({
287 "tool": tool_name,
288 "args": tool_args,
289 "result": str_result
290 })
291 logger.info(f"{prefix} ✓ {tool_name} finished in {time.time()-t_tool_start:.2f}s")
292 except Exception as e:
293 error_msg = f"Error executing {tool_name}: {str(e)}"
294 logger.error(f"{prefix} {error_msg}")
295 tool_results.append({
296 "tool": tool_name,
297 "args": tool_args,
298 "result": f"❌ {error_msg}"
299 })
300 else:
301 logger.warning(f"{prefix} Unknown tool: {tool_name}")
303 # 4b. Second round: Get final response with tool results
304 logger.info(f"{prefix} Requesting final analysis...")
306 tool_names = ", ".join([tr['tool'] for tr in tool_results])
308 # Format specific tool results for context
309 tool_results_text = "\n".join([
310 f"Tool '{tr['tool']}' (Args: {tr['args']}) returned:\n{tr['result']}"
311 for tr in tool_results
312 ])
314 # Inject thoughts into memory
315 storage.add_message("assistant", f"I checked: {tool_names}", model="gemini")
317 # Stronger Prompt for Analysis
318 analysis_prompt = (
319 f"Here are the execution results from the tools:\n{tool_results_text}\n\n"
320 f"CRITICAL INSTRUCTION: You represent the 'Butler' bot. You must now answer the user's original question based ONLY on the above results.\n"
321 f"1. Summarize the key data points found (e.g., steps, sleep score).\n"
322 f"2. Provide direct, natural language insights.\n"
323 f"3. If the tool indicated an error, explain it to the user.\n\n"
324 f"Do NOT output raw JSON. Write a helpful paragraph."
325 )
327 # Do NOT add this massive data blob to persistent storage.
328 # Just use it for this generation context.
329 final_context = storage.get_context()
331 t_llm_2_start = time.time()
332 response_text, _ = self.llm.generate_response(
333 message=analysis_prompt,
334 context=final_context,
335 tools=None
336 )
337 logger.info(f"{prefix} Final analysis received in {time.time()-t_llm_2_start:.2f}s")
339 # Fallback
340 if not response_text or not response_text.strip():
341 logger.warning(f"{prefix} Empty analysis received!")
342 if files:
343 response_text = "⚠️ 图片已接收,但 AI 模型未能返回描述。这可能是由于网络波动或模型暂时无法识别该图像,请稍后重试。"
344 else:
345 response_text = "✅ 数据已同步,但我似乎暂时无法生成文字分析。请参考上方的工具执行结果。"
347 # 5. Format and send response
348 # Relaxing internal limit to allow for multi-message splitting (max 3 messages of ~4000 chars)
349 APP_LIMIT = 12000
350 full_response = self._format_response(response_text, tool_results, max_length=APP_LIMIT)
352 # Split into chunks
353 chunks = self._split_text(full_response, max_chunk_size=3800)
355 # Limit to 3 chunks as requested
356 if len(chunks) > 3:
357 chunks = chunks[:3]
358 chunks[-1] += "\n...(remaining content truncated)"
360 # Post to Slack
361 if response_ts:
362 # Update the first message (Thinking...)
363 try:
364 self.client.chat_update(channel=channel_id, ts=response_ts, text=chunks[0])
365 logger.info(f"{prefix} 📤 Message updated {response_ts}")
366 except Exception as e:
367 logger.warning(f"{prefix} Update failed, posting new: {e}")
368 self.client.chat_postMessage(channel=channel_id, text=chunks[0])
370 # Post subsequent chunks as new messages
371 for chunk in chunks[1:]:
372 # Small delay to ensure order
373 time.sleep(0.5)
374 self.client.chat_postMessage(channel=channel_id, text=chunk)
375 logger.info(f"{prefix} 📤 Continuation message posted")
377 else:
378 # Post all as new messages
379 for i, chunk in enumerate(chunks):
380 if i > 0: time.sleep(0.5)
381 self.client.chat_postMessage(channel=channel_id, text=chunk)
382 logger.info(f"{prefix} 📤 {len(chunks)} message(s) posted")
385 # 6. Save to Context
386 storage.add_message("assistant", full_response, model="gemini")
387 logger.info(f"{prefix} Dispatch completed in {time.time()-t0:.2f}s")
389 except Exception as e:
390 logger.error(f"{prefix} Dispatch failed: {e}", exc_info=True)
391 try:
392 self.client.chat_postMessage(
393 channel=channel_id,
394 text=f"⚠️ Internal Error: {str(e)}"
395 )
396 logger.info("Error message posted to Slack")
397 except Exception as slack_error:
398 logger.error(f"Failed to post error message to Slack: {slack_error}", exc_info=True)
400 def _split_text(self, text: str, max_chunk_size: int = 3800) -> List[str]:
401 """Split text into chunks avoiding breaking code blocks if possible."""
402 if len(text) <= max_chunk_size:
403 return [text]
405 chunks = []
406 current_chunk = ""
407 lines = text.split('\n')
409 in_code_block = False
411 for line in lines:
412 # Check for code block toggle
413 if line.strip().startswith('```'):
414 in_code_block = not in_code_block
416 # If adding this line exceeds chunk size
417 if len(current_chunk) + len(line) + 1 > max_chunk_size:
418 # Close code block if open
419 if in_code_block:
420 current_chunk += "\n```"
422 chunks.append(current_chunk)
424 # Start new chunk
425 current_chunk = ""
426 # Re-open code block if it was open
427 if in_code_block:
428 current_chunk += "```\n" + "(...continued)\n"
430 if current_chunk:
431 current_chunk += "\n"
432 current_chunk += line
434 if current_chunk:
435 chunks.append(current_chunk)
437 return chunks
439 def _format_response(self, text: str, tool_results: list, max_length: int = 39900) -> str:
440 """
441 Format the response with tool execution results.
443 Args:
444 text: The main response text
445 tool_results: List of tool execution results
446 max_length: Maximum allowed message length (default for new messages)
448 Returns:
449 Formatted response string, guaranteed to be under max_length
450 """
451 parts = []
453 # Add tool execution results first (with smart truncation)
454 if tool_results:
455 parts.append("🛠️ *Tool Executions:*")
456 for tr in tool_results:
457 # Format args nicely
458 args_str = ", ".join(f"{k}={v}" for k, v in tr['args'].items())
460 # Truncate individual tool result if needed (more aggressive for multiple tools)
461 result_str = str(tr['result'])
462 # Allow larger tool outputs since we have multi-message support
463 max_per_tool = 5000
464 if len(result_str) > max_per_tool:
465 result_str = result_str[:max_per_tool] + "... (truncated)"
467 parts.append(f"• `{tr['tool']}({args_str})`: {result_str}")
468 parts.append("") # Blank line
470 # Add text response with formatting
471 if text:
472 from slack_bot.utils.mrkdwn import SlackFormatter
473 formatted_text = SlackFormatter.convert(text)
474 logger.info(f"Formatted text (len {len(text)} -> {len(formatted_text)})")
475 parts.append(formatted_text)
477 # Join all parts
478 if parts:
479 full_response = "\n".join(parts)
480 else:
481 # No tools and no response - this is unusual
482 logger.warning("No response text and no tool results - returning error message")
484 # Enhanced debugging info for empty Gemini responses
485 debug_info = f"\n\n🔍 调试信息:\n- 响应文本长度: {len(text)} 字符\n- 工具调用数: {len(tool_results)}\n- 模型: {self.llm.get_model_name()}"
487 full_response = (
488 "⚠️ 抱歉,我似乎没有理解你的请求。请尝试更具体的描述,或者直接说明你想查询什么数据(例如:今天的步数、昨天的睡眠、上午的运动等)。"
489 + debug_info
490 )
492 # Final safety check: ensure we're under the limit
493 if len(full_response) > max_length:
494 truncation_msg = "\n\n⚠️ (Response truncated due to Slack length limit)"
495 allowed_length = max_length - len(truncation_msg)
496 full_response = full_response[:allowed_length] + truncation_msg
497 logger.warning(f"Final response truncated from {len(full_response)} to {max_length} chars")
499 return full_response
501 def _download_files(self, files: List[Dict], prefix: str) -> List[Dict[str, Any]]:
502 """Download files from Slack."""
503 import requests
505 token = self.bot_token
506 from PIL import Image
507 import io
509 downloaded_files = []
511 for file_info in files:
512 logger.info(f"{prefix} found file: {file_info.get('name')} Type: {file_info.get('mimetype')} URL_P: {bool(file_info.get('url_private'))}")
513 # Check if it's an image
514 if 'mimetype' in file_info and file_info['mimetype'].startswith('image/'):
515 url = file_info.get('url_private_download') or file_info.get('url_private')
516 if not url:
517 continue
519 headers = {'Authorization': f'Bearer {token}'}
520 # Debug Auth (Masked)
521 token_masked = f"{token[:5]}...{token[-5:]}" if token else "None"
522 logger.info(f"{prefix} Attempting download from {url} with token {token_masked}")
524 # Try using url_private with token in query param
525 # This often bypasses redirect auth stripping issues
526 base_url = file_info.get('url_private')
527 if not base_url:
528 continue
530 # Use Session to handle cookies/redirects better
531 session = requests.Session()
532 session.headers.update({'Authorization': f'Bearer {token}'})
534 # Debug Auth (Masked)
535 token_masked = f"{token[:5]}...{token[-5:]}" if token else "None"
536 logger.info(f"{prefix} Attempting download from {url} using Session (Token: {token_masked})")
538 try:
539 response = session.get(url, timeout=10)
541 # Log final URL to check for login page redirect
542 if response.url != url:
543 logger.info(f"{prefix} Redirected to: {response.url}")
545 if response.status_code == 200:
546 # DEBUG: Check for HTML (Login page)
547 ct = response.headers.get('Content-Type', 'unknown')
548 if 'html' in ct.lower():
549 logger.error(f"{prefix} Download returned HTML (likely Login Page) instead of image. Auth failed.")
550 continue
552 # Process image: Resize and Compress
553 try:
554 img = Image.open(io.BytesIO(response.content))
555 except Exception as img_err:
556 logger.error(f"{prefix} PIL Open Failed! Content preamble: {response.content[:200]!r}")
557 continue # Skip this file
559 # Convert to RGB if necessary (e.g., PNG with alpha)
560 if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
561 bg = Image.new('RGB', img.size, (255, 255, 255))
562 if img.mode != 'RGBA':
563 img = img.convert('RGBA')
564 bg.paste(img, mask=img.split()[3])
565 img = bg
566 elif img.mode != 'RGB':
567 img = img.convert('RGB')
569 # Resize if too large (max dimension 1024)
570 max_dim = 1024
571 if max(img.size) > max_dim:
572 img.thumbnail((max_dim, max_dim), Image.Resampling.LANCZOS)
574 # Compress to JPEG
575 buf = io.BytesIO()
576 img.save(buf, format='JPEG', quality=85)
577 optimized_data = buf.getvalue()
579 logger.info(f"{prefix} Downloaded and optimized image: {file_info.get('name')} "
580 f"({len(response.content)/1024:.1f}KB -> {len(optimized_data)/1024:.1f}KB)")
582 downloaded_files.append({
583 "mime_type": "image/jpeg", # Always normalize to JPEG for consistency
584 "data": optimized_data
585 })
586 else:
587 logger.error(f"{prefix} Failed to download file: status {response.status_code}")
588 except Exception as e:
589 logger.error(f"{prefix} Error downloading file: {e}")
591 return downloaded_files