memory_tool.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. """
  2. Production-ready memory tool handler for Claude's memory_20250818 tool.
  3. This implementation provides secure, client-side execution of memory operations
  4. with path validation, error handling, and comprehensive security measures.
  5. """
  6. import shutil
  7. from pathlib import Path
  8. from typing import Any
  9. class MemoryToolHandler:
  10. """
  11. Handles execution of Claude's memory tool commands.
  12. The memory tool enables Claude to read, write, and manage files in a memory
  13. system through a standardized tool interface. This handler provides client-side
  14. implementation with security controls.
  15. Attributes:
  16. base_path: Root directory for memory storage
  17. memory_root: The /memories directory within base_path
  18. """
  19. def __init__(self, base_path: str = "./memory_storage"):
  20. """
  21. Initialize the memory tool handler.
  22. Args:
  23. base_path: Root directory for all memory operations
  24. """
  25. self.base_path = Path(base_path).resolve()
  26. self.memory_root = self.base_path / "memories"
  27. self.memory_root.mkdir(parents=True, exist_ok=True)
  28. def _validate_path(self, path: str) -> Path:
  29. """
  30. Validate and resolve memory paths to prevent directory traversal attacks.
  31. Args:
  32. path: The path to validate (must start with /memories)
  33. Returns:
  34. Resolved absolute Path object within memory_root
  35. Raises:
  36. ValueError: If path is invalid or attempts to escape memory directory
  37. """
  38. if not path.startswith("/memories"):
  39. raise ValueError(
  40. f"Path must start with /memories, got: {path}. "
  41. "All memory operations must be confined to the /memories directory."
  42. )
  43. # Remove /memories prefix and any leading slashes
  44. relative_path = path[len("/memories") :].lstrip("/")
  45. # Resolve to absolute path within memory_root
  46. if relative_path:
  47. full_path = (self.memory_root / relative_path).resolve()
  48. else:
  49. full_path = self.memory_root.resolve()
  50. # Verify the resolved path is still within memory_root
  51. try:
  52. full_path.relative_to(self.memory_root.resolve())
  53. except ValueError as e:
  54. raise ValueError(
  55. f"Path '{path}' would escape /memories directory. "
  56. "Directory traversal attempts are not allowed."
  57. ) from e
  58. return full_path
  59. def execute(self, **params: Any) -> dict[str, str]:
  60. """
  61. Execute a memory tool command.
  62. Args:
  63. **params: Command parameters from Claude's tool use
  64. Returns:
  65. Dict with either 'success' or 'error' key
  66. Supported commands:
  67. - view: Show directory contents or file contents
  68. - create: Create or overwrite a file
  69. - str_replace: Replace text in a file
  70. - insert: Insert text at a specific line
  71. - delete: Delete a file or directory
  72. - rename: Rename or move a file/directory
  73. """
  74. command = params.get("command")
  75. try:
  76. if command == "view":
  77. return self._view(params)
  78. elif command == "create":
  79. return self._create(params)
  80. elif command == "str_replace":
  81. return self._str_replace(params)
  82. elif command == "insert":
  83. return self._insert(params)
  84. elif command == "delete":
  85. return self._delete(params)
  86. elif command == "rename":
  87. return self._rename(params)
  88. else:
  89. return {
  90. "error": f"Unknown command: '{command}'. "
  91. "Valid commands are: view, create, str_replace, insert, delete, rename"
  92. }
  93. except ValueError as e:
  94. return {"error": str(e)}
  95. except Exception as e:
  96. return {"error": f"Unexpected error executing {command}: {e}"}
  97. def _view(self, params: dict[str, Any]) -> dict[str, str]:
  98. """View directory contents or file contents."""
  99. path = params.get("path")
  100. view_range = params.get("view_range")
  101. if not path:
  102. return {"error": "Missing required parameter: path"}
  103. full_path = self._validate_path(path)
  104. # Handle directory listing
  105. if full_path.is_dir():
  106. try:
  107. items = []
  108. for item in sorted(full_path.iterdir()):
  109. if item.name.startswith("."):
  110. continue
  111. items.append(f"{item.name}/" if item.is_dir() else item.name)
  112. if not items:
  113. return {"success": f"Directory: {path}\n(empty)"}
  114. return {
  115. "success": f"Directory: {path}\n" + "\n".join([f"- {item}" for item in items])
  116. }
  117. except Exception as e:
  118. return {"error": f"Cannot read directory {path}: {e}"}
  119. # Handle file reading
  120. elif full_path.is_file():
  121. try:
  122. content = full_path.read_text(encoding="utf-8")
  123. lines = content.splitlines()
  124. # Apply view range if specified
  125. if view_range:
  126. start_line = max(1, view_range[0]) - 1 # Convert to 0-indexed
  127. end_line = len(lines) if view_range[1] == -1 else view_range[1]
  128. lines = lines[start_line:end_line]
  129. start_num = start_line + 1
  130. else:
  131. start_num = 1
  132. # Format with line numbers
  133. numbered_lines = [f"{i + start_num:4d}: {line}" for i, line in enumerate(lines)]
  134. return {"success": "\n".join(numbered_lines)}
  135. except UnicodeDecodeError:
  136. return {"error": f"Cannot read {path}: File is not valid UTF-8 text"}
  137. except Exception as e:
  138. return {"error": f"Cannot read file {path}: {e}"}
  139. else:
  140. return {"error": f"Path not found: {path}"}
  141. def _create(self, params: dict[str, Any]) -> dict[str, str]:
  142. """Create or overwrite a file."""
  143. path = params.get("path")
  144. file_text = params.get("file_text", "")
  145. if not path:
  146. return {"error": "Missing required parameter: path"}
  147. full_path = self._validate_path(path)
  148. # Don't allow creating directories directly
  149. if not path.endswith((".txt", ".md", ".json", ".py", ".yaml", ".yml")):
  150. return {
  151. "error": f"Cannot create {path}: Only text files are supported. "
  152. "Use file extensions: .txt, .md, .json, .py, .yaml, .yml"
  153. }
  154. try:
  155. # Create parent directories if needed
  156. full_path.parent.mkdir(parents=True, exist_ok=True)
  157. # Write the file
  158. full_path.write_text(file_text, encoding="utf-8")
  159. return {"success": f"File created successfully at {path}"}
  160. except Exception as e:
  161. return {"error": f"Cannot create file {path}: {e}"}
  162. def _str_replace(self, params: dict[str, Any]) -> dict[str, str]:
  163. """Replace text in a file."""
  164. path = params.get("path")
  165. old_str = params.get("old_str")
  166. new_str = params.get("new_str", "")
  167. if not path or old_str is None:
  168. return {"error": "Missing required parameters: path, old_str"}
  169. full_path = self._validate_path(path)
  170. if not full_path.is_file():
  171. return {"error": f"File not found: {path}"}
  172. try:
  173. content = full_path.read_text(encoding="utf-8")
  174. # Check if old_str exists
  175. count = content.count(old_str)
  176. if count == 0:
  177. return {
  178. "error": f"String not found in {path}. The exact text must exist in the file."
  179. }
  180. elif count > 1:
  181. return {
  182. "error": f"String appears {count} times in {path}. "
  183. "The string must be unique. Use more specific context."
  184. }
  185. # Perform replacement
  186. new_content = content.replace(old_str, new_str, 1)
  187. full_path.write_text(new_content, encoding="utf-8")
  188. return {"success": f"File {path} has been edited successfully"}
  189. except Exception as e:
  190. return {"error": f"Cannot edit file {path}: {e}"}
  191. def _insert(self, params: dict[str, Any]) -> dict[str, str]:
  192. """Insert text at a specific line."""
  193. path = params.get("path")
  194. insert_line = params.get("insert_line")
  195. insert_text = params.get("insert_text", "")
  196. if not path or insert_line is None:
  197. return {"error": "Missing required parameters: path, insert_line"}
  198. full_path = self._validate_path(path)
  199. if not full_path.is_file():
  200. return {"error": f"File not found: {path}"}
  201. try:
  202. lines = full_path.read_text(encoding="utf-8").splitlines()
  203. # Validate insert_line
  204. if insert_line < 0 or insert_line > len(lines):
  205. return {
  206. "error": f"Invalid insert_line {insert_line}. "
  207. f"Must be between 0 and {len(lines)}"
  208. }
  209. # Insert the text
  210. lines.insert(insert_line, insert_text.rstrip("\n"))
  211. # Write back
  212. full_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
  213. return {"success": f"Text inserted at line {insert_line} in {path}"}
  214. except Exception as e:
  215. return {"error": f"Cannot insert into {path}: {e}"}
  216. def _delete(self, params: dict[str, Any]) -> dict[str, str]:
  217. """Delete a file or directory."""
  218. path = params.get("path")
  219. if not path:
  220. return {"error": "Missing required parameter: path"}
  221. # Prevent deletion of root memories directory
  222. if path == "/memories":
  223. return {"error": "Cannot delete the /memories directory itself"}
  224. full_path = self._validate_path(path)
  225. # Verify the path is within /memories to prevent accidental deletion outside the memory directory
  226. # This provides an additional safety check beyond _validate_path
  227. try:
  228. full_path.relative_to(self.memory_root.resolve())
  229. except ValueError:
  230. return {
  231. "error": f"Invalid operation: Path '{path}' is not within /memories directory. "
  232. "Only paths within /memories can be deleted."
  233. }
  234. if not full_path.exists():
  235. return {"error": f"Path not found: {path}"}
  236. try:
  237. if full_path.is_file():
  238. full_path.unlink()
  239. return {"success": f"File deleted: {path}"}
  240. elif full_path.is_dir():
  241. shutil.rmtree(full_path)
  242. return {"success": f"Directory deleted: {path}"}
  243. except Exception as e:
  244. return {"error": f"Cannot delete {path}: {e}"}
  245. def _rename(self, params: dict[str, Any]) -> dict[str, str]:
  246. """Rename or move a file/directory."""
  247. old_path = params.get("old_path")
  248. new_path = params.get("new_path")
  249. if not old_path or not new_path:
  250. return {"error": "Missing required parameters: old_path, new_path"}
  251. old_full_path = self._validate_path(old_path)
  252. new_full_path = self._validate_path(new_path)
  253. if not old_full_path.exists():
  254. return {"error": f"Source path not found: {old_path}"}
  255. if new_full_path.exists():
  256. return {
  257. "error": f"Destination already exists: {new_path}. "
  258. "Cannot overwrite existing files/directories."
  259. }
  260. try:
  261. # Create parent directories if needed
  262. new_full_path.parent.mkdir(parents=True, exist_ok=True)
  263. # Perform rename/move
  264. old_full_path.rename(new_full_path)
  265. return {"success": f"Renamed {old_path} to {new_path}"}
  266. except Exception as e:
  267. return {"error": f"Cannot rename {old_path} to {new_path}: {e}"}
  268. def clear_all_memory(self) -> dict[str, str]:
  269. """
  270. Clear all memory files (useful for testing or starting fresh).
  271. ⚠️ WARNING: This method is for demonstration and testing purposes only.
  272. In production, you should carefully consider whether you need to delete
  273. all memory files, as this will permanently remove all learned patterns
  274. and stored knowledge. Consider using selective deletion instead.
  275. Returns:
  276. Dict with success message
  277. """
  278. try:
  279. if self.memory_root.exists():
  280. shutil.rmtree(self.memory_root)
  281. self.memory_root.mkdir(parents=True, exist_ok=True)
  282. return {"success": "All memory cleared successfully"}
  283. except Exception as e:
  284. return {"error": f"Cannot clear memory: {e}"}