Coverage for slack_bot / shell / session.py: 0%
65 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-02 17:44 +0800
1import os
2import pty
3import select
4import time
5import subprocess
6import re
7from typing import Optional
9class ShellSession:
10 """
11 Manages a persistent shell (zsh) session using pty.
12 """
13 def __init__(self, shell_path: str = "/bin/zsh", timeout: float = 5.0):
14 self.shell_path = shell_path
15 self.timeout = timeout
16 self.master_fd = None
17 self.slave_fd = None
18 self.process = None
19 self.buffer = b""
20 self.prompt_marker = "__BUTLER_SHELL_PROMPT__"
21 self._start()
23 def _start(self):
24 """Starts the shell process."""
25 self.master_fd, self.slave_fd = pty.openpty()
27 # Start the process
28 self.process = subprocess.Popen(
29 [self.shell_path],
30 stdin=self.slave_fd,
31 stdout=self.slave_fd,
32 stderr=self.slave_fd,
33 preexec_fn=os.setsid,
34 close_fds=True,
35 env={**os.environ, "PS1": f"{self.prompt_marker}\n", "TERM": "xterm"}
36 # Note: We set PS1 to a unique marker with a newline to make parsing easier
37 )
39 # Close slave in parent
40 os.close(self.slave_fd)
42 # Initial read to clear startup banner and first prompt
43 self._read_until_prompt()
45 def execute(self, cmd: str, timeout: Optional[float] = None) -> str:
46 """
47 Executes a command and returns the output.
48 """
49 if not self.is_alive():
50 self._start()
52 # Write command
53 os.write(self.master_fd, (cmd + "\n").encode('utf-8'))
55 # Read response
56 output = self._read_until_prompt(timeout or self.timeout)
58 # Cleanup output
59 # 1. Remove the echoed command (usually the first line(s))
60 # 2. Remove the trailing prompt
62 clean_output = self._clean_output(cmd, output)
63 return clean_output
65 def _read_until_prompt(self, timeout: float = 2.0) -> str:
66 """Reads from master_fd until prompt is detected or timeout."""
67 output = b""
68 start_time = time.time()
70 while True:
71 # Check if we have data to read
72 r, _, _ = select.select([self.master_fd], [], [], 0.1)
74 if self.master_fd in r:
75 try:
76 chunk = os.read(self.master_fd, 1024)
77 if not chunk:
78 break # EOF
79 output += chunk
81 # Check for prompt
82 if self.prompt_marker.encode() in output:
83 break
85 except OSError:
86 break
88 if time.time() - start_time > timeout:
89 # Timeout
90 break
92 return output.decode('utf-8', errors='replace')
94 def _clean_output(self, cmd: str, raw_output: str) -> str:
95 """Removes echo, prompt, and ANSI escape sequences from raw output."""
96 # 1. Remove ANSI escape sequences (terminal colors/formatting)
97 ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
98 output = ansi_escape.sub('', raw_output)
100 # 2. Remove prompt
101 output = output.replace(self.prompt_marker, "").strip()
103 # 3. Remove the command echo (simplistic approach: remove first line if it matches cmd)
104 lines = output.splitlines()
105 if lines and cmd.strip() in lines[0]:
106 lines = lines[1:]
108 return "\n".join(lines).strip()
110 def is_alive(self) -> bool:
111 return self.process.poll() is None
113 def close(self):
114 """Terminates the shell."""
115 if self.process:
116 self.process.terminate()
117 self.process.wait()
118 if self.master_fd:
119 os.close(self.master_fd)
121 def __del__(self):
122 self.close()