Coverage for slack_bot / obsidian / indexer.py: 0%
112 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
1import os
2import random
3import re
4from typing import List, Dict, Optional, Set
5from health.utils.logging_config import setup_logger
7logger = setup_logger(__name__)
9class ObsidianIndexer:
10 """
11 Scans an Obsidian Vault to index files and extract samples based on tags.
12 """
14 WRITING_SAMPLE_TAG = "#writing_sample"
15 REPLY_SAMPLE_TAG = "#reply_sample"
17 def __init__(self, vault_path: str):
18 self.vault_path = os.path.expanduser(vault_path) if vault_path else None
19 if not self.vault_path or not os.path.exists(self.vault_path):
20 logger.warning(f"Obsidian Vault path invalid: {self.vault_path}")
22 self.writing_samples: List[str] = []
23 self.reply_samples: List[str] = []
24 self.file_index: Dict[str, str] = {} # filepath -> lowercase content (for simple search)
25 self.file_metadata: Dict[str, float] = {} # filepath -> mtime
26 self.files_scanned = 0
28 def scan_vault(self):
29 """Scans the vault for markdown files and populates indexes."""
30 if not self.vault_path or not os.path.exists(self.vault_path):
31 logger.error(f"Cannot scan: Invalid vault path {self.vault_path}")
32 return
34 logger.info(f"Scanning Obsidian Vault at: {self.vault_path}")
35 self.writing_samples = []
36 self.reply_samples = []
37 self.file_index = {}
38 self.file_metadata = {}
39 self.files_scanned = 0
41 for root, dirs, files in os.walk(self.vault_path):
42 # Skip hidden folders (like .obsidian, .git)
43 dirs[:] = [d for d in dirs if not d.startswith('.')]
45 for file in files:
46 if file.endswith(".md"):
47 full_path = os.path.join(root, file)
48 self._process_file(full_path)
49 self.files_scanned += 1
51 logger.info(f"Scan complete. Scanned {self.files_scanned} files.")
52 logger.info(f"Found {len(self.writing_samples)} writing samples.")
53 logger.info(f"Found {len(self.reply_samples)} reply samples.")
55 def _process_file(self, file_path: str):
56 """Reads a file and checks for tags."""
57 try:
58 # Capture metadata
59 mtime = os.path.getmtime(file_path)
60 self.file_metadata[file_path] = mtime
62 with open(file_path, "r", encoding="utf-8") as f:
63 content = f.read()
65 # Basic tag detection (handles #tag inside text)
66 # We don't need a full-blown frontmatter parser for this,
67 # as Obsidian tags can be anywhere.
69 if self.WRITING_SAMPLE_TAG in content:
70 self.writing_samples.append(file_path)
72 if self.REPLY_SAMPLE_TAG in content:
73 self.reply_samples.append(file_path)
75 # Store for search (naive in-memory index)
76 # Truncating huge files to avoid memory boom if necessary,
77 # but for text notes 100k char limit is generous.
78 self.file_index[file_path] = content[:100000].lower()
80 except Exception as e:
81 logger.warning(f"Failed to read file {file_path}: {e}")
83 def get_writing_samples(self, count: int = 3) -> List[str]:
84 """Returns content of random writing samples."""
85 if not self.writing_samples:
86 return []
88 selected_paths = random.sample(self.writing_samples, min(len(self.writing_samples), count))
89 return self._read_files(selected_paths)
91 def get_reply_samples(self, count: int = 3) -> List[str]:
92 """Returns content of random reply samples."""
93 if not self.reply_samples:
94 return []
96 selected_paths = random.sample(self.reply_samples, min(len(self.reply_samples), count))
97 return self._read_files(selected_paths)
99 def search(self, query: str, limit: int = 5) -> List[str]:
100 """
101 Simple keyword search. Returns content of matching files.
102 RAG Strategy: Find notes containing the query keywords.
103 """
104 query_lower = query.lower()
106 # Split query into tokens
107 # Improved regex to handle mixed English/Chinese (e.g. "XLSmart遇到了")
108 # Matches:
109 # 1. English/Number/Underscore sequences: [a-zA-Z0-9_]+
110 # 2. Chinese characters (unigrams): [\u4e00-\u9fff]
111 tokens = re.findall(r'[a-zA-Z0-9_]+|[\u4e00-\u9fff]', query_lower)
112 if not tokens:
113 return []
115 logger.debug(f"Search tokens: {tokens}")
117 # Naive scoring: count token occurrences
118 scored_files = []
119 for path, content in self.file_index.items():
120 score = 0
121 unique_matches = 0
123 # Combine filename and content for search context
124 filename = os.path.basename(path).lower()
125 search_text = f"{filename} {filename} {content}" # Weight filename higher
127 for token in tokens:
128 if token in search_text:
129 # Cubic weighting by length to heavily favor longer keywords (like "XLSmart")
130 # over common single characters.
131 count = search_text.count(token)
132 score += count * (len(token) ** 3)
133 unique_matches += 1
135 # Boost score based on how many unique tokens matched
136 if unique_matches > 0:
137 score = score * (unique_matches ** 2)
138 scored_files.append((score, path))
140 # Sort by score desc
141 scored_files.sort(key=lambda x: x[0], reverse=True)
143 top_paths = [path for _, path in scored_files[:limit]]
144 return self._read_files(top_paths)
146 def get_recent_files(self, days: int = 5, limit: int = 10) -> str:
147 """
148 Returns a formatted list of files modified in the last N days.
149 """
150 import time
151 current_time = time.time()
152 cutoff_time = current_time - (days * 86400)
154 recent_files = []
155 for path, mtime in self.file_metadata.items():
156 if mtime >= cutoff_time:
157 recent_files.append((mtime, path))
159 # Sort by mtime DESC (newest first)
160 recent_files.sort(key=lambda x: x[0], reverse=True)
162 # Limit
163 recent_files = recent_files[:limit]
165 if not recent_files:
166 return f"No files modified in the last {days} days."
168 output = [f"📂 **Updated in last {days} days:**"]
169 for mtime, path in recent_files:
170 filename = os.path.basename(path)
171 date_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(mtime))
172 output.append(f"- `{filename}` ({date_str})")
174 return "\n".join(output)
176 def _read_files(self, paths: List[str]) -> List[str]:
177 """Helper to read complete content of list of paths."""
178 contents = []
179 for p in paths:
180 try:
181 with open(p, "r", encoding="utf-8") as f:
182 # Provide context about the source
183 filename = os.path.basename(p)
184 contents.append(f"--- Source: {filename} ---\n{f.read()}")
185 except Exception:
186 pass
187 return contents