news 2026/6/15 18:30:28

ai自己制作mod2 ocr vlm识别 模型页面点击打开模型页面

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
ai自己制作mod2 ocr vlm识别 模型页面点击打开模型页面

ai自己制作mod2 ocr vlm识别 模型页面点击打开模型页面_哔哩哔哩_bilibili

ocr识别不了点赞图标,不然点赞收藏一气喝成就能下载模型了

# e:\code\my_python_server\llm_server\memory_llm.py import tkinter as tk from tkinter import scrolledtext, messagebox, ttk import os import subprocess import re import json import sys from pathlib import Path from llm_class import VLMService # 假设VLMService基于LLMService import pyautogui from PIL import Image import base64 from io import BytesIO def execute_python_script(script_path, *args): """ 执行指定路径的Python脚本 """ # 获取项目根目录(从当前脚本位置向上一级) current_dir = Path(__file__).parent.parent # 回到项目根目录 script_full_path = current_dir / script_path if not script_full_path.exists(): return f"错误: 脚本 '{script_path}' 不存在" if script_full_path.suffix != '.py': return f"错误: 文件必须是Python脚本 (.py文件)" try: # 构建命令:工具名称作为脚本的第一个参数 cmd = [sys.executable, str(script_full_path)] + list(args) # 执行Python脚本,指定编码为UTF-8 result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, cwd=str(current_dir), encoding='utf-8', # 明确指定UTF-8编码 errors='replace' # 遇到编码错误时替换字符 ) print( "",result.stdout.strip()) if result.returncode == 0: return result.stdout.strip() else: return f"脚本执行失败: {result.stderr.strip()}" except subprocess.TimeoutExpired: return f"脚本执行超时: {script_path}" except Exception as e: return f"执行脚本时出错: {str(e)}" def list_available_tools(): """ 从配置文件中列出所有可用工具 """ current_dir = Path(__file__).parent config_path = current_dir / "tools_config.json" if not config_path.exists(): return [] try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config.get('tools', []) except Exception as e: return [] def get_tool_by_name(tool_name): """ 根据工具名称获取工具信息 """ tools = list_available_tools() for tool in tools: if tool['name'] == tool_name: return tool return None def execute_tool(tool_name, *args): """ 执行指定名称的工具 """ tool_info = get_tool_by_name(tool_name) if not tool_info: return f"错误: 未找到工具 '{tool_name}'" script_path = tool_info['path'] result = execute_python_script(script_path, tool_name, *args) return result def get_available_tools_info(): """ 获取所有可用工具的信息 """ current_dir = Path(__file__).parent config_path = current_dir / "tools_config.json" if not config_path.exists(): return "错误: 工具配置文件不存在" try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config.get('tools', []) except Exception as e: return [] def get_tools_description(): """获取工具描述,用于提供给VLM""" tools = get_available_tools_info() if not tools or isinstance(tools, str): # 检查是否返回错误 return "当前没有可用工具" tools_desc = "可用工具列表:\n" for tool in tools: name = tool.get('name', '未知工具') desc = tool.get('description', '无描述') params = tool.get('parameters', []) if params: param_desc = ", ".join([f"{p['name']}({p['type']})" for p in params]) tools_desc += f"- {name}: {desc} (参数: {param_desc})\n" else: tools_desc += f"- {name}: {desc} (无参数)\n" tools_desc += "\n使用格式: [TOOL:工具名称,参数1,参数2,...]\n" return tools_desc def image_to_base64(image_path): """将图像文件转换为base64编码""" with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def is_task_completed(ai_response, tool_result): """判断任务是否完成""" completion_indicators = [ "任务完成", "完成任务", "任务已结束", "已完成", "任务完成", "task completed", "finished", "done", "success", "成功" ] combined_text = f"{ai_response} {tool_result}".lower() return any(indicator in combined_text for indicator in completion_indicators) def vision_task_loop(task_description, knowledge_file=None, memory_file=None, reset_first_iteration=True): """ 基于视觉的循环任务执行器 """ current_dir = os.path.dirname(os.path.abspath(__file__)) if knowledge_file is None: knowledge_file = os.path.join(current_dir, "knowledge.txt") if memory_file is None: memory_file = os.path.join(current_dir, "memory.txt") # 创建LLM服务实例(模拟VLM) vlm_service = VLMService() # 读取固定知识 system_prompt_parts = [] # 添加可用工具信息到系统提示 tools_description = get_tools_description() system_prompt_parts.append(f"可用工具信息:\n{tools_description}") # 添加固定知识 if os.path.exists(knowledge_file): with open(knowledge_file, 'r', encoding='utf-8') as f: knowledge_content = f.read() if knowledge_content.strip(): system_prompt_parts.append(f"重要知识:\n{knowledge_content}") # 组合系统提示 system_prompt = "\n".join(system_prompt_parts) iteration_count = 0 max_iterations = 50 # 设置最大迭代次数,防止无限循环 first_iteration = reset_first_iteration # 使用参数来决定是否重置首次迭代标志 # 记录之前的AI响应,用于检测重复行为 previous_ai_response = "" previous_tool_result = "" while iteration_count < max_iterations: iteration_count += 1 # 截取当前屏幕 screenshot = pyautogui.screenshot() screenshot_path = os.path.join(current_dir, "current_screen.png") screenshot.save(screenshot_path) # 准备消息列表 messages = [] # 添加系统提示(包含工具信息) if system_prompt: messages.append({ "role": "system", "content": system_prompt }) # 添加任务描述和当前截图信息 - 使用更清晰的格式 if first_iteration: # 首次迭代时,AI只需要开始分析任务 user_message = f"当前任务: {task_description}\n请分析当前屏幕截图,并开始执行任务。" else: # 非首次迭代时,询问任务完成情况,避免重复之前的操作 user_message = ( f"当前任务: {task_description}\n" f"上一轮AI分析: {previous_ai_response}\n" f"上一轮工具执行结果: {previous_tool_result}\n" f"请分析当前屏幕截图,判断任务完成情况,避免重复执行相同操作,并按需执行相应操作。" f"如果任务已经完成,请明确说明任务已完成。" ) # 构建包含图像的消息内容 image_content = { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_to_base64(screenshot_path)}" } } text_content = { "type": "text", "text": user_message } # 构建用户消息,包含文本和图像 messages.append({ "role": "user", "content": [text_content, image_content] }) print(f"用户消息: {user_message}, 截图保存在: {screenshot_path}") try: # 调用LLM服务(模拟VLM功能) result = vlm_service.create_with_image(messages) # 不传递图像路径,因为已经在消息中包含 ai_response = result['choices'][0]['message']['content'] print(f"VLM响应: {ai_response}") # 执行AI返回的工具指令 tool_execution_result = process_tool_calls(ai_response, memory_file) # 更新历史记录 previous_ai_response = ai_response previous_tool_result = tool_execution_result or "" # 显示AI响应 yield f"AI分析: {ai_response}" # 只在非首次迭代时检查任务完成状态 if not first_iteration: # 检查任务是否完成 if is_task_completed(ai_response, tool_execution_result or ""): yield "任务已完成,退出循环" break # 更新标志,表示不再是第一次迭代 first_iteration = False # 如果没有工具执行结果,检查AI响应是否表明任务已完成 if any(indicator in ai_response.lower() for indicator in ["任务完成", "完成任务", "已完成", "task completed", "finished", "done"]): yield "任务已完成,退出循环" # 取消自动删除短期记忆,改为手动删除 break except Exception as e: error_msg = f"执行任务时出错: {str(e)}" yield error_msg break if iteration_count >= max_iterations: yield "达到最大迭代次数,停止任务执行" def process_tool_calls(response_text, memory_file_path=None): """ 解析AI响应中的工具调用指令 支持格式: [TOOL:工具名称,arg1,arg2,arg3...] """ # 修复正则表达式以正确捕获工具名称和所有参数 tool_pattern = r'\[TOOL:([^\],\]]+)(?:,([^\]]*))?\]' matches = re.findall(tool_pattern, response_text) all_results = [] if not matches: print ("未找到工具调用指令") for match in matches: tool_name = match[0] tool_args_str = match[1] # 包含所有参数的字符串,可能为空 # 验证工具是否存在 tools = get_available_tools_info() if not tools or isinstance(tools, str): # 检查是否返回错误 all_results.append(f"工具 '{tool_name}' 执行失败: 无法获取工具列表") continue tool_exists = any(tool['name'] == tool_name for tool in tools) if not tool_exists: all_results.append(f"工具 '{tool_name}' 执行失败: 工具不存在") continue # 解析参数,处理带引号的参数值(如果存在参数) tool_args = [] if tool_args_str: # 如果有参数 current_arg = "" inside_quotes = False quote_char = None i = 0 while i < len(tool_args_str): char = tool_args_str[i] if char in ['"', "'"] and not inside_quotes: # 开始引号 inside_quotes = True quote_char = char elif char == quote_char and inside_quotes: # 结束引号 inside_quotes = False quote_char = None elif char == ',' and not inside_quotes: # 参数分隔符,不在引号内 tool_args.append(current_arg.strip()) current_arg = "" else: current_arg += char i += 1 # 添加最后一个参数 if current_arg: tool_args.append(current_arg.strip()) print(f"正在执行工具:{tool_name}") print(f"参数列表:{tool_args}") result = execute_tool(tool_name, *tool_args) if tool_args else execute_tool(tool_name) print("结果:",result) # 处理执行结果为None的情况 if result is None: result = "工具执行结果为空" # 将所有工具的结果写入记忆文件,这样AI可以看到 if memory_file_path: try: with open(memory_file_path, 'a', encoding='utf-8') as f: f.write(f"工具 '{tool_name}' 执行结果:\n{result}\n\n") except Exception as e: print(f"写入记忆文件失败: {e}") all_results.append(f"工具 '{tool_name}' 执行结果: {result}") return "\n".join(all_results) if all_results else None def parse_history_content(content): """ 解析历史对话内容,转换为messages格式 """ messages = [] lines = content.strip().split('\n') current_role = None current_content = [] for line in lines: line = line.strip() if line.startswith('用户:'): if current_role and current_content: messages.append({ "role": current_role, "content": '\n'.join(current_content).strip() }) current_role = "user" current_content = [line[3:].strip()] elif line.startswith('AI:'): if current_role and current_content: messages.append({ "role": current_role, "content": '\n'.join(current_content).strip() }) current_role = "assistant" current_content = [line[3:].strip()] elif line == "" and current_content: if current_role and current_content: messages.append({ "role": current_role, "content": '\n'.join(current_content).strip() }) current_role = None current_content = [] elif current_role: current_content.append(line) if current_role and current_content: messages.append({ "role": current_role, "content": '\n'.join(current_content).strip() }) return messages class VLMTaskApp: def __init__(self, root): self.root = root self.root.title("VLM任务执行器") # 修改窗口大小为较小尺寸并设置为置顶 self.root.geometry("400x600") # 调整为较小的尺寸 self.root.attributes('-topmost', True) # 设置窗口置顶 # 任务执行标志 self.is_executing = False # 创建界面 self.setup_ui() # 文件路径 current_dir = os.path.dirname(os.path.abspath(__file__)) self.knowledge_file = os.path.join(current_dir, "knowledge.txt") self.memory_file = os.path.join(current_dir, "memory.txt") # 启动时加载记忆文件内容到显示区域 self.load_memory_content() def setup_ui(self): # 任务描述输入区域 task_frame = tk.Frame(self.root) task_frame.pack(fill=tk.X, padx=10, pady=5) tk.Label(task_frame, text="任务描述:").pack(anchor=tk.W) self.task_input = tk.Text(task_frame, height=3) self.task_input.pack(fill=tk.X, pady=5) # 控制按钮区域 control_frame = tk.Frame(self.root) control_frame.pack(fill=tk.X, padx=10, pady=5) self.start_button = tk.Button( control_frame, text="开始执行任务", command=self.start_task ) self.start_button.pack(side=tk.LEFT, padx=(0, 10)) self.stop_button = tk.Button( control_frame, text="停止任务", command=self.stop_task, state=tk.DISABLED ) self.stop_button.pack(side=tk.LEFT, padx=(0, 10)) # 添加清除短期记忆按钮 self.clear_memory_button = tk.Button( control_frame, text="清除短期记忆", command=self.clear_short_term_memory ) self.clear_memory_button.pack(side=tk.LEFT) # 聊天历史显示区域 self.chat_history = scrolledtext.ScrolledText( self.root, wrap=tk.WORD, state='disabled', height=30 ) self.chat_history.pack(padx=10, pady=5, fill=tk.BOTH, expand=True) # 任务状态标签 self.status_label = tk.Label(self.root, text="状态: 等待任务开始", bd=1, relief=tk.SUNKEN, anchor=tk.W) self.status_label.pack(side=tk.BOTTOM, fill=tk.X) def load_memory_content(self): """启动时加载记忆文件内容到显示区域""" if os.path.exists(self.memory_file): try: with open(self.memory_file, 'r', encoding='utf-8') as f: content = f.read() if content.strip(): # 启用文本框编辑 self.chat_history.config(state='normal') # 清空当前内容 self.chat_history.delete(1.0, tk.END) # 插入记忆文件内容 self.chat_history.insert(tk.END, content) # 禁用编辑并滚动到底部 self.chat_history.config(state='disabled') self.chat_history.see(tk.END) except Exception as e: print(f"加载记忆文件失败: {str(e)}") else: # 如果记忆文件不存在,清空显示区域 self.chat_history.config(state='normal') self.chat_history.delete(1.0, tk.END) self.chat_history.config(state='disabled') def start_task(self): """开始执行任务""" task_description = self.task_input.get("1.0", tk.END).strip() if not task_description: messagebox.showwarning("警告", "请输入任务描述") return self.is_executing = True self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) self.update_status("状态: 任务执行中...") # 在新线程中执行任务以避免界面冻结 import threading task_thread = threading.Thread( target=self.run_task, args=(task_description,) ) task_thread.daemon = True task_thread.start() def stop_task(self): """停止任务执行""" self.is_executing = False self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) self.update_status("状态: 任务已停止") def run_task(self, task_description): """执行任务的主循环""" try: # 显示用户输入的任务 self.display_message(f"用户: {task_description}") # 追加到记忆文件而不是覆盖 with open(self.memory_file, 'a', encoding='utf-8') as f: f.write(f"用户: {task_description}\n\n") # 执行任务循环,确保重置首次迭代标志 for output in vision_task_loop(task_description, self.knowledge_file, self.memory_file, reset_first_iteration=True): if not self.is_executing: self.display_message("系统: 任务已手动停止") break self.display_message(f"AI: {output}") # 将输出追加到记忆文件 with open(self.memory_file, 'a', encoding='utf-8') as f: f.write(f"AI: {output}\n\n") except Exception as e: self.display_message(f"系统: 执行任务时出错: {str(e)}") finally: self.is_executing = False self.root.after(0, lambda: self.start_button.config(state=tk.NORMAL)) self.root.after(0, lambda: self.stop_button.config(state=tk.DISABLED)) self.root.after(0, lambda: self.update_status("状态: 任务执行完成")) def display_message(self, message): """显示消息""" self.root.after(0, self._display_message, message) def _display_message(self, message): """在主线程中更新UI""" self.chat_history.config(state='normal') self.chat_history.insert(tk.END, f"{message}\n\n") self.chat_history.config(state='disabled') self.chat_history.see(tk.END) def update_status(self, status_text): """更新状态栏""" self.status_label.config(text=status_text) def clear_short_term_memory(self): """手动清除短期记忆""" if os.path.exists(self.memory_file): try: os.remove(self.memory_file) # 同时清空显示区域 self.chat_history.config(state='normal') self.chat_history.delete(1.0, tk.END) self.chat_history.config(state='disabled') self.display_message( "短期记忆已手动清除") except Exception as e: messagebox.showerror("错误", f"清除短期记忆失败: {str(e)}") else: self.display_message("短期记忆文件不存在") def main(): root = tk.Tk() app = VLMTaskApp(root) root.mainloop() if __name__ == "__main__": main()
import tkinter as tk from tkinter import filedialog import pytesseract from PIL import ImageGrab import cv2 import numpy as np from typing import List, Tuple, Dict import json import io from PIL import Image import pyautogui import easyocr import sys def ocr_screen_tool(): """ 弹窗选择图片并进行OCR识别,返回文字和坐标信息的JSON格式结果 """ try: # 创建tkinter根窗口 root = tk.Tk() root.withdraw() # 隐藏主窗口 # 弹窗选择图片文件 file_path = filedialog.askopenfilename( title="选择要识别的图片", filetypes=[ ("图片文件", "*.png *.jpg *.jpeg *.bmp *.tiff *.tif"), ("PNG文件", "*.png"), ("JPG文件", "*.jpg"), ("JPEG文件", "*.jpeg"), ("BMP文件", "*.bmp"), ("TIFF文件", "*.tiff"), ("所有文件", "*.*") ] ) # 销毁根窗口 root.destroy() # 如果用户取消选择,返回错误信息 if not file_path: error_result = {"error": "用户取消了图片选择"} return json.dumps(error_result, ensure_ascii=False, indent=2) # 加载选择的图片 image = Image.open(file_path) # 初始化OCR读取器 reader = easyocr.Reader(['ch_sim', 'en']) # 可根据需要添加其他语言 # 执行OCR识别 results = reader.readtext(np.array(image)) # 格式化结果 formatted_results = [] for (bbox, text, confidence) in results: # 将NumPy类型转换为Python原生类型 formatted_bbox = [ [int(point[0]), int(point[1])] for point in bbox ] formatted_results.append({ "text": text, "confidence": float(confidence), "bbox": formatted_bbox }) print("ocr:", formatted_results) return json.dumps(formatted_results, ensure_ascii=False, indent=2) except Exception as e: error_result = {"error": f"OCR识别失败: {str(e)}"} return json.dumps(error_result, ensure_ascii=False, indent=2) def find_text_coordinates(text_to_find: str, image_path: str = None): """ 从指定图片或全屏截图中查找指定文字的中心点坐标 """ try: # 如果没有提供图片路径,使用全屏截图 if not image_path: screenshot = pyautogui.screenshot() image = screenshot else: # 加载指定的图片 image = Image.open(image_path) # 初始化OCR读取器 reader = easyocr.Reader(['ch_sim', 'en']) # 可根据需要添加其他语言 # 执行OCR识别 results = reader.readtext(np.array(image)) # 遍历OCR结果查找指定文字 for (bbox, text, confidence) in results: if text_to_find.lower() in text.lower(): # 不区分大小写匹配 # 计算边界框的中心点坐标 x_coords = [point[0] for point in bbox] y_coords = [point[1] for point in bbox] center_x = int((min(x_coords) + max(x_coords)) / 2) center_y = int((min(y_coords) + max(y_coords)) / 2) result = { "text": text, "target_text": text_to_find, "center_x": center_x, "center_y": center_y, "bbox": [[int(point[0]), int(point[1])] for point in bbox], "confidence": float(confidence) } return json.dumps(result, ensure_ascii=False, indent=2) # 如果没有找到指定文字 result = { "error": f"未找到文字: {text_to_find}", "found_texts": [] # 可以返回所有找到的文字作为参考 } # 添加所有找到的文字(可选,用于调试) for (bbox, text, confidence) in results: result["found_texts"].append({ "text": text, "confidence": float(confidence) }) return json.dumps(result, ensure_ascii=False, indent=2) except Exception as e: error_result = {"error": f"查找文字坐标失败: {str(e)}"} return json.dumps(error_result, ensure_ascii=False, indent=2) def get_all_text(image_path: str = None): """ 从指定图片或全屏截图中获取所有识别到的文本,以字符串形式返回 """ try: # 如果没有提供图片路径,使用全屏截图 if not image_path: screenshot = pyautogui.screenshot() image = screenshot else: # 加载指定的图片 image = Image.open(image_path) # 初始化OCR读取器 reader = easyocr.Reader(['ch_sim', 'en']) # 可根据需要添加其他语言 # 执行OCR识别 results = reader.readtext(np.array(image)) # 提取所有文本,按置信度排序 all_text = [] for (bbox, text, confidence) in results: all_text.append(text) # 将所有文本连接成一个字符串,每行一个文本 full_text = '\n'.join(all_text) return full_text except Exception as e: return f"获取所有文本失败: {str(e)}" def main(): """主函数,处理命令行参数以适配executor.py.""" if len(sys.argv) < 2: # 当没有命令行参数时,直接运行ocr_screen_tool result = ocr_screen_tool() print(result) return tool_name = sys.argv[1] try: if tool_name == "ocr_screen": result = ocr_screen_tool() elif tool_name == "find_text_coordinates": if len(sys.argv) < 3: result = "错误: find_text_coordinates 需要指定要查找的文字" else: text_to_find = sys.argv[2] # 如果提供了图片路径参数 image_path = sys.argv[3] if len(sys.argv) > 3 else None result = find_text_coordinates(text_to_find, image_path) elif tool_name == "get_all_text": # 如果提供了图片路径参数 image_path = sys.argv[2] if len(sys.argv) > 2 else None result = get_all_text(image_path) else: result = f"错误: 未找到工具 '{tool_name}'" except Exception as e: result = f"执行工具时出错: {str(e)}" print(result) if __name__ == "__main__": main()
import os from dotenv import load_dotenv import json import http.client from urllib.parse import urlparse import base64 import requests from io import BytesIO from PIL import Image dotenv_path = r'E:\code\my_python_server_private\.env' load_dotenv(dotenv_path) import ssl import urllib3 # 在程序开始时禁用SSL警告(仅在开发环境中使用) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class LLMService: def __init__(self): # 从环境变量中获取 DeepSeek 参数 # 加载 .env 文件中的环境变量 self.api_url = os.getenv('LLM_OPENAI_API_URL') self.model_name = os.getenv('LLM_MODEL_NAME') self.api_key = os.getenv('LLM_OPENAI_API_KEY') # 检查必需的环境变量是否存在 if not self.api_url: raise ValueError("环境变量 'deepseek_OPENAI_API_URL' 未设置或为空") if not self.model_name: raise ValueError("环境变量 'deepseek_MODEL_NAME' 未设置或为空") if not self.api_key: raise ValueError("环境变量 'deepseek_OPENAI_API_KEY' 未设置或为空") print(f"LLM服务初始化完成,模型: {self.model_name}") def create(self, messages, tools=None): print("开始调用LLM服务") # 解析 URL(去掉协议部分) parsed = urlparse(f"{self.api_url}/chat/completions") host, path = parsed.hostname, parsed.path if not host: raise ValueError("API URL 无效,无法解析主机名") # 创建 HTTP 连接 conn = http.client.HTTPSConnection(host) # 构造请求体 request_body = { "model": self.model_name, "messages": messages, "tools": tools, "temperature": 0.9 # 添加温度参数 } # 发送 POST 请求 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } conn.request( "POST", path, body=json.dumps(request_body), headers=headers ) # 获取响应 response = conn.getresponse() print(f"LLM服务响应状态码: {response.status}") if response.status != 200: error_msg = response.read().decode('utf-8') raise Exception(f"LLM服务器错误: {response.status} - {error_msg}") # 读取响应内容 response_data = response.read().decode('utf-8') data = json.loads(response_data) # 确保output目录存在 os.makedirs('output', exist_ok=True) # 将响应保存到文件 (修复路径分隔符问题) output_file_path = os.path.join('output', 'formatted_data.json') with open(output_file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) # 关闭连接 conn.close() print("LLM服务调用完成") return data class VLMService: def __init__(self): # 从环境变量中获取 VLM 参数 self.api_url = os.getenv('VLM_OPENAI_API_URL') self.model_name = os.getenv('VLM_MODEL_NAME') self.api_key = os.getenv('VLM_OPENAI_API_KEY') # 检查必需的环境变量是否存在 if not self.api_url: raise ValueError("环境变量 'VLM_OPENAI_API_URL' 未设置或为空") if not self.model_name: raise ValueError("环境变量 'VLM_MODEL_NAME' 未设置或为空") if not self.api_key: raise ValueError("环境变量 'VLM_OPENAI_API_KEY' 未设置或为空") def encode_image(self, image_source): """ 编码图像为base64字符串 支持URL和本地文件路径 """ try: if image_source.startswith(('http://', 'https://')): # 从URL获取图像 response = requests.get(image_source) image_data = response.content else: # 从本地文件路径获取图像 with open(image_source, "rb") as image_file: image_data = image_file.read() return base64.b64encode(image_data).decode('utf-8') except Exception as e: raise Exception(f"图像编码失败: {str(e)}") def create_with_image(self, messages, image_source=None, tools=None): """ 使用图像创建VLM请求 :param messages: 消息列表 :param image_source: 图像源(可选),如果在消息中已经包含图像则可不传 :param tools: 工具定义(可选) """ # 如果提供了图像源,且第一条消息是用户消息,则添加图像到该消息 if image_source and messages and messages[0]["role"] == "user": # 编码图像 base64_image = self.encode_image(image_source) # 获取当前用户消息的内容 current_content = messages[0]["content"] # 构建图像内容 image_content = { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}" } } # 如果当前内容是字符串,转换为列表并添加图像 if isinstance(current_content, str): text_content = { "type": "text", "text": current_content } messages[0]["content"] = [text_content, image_content] # 如果已经是列表,直接添加图像内容 elif isinstance(current_content, list): messages[0]["content"].append(image_content) # 解析 URL - 需要确保URL格式正确 full_url = f"{self.api_url}/chat/completions" parsed = urlparse(full_url) host = parsed.netloc or parsed.hostname path = parsed.path if parsed.path else (parsed.netloc.split('/', 1)[1] if '/' in parsed.netloc else '/v1/chat/completions') if not host: print(f"解析URL失败: {full_url}") raise ValueError("API URL 无效,无法解析主机名") # 创建 HTTP 连接 conn = http.client.HTTPSConnection(host) # 构造请求体 request_body = { "model": self.model_name, "messages": messages, "tools": tools, "temperature": 0.7 } # 发送 POST 请求 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } try: conn.request( "POST", path, body=json.dumps(request_body), headers=headers ) except Exception as e: conn.close() raise e # 获取响应 response = conn.getresponse() if response.status != 200: error_msg = response.read().decode('utf-8') print(f"VLM服务器错误响应: {error_msg}") conn.close() raise Exception(f"VLM服务器错误: {response.status} - {error_msg}") # 读取响应内容 response_data = response.read().decode('utf-8') data = json.loads(response_data) # 确保output目录存在 os.makedirs('output', exist_ok=True) # 将响应保存到文件 output_file_path = os.path.join('output', 'vlm_formatted_data.json') with open(output_file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) # 关闭连接 conn.close() return data #
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/9 22:17:55

​​解锁AI Agent开发新姿势:Coze Studio,让创意秒变现实

引言 在AI技术飞速发展的今天&#xff0c;AI Agent&#xff08;智能体&#xff09;已成为推动行业变革的核心力量。无论是智能客服、自动化流程&#xff0c;还是个性化助手&#xff0c;AI Agent的应用场景正不断拓展。然而&#xff0c;传统开发方式往往需要深厚的编程基础和复…

作者头像 李华
网站建设 2026/6/10 19:06:10

YOLOv8图文匹配潜力评估

YOLOv8图文匹配潜力评估 在智能视觉应用日益普及的今天&#xff0c;如何快速构建一个稳定、高效的目标检测系统&#xff0c;已经成为开发者面临的核心挑战之一。从自动标注到图文内容理解&#xff0c;再到多模态检索&#xff0c;目标检测不仅是基础能力&#xff0c;更是连接图像…

作者头像 李华
网站建设 2026/6/14 5:27:14

R语言可视化进阶指南:5个你必须掌握的数据探索技巧

第一章&#xff1a;R语言数据探索可视化的核心价值在数据分析流程中&#xff0c;数据探索可视化是理解数据结构、发现潜在模式和识别异常值的关键步骤。R语言凭借其强大的图形系统和丰富的可视化包&#xff08;如ggplot2、lattice、plotly等&#xff09;&#xff0c;成为数据科…

作者头像 李华
网站建设 2026/6/14 4:56:47

【R语言GPT代码调试终极指南】:9大高效技巧让你秒杀Bug

第一章&#xff1a;R语言GPT代码调试的核心挑战在将GPT模型集成到R语言环境中进行开发时&#xff0c;代码调试面临一系列独特挑战。这些挑战不仅源于R语言本身的动态特性和非标准求值机制&#xff0c;还涉及与外部API通信、数据类型转换以及上下文管理等复杂问题。动态作用域与…

作者头像 李华
网站建设 2026/5/31 6:36:42

YOLOv8日志收集:ELK栈集成方案

YOLOv8日志收集&#xff1a;ELK栈集成方案 在AI模型训练日益复杂的今天&#xff0c;开发者早已不再满足于“模型能跑就行”的初级阶段。尤其是在使用YOLOv8这类高效目标检测框架进行工业级项目开发时&#xff0c;一个常见的痛点浮现出来&#xff1a;当训练突然中断、损失函数异…

作者头像 李华
网站建设 2026/6/5 8:04:07

探秘智能水质检测公示屏

炎炎夏日&#xff0c;泳池是消暑健身的好去处。然而&#xff0c;畅游背后&#xff0c;池水是否真正洁净安全&#xff0c;曾经是管理者与游泳者共同的隐忧。传统的水质管理方式&#xff0c;正面临着多重挑战。传统水质管理的痛点过去&#xff0c;泳池水质监测多依赖人工定时取样…

作者头像 李华