在处理多轮对话的上下文管理时,理论往往很美,但工程落地全是坑。
之前探索了多轮对话长上下文截断技巧示例
https://blog.csdn.net/liliang199/article/details/160157872
这里进一步探索增量摘要和结构化摘要,所用示例参考和修改自网络资料。
1 摘要技巧
摘要能极大节省 Token,但会丢失细节。这里的建议是分阶段实施。
具体为增量摘要、结构化摘要、反思式摘要。
1.1 增量摘要
增量摘要是指维护一个summary变量。
比如,每满 5 轮对话,调用轻量级模型,如 GPT-4o-mini 或本地 7B 模型,
将新产生的“5 轮对话”和“旧摘要” 合并生成一个新摘要。
如此,对话上下文不会随着总轮数的增加而线性增加,能控制在一个相对合理的范围内。
1.2 结构化摘要
在摘要过程中,LLM为节约字数,有可能仅仅是总结下历史对话。
对于编程类对话,总结历史对话可能丢失很多编程实现细节,而这些细节往往是更重要的。
在这种情况下,更有效的做法,就是强制输出 JSON 格式,也就是结构化摘要。
示例如下
{
"user_personal_info": {"name": null, "preference": "like coffee"},
"task_progress": "选定了酒店,正在对比机票",
"pending_questions": ["需要确认返程日期"],
"key_facts": ["预算 5000 元", "不去海边"]
}
这种填充式摘要比纯文本摘要的信噪比高得多,后续程序可以直接读取 JSON 做逻辑判断。
1.3 反思式摘要
在生成摘要时,增加一个 Prompt 指令:"指出对话中用户尚未得到解答的悬而未决的问题"。
这就是反思式摘要,反思式摘要解决长对话中经常被冗余文本淹没的重要的未回答的问题。
比如,LLM说了一半问题时,突然岔开话题,最后LLM忘记回答该问题的场景。
2 代码示例
2.1 场景说明
这是一个编程助手场景,示例如何通过增量摘要和结构化摘要,处理长达数十轮的技术讨论。
此场景演示编程助手对话中,如何压缩历史,同时保留最近几轮的精确代码上下文。
2.2 代码示例
编程助手场景中,增量摘要 + 结构化摘要的实现代码示例如下。
1)环境配置
由于涉及大模型调用,这里选用openai格式,需要配置api key和base url,示例代码如下所示。
import os os.environ['HF_ENDPOINT'] = "https://hf-mirror.com" model_name = gpt_model_name # LLM名称,比如deepseek-r1, qwen3.5-8b os.environ['OPENAI_API_KEY'] = gpt_api_key # LLM供应商提供的api key os.environ['OPENAI_BASE_URL'] = gpt_api_url # LLM供应商提供llm访问api的url2)增量&结构化摘要
增量摘要 + 结构化摘要的具体内容示例如下
"""
场景二:编程助手 - 增量摘要 + 结构化摘要演示:
- 每 4 轮对话触发一次摘要
- 摘要以结构化 JSON 格式存储(包含代码上下文、未解决问题等)
- 最近 4 轮完整对话保留原文
- 历史部分以摘要形式注入
"""
代码示例如下
import os import json from openai import OpenAI class ProgrammingAssistant: def __init__(self, model: str = "gpt-4o-mini"): self.client = OpenAI() self.model = model # 完整对话历史(用于触发摘要前的积累) self.full_history = [] # 结构化摘要 self.structured_summary = { "topic": None, "code_snippets": [], "decisions_made": [], "pending_questions": [], "key_facts": [] } # 最近保留的完整对话轮数 self.recent_window = [] self.summary_threshold = 4 # 每 4 轮触发一次摘要 def _generate_structured_summary(self, messages_to_summarize: list) -> dict: """调用 LLM 生成结构化摘要""" conversation_text = "" for msg in messages_to_summarize: conversation_text += f"{msg['role']}: {msg['content']}\n" prompt = f"""请将以下编程对话提炼为结构化 JSON 摘要: 对话内容: {conversation_text} 请输出如下格式的 JSON(只输出 JSON,不要其他文字): {{ "topic": "讨论的核心主题(一句话)", "code_snippets": ["用户或助手分享的重要代码片段"], "decisions_made": ["已确定的决策或方案"], "pending_questions": ["尚未解决的问题"], "key_facts": ["重要事实,如用户的环境、版本等"] }} """ response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": "你是一个专业的技术摘要助手,只输出合法的 JSON。"}, {"role": "user", "content": prompt} ], temperature=0.3, ) try: return json.loads(response.choices[0].message.content.strip()) except json.JSONDecodeError: # 容错:返回基础结构 return { "topic": "编程讨论", "code_snippets": [], "decisions_made": [], "pending_questions": [], "key_facts": [] } def _merge_summaries(self, old_summary: dict, new_summary: dict) -> dict: """合并新旧摘要,保留关键信息""" merged = { "topic": new_summary.get("topic") or old_summary.get("topic"), "code_snippets": list(set(old_summary.get("code_snippets", []) + new_summary.get("code_snippets", [])))[-5:], # 保留最多5个代码片段 "decisions_made": old_summary.get("decisions_made", []) + new_summary.get("decisions_made", []), "pending_questions": new_summary.get("pending_questions", []), # 新摘要覆盖未解决问题 "key_facts": list(set(old_summary.get("key_facts", []) + new_summary.get("key_facts", []))) } return merged def _build_context(self, current_user_input: str) -> list: """构建包含摘要和近期原文的上下文""" self.full_history.append({"role": "user", "content": current_user_input}) self.recent_window.append({"role": "user", "content": current_user_input}) # 检查是否需要生成摘要 if len(self.full_history) >= self.summary_threshold * 2: # user+assistant 成对计算 messages_to_summarize = self.full_history[:-self.summary_threshold * 2] if messages_to_summarize: new_summary = self._generate_structured_summary(messages_to_summarize) self.structured_summary = self._merge_summaries(self.structured_summary, new_summary) # 清空已摘要的历史,只保留最近窗口 self.full_history = self.full_history[-self.summary_threshold * 2:] # 构建上下文 system_prompt = { "role": "system", "content": "你是一位经验丰富的编程导师,擅长解答 Python、算法和系统设计问题。" } # 将摘要作为 system 消息的一部分注入 summary_text = "" if self.structured_summary.get("topic"): summary_text = f"[历史对话摘要]\n" summary_text += f"主题: {self.structured_summary['topic']}\n" if self.structured_summary["code_snippets"]: summary_text += f"重要代码片段: {self.structured_summary['code_snippets'][:3]}\n" if self.structured_summary["decisions_made"]: summary_text += f"已定方案: {self.structured_summary['decisions_made']}\n" if self.structured_summary["pending_questions"]: summary_text += f"待解决问题: {self.structured_summary['pending_questions']}\n" if self.structured_summary["key_facts"]: summary_text += f"关键事实: {self.structured_summary['key_facts']}\n" context = [system_prompt] if summary_text: context.append({"role": "system", "content": summary_text}) # 添加最近的完整对话 context.extend(self.recent_window[-self.summary_threshold * 2:]) return context def chat(self, user_input: str) -> str: context = self._build_context(user_input) print(f"\n[调试] 结构化摘要: {json.dumps(self.structured_summary, ensure_ascii=False, indent=2)[:200]}...") response = self.client.chat.completions.create( model=self.model, messages=context, temperature=0.5, ) ai_response = response.choices[0].message.content self.full_history.append({"role": "assistant", "content": ai_response}) self.recent_window.append({"role": "assistant", "content": ai_response}) # 保持 recent_window 不超过窗口大小 if len(self.recent_window) > self.summary_threshold * 4: self.recent_window = self.recent_window[-self.summary_threshold * 2:] return ai_response3)运行测试
以下是运行测试的示例
# ========== 测试运行 ========== if __name__ == "__main__": assistant = ProgrammingAssistant(model=model_name) print("=== 编程助手测试 (增量摘要) ===") test_queries = [ "我想用 Python 写一个并发下载器,有什么建议?", "asyncio 和 threading 哪个更适合?", "我决定用 asyncio,代码大概怎么写?", "import asyncio\nimport aiohttp\n\nasync def download(url):\n ...\n这样开始对吗?", "如果我要限制并发数,用 Semaphore 对吗?", "好的,那超时和重试怎么处理?", "如果下载的文件很大,内存会不会爆?", "我明白了,用流式写入文件。还有什么需要注意的吗?", ] for i, query in enumerate(test_queries): print(f"\n👤 用户: {query}") response = assistant.chat(query) print(f"🤖 助手: {response[:600]}..." if len(response) > 150 else f"🤖 助手: {response}") if (i + 1) % 4 == 0: print(f"\n📊 [第 {i+1} 轮后触发了摘要生成]")输出如下所示
=== 编程助手测试 (增量摘要) ===
👤 用户: 我想用 Python 写一个并发下载器,有什么建议?
[调试] 结构化摘要: {
"topic": null,
"code_snippets": [],
"decisions_made": [],
"pending_questions": [],
"key_facts": []
}...
🤖 助手: 要用 Python 编写一个高效的并发下载器,你可以从以下几个方面入手,构建一个稳定、可扩展且高性能的系统:---
## 一、并发方案选择
根据任务类型选择适合的并发模型:
| 方案 | 适用场景 | 优点 | 缺点 |
|--------------|--------------------|--------------------------|--------------------------|
| `threading` | I/O 密集型任务 | 实现简单,资源开销小 | 受 GIL 限制,不适合计算密集 |
| `multiprocessing` | CPU 密集型任务 | 绕过 GIL,真正并行 | 进程开销大,通信复杂 |
| `asyncio` + `aiohttp` | 高并发网络任务 | 性能最佳,资源利用率高 | 学习曲线较陡,需异步编程 |✅ 推荐:网络下载属于 I/O 密集型,优先使用 `asyncio + aiohttp`,其次可用 `ThreadPoolExecutor`。
---
## 二、核心库推荐
- `aiohttp`:异步 HTTP 客户端,适合高并发下载
- `r...👤 用户: asyncio 和 threading 哪个更适合?
[调试] 结构化摘要: {
"topic": null,
"code_snippets": [],
"decisions_made": [],
"pending_questions": [],
"key_facts": []
}...
🤖 助手: 对于 **并发下载器** 这个特定场景,结论如下:* **追求高性能、高并发(>100 个任务)**:选 **`asyncio`**。
* **追求开发简单、快速脚本(<50 个任务)**:选 **`threading`**。下面从 5 个核心维度进行深度对比,帮助你做出决定:
### 1. 核心对比表
| 维度 | `asyncio` (异步 IO) | `threading` (多线程) |
| :--- | :--- | :--- |
| **并发能力** | ⭐⭐⭐⭐⭐ (轻松支持 1000+ 连接) | ⭐⭐⭐ (通常建议 <200 线程,否则开销大) |
| **内存占用** | ⭐⭐⭐⭐⭐ (极低,协程开销小) | ⭐⭐ (每个线程需独立栈空间,开销大) |
| **CPU 利用** | 单核 (事件循环),无锁竞争 | 多核 (受 GIL 限制,但 I/O 等待时会释放) |
| **代码难度** | ⭐⭐⭐⭐ (需理解 async/await,所有库需异步) | ⭐⭐ (同步代码,思维直观,库丰富) |
| **依赖库** | `aiohttp`, `aiomysql` 等异步库 | `requests`, `urllib` 等同步库 |
| **调试难度** | 较高 (报错堆栈复杂,易阻塞事件循环) | 较低 (标准同步报错,但需注意线程安...👤 用户: 我决定用 asyncio,代码大概怎么写?
[调试] 结构化摘要: {
"topic": null,
"code_snippets": [],
"decisions_made": [],
"pending_questions": [],
"key_facts": []
}...
🤖 助手: 太好了!`asyncio` 是构建高性能并发下载器的最佳选择。下面我为你提供一个**生产级别的完整示例**,包含并发控制、进度条、重试机制、超时处理等核心功能,并附带使用说明和扩展建议。---
## 📦 完整代码示例
```python
import asyncio
import aiohttp
from pathlib import Path
from tqdm import tqdm
from typing import List, Optional
import timeclass AsyncDownloader:
def __init__(
self,
max_concurrent: int = 10,
timeout: int = 30,
retry_times: int = 3,
chunk_size: int = 8192
):
self.max_concurrent = max_concurrent
self.timeout = timeout
self.retry_times = retry_times
self.chunk_size = chunk_size
self.sema...👤 用户: import asyncio
import aiohttpasync def download(url):
...
这样开始对吗?[调试] 结构化摘要: {
"topic": null,
"code_snippets": [],
"decisions_made": [],
"pending_questions": [],
"key_facts": []
}...
🤖 助手: 你的开头**基本正确**,但还**不完整**。`async def` 只是定义了异步函数,要真正运行它,还需要正确的**入口点**和**Session 管理**。---
## ✅ 正确的最小可用结构
```python
import asyncio
import aiohttpasync def download(session, url):
"""下载单个文件"""
async with session.get(url) as resp:
content = await resp.read()
print(f"✓ 已下载:{url}")
return contentasync def main():
"""程序入口"""
urls = ['https://example.com/file1.zip', 'https://example.com/file2.zip']
# 必须创建 ClientSession 并正确关闭
async with aiohttp.ClientSession() as session:
tasks = [download(session, url) for url in urls]
await asyn...📊 [第 4 轮后触发了摘要生成]
👤 用户: 如果我要限制并发数,用 Semaphore 对吗?
[调试] 结构化摘要: {
"topic": "编程讨论",
"code_snippets": [],
"decisions_made": [],
"pending_questions": [],
"key_facts": []
}...
🤖 助手: **对的,完全正确!** 👍在 `asyncio` 中,`asyncio.Semaphore` 是控制并发数的**标准做法**。它的作用就像是一个“令牌”,只有拿到令牌的协程才能执行下载任务,从而防止同时发起过多请求导致服务器拒绝或本地资源耗尽。
---
### 1. 核心用法示例
```python
import asyncio
import aiohttpasync def download(session, url, semaphore):
# ✅ 关键:使用 async with 获取锁
async with semaphore:
print(f"开始下载:{url}")
async with session.get(url) as resp:
await resp.read()
print(f"完成下载:{url}")async def main():
urls = ['http://example.com/file1', 'http://example.com/file2', ...] * 100
# ✅ 关键:在外部创建 Semaphore,所有任务共享
semaphore = asyncio.Semaphore(10) # 限...👤 用户: 好的,那超时和重试怎么处理?
[调试] 结构化摘要: {
"topic": "Python 高效并发下载器的实现方案选择与代码示例",
"code_snippets": [
"async def download_file(session, url, save_path, semaphore):\n async with semaphore:\n async with session.get(url, timeou...
🤖 助手: 非常好的问题!**超时**和**重试**是生产级下载器的必备功能。下面我为你展示完整的实现方案。---
## 1. 超时控制(Timeout)
`aiohttp` 支持多种超时设置,推荐使用 `ClientTimeout`:
```python
import aiohttp# ✅ 推荐:使用 ClientTimeout 对象
timeout = aiohttp.ClientTimeout(
total=60, # 整个请求最大时间(秒)
connect=10, # 连接建立超时
sock_read=30 # 读取数据超时
)async with session.get(url, timeout=timeout) as resp:
...
```| 参数 | 说明 | 推荐值 |
|------|------|--------|
| `total` | 整个请求的最大时间 | 60~300 秒(大文件增加) |
| `connect` | 建立连接超时 | 5~10 秒 |
| `sock_read` | 读取数据超时 | 30~60 秒 |
| `sock_connect` | TCP 握手超时 | 5~10 秒 |---
## 2. 重试机制(Retry)
推荐使用 **指数退避** 策略,避免频繁...
👤 用户: 如果下载的文件很大,内存会不会爆?
[调试] 结构化摘要: {
"topic": "Python 并发下载器中 asyncio 与 threading 的技术选型及实现讨论",
"code_snippets": [
"Asyncio 异步思维:async def download(url): data = await session.get(url); save(data)",
"Threading 同步思维:def downloa...
🤖 助手: **非常好的问题!** 这正是很多人踩过的坑。🚨**答案:如果用错方法,内存会爆;如果用对方法,100GB 的文件也没问题。**
---
## ❌ 错误写法:一次性加载到内存
```python
# ⚠️ 危险!大文件会撑爆内存
async def download_wrong(session, url, save_path):
async with session.get(url) as resp:
content = await resp.read() # ❌ 整个文件加载到内存
with open(save_path, 'wb') as f:
f.write(content) # 再写入磁盘
```| 文件大小 | 内存占用 | 结果 |
|---------|---------|------|
| 10 MB | 10 MB | ✅ 正常 |
| 500 MB | 500 MB | ⚠️ 危险 |
| 2 GB | 2 GB | ❌ 可能 OOM |
| 10 GB | 10 GB | ❌ 必然崩溃 |---
## ✅ 正确写法:流式分块读取
```python
# ✅ 安全!内存占用恒定(与文件大小无关)
async def download_correct(session, url...👤 用户: 我明白了,用流式写入文件。还有什么需要注意的吗?
[调试] 结构化摘要: {
"topic": "编程讨论",
"code_snippets": [
"Threading 同步思维:def download(url): data = requests.get(url); save(data)",
"async def download_file(session, url, save_path, semaphore):\n async wit...
🤖 助手: 你理解得很到位!流式写入是核心,但**生产级下载器**还有很多细节需要注意。我整理了最关键的 6 个要点:---
## 1️⃣ 临时文件 + 原子操作(避免半成品)
**问题**:下载中断会留下损坏的文件,下次无法判断是否完整。
**方案**:先下载到 `.tmp` 文件,完成后再重命名。
```python
async def download(self, session, url, save_path):
# ✅ 使用临时文件
temp_path = save_path.with_suffix(f"{save_path.suffix}.tmp")
try:
async with session.get(url, timeout=self.timeout) as resp:
resp.raise_for_status()
with open(temp_path, 'wb') as f:
async for chunk in resp.content.iter_chunked(8192):
f.write(chunk)
# ✅ 下载完成再重命名...📊 [第 8 轮后触发了摘要生成]
增量摘要&结构化摘要 ,采用JSON格式存储代码片段、决策和待办,并对新旧摘要智能合并。
每 4 轮对话后,早期对话被压缩为结构化 JSON 摘要(包含代码片段、决策、待办事项)。
后续对话模型既能通过摘要了解历史背景,又能通过保留最近4轮精确引用刚讨论过的代码细节。
reference
---
多轮对话长上下文截断技巧示例
https://blog.csdn.net/liliang199/article/details/160157872
LLM上下文管理探索-滑动窗口+摘要压缩+优先级丢弃
https://blog.csdn.net/liliang199/article/details/159986000
DeepSeek 如果对话过长,如何处理上下文长度限制问题?
https://bbs.itying.com/topic/67a6875f55a429007d7d3b26