BrowserUse10-源码-FileSystem模块-整理
FileSystem模块-整理 1-源代码部分 import asyncioimport base64import osimport reimport shutilfrom abcimport ABC, abstractmethodfrom concurrent. futuresimport ThreadPoolExecutorfrom pathlibimport Pathfrom typingimport Anyfrom pydanticimport BaseModel, Fieldfrom reportlab. lib. pagesizesimport letterfrom reportlab. lib. stylesimport getSampleStyleSheetfrom reportlab. platypusimport Paragraph, SimpleDocTemplate, Spacer# 无效文件名错误信息:文件名格式无效,必须为字母数字字符并带有支持的扩展名。 INVALID_FILENAME_ERROR_MESSAGE= 'Error: Invalid filename format. Must be alphanumeric with supported extension.' # 默认文件系统路径 DEFAULT_FILE_SYSTEM_PATH= 'browseruse_agent_data' class FileSystemError ( Exception) : """用于文件系统操作的自定义异常,应显示给LLM""" pass class BaseFile ( BaseModel, ABC) : """所有文件类型的基类""" # 文件名 name: str # 文件内容,默认为空字符串 content: str = '' # --- 子类必须定义此属性 --- @property @abstractmethod def extension ( self) - > str : """文件扩展名(例如 'txt', 'md')""" pass def write_file_content ( self, content: str ) - > None : """更新内部内容(已格式化)""" self. update_content( content) def append_file_content ( self, content: str ) - > None : """将内容追加到内部内容中""" self. update_content( self. content+ content) # --- 这些是共享的且在此处实现 --- def update_content ( self, content: str ) - > None : self. content= contentdef sync_to_disk_sync ( self, path: Path) - > None : # 构建完整文件路径 file_path= path/ self. full_name# 同步写入文件内容 file_path. write_text( self. content) async def sync_to_disk ( self, path: Path) - > None : # 构建完整文件路径 file_path= path/ self. full_name# 使用线程池执行同步写入操作 with ThreadPoolExecutor( ) as executor: await asyncio. get_event_loop( ) . run_in_executor( executor, lambda : file_path. write_text( self. content) ) async def write ( self, content: str , path: Path) - > None : # 更新文件内容 self. write_file_content( content) # 异步同步到磁盘 await self. sync_to_disk( path) async def append ( self, content: str , path: Path) - > None : # 追加文件内容 self. append_file_content( content) # 异步同步到磁盘 await self. sync_to_disk( path) def read ( self) - > str : # 返回文件内容 return self. content@property def full_name ( self) - > str : # 返回完整的文件名(包含扩展名) return f' { self. name} . { self. extension} ' @property def get_size ( self) - > int : # 返回文件内容大小(字节数) return len ( self. content) @property def get_line_count ( self) - > int : # 返回文件行数 return len ( self. content. splitlines( ) ) class MarkdownFile ( BaseFile) : """Markdown文件实现""" @property def extension ( self) - > str : # 扩展名为 md return 'md' class TxtFile ( BaseFile) : """纯文本文件实现""" @property def extension ( self) - > str : # 扩展名为 txt return 'txt' class JsonFile ( BaseFile) : """JSON文件实现""" @property def extension ( self) - > str : # 扩展名为 json return 'json' class CsvFile ( BaseFile) : """CSV文件实现""" @property def extension ( self) - > str : # 扩展名为 csv return 'csv' class JsonlFile ( BaseFile) : """JSONL(JSON行)文件实现""" @property def extension ( self) - > str : # 扩展名为 jsonl return 'jsonl' class PdfFile ( BaseFile) : """PDF文件实现""" @property def extension ( self) - > str : # 扩展名为 pdf return 'pdf' def sync_to_disk_sync ( self, path: Path) - > None : # 构建完整文件路径 file_path= path/ self. full_nametry : # 创建PDF文档 doc= SimpleDocTemplate( str ( file_path) , pagesize= letter) styles= getSampleStyleSheet( ) story= [ ] # 将markdown内容转换为简单文本并添加到PDF中 # 对于基本实现,我们将内容视为纯文本 # 这避免了AGPL许可证问题,同时保持功能 content_lines= self. content. split( '\n' ) for linein content_lines: if line. strip( ) : # 处理基本的markdown标题 if line. startswith( '# ' ) : para= Paragraph( line[ 2 : ] , styles[ 'Title' ] ) elif line. startswith( '## ' ) : para= Paragraph( line[ 3 : ] , styles[ 'Heading1' ] ) elif line. startswith( '### ' ) : para= Paragraph( line[ 4 : ] , styles[ 'Heading2' ] ) else : para= Paragraph( line, styles[ 'Normal' ] ) story. append( para) else : story. append( Spacer( 1 , 6 ) ) # 构建PDF文档 doc. build( story) except Exceptionas e: # 抛出自定义文件系统错误 raise FileSystemError( f"Error: Could not write to file ' { self. full_name} '. { str ( e) } " ) async def sync_to_disk ( self, path: Path) - > None : # 使用线程池执行同步写入操作 with ThreadPoolExecutor( ) as executor: await asyncio. get_event_loop( ) . run_in_executor( executor, lambda : self. sync_to_disk_sync( path) ) class DocxFile ( BaseFile) : """DOCX文件实现""" @property def extension ( self) - > str : # 扩展名为 docx return 'docx' def sync_to_disk_sync ( self, path: Path) - > None : # 构建完整文件路径 file_path= path/ self. full_nametry : from docximport Document doc= Document( ) # 将内容转换为DOCX段落 content_lines= self. content. split( '\n' ) for linein content_lines: if line. strip( ) : # 处理基本的markdown标题 if line. startswith( '# ' ) : doc. add_heading( line[ 2 : ] , level= 1 ) elif line. startswith( '## ' ) : doc. add_heading( line[ 3 : ] , level= 2 ) elif line. startswith( '### ' ) : doc. add_heading( line[ 4 : ] , level= 3 ) else : doc. add_paragraph( line) else : doc. add_paragraph( ) # 空段落用于间距 # 保存文档 doc. save( str ( file_path) ) except Exceptionas e: # 抛出自定义文件系统错误 raise FileSystemError( f"Error: Could not write to file ' { self. full_name} '. { str ( e) } " ) async def sync_to_disk ( self, path: Path) - > None : # 使用线程池执行同步写入操作 with ThreadPoolExecutor( ) as executor: await asyncio. get_event_loop( ) . run_in_executor( executor, lambda : self. sync_to_disk_sync( path) ) class FileSystemState ( BaseModel) : """文件系统的可序列化状态""" # 文件列表,键为完整文件名,值为文件数据 files: dict [ str , dict [ str , Any] ] = Field( default_factory= dict ) # 基础目录路径 base_dir: str # 提取内容计数 extracted_content_count: int = 0 class FileSystem : """增强型文件系统,支持内存存储和多种文件类型""" def __init__ ( self, base_dir: str | Path, create_default_files: bool = True ) : # 在调用 super().__init__ 之前处理 Path 转换 # 设置基础目录 self. base_dir= Path( base_dir) if isinstance ( base_dir, str ) else base_dir# 创建基础目录(如果不存在) self. base_dir. mkdir( parents= True , exist_ok= True ) # 创建专用子文件夹用于所有操作 # 设置数据目录 self. data_dir= self. base_dir/ DEFAULT_FILE_SYSTEM_PATHif self. data_dir. exists( ) : # 清理数据目录 shutil. rmtree( self. data_dir) self. data_dir. mkdir( exist_ok= True ) # 定义支持的文件类型映射 self. _file_types: dict [ str , type [ BaseFile] ] = { 'md' : MarkdownFile, 'txt' : TxtFile, 'json' : JsonFile, 'jsonl' : JsonlFile, 'csv' : CsvFile, 'pdf' : PdfFile, 'docx' : DocxFile, } # 存储文件对象 self. files= { } if create_default_files: # 默认文件列表 self. default_files= [ 'todo.md' ] # 创建默认文件 self. _create_default_files( ) # 提取内容计数 self. extracted_content_count= 0 def get_allowed_extensions ( self) - > list [ str ] : """获取允许的扩展名列表""" return list ( self. _file_types. keys( ) ) def _get_file_type_class ( self, extension: str ) - > type [ BaseFile] | None : """根据扩展名获取相应的文件类""" return self. _file_types. get( extension. lower( ) , None ) def _create_default_files ( self) - > None : """创建默认结果和待办文件""" for full_filenamein self. default_files: # 解析文件名 name_without_ext, extension= self. _parse_filename( full_filename) # 获取文件类 file_class= self. _get_file_type_class( extension) if not file_class: raise ValueError( f"Error: Invalid file extension ' { extension} ' for file ' { full_filename} '." ) # 创建文件对象 file_obj= file_class( name= name_without_ext) # 使用完整文件名作为键存储 self. files[ full_filename] = file_obj# 同步写入磁盘 file_obj. sync_to_disk_sync( self. data_dir) def _is_valid_filename ( self, file_name: str ) - > bool : """检查文件名是否符合要求的模式:name.extension""" # 从 _file_types 构建扩展名模式 extensions= '|' . join( self. _file_types. keys( ) ) # 正则表达式模式 pattern= rf'^[a-zA-Z0-9_\-\u4e00-\u9fff]+\.( { extensions} )$' # 获取文件名基础部分 file_name_base= os. path. basename( file_name) # 匹配模式 return bool ( re. match( pattern, file_name_base) ) def _parse_filename ( self, filename: str ) - > tuple [ str , str ] : """将文件名解析为名称和扩展名。始终在检查 _is_valid_filename 之后使用""" # 分割文件名和扩展名 name, extension= filename. rsplit( '.' , 1 ) # 返回小写的扩展名 return name, extension. lower( ) def get_dir ( self) - > Path: """获取文件系统目录""" return self. data_dirdef get_file ( self, full_filename: str ) - > BaseFile| None : """通过完整文件名获取文件对象""" # 检查文件名是否有效 if not self. _is_valid_filename( full_filename) : return None # 使用完整文件名作为键 return self. files. get( full_filename) def list_files ( self) - > list [ str ] : """列出系统中的所有文件""" # 返回所有文件的完整文件名列表 return [ file_obj. full_namefor file_objin self. files. values( ) ] def display_file ( self, full_filename: str ) - > str | None : """使用特定于文件的显示方法显示文件内容""" # 检查文件名是否有效 if not self. _is_valid_filename( full_filename) : return None # 获取文件对象 file_obj= self. get_file( full_filename) if not file_obj: return None # 返回文件内容 return file_obj. read( ) async def read_file_structured ( self, full_filename: str , external_file: bool = False ) - > dict [ str , Any] : """读取文件并返回结构化数据,包括图片(如适用)。 返回: 包含以下键的字典: - 'message': str - 要显示的消息 - 'images': list[dict] | None - 如果文件是图片,则包含图片数据:[{"name": str, "data": base64_str}] """ result: dict [ str , Any] = { 'message' : '' , 'images' : None } if external_file: try : try : _, extension= self. _parse_filename( full_filename) except Exception: # 错误消息 result[ 'message' ] = ( f'Error: Invalid filename format { full_filename} . Must be alphanumeric with a supported extension.' ) return result# 支持的文件类型 if extensionin [ 'md' , 'txt' , 'json' , 'jsonl' , 'csv' ] : import anyio# 异步打开文件并读取内容 async with await anyio. open_file( full_filename, 'r' ) as f: content= await f. read( ) # 构建消息 result[ 'message' ] = f'Read from file { full_filename} .\n<content>\n { content} \n</content>' return resultelif extension== 'docx' : from docximport Document# 加载DOCX文档 doc= Document( full_filename) # 提取所有段落文本 content= '\n' . join( [ para. textfor parain doc. paragraphs] ) # 构建消息 result[ 'message' ] = f'Read from file { full_filename} .\n<content>\n { content} \n</content>' return resultelif extension== 'pdf' : import pypdf# 加载PDF文档 reader= pypdf. PdfReader( full_filename) num_pages= len ( reader. pages) MAX_PDF_PAGES= 20 extra_pages= num_pages- MAX_PDF_PAGES extracted_text= '' # 提取前20页的内容 for pagein reader. pages[ : MAX_PDF_PAGES] : extracted_text+= page. extract_text( ) # 添加额外页数信息 extra_pages_text= f' { extra_pages} more pages...' if extra_pages> 0 else '' # 构建消息 result[ 'message' ] = ( f'Read from file { full_filename} .\n<content>\n { extracted_text} \n { extra_pages_text} </content>' ) return resultelif extensionin [ 'jpg' , 'jpeg' , 'png' ] : import anyio# 异步读取图像文件并转换为base64 async with await anyio. open_file( full_filename, 'rb' ) as f: img_data= await f. read( ) base64_str= base64. b64encode( img_data) . decode( 'utf-8' ) # 构建消息 result[ 'message' ] = f'Read image file { full_filename} .' # 包含图像数据 result[ 'images' ] = [ { 'name' : os. path. basename( full_filename) , 'data' : base64_str} ] return resultelse : # 不支持的扩展名 result[ 'message' ] = f'Error: Cannot read file { full_filename} as { extension} extension is not supported.' return resultexcept FileNotFoundError: # 文件未找到 result[ 'message' ] = f"Error: File ' { full_filename} ' not found." return resultexcept PermissionError: # 权限被拒绝 result[ 'message' ] = f"Error: Permission denied to read file ' { full_filename} '." return resultexcept Exceptionas e: # 其他错误 result[ 'message' ] = f"Error: Could not read file ' { full_filename} '. { str ( e) } " return result# 对于内部文件,仅支持非图片类型 # 检查文件名是否有效 if not self. _is_valid_filename( full_filename) : result[ 'message' ] = INVALID_FILENAME_ERROR_MESSAGEreturn result# 获取文件对象 file_obj= self. get_file( full_filename) if not file_obj: result[ 'message' ] = f"File ' { full_filename} ' not found." return resulttry : # 读取文件内容 content= file_obj. read( ) # 构建消息 result[ 'message' ] = f'Read from file { full_filename} .\n<content>\n { content} \n</content>' return resultexcept FileSystemErroras e: # 文件系统错误 result[ 'message' ] = str ( e) return resultexcept Exceptionas e: # 其他错误 result[ 'message' ] = f"Error: Could not read file ' { full_filename} '. { str ( e) } " return resultasync def read_file ( self, full_filename: str , external_file: bool = False ) - > str : """使用特定于文件的读取方法读取文件内容,并返回适当的LLM消息。 注意:对于图像文件,请使用 read_file_structured() 以获取图像数据。 """ # 获取结构化读取结果 result= await self. read_file_structured( full_filename, external_file) # 返回消息 return result[ 'message' ] async def write_file ( self, full_filename: str , content: str ) - > str : """使用特定于文件的写入方法将内容写入文件""" # 检查文件名是否有效 if not self. _is_valid_filename( full_filename) : return INVALID_FILENAME_ERROR_MESSAGEtry : # 解析文件名 name_without_ext, extension= self. _parse_filename( full_filename) # 获取文件类 file_class= self. _get_file_type_class( extension) if not file_class: raise ValueError( f"Error: Invalid file extension ' { extension} ' for file ' { full_filename} '." ) # 创建或获取现有文件对象 # 使用完整文件名作为键 if full_filenamein self. files: file_obj= self. files[ full_filename] else : file_obj= file_class( name= name_without_ext) self. files[ full_filename] = file_obj# 使用特定于文件的写入方法 await file_obj. write( content, self. data_dir) # 成功消息 return f'Data written to file { full_filename} successfully.' except FileSystemErroras e: # 文件系统错误 return str ( e) except Exceptionas e: # 其他错误 return f"Error: Could not write to file ' { full_filename} '. { str ( e) } " async def append_file ( self, full_filename: str , content: str ) - > str : """使用特定于文件的追加方法向文件追加内容""" # 检查文件名是否有效 if not self. _is_valid_filename( full_filename) : return INVALID_FILENAME_ERROR_MESSAGE# 获取文件对象 file_obj= self. get_file( full_filename) if not file_obj: return f"File ' { full_filename} ' not found." try : # 使用特定于文件的追加方法 await file_obj. append( content, self. data_dir) # 成功消息 return f'Data appended to file { full_filename} successfully.' except FileSystemErroras e: # 文件系统错误 return str ( e) except Exceptionas e: # 其他错误 return f"Error: Could not append to file ' { full_filename} '. { str ( e) } " async def replace_file_str ( self, full_filename: str , old_str: str , new_str: str ) - > str : """在文件中将 old_str 替换为 new_str""" # 检查文件名是否有效 if not self. _is_valid_filename( full_filename) : return INVALID_FILENAME_ERROR_MESSAGE# 不能替换空字符串 if not old_str: return 'Error: Cannot replace empty string. Please provide a non-empty string to replace.' # 获取文件对象 file_obj= self. get_file( full_filename) if not file_obj: return f"File ' { full_filename} ' not found." try : # 读取文件内容 content= file_obj. read( ) # 替换字符串 content= content. replace( old_str, new_str) # 写回文件 await file_obj. write( content, self. data_dir) # 成功消息 return f'Successfully replaced all occurrences of " { old_str} " with " { new_str} " in file { full_filename} ' except FileSystemErroras e: # 文件系统错误 return str ( e) except Exceptionas e: # 其他错误 return f"Error: Could not replace string in file ' { full_filename} '. { str ( e) } " async def save_extracted_content ( self, content: str ) - > str : """将提取的内容保存到编号文件中""" # 初始文件名 initial_filename= f'extracted_content_ { self. extracted_content_count} ' # 完整文件名 extracted_filename= f' { initial_filename} .md' # 创建Markdown文件对象 file_obj= MarkdownFile( name= initial_filename) # 写入内容 await file_obj. write( content, self. data_dir) # 存储文件对象 self. files[ extracted_filename] = file_obj# 增加计数器 self. extracted_content_count+= 1 # 返回文件名 return extracted_filenamedef describe ( self) - > str : """使用特定于文件的显示方法列出所有文件及其内容信息""" # 显示字符数限制 DISPLAY_CHARS= 400 description= '' for file_objin self. files. values( ) : # 跳过 todo.md 的描述 if file_obj. full_name== 'todo.md' : continue # 读取文件内容 content= file_obj. read( ) # 处理空文件 if not content: description+= f'<file>\n { file_obj. full_name} - [empty file]\n</file>\n' continue # 按行分割 lines= content. splitlines( ) line_count= len ( lines) # 对于小文件,显示全部内容 whole_file_description= ( f'<file>\n { file_obj. full_name} - { line_count} lines\n<content>\n { content} \n</content>\n</file>\n' ) if len ( content) < int ( 1.5 * DISPLAY_CHARS) : description+= whole_file_descriptioncontinue # 对于大文件,显示开头和结尾预览 half_display_chars= DISPLAY_CHARS// 2 # 获取开头预览 start_preview= '' start_line_count= 0 chars_count= 0 for linein lines: if chars_count+ len ( line) + 1 > half_display_chars: break start_preview+= line+ '\n' chars_count+= len ( line) + 1 start_line_count+= 1 # 获取结尾预览 end_preview= '' end_line_count= 0 chars_count= 0 for linein reversed ( lines) : if chars_count+ len ( line) + 1 > half_display_chars: break end_preview= line+ '\n' + end_preview chars_count+= len ( line) + 1 end_line_count+= 1 # 计算中间的行数 middle_line_count= line_count- start_line_count- end_line_countif middle_line_count<= 0 : description+= whole_file_descriptioncontinue # 清理预览 start_preview= start_preview. strip( '\n' ) . rstrip( ) end_preview= end_preview. strip( '\n' ) . rstrip( ) # 格式化输出 if not ( start_previewor end_preview) : description+= f'<file>\n { file_obj. full_name} - { line_count} lines\n<content>\n { middle_line_count} lines...\n</content>\n</file>\n' else : description+= f'<file>\n { file_obj. full_name} - { line_count} lines\n<content>\n { start_preview} \n' description+= f'... { middle_line_count} more lines ...\n' description+= f' { end_preview} \n' description+= '</content>\n</file>\n' # 去除末尾换行符 return description. strip( '\n' ) def get_todo_contents ( self) - > str : """获取待办文件内容""" # 获取 todo.md 文件 todo_file= self. get_file( 'todo.md' ) # 返回内容,如果文件不存在则返回空字符串 return todo_file. read( ) if todo_fileelse '' def get_state ( self) - > FileSystemState: """获取文件系统的可序列化状态""" # 存储文件数据 files_data= { } for full_filename, file_objin self. files. items( ) : files_data[ full_filename] = { 'type' : file_obj. __class__. __name__, 'data' : file_obj. model_dump( ) } # 构建状态对象 return FileSystemState( files= files_data, base_dir= str ( self. base_dir) , extracted_content_count= self. extracted_content_count) def nuke ( self) - > None : """删除文件系统目录""" # 删除数据目录 shutil. rmtree( self. data_dir) @classmethod def from_state ( cls, state: FileSystemState) - > 'FileSystem' : """从可序列化状态恢复文件系统,位置完全相同""" # 创建文件系统但不创建默认文件 fs= cls( base_dir= Path( state. base_dir) , create_default_files= False ) # 恢复提取内容计数 fs. extracted_content_count= state. extracted_content_count# 恢复所有文件 for full_filename, file_datain state. files. items( ) : file_type= file_data[ 'type' ] file_info= file_data[ 'data' ] # 根据类型创建适当的文件对象 if file_type== 'MarkdownFile' : file_obj= MarkdownFile( ** file_info) elif file_type== 'TxtFile' : file_obj= TxtFile( ** file_info) elif file_type== 'JsonFile' : file_obj= JsonFile( ** file_info) elif file_type== 'JsonlFile' : file_obj= JsonlFile( ** file_info) elif file_type== 'CsvFile' : file_obj= CsvFile( ** file_info) elif file_type== 'PdfFile' : file_obj= PdfFile( ** file_info) elif file_type== 'DocxFile' : file_obj= DocxFile( ** file_info) else : # 跳过未知文件类型 continue # 添加到文件字典并同步到磁盘 fs. files[ full_filename] = file_obj file_obj. sync_to_disk_sync( fs. data_dir) return fs2-自动测试 #!/usr/bin/env python3 """ FileSystem 文件系统读取测试用例 测试读取本地文件并打印具体内容,覆盖所有支持的文件类型 """ # 添加项目根目录到 Python 路径以支持导入 import sysfrom pathlibimport Path sys. path. insert( 0 , str ( Path( __file__) . parent. parent. parent) ) import asyncioimport jsonimport loggingfrom reportlab. lib. pagesizesimport letterfrom reportlab. platypusimport SimpleDocTemplate, Paragraphfrom browser_use_manual. filesystem. file_systemimport ( FileSystem) # 配置日志 logging. basicConfig( level= logging. INFO, format = '%(levelname)s - %(name)s - %(message)s' ) logger= logging. getLogger( __name__) class TestFileSystemReadAllTypes : """测试文件系统读取所有文件类型""" def __init__ ( self) : """每个测试方法执行前的准备工作""" # 创建临时目录用于测试 self. file_dir= Path( "/Users/rong/Documents/EnzoApplication/WorkSpace/Python/20251209_1_Python_playwright_manual/browser-use-manual-file" ) logger. info( f"测试文件目录地址: { self. file_dir} " ) # 创建文件系统实例 self. fs= FileSystem( self. file_dir, create_default_files= False ) def create_test_file ( self, filename: str , content: str ) : """创建测试文件""" # 创建完整的文件系统路径 fs_data_dir= self. file_dir fs_data_dir. mkdir( parents= True , exist_ok= True ) full_path= fs_data_dir/ filename full_path. write_text( content, encoding= 'utf-8' ) logger. info( f"创建测试文件: { full_path} " ) return str ( full_path) def test_read_markdown_file ( self) : """测试读取 Markdown 文件""" filename= "textContent.md" """测试外部文件读取""" file_full_path= self. file_dir/ filenametry : result= asyncio. run( self. fs. read_file( str ( file_full_path) , external_file= True ) ) logger. info( f"读取 MARKDOWN 文件结果-第1次: { result} " ) except Exceptionas e: logger. info( f"读取 MARKDOWN 文件失败-第1次: { str ( e) } " ) """测试外部文件写入""" content= "# 测试标题\n\n这是一个测试的 Markdown 文件。\n\n- 列表项1\n- 列表项2\n" file_path= self. create_test_file( filename, content) result= asyncio. run( self. fs. read_file( str ( file_path) , external_file= True ) ) logger. info( f"读取 MARKDOWN 文件结果-第2次: { result} " ) def test_read_text_file ( self) : """测试读取 TEXT 文件""" filename= "textContent.txt" """测试外部文件读取""" file_full_path= self. file_dir/ filenametry : result= asyncio. run( self. fs. read_file( str ( file_full_path) , external_file= True ) ) logger. info( f"读取 TEXT 文件结果-第1次: { result} " ) except Exceptionas e: logger. info( f"读取 TEXT 文件失败-第1次: { str ( e) } " ) """测试外部文件写入""" content= "这是测试文本文件的内容。\n第二行内容。\n第三行内容。" file_path= self. create_test_file( filename, content) result= asyncio. run( self. fs. read_file( str ( file_path) , external_file= True ) ) logger. info( f"读取 TEXT 文件结果-第2次: { result} " ) def test_read_json_file ( self) : """测试读取 JSON 文件""" filename= "textContent.json" """测试外部文件读取""" file_full_path= self. file_dir/ filenametry : result= asyncio. run( self. fs. read_file( str ( file_full_path) , external_file= True ) ) logger. info( f"读取 JSON 文件结果-第1次: { result} " ) except Exceptionas e: logger. info( f"读取 JSON 文件失败-第1次: { str ( e) } " ) """测试外部文件写入""" content= json. dumps( { "name" : "测试用户" , "age" : 30 , "skills" : [ "Python" , "JavaScript" , "Java" ] } , ensure_ascii= False , indent= 2 ) file_path= self. create_test_file( filename, content) result= asyncio. run( self. fs. read_file( str ( file_path) , external_file= True ) ) logger. info( f"读取 JSON 文件结果-第2次: { result} " ) def test_read_csv_file ( self) : """测试读取 CSV 文件""" filename= "textContent.csv" """测试外部文件读取""" file_full_path= self. file_dir/ filenametry : result= asyncio. run( self. fs. read_file( str ( file_full_path) , external_file= True ) ) logger. info( f"读取 CSV 文件结果-第1次: { result} " ) except Exceptionas e: logger. info( f"读取 CSV 文件失败-第1次: { str ( e) } " ) """测试外部文件写入""" content= "姓名,年龄,城市\n张三,25,北京\n李四,30,上海\n王五,28,广州" file_path= self. create_test_file( filename, content) result= asyncio. run( self. fs. read_file( str ( file_path) , external_file= True ) ) logger. info( f"读取 CSV 文件结果-第2次: { result} " ) def test_read_jsonl_file ( self) : """测试读取 JSONL 文件""" filename= "textContent.jsonl" """测试外部文件读取""" file_full_path= self. file_dir/ filenametry : result= asyncio. run( self. fs. read_file( str ( file_full_path) , external_file= True ) ) logger. info( f"读取 JSONL 文件结果-第1次: { result} " ) except Exceptionas e: logger. info( f"读取 JSONL 文件失败-第1次: { str ( e) } " ) """测试外部文件写入""" lines= [ json. dumps( { "id" : 1 , "name" : "项目1" } , ensure_ascii= False ) , json. dumps( { "id" : 2 , "name" : "项目2" } , ensure_ascii= False ) , json. dumps( { "id" : 3 , "name" : "项目3" } , ensure_ascii= False ) ] content= "\n" . join( lines) file_path= self. create_test_file( filename, content) result= asyncio. run( self. fs. read_file( str ( file_path) , external_file= True ) ) logger. info( f"读取 JSONL 文件结果-第2次: { result} " ) def test_read_pdf_file ( self) : """测试读取 PDF 文件""" filename= "textContent.pdf" """测试外部文件读取""" file_full_path= self. file_dir/ filenametry : result= asyncio. run( self. fs. read_file( str ( file_full_path) , external_file= True ) ) logger. info( f"读取 PDF 文件结果-第1次: { result} " ) except Exceptionas e: logger. info( f"读取 PDF 文件失败-第1次: { str ( e) } " ) """测试外部文件写入""" # 使用 reportlab 创建 PDF content= "这是测试 PDF 文件的内容。" doc= SimpleDocTemplate( str ( self. file_dir/ filename) , pagesize= letter) story= [ Paragraph( content) ] doc. build( story) file_path= self. create_test_file( filename, content) result= asyncio. run( self. fs. read_file( str ( file_path) , external_file= True ) ) logger. info( f"读取 PDF 文件结果-第2次: { result} " ) def test_read_docx_file ( self) : """测试读取 DOCX 文件""" try : from docximport Document has_docx= True except ImportError: has_docx= False if not has_docx: logger. info( "未安装 python-docx,跳过 DOCX 文件测试" ) return filename= "textContent.docx" """测试外部文件读取""" file_full_path= self. file_dir/ filenametry : result= asyncio. run( self. fs. read_file( str ( file_full_path) , external_file= True ) ) logger. info( f"读取 DOCX 文件结果-第1次: { result} " ) except Exceptionas e: logger. info( f"读取 DOCX 文件失败-第1次: { str ( e) } " ) """测试外部文件写入""" # 创建 DOCX 文件 content= "这是测试 DOCX 文件的内容。\n第二段内容。" doc= Document( ) doc. add_heading( '测试标题' , level= 1 ) doc. add_paragraph( '这是测试 DOCX 文件的内容。' ) doc. add_paragraph( '第二段内容。' ) doc. save( str ( self. file_dir/ filename) ) file_path= self. create_test_file( filename, content) result= asyncio. run( self. fs. read_file( str ( file_path) , external_file= True ) ) logger. info( f"读取 DOCX 文件结果-第2次: { result} " ) def test_read_image_file ( self) : """测试读取图像文件""" # 创建一个简单的测试图像文件 filename= "test_image.png" """测试外部文件读取""" file_full_path= self. file_dir/ filenametry : result= asyncio. run( self. fs. read_file( str ( file_full_path) , external_file= True ) ) logger. info( f"读取 IMAGE 文件结果-第1次: { result} " ) except Exceptionas e: logger. info( f"读取 IMAGE 文件失败-第1次: { str ( e) } " ) """测试外部文件写入""" # 创建 PNG 文件 content= "fake png content for testing" file_path= self. create_test_file( filename, content) result= asyncio. run( self. fs. read_file( str ( file_path) , external_file= True ) ) logger. info( f"读取 IMAGE 文件结果-第2次: { result} " ) def test_read_all_supported_types ( self) : """测试读取所有支持的文件类型""" logger. info( "\n=== 测试读取所有支持的文件类型 ===" ) # 测试的文件类型和内容 test_files= [ ( "test1.md" , "# Markdown 测试\n\n这是 Markdown 内容。" ) , ( "test2.txt" , "纯文本测试内容。\n第二行。" ) , ( "test3.json" , '{"key": "value", "number": 42}' ) , ( "test4.csv" , "列1,列2,列3\n值1,值2,值3\n值4,值5,值6" ) , ( "test5.jsonl" , '{"line": 1}\n{"line": 2}\n{"line": 3}' ) , ] # 创建所有测试文件 created_files= [ ] for filename, contentin test_files: file_path= self. create_test_file( filename, content) created_files. append( ( filename, content) ) logger. info( f"已创建测试文件: { file_path} " ) # 逐个读取并验证文件 for filename, expected_contentin created_files: logger. info( f"\n--- 读取文件: { filename} ---" ) file_full_path= self. file_dir/ filenametry : result= asyncio. run( self. fs. read_file( str ( file_full_path) , external_file= True ) ) logger. info( f"读取结果: { result} " ) except Exceptionas e: logger. info( f"读取文件 { filename} 失败: { str ( e) } " ) logger. info( "\n=== 所有文件类型测试完成 ===" ) def test_invalid_file_types ( self) : """测试不支持的文件类型""" filename= "textContent.xyz" # 不支持的扩展名 content= "无效文件类型测试" file_path= self. create_test_file( filename, content) logger. info( f"无效文件类型测试路径: { file_path} " ) # 测试外部文件读取 file_full_path= self. file_dir/ filename result= asyncio. run( self. fs. read_file( str ( file_full_path) , external_file= True ) ) logger. info( f"读取不支持的文件类型结果: { result} " ) # 验证返回错误信息 assert "extension is not supported" in resultif __name__== "__main__" : # 运行所有测试 test_instance= TestFileSystemReadAllTypes( ) # 运行各个测试方法 test_instance. test_read_markdown_file( ) test_instance. test_read_text_file( ) test_instance. test_read_json_file( ) test_instance. test_read_csv_file( ) test_instance. test_read_jsonl_file( ) test_instance. test_read_pdf_file( ) test_instance. test_read_docx_file( ) test_instance. test_read_image_file( ) test_instance. test_read_all_supported_types( ) test_instance. test_invalid_file_types( ) 3-代码使用 #!/usr/bin/env python3 """ FileSystem 文件系统读取测试用例 测试读取本地文件并打印具体内容,覆盖所有支持的文件类型 """ # 添加项目根目录到 Python 路径以支持导入 import sysfrom pathlibimport Path sys. path. insert( 0 , str ( Path( __file__) . parent. parent. parent) ) import loggingfrom browser_use_manual. filesystem. file_systemimport ( MarkdownFile) # 配置日志 logging. basicConfig( level= logging. INFO, format = '%(levelname)s - %(name)s - %(message)s' ) logger= logging. getLogger( __name__) from datetimeimport dateprint ( ) if __name__== "__main__" : # 指定文件路径 file_dir= Path( "/Users/rong/Documents/EnzoApplication/WorkSpace/Python/20251209_1_Python_playwright_manual/browser-use-manual-file/" ) file_name= "popyu_" + str ( date. today( ) ) logger. info( f"测试文件的创建地址: { file_dir} / { file_name} " ) # 创建一个指定的文件类型 markdown_file= MarkdownFile( name= file_name) markdown_file. append_file_content( "This is a test markdown file." ) markdown_file. sync_to_disk_sync( file_dir)