Coverage for slack_bot / obsidian / indexer.py: 0%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-02 17:44 +0800

1import os 

2import random 

3import re 

4from typing import List, Dict, Optional, Set 

5from health.utils.logging_config import setup_logger 

6 

7logger = setup_logger(__name__) 

8 

9class ObsidianIndexer: 

10 """ 

11 Scans an Obsidian Vault to index files and extract samples based on tags. 

12 """ 

13 

14 WRITING_SAMPLE_TAG = "#writing_sample" 

15 REPLY_SAMPLE_TAG = "#reply_sample" 

16 

17 def __init__(self, vault_path: str): 

18 self.vault_path = os.path.expanduser(vault_path) if vault_path else None 

19 if not self.vault_path or not os.path.exists(self.vault_path): 

20 logger.warning(f"Obsidian Vault path invalid: {self.vault_path}") 

21 

22 self.writing_samples: List[str] = [] 

23 self.reply_samples: List[str] = [] 

24 self.file_index: Dict[str, str] = {} # filepath -> lowercase content (for simple search) 

25 self.file_metadata: Dict[str, float] = {} # filepath -> mtime 

26 self.files_scanned = 0 

27 

28 def scan_vault(self): 

29 """Scans the vault for markdown files and populates indexes.""" 

30 if not self.vault_path or not os.path.exists(self.vault_path): 

31 logger.error(f"Cannot scan: Invalid vault path {self.vault_path}") 

32 return 

33 

34 logger.info(f"Scanning Obsidian Vault at: {self.vault_path}") 

35 self.writing_samples = [] 

36 self.reply_samples = [] 

37 self.file_index = {} 

38 self.file_metadata = {} 

39 self.files_scanned = 0 

40 

41 for root, dirs, files in os.walk(self.vault_path): 

42 # Skip hidden folders (like .obsidian, .git) 

43 dirs[:] = [d for d in dirs if not d.startswith('.')] 

44 

45 for file in files: 

46 if file.endswith(".md"): 

47 full_path = os.path.join(root, file) 

48 self._process_file(full_path) 

49 self.files_scanned += 1 

50 

51 logger.info(f"Scan complete. Scanned {self.files_scanned} files.") 

52 logger.info(f"Found {len(self.writing_samples)} writing samples.") 

53 logger.info(f"Found {len(self.reply_samples)} reply samples.") 

54 

55 def _process_file(self, file_path: str): 

56 """Reads a file and checks for tags.""" 

57 try: 

58 # Capture metadata 

59 mtime = os.path.getmtime(file_path) 

60 self.file_metadata[file_path] = mtime 

61 

62 with open(file_path, "r", encoding="utf-8") as f: 

63 content = f.read() 

64 

65 # Basic tag detection (handles #tag inside text) 

66 # We don't need a full-blown frontmatter parser for this,  

67 # as Obsidian tags can be anywhere. 

68 

69 if self.WRITING_SAMPLE_TAG in content: 

70 self.writing_samples.append(file_path) 

71 

72 if self.REPLY_SAMPLE_TAG in content: 

73 self.reply_samples.append(file_path) 

74 

75 # Store for search (naive in-memory index) 

76 # Truncating huge files to avoid memory boom if necessary,  

77 # but for text notes 100k char limit is generous. 

78 self.file_index[file_path] = content[:100000].lower() 

79 

80 except Exception as e: 

81 logger.warning(f"Failed to read file {file_path}: {e}") 

82 

83 def get_writing_samples(self, count: int = 3) -> List[str]: 

84 """Returns content of random writing samples.""" 

85 if not self.writing_samples: 

86 return [] 

87 

88 selected_paths = random.sample(self.writing_samples, min(len(self.writing_samples), count)) 

89 return self._read_files(selected_paths) 

90 

91 def get_reply_samples(self, count: int = 3) -> List[str]: 

92 """Returns content of random reply samples.""" 

93 if not self.reply_samples: 

94 return [] 

95 

96 selected_paths = random.sample(self.reply_samples, min(len(self.reply_samples), count)) 

97 return self._read_files(selected_paths) 

98 

99 def search(self, query: str, limit: int = 5) -> List[str]: 

100 """ 

101 Simple keyword search. Returns content of matching files. 

102 RAG Strategy: Find notes containing the query keywords. 

103 """ 

104 query_lower = query.lower() 

105 

106 # Split query into tokens 

107 # Improved regex to handle mixed English/Chinese (e.g. "XLSmart遇到了") 

108 # Matches: 

109 # 1. English/Number/Underscore sequences: [a-zA-Z0-9_]+ 

110 # 2. Chinese characters (unigrams): [\u4e00-\u9fff] 

111 tokens = re.findall(r'[a-zA-Z0-9_]+|[\u4e00-\u9fff]', query_lower) 

112 if not tokens: 

113 return [] 

114 

115 logger.debug(f"Search tokens: {tokens}") 

116 

117 # Naive scoring: count token occurrences 

118 scored_files = [] 

119 for path, content in self.file_index.items(): 

120 score = 0 

121 unique_matches = 0 

122 

123 # Combine filename and content for search context 

124 filename = os.path.basename(path).lower() 

125 search_text = f"{filename} {filename} {content}" # Weight filename higher 

126 

127 for token in tokens: 

128 if token in search_text: 

129 # Cubic weighting by length to heavily favor longer keywords (like "XLSmart") 

130 # over common single characters. 

131 count = search_text.count(token) 

132 score += count * (len(token) ** 3) 

133 unique_matches += 1 

134 

135 # Boost score based on how many unique tokens matched 

136 if unique_matches > 0: 

137 score = score * (unique_matches ** 2) 

138 scored_files.append((score, path)) 

139 

140 # Sort by score desc 

141 scored_files.sort(key=lambda x: x[0], reverse=True) 

142 

143 top_paths = [path for _, path in scored_files[:limit]] 

144 return self._read_files(top_paths) 

145 

146 def get_recent_files(self, days: int = 5, limit: int = 10) -> str: 

147 """ 

148 Returns a formatted list of files modified in the last N days. 

149 """ 

150 import time 

151 current_time = time.time() 

152 cutoff_time = current_time - (days * 86400) 

153 

154 recent_files = [] 

155 for path, mtime in self.file_metadata.items(): 

156 if mtime >= cutoff_time: 

157 recent_files.append((mtime, path)) 

158 

159 # Sort by mtime DESC (newest first) 

160 recent_files.sort(key=lambda x: x[0], reverse=True) 

161 

162 # Limit 

163 recent_files = recent_files[:limit] 

164 

165 if not recent_files: 

166 return f"No files modified in the last {days} days." 

167 

168 output = [f"📂 **Updated in last {days} days:**"] 

169 for mtime, path in recent_files: 

170 filename = os.path.basename(path) 

171 date_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(mtime)) 

172 output.append(f"- `{filename}` ({date_str})") 

173 

174 return "\n".join(output) 

175 

176 def _read_files(self, paths: List[str]) -> List[str]: 

177 """Helper to read complete content of list of paths.""" 

178 contents = [] 

179 for p in paths: 

180 try: 

181 with open(p, "r", encoding="utf-8") as f: 

182 # Provide context about the source 

183 filename = os.path.basename(p) 

184 contents.append(f"--- Source: {filename} ---\n{f.read()}") 

185 except Exception: 

186 pass 

187 return contents