Coverage for slack_bot / shell / session.py: 0%

65 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-02 17:44 +0800

1import os 

2import pty 

3import select 

4import time 

5import subprocess 

6import re 

7from typing import Optional 

8 

9class ShellSession: 

10 """ 

11 Manages a persistent shell (zsh) session using pty. 

12 """ 

13 def __init__(self, shell_path: str = "/bin/zsh", timeout: float = 5.0): 

14 self.shell_path = shell_path 

15 self.timeout = timeout 

16 self.master_fd = None 

17 self.slave_fd = None 

18 self.process = None 

19 self.buffer = b"" 

20 self.prompt_marker = "__BUTLER_SHELL_PROMPT__" 

21 self._start() 

22 

23 def _start(self): 

24 """Starts the shell process.""" 

25 self.master_fd, self.slave_fd = pty.openpty() 

26 

27 # Start the process 

28 self.process = subprocess.Popen( 

29 [self.shell_path], 

30 stdin=self.slave_fd, 

31 stdout=self.slave_fd, 

32 stderr=self.slave_fd, 

33 preexec_fn=os.setsid, 

34 close_fds=True, 

35 env={**os.environ, "PS1": f"{self.prompt_marker}\n", "TERM": "xterm"} 

36 # Note: We set PS1 to a unique marker with a newline to make parsing easier 

37 ) 

38 

39 # Close slave in parent 

40 os.close(self.slave_fd) 

41 

42 # Initial read to clear startup banner and first prompt 

43 self._read_until_prompt() 

44 

45 def execute(self, cmd: str, timeout: Optional[float] = None) -> str: 

46 """ 

47 Executes a command and returns the output. 

48 """ 

49 if not self.is_alive(): 

50 self._start() 

51 

52 # Write command 

53 os.write(self.master_fd, (cmd + "\n").encode('utf-8')) 

54 

55 # Read response 

56 output = self._read_until_prompt(timeout or self.timeout) 

57 

58 # Cleanup output 

59 # 1. Remove the echoed command (usually the first line(s)) 

60 # 2. Remove the trailing prompt 

61 

62 clean_output = self._clean_output(cmd, output) 

63 return clean_output 

64 

65 def _read_until_prompt(self, timeout: float = 2.0) -> str: 

66 """Reads from master_fd until prompt is detected or timeout.""" 

67 output = b"" 

68 start_time = time.time() 

69 

70 while True: 

71 # Check if we have data to read 

72 r, _, _ = select.select([self.master_fd], [], [], 0.1) 

73 

74 if self.master_fd in r: 

75 try: 

76 chunk = os.read(self.master_fd, 1024) 

77 if not chunk: 

78 break # EOF 

79 output += chunk 

80 

81 # Check for prompt 

82 if self.prompt_marker.encode() in output: 

83 break 

84 

85 except OSError: 

86 break 

87 

88 if time.time() - start_time > timeout: 

89 # Timeout 

90 break 

91 

92 return output.decode('utf-8', errors='replace') 

93 

94 def _clean_output(self, cmd: str, raw_output: str) -> str: 

95 """Removes echo, prompt, and ANSI escape sequences from raw output.""" 

96 # 1. Remove ANSI escape sequences (terminal colors/formatting) 

97 ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') 

98 output = ansi_escape.sub('', raw_output) 

99 

100 # 2. Remove prompt 

101 output = output.replace(self.prompt_marker, "").strip() 

102 

103 # 3. Remove the command echo (simplistic approach: remove first line if it matches cmd) 

104 lines = output.splitlines() 

105 if lines and cmd.strip() in lines[0]: 

106 lines = lines[1:] 

107 

108 return "\n".join(lines).strip() 

109 

110 def is_alive(self) -> bool: 

111 return self.process.poll() is None 

112 

113 def close(self): 

114 """Terminates the shell.""" 

115 if self.process: 

116 self.process.terminate() 

117 self.process.wait() 

118 if self.master_fd: 

119 os.close(self.master_fd) 

120 

121 def __del__(self): 

122 self.close()