memory_tool.py 13 KB


  1. TRANSLATED CONTENT:
  2. """
  3. Production-ready memory tool handler for Claude's memory_20250818 tool.
  4. This implementation provides secure, client-side execution of memory operations
  5. with path validation, error handling, and comprehensive security measures.
  6. """
  7. import shutil
  8. from pathlib import Path
  9. from typing import Any
  10. class MemoryToolHandler:
  11. """
  12. Handles execution of Claude's memory tool commands.
  13. The memory tool enables Claude to read, write, and manage files in a memory
  14. system through a standardized tool interface. This handler provides client-side
  15. implementation with security controls.
  16. Attributes:
  17. base_path: Root directory for memory storage
  18. memory_root: The /memories directory within base_path
  19. """
  20. def __init__(self, base_path: str = "./memory_storage"):
  21. """
  22. Initialize the memory tool handler.
  23. Args:
  24. base_path: Root directory for all memory operations
  25. """
  26. self.base_path = Path(base_path).resolve()
  27. self.memory_root = self.base_path / "memories"
  28. self.memory_root.mkdir(parents=True, exist_ok=True)
  29. def _validate_path(self, path: str) -> Path:
  30. """
  31. Validate and resolve memory paths to prevent directory traversal attacks.
  32. Args:
  33. path: The path to validate (must start with /memories)
  34. Returns:
  35. Resolved absolute Path object within memory_root
  36. Raises:
  37. ValueError: If path is invalid or attempts to escape memory directory
  38. """
  39. if not path.startswith("/memories"):
  40. raise ValueError(
  41. f"Path must start with /memories, got: {path}. "
  42. "All memory operations must be confined to the /memories directory."
  43. )
  44. # Remove /memories prefix and any leading slashes
  45. relative_path = path[len("/memories") :].lstrip("/")
  46. # Resolve to absolute path within memory_root
  47. if relative_path:
  48. full_path = (self.memory_root / relative_path).resolve()
  49. else:
  50. full_path = self.memory_root.resolve()
  51. # Verify the resolved path is still within memory_root
  52. try:
  53. full_path.relative_to(self.memory_root.resolve())
  54. except ValueError as e:
  55. raise ValueError(
  56. f"Path '{path}' would escape /memories directory. "
  57. "Directory traversal attempts are not allowed."
  58. ) from e
  59. return full_path
  60. def execute(self, **params: Any) -> dict[str, str]:
  61. """
  62. Execute a memory tool command.
  63. Args:
  64. **params: Command parameters from Claude's tool use
  65. Returns:
  66. Dict with either 'success' or 'error' key
  67. Supported commands:
  68. - view: Show directory contents or file contents
  69. - create: Create or overwrite a file
  70. - str_replace: Replace text in a file
  71. - insert: Insert text at a specific line
  72. - delete: Delete a file or directory
  73. - rename: Rename or move a file/directory
  74. """
  75. command = params.get("command")
  76. try:
  77. if command == "view":
  78. return self._view(params)
  79. elif command == "create":
  80. return self._create(params)
  81. elif command == "str_replace":
  82. return self._str_replace(params)
  83. elif command == "insert":
  84. return self._insert(params)
  85. elif command == "delete":
  86. return self._delete(params)
  87. elif command == "rename":
  88. return self._rename(params)
  89. else:
  90. return {
  91. "error": f"Unknown command: '{command}'. "
  92. "Valid commands are: view, create, str_replace, insert, delete, rename"
  93. }
  94. except ValueError as e:
  95. return {"error": str(e)}
  96. except Exception as e:
  97. return {"error": f"Unexpected error executing {command}: {e}"}
  98. def _view(self, params: dict[str, Any]) -> dict[str, str]:
  99. """View directory contents or file contents."""
  100. path = params.get("path")
  101. view_range = params.get("view_range")
  102. if not path:
  103. return {"error": "Missing required parameter: path"}
  104. full_path = self._validate_path(path)
  105. # Handle directory listing
  106. if full_path.is_dir():
  107. try:
  108. items = []
  109. for item in sorted(full_path.iterdir()):
  110. if item.name.startswith("."):
  111. continue
  112. items.append(f"{item.name}/" if item.is_dir() else item.name)
  113. if not items:
  114. return {"success": f"Directory: {path}\n(empty)"}
  115. return {
  116. "success": f"Directory: {path}\n" + "\n".join([f"- {item}" for item in items])
  117. }
  118. except Exception as e:
  119. return {"error": f"Cannot read directory {path}: {e}"}
  120. # Handle file reading
  121. elif full_path.is_file():
  122. try:
  123. content = full_path.read_text(encoding="utf-8")
  124. lines = content.splitlines()
  125. # Apply view range if specified
  126. if view_range:
  127. start_line = max(1, view_range[0]) - 1 # Convert to 0-indexed
  128. end_line = len(lines) if view_range[1] == -1 else view_range[1]
  129. lines = lines[start_line:end_line]
  130. start_num = start_line + 1
  131. else:
  132. start_num = 1
  133. # Format with line numbers
  134. numbered_lines = [f"{i + start_num:4d}: {line}" for i, line in enumerate(lines)]
  135. return {"success": "\n".join(numbered_lines)}
  136. except UnicodeDecodeError:
  137. return {"error": f"Cannot read {path}: File is not valid UTF-8 text"}
  138. except Exception as e:
  139. return {"error": f"Cannot read file {path}: {e}"}
  140. else:
  141. return {"error": f"Path not found: {path}"}
  142. def _create(self, params: dict[str, Any]) -> dict[str, str]:
  143. """Create or overwrite a file."""
  144. path = params.get("path")
  145. file_text = params.get("file_text", "")
  146. if not path:
  147. return {"error": "Missing required parameter: path"}
  148. full_path = self._validate_path(path)
  149. # Don't allow creating directories directly
  150. if not path.endswith((".txt", ".md", ".json", ".py", ".yaml", ".yml")):
  151. return {
  152. "error": f"Cannot create {path}: Only text files are supported. "
  153. "Use file extensions: .txt, .md, .json, .py, .yaml, .yml"
  154. }
  155. try:
  156. # Create parent directories if needed
  157. full_path.parent.mkdir(parents=True, exist_ok=True)
  158. # Write the file
  159. full_path.write_text(file_text, encoding="utf-8")
  160. return {"success": f"File created successfully at {path}"}
  161. except Exception as e:
  162. return {"error": f"Cannot create file {path}: {e}"}
  163. def _str_replace(self, params: dict[str, Any]) -> dict[str, str]:
  164. """Replace text in a file."""
  165. path = params.get("path")
  166. old_str = params.get("old_str")
  167. new_str = params.get("new_str", "")
  168. if not path or old_str is None:
  169. return {"error": "Missing required parameters: path, old_str"}
  170. full_path = self._validate_path(path)
  171. if not full_path.is_file():
  172. return {"error": f"File not found: {path}"}
  173. try:
  174. content = full_path.read_text(encoding="utf-8")
  175. # Check if old_str exists
  176. count = content.count(old_str)
  177. if count == 0:
  178. return {
  179. "error": f"String not found in {path}. The exact text must exist in the file."
  180. }
  181. elif count > 1:
  182. return {
  183. "error": f"String appears {count} times in {path}. "
  184. "The string must be unique. Use more specific context."
  185. }
  186. # Perform replacement
  187. new_content = content.replace(old_str, new_str, 1)
  188. full_path.write_text(new_content, encoding="utf-8")
  189. return {"success": f"File {path} has been edited successfully"}
  190. except Exception as e:
  191. return {"error": f"Cannot edit file {path}: {e}"}
  192. def _insert(self, params: dict[str, Any]) -> dict[str, str]:
  193. """Insert text at a specific line."""
  194. path = params.get("path")
  195. insert_line = params.get("insert_line")
  196. insert_text = params.get("insert_text", "")
  197. if not path or insert_line is None:
  198. return {"error": "Missing required parameters: path, insert_line"}
  199. full_path = self._validate_path(path)
  200. if not full_path.is_file():
  201. return {"error": f"File not found: {path}"}
  202. try:
  203. lines = full_path.read_text(encoding="utf-8").splitlines()
  204. # Validate insert_line
  205. if insert_line < 0 or insert_line > len(lines):
  206. return {
  207. "error": f"Invalid insert_line {insert_line}. "
  208. f"Must be between 0 and {len(lines)}"
  209. }
  210. # Insert the text
  211. lines.insert(insert_line, insert_text.rstrip("\n"))
  212. # Write back
  213. full_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
  214. return {"success": f"Text inserted at line {insert_line} in {path}"}
  215. except Exception as e:
  216. return {"error": f"Cannot insert into {path}: {e}"}
  217. def _delete(self, params: dict[str, Any]) -> dict[str, str]:
  218. """Delete a file or directory."""
  219. path = params.get("path")
  220. if not path:
  221. return {"error": "Missing required parameter: path"}
  222. # Prevent deletion of root memories directory
  223. if path == "/memories":
  224. return {"error": "Cannot delete the /memories directory itself"}
  225. full_path = self._validate_path(path)
  226. # Verify the path is within /memories to prevent accidental deletion outside the memory directory
  227. # This provides an additional safety check beyond _validate_path
  228. try:
  229. full_path.relative_to(self.memory_root.resolve())
  230. except ValueError:
  231. return {
  232. "error": f"Invalid operation: Path '{path}' is not within /memories directory. "
  233. "Only paths within /memories can be deleted."
  234. }
  235. if not full_path.exists():
  236. return {"error": f"Path not found: {path}"}
  237. try:
  238. if full_path.is_file():
  239. full_path.unlink()
  240. return {"success": f"File deleted: {path}"}
  241. elif full_path.is_dir():
  242. shutil.rmtree(full_path)
  243. return {"success": f"Directory deleted: {path}"}
  244. except Exception as e:
  245. return {"error": f"Cannot delete {path}: {e}"}
  246. def _rename(self, params: dict[str, Any]) -> dict[str, str]:
  247. """Rename or move a file/directory."""
  248. old_path = params.get("old_path")
  249. new_path = params.get("new_path")
  250. if not old_path or not new_path:
  251. return {"error": "Missing required parameters: old_path, new_path"}
  252. old_full_path = self._validate_path(old_path)
  253. new_full_path = self._validate_path(new_path)
  254. if not old_full_path.exists():
  255. return {"error": f"Source path not found: {old_path}"}
  256. if new_full_path.exists():
  257. return {
  258. "error": f"Destination already exists: {new_path}. "
  259. "Cannot overwrite existing files/directories."
  260. }
  261. try:
  262. # Create parent directories if needed
  263. new_full_path.parent.mkdir(parents=True, exist_ok=True)
  264. # Perform rename/move
  265. old_full_path.rename(new_full_path)
  266. return {"success": f"Renamed {old_path} to {new_path}"}
  267. except Exception as e:
  268. return {"error": f"Cannot rename {old_path} to {new_path}: {e}"}
  269. def clear_all_memory(self) -> dict[str, str]:
  270. """
  271. Clear all memory files (useful for testing or starting fresh).
  272. ⚠️ WARNING: This method is for demonstration and testing purposes only.
  273. In production, you should carefully consider whether you need to delete
  274. all memory files, as this will permanently remove all learned patterns
  275. and stored knowledge. Consider using selective deletion instead.
  276. Returns:
  277. Dict with success message
  278. """
  279. try:
  280. if self.memory_root.exists():
  281. shutil.rmtree(self.memory_root)
  282. self.memory_root.mkdir(parents=True, exist_ok=True)
  283. return {"success": "All memory cleared successfully"}
  284. except Exception as e:
  285. return {"error": f"Cannot clear memory: {e}"}