1+ from .base import Tool
2+ import asyncio
3+ import os
4+ from typing import List , Optional
5+ from dataclasses import dataclass , field
6+
7+ @dataclass
8+ class BashTool (Tool ):
9+ name : str = "bash"
10+ description : str = field (default = "" , init = False )
11+ input_schema : dict = field (default_factory = lambda : {
12+ "type" : "object" ,
13+ "properties" : {
14+ "command" : {
15+ "type" : "string" ,
16+ "description" : "The bash command to execute"
17+ },
18+ "working_directory" : {
19+ "type" : "string" ,
20+ "description" : "Working directory for command execution (optional, defaults to current directory)"
21+ }
22+ },
23+ "required" : ["command" ]
24+ }, init = False )
25+
26+ # Permission settings
27+ allowed_commands : List [str ] = field (default = None )
28+ denied_commands : List [str ] = field (default_factory = list )
29+ working_directories : List [str ] = field (default = None )
30+ allow_pipes : bool = field (default = True )
31+ allow_redirects : bool = field (default = True )
32+ timeout : int = field (default = 30 )
33+
34+ def __post_init__ (self ):
35+ # Build dynamic description based on permissions
36+ self .description = self ._build_description ()
37+
38+ def _build_description (self ) -> str :
39+ desc = "Execute bash commands in the system shell."
40+
41+ # Add permission details
42+ if self .allowed_commands :
43+ desc += f"\n \n You can ONLY use these commands: { ', ' .join (self .allowed_commands )} "
44+ desc += "\n Examples:"
45+ if 'git' in self .allowed_commands :
46+ desc += "\n - git status, git log, git diff"
47+ if 'npm' in self .allowed_commands :
48+ desc += "\n - npm install, npm run build, npm test"
49+ if 'ls' in self .allowed_commands :
50+ desc += "\n - ls -la, ls src/"
51+ else :
52+ desc += "\n \n You can use any bash command except those explicitly denied."
53+
54+ if self .denied_commands :
55+ desc += f"\n \n NEVER use these commands: { ', ' .join (self .denied_commands )} "
56+
57+ if self .working_directories :
58+ desc += f"\n \n You can only work in these directories: { ', ' .join (self .working_directories )} "
59+
60+ # Feature restrictions
61+ features = []
62+ if not self .allow_pipes :
63+ features .append ("pipes (|)" )
64+ if not self .allow_redirects :
65+ features .append ("redirects (>, <, >>)" )
66+
67+ if features :
68+ desc += f"\n \n The following features are DISABLED: { ', ' .join (features )} "
69+
70+ desc += f"\n \n Commands will timeout after { self .timeout } seconds."
71+ desc += "\n \n Output includes stdout, stderr, and exit codes. Long outputs may be truncated."
72+
73+ return desc
74+
75+ async def execute (self , command : str , working_directory : Optional [str ] = None ) -> str :
76+ # Basic permission checks
77+ if not self ._is_command_allowed (command ):
78+ return f"Error: Command not permitted by current permissions"
79+
80+ if working_directory and not self ._is_directory_allowed (working_directory ):
81+ return f"Error: Working directory '{ working_directory } ' not permitted"
82+
83+ if not self .allow_pipes and '|' in command :
84+ return "Error: Pipe operators not permitted"
85+
86+ if not self .allow_redirects and any (op in command for op in ['>' , '<' , '>>' ]):
87+ return "Error: Redirect operators not permitted"
88+
89+ # Execute command
90+ cwd = working_directory or os .getcwd ()
91+
92+ try :
93+ process = await asyncio .create_subprocess_shell (
94+ command ,
95+ stdout = asyncio .subprocess .PIPE ,
96+ stderr = asyncio .subprocess .PIPE ,
97+ cwd = cwd
98+ )
99+
100+ try :
101+ stdout , stderr = await asyncio .wait_for (
102+ process .communicate (),
103+ timeout = self .timeout
104+ )
105+ except asyncio .TimeoutError :
106+ process .kill ()
107+ await process .wait ()
108+ return f"Error: Command timed out after { self .timeout } seconds"
109+
110+ # Format output
111+ output = ""
112+ if stdout :
113+ output += stdout .decode ('utf-8' , errors = 'replace' )
114+ if stderr :
115+ if output :
116+ output += "\n --- stderr ---\n "
117+ output += stderr .decode ('utf-8' , errors = 'replace' )
118+
119+ if process .returncode != 0 :
120+ output += f"\n [Exit code: { process .returncode } ]"
121+
122+ return output .strip () or "[No output]"
123+
124+ except Exception as e :
125+ return f"Error executing command: { str (e )} "
126+
127+ def _is_command_allowed (self , command : str ) -> bool :
128+ # Get the base command (first word)
129+ base_command = command .split ()[0 ] if command else ""
130+
131+ # Check denied list first
132+ for denied in self .denied_commands :
133+ if base_command .startswith (denied ):
134+ return False
135+
136+ # If allowed list exists, command must be in it
137+ if self .allowed_commands :
138+ return any (base_command .startswith (allowed ) for allowed in self .allowed_commands )
139+
140+ # No allowed list means all non-denied commands are permitted
141+ return True
142+
143+ def _is_directory_allowed (self , directory : str ) -> bool :
144+ if not self .working_directories :
145+ return True
146+
147+ # Normalize paths for comparison
148+ abs_dir = os .path .abspath (directory )
149+ return any (abs_dir .startswith (os .path .abspath (allowed ))
150+ for allowed in self .working_directories )
0 commit comments