news 2026/4/17 13:51:02

SecGPT-14B实战教程:Chainlit自定义UI添加威胁情报查询插件

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
SecGPT-14B实战教程:Chainlit自定义UI添加威胁情报查询插件

SecGPT-14B实战教程:Chainlit自定义UI添加威胁情报查询插件

1. 从模型到应用:为什么需要自定义插件?

你部署好了SecGPT-14B,也通过Chainlit界面问了几个安全基础问题,模型回答得不错。但你可能很快会发现一个问题:模型的知识库是静态的,它知道XSS、SQL注入这些经典攻击的原理,但如果我问“IP地址 192.168.1.100 最近有没有恶意活动记录?”或者“域名 evil.com 的威胁评分是多少?”,模型就无能为力了。

这就是我们今天要解决的问题——给SecGPT-14B加上“眼睛”和“耳朵”,让它不仅能回答理论问题,还能实时查询外部威胁情报,真正成为一个懂实战的网络安全助手。

想象一下这个场景:你在分析一份可疑的日志,发现一个陌生的IP地址。传统做法是打开浏览器,手动去威胁情报平台查询,复制粘贴,再回来分析。现在,你只需要在Chat界面问一句:“查一下IP 192.168.1.100的威胁情报”,SecGPT就能自动调用插件,从多个情报源获取数据,然后综合分析后告诉你:“这个IP在过去24小时内被标记为恶意扫描源,关联的恶意软件家族是Mirai,建议立即阻断。”

这就是自定义插件的价值——让AI模型从“知识库”变成“行动者”。

2. 环境准备与插件设计思路

2.1 你需要准备什么

在开始编码之前,我们先确认一下环境:

  • 已部署的SecGPT-14B服务:通过vLLM部署,API端点通常是http://localhost:8000/v1
  • 已运行的Chainlit前端:默认在http://localhost:8000或指定端口
  • Python环境:建议Python 3.8+,确保有requests、chainlit等基础库
  • 威胁情报API密钥(可选):如果你要接入真实的威胁情报平台,需要提前申请。本文会提供模拟接口和真实接口两种方案。

2.2 插件架构设计

我们的目标是在Chainlit界面中添加一个“威胁情报查询”功能,整体架构很简单:

用户提问 → Chainlit前端 → 插件判断是否需要查询 → 调用威胁情报API → 结果返回给SecGPT → 模型生成最终回答

关键点在于如何让Chainlit和SecGPT协同工作。Chainlit负责界面交互和插件触发,SecGPT负责理解用户意图和生成回答,插件负责执行具体的查询任务。

3. 实战开始:编写威胁情报查询插件

3.1 创建插件基础结构

首先,在你的Chainlit项目目录下创建一个新的Python文件,比如threat_intel_plugin.py

# threat_intel_plugin.py import json import requests from typing import Dict, Any, Optional import re class ThreatIntelligencePlugin: """威胁情报查询插件""" def __init__(self, api_keys: Optional[Dict[str, str]] = None): """ 初始化插件 Args: api_keys: 各威胁情报平台的API密钥字典 格式: {'virustotal': 'your_vt_key', 'abuseipdb': 'your_abuse_key'} """ self.api_keys = api_keys or {} self.enabled = bool(api_keys) # 如果有API密钥,启用真实查询 # 定义支持的查询类型 self.supported_queries = { 'ip': self.query_ip, 'domain': self.query_domain, 'hash': self.query_hash, 'url': self.query_url } # 模拟数据(用于演示,无API密钥时使用) self.mock_data = { 'ip': { '192.168.1.100': { 'malicious': True, 'score': 85, 'tags': ['scanner', 'brute-force'], 'last_seen': '2024-01-15', 'country': 'CN', 'isp': 'Example ISP' }, '8.8.8.8': { 'malicious': False, 'score': 0, 'tags': ['public-dns'], 'last_seen': '2024-01-15', 'country': 'US', 'isp': 'Google LLC' } }, 'domain': { 'evil.com': { 'malicious': True, 'score': 95, 'tags': ['phishing', 'malware-distribution'], 'created': '2023-05-10', 'registrar': 'Example Registrar' }, 'google.com': { 'malicious': False, 'score': 0, 'tags': ['legitimate'], 'created': '1997-09-15', 'registrar': 'MarkMonitor Inc.' } } } def detect_query_type(self, text: str) -> Optional[str]: """ 检测用户输入中是否包含威胁情报查询请求 Args: text: 用户输入的文本 Returns: 查询类型: 'ip', 'domain', 'hash', 'url' 或 None """ text_lower = text.lower() # 关键词匹配 query_keywords = ['查一下', '查询', '检查', '搜索', 'threat', 'intel', '情报'] if not any(keyword in text_lower for keyword in query_keywords): return None # 提取可能的IOC(威胁指标) # IP地址匹配 ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b' if re.search(ip_pattern, text): return 'ip' # 域名匹配(简单版本) domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b' if re.search(domain_pattern, text): # 排除常见非威胁域名 safe_domains = ['google.com', 'baidu.com', 'github.com', 'csdn.net'] for domain in re.findall(domain_pattern, text): if domain not in safe_domains: return 'domain' # MD5/SHA256哈希匹配 hash_patterns = [ r'\b[a-fA-F0-9]{32}\b', # MD5 r'\b[a-fA-F0-9]{40}\b', # SHA-1 r'\b[a-fA-F0-9]{64}\b' # SHA-256 ] for pattern in hash_patterns: if re.search(pattern, text): return 'hash' # URL匹配 url_pattern = r'https?://[^\s]+' if re.search(url_pattern, text): return 'url' return None def extract_ioc(self, text: str, query_type: str) -> Optional[str]: """ 从文本中提取威胁指标 Args: text: 用户输入的文本 query_type: 查询类型 Returns: 提取到的威胁指标字符串 """ if query_type == 'ip': pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b' matches = re.findall(pattern, text) return matches[0] if matches else None elif query_type == 'domain': pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b' matches = re.findall(pattern, text) # 返回第一个非常见域名 safe_domains = ['google.com', 'baidu.com', 'github.com', 'csdn.net'] for match in matches: if match not in safe_domains: return match return matches[0] if matches else None elif query_type == 'hash': patterns = [ r'\b[a-fA-F0-9]{32}\b', # MD5 r'\b[a-fA-F0-9]{40}\b', # SHA-1 r'\b[a-fA-F0-9]{64}\b' # SHA-256 ] for pattern in patterns: matches = re.findall(pattern, text) if matches: return matches[0] return None elif query_type == 'url': pattern = r'https?://[^\s]+' matches = re.findall(pattern, text) return matches[0] if matches else None return None

3.2 实现查询逻辑

接下来,我们实现具体的查询方法。这里提供两种方案:模拟查询(用于演示)和真实API查询。

# 在ThreatIntelligencePlugin类中继续添加方法 def query_ip(self, ip_address: str) -> Dict[str, Any]: """ 查询IP地址的威胁情报 Args: ip_address: 要查询的IP地址 Returns: 威胁情报结果字典 """ if not self.enabled: # 使用模拟数据 return self.mock_data['ip'].get(ip_address, { 'malicious': False, 'score': 0, 'tags': ['unknown'], 'last_seen': 'N/A', 'country': 'Unknown', 'isp': 'Unknown' }) # 真实API查询 - 这里以AbuseIPDB为例 try: if 'abuseipdb' in self.api_keys: return self._query_abuseipdb(ip_address) elif 'virustotal' in self.api_keys: return self._query_virustotal_ip(ip_address) else: # 如果没有配置API,回退到模拟数据 return self.mock_data['ip'].get(ip_address, { 'malicious': False, 'score': 0, 'tags': ['unknown'], 'message': 'API未配置,使用模拟数据' }) except Exception as e: return { 'error': True, 'message': f'查询失败: {str(e)}', 'ip': ip_address } def query_domain(self, domain: str) -> Dict[str, Any]: """ 查询域名的威胁情报 Args: domain: 要查询的域名 Returns: 威胁情报结果字典 """ if not self.enabled: # 使用模拟数据 return self.mock_data['domain'].get(domain, { 'malicious': False, 'score': 0, 'tags': ['unknown'], 'created': 'N/A', 'registrar': 'Unknown' }) # 真实API查询 - 这里以VirusTotal为例 try: if 'virustotal' in self.api_keys: return self._query_virustotal_domain(domain) else: return self.mock_data['domain'].get(domain, { 'malicious': False, 'score': 0, 'tags': ['unknown'], 'message': 'API未配置,使用模拟数据' }) except Exception as e: return { 'error': True, 'message': f'查询失败: {str(e)}', 'domain': domain } def query_hash(self, file_hash: str) -> Dict[str, Any]: """ 查询文件哈希的威胁情报 Args: file_hash: 文件哈希值(MD5/SHA1/SHA256) Returns: 威胁情报结果字典 """ # 这里实现哈希查询逻辑 # 由于篇幅限制,只展示框架 return { 'hash': file_hash, 'malicious': True if len(file_hash) == 64 else False, # 简单示例 'score': 75, 'tags': ['malware', 'trojan'], 'message': '哈希查询功能(需配置API密钥)' } def query_url(self, url: str) -> Dict[str, Any]: """ 查询URL的威胁情报 Args: url: 要查询的URL Returns: 威胁情报结果字典 """ # 这里实现URL查询逻辑 return { 'url': url, 'malicious': 'phishing' in url.lower(), 'score': 80, 'tags': ['phishing', 'suspicious'], 'message': 'URL查询功能(需配置API密钥)' } def _query_abuseipdb(self, ip_address: str) -> Dict[str, Any]: """ 调用AbuseIPDB API查询IP地址 注意:需要先在AbuseIPDB注册并获取API密钥 """ api_key = self.api_keys.get('abuseipdb') if not api_key: raise ValueError("未配置AbuseIPDB API密钥") headers = { 'Key': api_key, 'Accept': 'application/json' } params = { 'ipAddress': ip_address, 'maxAgeInDays': 90 } try: response = requests.get( 'https://api.abuseipdb.com/api/v2/check', headers=headers, params=params ) response.raise_for_status() data = response.json() result = data.get('data', {}) return { 'malicious': result.get('abuseConfidenceScore', 0) > 50, 'score': result.get('abuseConfidenceScore', 0), 'tags': result.get('usageType', 'Unknown').split(', '), 'last_seen': result.get('lastReportedAt', 'N/A'), 'country': result.get('countryCode', 'Unknown'), 'isp': result.get('isp', 'Unknown'), 'total_reports': result.get('totalReports', 0), 'data_source': 'AbuseIPDB' } except requests.RequestException as e: raise Exception(f"AbuseIPDB API调用失败: {str(e)}") def _query_virustotal_ip(self, ip_address: str) -> Dict[str, Any]: """ 调用VirusTotal API查询IP地址 注意:需要先在VirusTotal注册并获取API密钥 """ api_key = self.api_keys.get('virustotal') if not api_key: raise ValueError("未配置VirusTotal API密钥") headers = { 'x-apikey': api_key } try: response = requests.get( f'https://www.virustotal.com/api/v3/ip_addresses/{ip_address}', headers=headers ) response.raise_for_status() data = response.json() stats = data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}) malicious_count = stats.get('malicious', 0) total_engines = sum(stats.values()) return { 'malicious': malicious_count > 0, 'score': int((malicious_count / max(total_engines, 1)) * 100), 'tags': list(data.get('data', {}).get('attributes', {}).get('tags', [])), 'last_analysis_date': data.get('data', {}).get('attributes', {}).get('last_analysis_date', 'N/A'), 'country': data.get('data', {}).get('attributes', {}).get('country', 'Unknown'), 'as_owner': data.get('data', {}).get('attributes', {}).get('as_owner', 'Unknown'), 'data_source': 'VirusTotal' } except requests.RequestException as e: raise Exception(f"VirusTotal API调用失败: {str(e)}") def _query_virustotal_domain(self, domain: str) -> Dict[str, Any]: """ 调用VirusTotal API查询域名 """ api_key = self.api_keys.get('virustotal') if not api_key: raise ValueError("未配置VirusTotal API密钥") headers = { 'x-apikey': api_key } try: response = requests.get( f'https://www.virustotal.com/api/v3/domains/{domain}', headers=headers ) response.raise_for_status() data = response.json() stats = data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}) malicious_count = stats.get('malicious', 0) total_engines = sum(stats.values()) return { 'malicious': malicious_count > 0, 'score': int((malicious_count / max(total_engines, 1)) * 100), 'tags': list(data.get('data', {}).get('attributes', {}).get('tags', [])), 'last_analysis_date': data.get('data', {}).get('attributes', {}).get('last_analysis_date', 'N/A'), 'creation_date': data.get('data', {}).get('attributes', {}).get('creation_date', 'N/A'), 'registrar': data.get('data', {}).get('attributes', {}).get('registrar', 'Unknown'), 'data_source': 'VirusTotal' } except requests.RequestException as e: raise Exception(f"VirusTotal API调用失败: {str(e)}") def process_query(self, user_input: str) -> Optional[Dict[str, Any]]: """ 处理用户输入,执行威胁情报查询 Args: user_input: 用户输入的文本 Returns: 查询结果字典,如果不需要查询则返回None """ # 检测查询类型 query_type = self.detect_query_type(user_input) if not query_type: return None # 提取威胁指标 ioc = self.extract_ioc(user_input, query_type) if not ioc: return None # 执行查询 query_func = self.supported_queries.get(query_type) if not query_func: return None result = query_func(ioc) result['query_type'] = query_type result['ioc'] = ioc result['user_query'] = user_input return result def format_result(self, result: Dict[str, Any]) -> str: """ 将查询结果格式化为自然语言 Args: result: 查询结果字典 Returns: 格式化的自然语言描述 """ if result.get('error'): return f"查询失败: {result['message']}" query_type = result.get('query_type', 'unknown') ioc = result.get('ioc', 'N/A') malicious = result.get('malicious', False) score = result.get('score', 0) # 基础信息 if malicious: status = "⚠️ **恶意**" recommendation = "建议立即采取阻断措施" else: status = "✅ **安全**" recommendation = "无需特殊处理" # 构建响应 response_parts = [ f"## 威胁情报查询结果", f"", f"**查询对象**: {ioc} ({query_type.upper()})", f"**威胁状态**: {status}", f"**威胁评分**: {score}/100", f"", f"### 详细信息", ] # 添加具体字段 for key, value in result.items(): if key not in ['query_type', 'ioc', 'user_query', 'malicious', 'score', 'error', 'message']: if isinstance(value, list): value_str = ', '.join(str(v) for v in value) else: value_str = str(value) response_parts.append(f"- **{key.replace('_', ' ').title()}**: {value_str}") response_parts.extend([ f"", f"### 处理建议", f"{recommendation}", f"", f"*数据来源: {result.get('data_source', '模拟数据')}*" ]) return "\n".join(response_parts)

4. 集成到Chainlit应用

现在我们已经有了插件,接下来需要把它集成到Chainlit应用中。

4.1 修改Chainlit主应用

在你的Chainlit应用文件(通常是app.pychainlit.md所在目录的主文件)中,添加插件支持:

# app.py (Chainlit主应用) import chainlit as cl import requests import json from typing import Optional from threat_intel_plugin import ThreatIntelligencePlugin # 初始化威胁情报插件 # 如果有真实的API密钥,可以在这里配置 API_KEYS = { # 'virustotal': 'your_vt_api_key_here', # 'abuseipdb': 'your_abuseipdb_api_key_here' } threat_intel_plugin = ThreatIntelligencePlugin(api_keys=API_KEYS) # SecGPT API配置 SECGPT_API_URL = "http://localhost:8000/v1/chat/completions" SECGPT_MODEL = "SecGPT-14B" async def call_secgpt_api(messages: list, temperature: float = 0.7) -> str: """ 调用SecGPT API Args: messages: 消息列表,格式为 [{"role": "user", "content": "..."}] temperature: 温度参数,控制随机性 Returns: 模型生成的回复文本 """ headers = { "Content-Type": "application/json" } payload = { "model": SECGPT_MODEL, "messages": messages, "temperature": temperature, "max_tokens": 2048, "stream": False } try: response = requests.post(SECGPT_API_URL, json=payload, headers=headers, timeout=30) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"] except Exception as e: return f"调用SecGPT API时出错: {str(e)}" @cl.on_chat_start async def on_chat_start(): """ 聊天开始时的初始化 """ # 设置系统提示词,告诉模型如何使用插件 system_prompt = """你是一个网络安全专家助手,专门帮助用户分析安全威胁和查询威胁情报。 你的能力包括: 1. 回答网络安全相关问题(漏洞分析、攻击技术、防御措施等) 2. 查询威胁情报(IP地址、域名、文件哈希、URL等) 3. 分析安全日志和事件 4. 提供安全建议和最佳实践 当用户询问威胁情报查询时(例如:"查一下IP 192.168.1.100"、"检查域名evil.com"),我会先调用威胁情报插件获取数据,然后将结果提供给你进行分析。 请基于我提供的情报数据,给出专业的分析和建议。""" # 发送欢迎消息 welcome_msg = f"""👋 欢迎使用SecGPT-14B网络安全助手! 我是一个专门为网络安全场景优化的AI助手,可以帮你: - 🔍 查询威胁情报(IP、域名、文件哈希、URL) - 🛡️ 分析安全漏洞和攻击技术 - 📊 解读安全日志和事件 - 💡 提供安全防护建议 **威胁情报查询示例**: - "查一下IP 192.168.1.100" - "检查域名 evil.com 的威胁情况" - "这个文件哈希有没有问题:a1b2c3d4e5f6..." - "分析这个URL:http://suspicious-site.com" {'⚠️ 当前使用模拟威胁情报数据,如需真实数据请配置API密钥' if not threat_intel_plugin.enabled else '✅ 威胁情报插件已启用(使用真实API数据)'} 开始提问吧!""" await cl.Message(content=welcome_msg).send() @cl.on_message async def on_message(message: cl.Message): """ 处理用户消息 """ user_input = message.content # 步骤1:检查是否需要威胁情报查询 intel_result = threat_intel_plugin.process_query(user_input) if intel_result: # 步骤2:如果有查询结果,先显示查询结果 intel_summary = threat_intel_plugin.format_result(intel_result) # 发送威胁情报结果 await cl.Message( content=f"🔍 **已执行威胁情报查询**\n\n{intel_summary}" ).send() # 步骤3:将查询结果和用户问题一起发送给SecGPT进行分析 enhanced_prompt = f"""用户提问:{user_input} 我已经查询了相关威胁情报,结果如下: {intel_summary} 请基于以上情报数据,给用户提供专业的网络安全分析和建议。""" messages = [ {"role": "user", "content": enhanced_prompt} ] else: # 步骤4:如果不是威胁情报查询,直接发送给SecGPT messages = [ {"role": "user", "content": user_input} ] # 步骤5:调用SecGPT API thinking_msg = cl.Message(content="") await thinking_msg.send() # 模拟思考过程(可选) await thinking_msg.stream_token("正在分析") try: # 调用SecGPT API response_text = await call_secgpt_api(messages) # 更新消息内容 thinking_msg.content = response_text await thinking_msg.update() except Exception as e: error_msg = f"处理请求时出错:{str(e)}" thinking_msg.content = error_msg await thinking_msg.update() async def call_secgpt_api(messages: list, temperature: float = 0.7) -> str: """ 调用SecGPT API(异步版本) """ import asyncio from concurrent.futures import ThreadPoolExecutor def sync_call(): headers = {"Content-Type": "application/json"} payload = { "model": SECGPT_MODEL, "messages": messages, "temperature": temperature, "max_tokens": 2048, "stream": False } try: response = requests.post(SECGPT_API_URL, json=payload, headers=headers, timeout=30) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"] except requests.exceptions.Timeout: return "请求超时,请稍后重试。" except requests.exceptions.ConnectionError: return "无法连接到SecGPT服务,请检查服务是否运行。" except Exception as e: return f"调用SecGPT API时出错: {str(e)}" # 在线程池中执行同步调用 loop = asyncio.get_event_loop() with ThreadPoolExecutor() as pool: response = await loop.run_in_executor(pool, sync_call) return response @cl.on_chat_end def on_chat_end(): """ 聊天结束时的清理工作 """ print("聊天会话结束")

4.2 创建Chainlit配置文件

创建一个chainlit.md文件来配置UI:

# 欢迎使用SecGPT-14B网络安全助手 这是一个专为网络安全场景优化的AI助手,集成了威胁情报查询功能。 ## 功能特点 - 🔍 **实时威胁情报查询**:支持IP、域名、文件哈希、URL查询 - 🛡️ **漏洞分析**:理解漏洞原理,提供修复建议 - 📊 **安全事件分析**:帮助分析日志和攻击事件 - 💡 **专业建议**:基于最新安全知识提供建议 ## 使用示例 1. **威胁情报查询**: - "查一下IP 192.168.1.100" - "检查域名 evil.com" - "这个文件哈希有没有恶意:a1b2c3d4e5f6..." 2. **安全咨询**: - "什么是SQL注入攻击?" - "如何防御DDoS攻击?" - "分析这个漏洞:CVE-2023-12345" 3. **日志分析**: - "帮我分析这段Apache日志" - "这些登录失败记录可疑吗?" ## 配置说明 当前威胁情报查询使用模拟数据。如需真实数据,请: 1. 注册威胁情报平台(如VirusTotal、AbuseIPDB) 2. 获取API密钥 3. 在 `app.py` 中配置API密钥 ## 注意事项 - 本工具仅供学习和研究使用 - 请勿用于非法用途 - 威胁情报数据仅供参考,请结合其他信息综合判断

4.3 运行应用

现在可以运行你的Chainlit应用了:

# 确保SecGPT-14B服务正在运行 # 检查vLLM服务状态 curl http://localhost:8000/v1/models # 运行Chainlit应用 chainlit run app.py -w # 或者指定端口运行 chainlit run app.py -w --port 7860

打开浏览器访问http://localhost:7860(或你指定的端口),就能看到集成了威胁情报查询功能的SecGPT助手了。

5. 实际效果展示与使用技巧

5.1 基本查询演示

当你启动应用后,可以尝试以下查询:

  1. IP地址查询

    • 输入:"查一下IP 192.168.1.100的威胁情报"
    • 系统会先调用插件查询该IP,然后SecGPT会基于查询结果给出分析
  2. 域名查询

    • 输入:"检查域名 evil.com 是否安全"
    • 插件会查询域名信誉,SecGPT会解释结果并提供建议
  3. 混合查询

    • 输入:"我在日志里看到IP 8.8.8.8有很多连接,这个IP有问题吗?"
    • 插件查询IP情报,SecGPT结合上下文给出专业分析

5.2 高级功能扩展

如果你想让插件更强大,可以考虑以下扩展:

多源情报聚合

class MultiSourceIntelPlugin: """多源威胁情报聚合插件""" def query_ip_multi_source(self, ip_address: str): """从多个来源查询IP情报并聚合结果""" results = [] # 查询VirusTotal vt_result = self._query_virustotal_ip(ip_address) results.append(('VirusTotal', vt_result)) # 查询AbuseIPDB abuse_result = self._query_abuseipdb(ip_address) results.append(('AbuseIPDB', abuse_result)) # 查询其他来源... # alienvault_result = self._query_alienvault(ip_address) # shodan_result = self._query_shodan(ip_address) # 聚合和评分 aggregated = self._aggregate_results(results) return aggregated

历史查询缓存

import sqlite3 from datetime import datetime, timedelta class CachedIntelPlugin(ThreatIntelligencePlugin): """带缓存的威胁情报插件""" def __init__(self, api_keys=None, cache_ttl_hours=24): super().__init__(api_keys) self.cache_ttl = timedelta(hours=cache_ttl_hours) self.init_cache() def init_cache(self): """初始化缓存数据库""" self.conn = sqlite3.connect('threat_intel_cache.db') self.cursor = self.conn.cursor() self.cursor.execute(''' CREATE TABLE IF NOT EXISTS cache ( ioc TEXT PRIMARY KEY, query_type TEXT, result TEXT, timestamp DATETIME ) ''') self.conn.commit() def query_with_cache(self, query_type: str, ioc: str): """带缓存的查询""" # 检查缓存 cached = self.get_from_cache(ioc) if cached and self.is_cache_valid(cached['timestamp']): return cached['result'] # 执行查询 result = super().query_ip(ioc) if query_type == 'ip' else super().query_domain(ioc) # 保存到缓存 self.save_to_cache(ioc, query_type, result) return result

5.3 使用技巧和最佳实践

  1. 提示词优化

    • 在系统提示词中明确告诉模型你的角色和能力
    • 提供查询结果的格式示例,让模型知道如何解析和利用数据
  2. 错误处理

    • API调用可能失败,要有完善的错误处理和降级机制
    • 提供友好的错误信息,指导用户如何解决问题
  3. 性能优化

    • 对于频繁查询的IOC,使用缓存减少API调用
    • 异步处理长时间运行的查询,避免阻塞UI
  4. 安全性考虑

    • API密钥不要硬编码在代码中,使用环境变量或配置文件
    • 对用户输入进行严格的验证和清理
    • 限制查询频率,避免滥用

6. 总结

通过这个教程,我们成功为SecGPT-14B和Chainlit前端添加了威胁情报查询插件。现在你的网络安全助手不仅能够回答理论知识,还能实时查询外部威胁情报,真正实现了"理论+实战"的结合。

关键收获

  1. 插件架构设计:我们设计了一个清晰的插件架构,能够自动检测用户意图,调用相应的查询功能

  2. 灵活的数据源:支持模拟数据和真实API数据两种模式,方便开发和演示

  3. 自然的结果展示:查询结果以自然语言格式呈现,便于SecGPT进一步分析和解释

  4. 易于扩展:插件设计考虑了扩展性,可以轻松添加新的查询类型或数据源

下一步建议

  1. 接入真实数据源:申请VirusTotal、AbuseIPDB等平台的API密钥,替换模拟数据

  2. 添加更多功能:考虑添加漏洞数据库查询、安全新闻聚合、自动化报告生成等功能

  3. 优化用户体验:添加查询历史、收藏夹、批量查询等实用功能

  4. 部署到生产环境:考虑安全性、性能监控、日志记录等生产级需求

威胁情报是网络安全工作者的"眼睛",而AI模型是"大脑"。通过这个插件,我们让SecGPT-14B同时拥有了眼睛和大脑,能够更智能、更实时地帮助安全分析师工作。无论你是安全研究人员、运维工程师还是CTF选手,这个工具都能显著提升你的工作效率。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/17 13:49:34

终极简单:LogcatReader安卓日志查看器完整使用指南

终极简单:LogcatReader安卓日志查看器完整使用指南 【免费下载链接】LogcatReader A simple app for viewing logcat logs on an android device. 项目地址: https://gitcode.com/gh_mirrors/lo/LogcatReader LogcatReader是一款专为安卓设备设计的轻量级日志…

作者头像 李华
网站建设 2026/4/17 13:44:13

dfs深度查询

dfs深度优先搜索一条路走到头&#xff0c;在回溯走别的路经典的走迷宫问题代码如下#include<bits/stdc.h> using namespace std; int p,q; int mins1e9; int a[100][100]; int v[100][100]; int dx[4]{1,-1,0,0}; int dy[4]{0,0,1,-1}; void dfs(int x,int y,int step) {…

作者头像 李华
网站建设 2026/4/17 13:42:13

ant-design-vue表格进阶:手把手教你实现可拖拽列宽+自适应布局

Ant Design Vue 表格进阶&#xff1a;打造可拖拽列宽与自适应布局的完美结合 在构建现代企业级管理系统时&#xff0c;数据表格作为核心交互组件&#xff0c;其用户体验直接影响工作效率。传统固定列宽的表格往往无法满足不同用户对数据查看的个性化需求&#xff0c;特别是在处…

作者头像 李华