news 2026/3/29 18:57:31

Spring AI实战:实现流式对话中的会话终止功能

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Spring AI实战:实现流式对话中的会话终止功能

前言

在AI对话系统中,流式响应(Streaming)已成为提升用户体验的重要技术。然而,当用户面对长时间生成的回复时,往往希望能够在中途终止对话。本文将详细介绍如何在基于Spring AI的项目中实现流式对话的会话终止功能,完整保存对话上下文,确保系统资源得到合理释放。

一、需求背景

在我们的AI对话平台中,当用户发起一个复杂问题时,AI可能需要较长时间才能生成完整回复。如果用户在等待过程中发现提问不准确或已获得足够信息,希望能够随时终止当前会话。这个功能需要解决以下挑战:

  1. 安全中断正在生成的AI响应流
  2. 保存已生成的部分内容到对话历史
  3. 释放相关系统资源
  4. 保持对话上下文的完整性

二、整体架构设计

我们采用了分层架构来实现会话终止功能:

  • Controller层:提供/stop接口,接收会话终止请求
  • Service层:处理业务逻辑,包括会话状态检查、资源清理和记忆保存
  • Handler层ChatSessionHandler负责管理活动会话和流式响应
  • 存储层:Redis存储会话状态和消息内容,ChatMemory管理对话上下文

三、核心实现代码与解析

1. 会话管理器:ChatSessionHandler

ChatSessionHandler是整个功能的核心,它负责跟踪活动会话、收集部分响应并处理取消操作:

@Slf4j @Component public class ChatSessionHandler { // 存储活动会话 private final Map<String, Disposable> activeSessions = new ConcurrentHashMap<>(); // 存储SSE连接 private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>(); // 会话取消标志 private final Map<String, AtomicBoolean> cancellationFlags = new ConcurrentHashMap<>(); // 存储部分响应 private final Map<String, List<String>> partialResponses = new ConcurrentHashMap<>(); // 注册新会话 public void registerSession(String sessionId, Disposable disposable, SseEmitter emitter) { activeSessions.put(sessionId, disposable); emitterMap.put(sessionId, emitter); cancellationFlags.put(sessionId, new AtomicBoolean(false)); partialResponses.put(sessionId, new ArrayList<>()); emitter.onTimeout(() -> cleanupSession(sessionId)); emitter.onCompletion(() -> cleanupSession(sessionId)); } // 收集AI的部分响应 public void collectPartialResponse(String sessionId, String chunk) { if (partialResponses.containsKey(sessionId)) { partialResponses.get(sessionId).add(chunk); } } // 取消会话 public void cancelSession(String sessionId, String userId, UserMessage userMessage) { // 标记会话已取消 if (cancellationFlags.containsKey(sessionId)) { cancellationFlags.get(sessionId).set(true); } // 保存聊天记忆 saveChatMemoryOnCancel(sessionId, userId, userMessage); // 取消流处理 Disposable disposable = activeSessions.remove(sessionId); if (disposable != null && !disposable.isDisposed()) { disposable.dispose(); } // 关闭SSE连接并发送终止消息 SseEmitter emitter = emitterMap.remove(sessionId); if (emitter != null) { try { emitter.send(SseEmitter.event().data(SseResponse.interrupted(sessionId, "[对话已终止]"))); emitter.complete(); } catch (IOException e) { log.error("SSE连接关闭失败: {}", sessionId, e); try { emitter.completeWithError(e); } catch (Exception ex) { log.error("无法完成错误状态", ex); } } } // 清理会话 cleanupSession(sessionId); } // 保存取消时的聊天记忆 private void saveChatMemoryOnCancel(String sessionId, String userId, UserMessage userMessage) { try { // redis key String memoryIdWithUserId = userId + ":u:" + sessionId; // 1. 获取AI已生成的部分响应 String aiPartialResponse = getCompleteResponse(sessionId); log.info("终止会话时AI已生成的响应: {}", aiPartialResponse); // 2. 构造消息序列 List<Message> messagesToAdd = new ArrayList<>(); log.info("添加用户消息到记忆: {}", userMessage.getText()); // 添加AI的已生成内容 if (StrUtil.isNotBlank(aiPartialResponse)) { AssistantMessage aiMessage = new AssistantMessage(aiPartialResponse + "[对话被用户终止]"); messagesToAdd.add(aiMessage); log.info("添加AI部分响应到记忆: {}", aiPartialResponse); } else { // 即使没有AI响应,也要添加一个终止标记 AssistantMessage aiMessage = new AssistantMessage("[对话被用户终止,无AI响应]"); messagesToAdd.add(aiMessage); log.info("添加终止标记到记忆"); } // 调用ChatMemory保存记忆 chatMemory.add(memoryIdWithUserId, messagesToAdd); log.info("成功保存终止会话的记忆,会话ID: {}, 消息数量: {}", sessionId, messagesToAdd.size()); } catch (Exception e) { log.error("保存终止会话记忆失败,会话ID: {}", sessionId, e); } } }

关键点解析:

  • 使用ConcurrentHashMap保证线程安全,避免并发问题
  • 通过Disposable对象管理响应式流的生命周期
  • partialResponses集合实时收集AI生成的部分内容
  • 会话取消时,添加特殊标记[对话被用户终止],明确标识对话状态

2. 服务层:会话终止业务逻辑

Service层的stop方法处理会话终止的核心业务流程:

@Override public String stop(String requestId) { AssertUtil.notNull("会话id不能为空!", requestId); try { // 1. 从Redis获取会话信息 String sessionData = stringRedisTemplate.opsForValue().get("session:" + requestId); if (StrUtil.isBlank(sessionData)) { throw new ServiceException("会话不存在或已过期"); } // 2. 获取当前用户 LoginUser currentUser = UserUtil.getUser(); if (currentUser == null) { throw new ServiceException("用户未登录"); } Integer userId = currentUser.getId(); // 3. 获取用户消息 UserMessage userMessage = getUserMessageFromSession(userId, requestId); // 4. 检查并取消会话 if (chatSessionHandler.isSessionActive(requestId)) { chatSessionHandler.cancelSession(requestId, userId.toString(), userMessage); } else { log.info("会话 {} 不处于活动状态,直接清理资源", requestId); // 即使会话不活跃,也尝试保存记忆 try { String memoryId = userId + ":u:" + requestId; String aiPartialResponse = chatSessionHandler.getCompleteResponse(requestId); if (StrUtil.isNotBlank(aiPartialResponse)) { List<Message> messagesToAdd = new ArrayList<>(); messagesToAdd.add(userMessage); messagesToAdd.add(new AssistantMessage(aiPartialResponse)); chatMemory.add(memoryId, messagesToAdd); log.info("为非活动会话保存了部分响应,会话ID: {}", requestId); } } catch (Exception e) { log.warn("保存非活动会话记忆失败,会话ID: {}", requestId, e); } } // 5. 清理Redis中的会话数据 stringRedisTemplate.delete("session:" + requestId); stringRedisTemplate.delete("chat:message:" + requestId); log.info("会话已成功终止,ID: {}", requestId); return sessionData; } catch (Exception e) { log.error("终止会话失败,ID: {}", requestId, e); return null; } }

关键点解析:

  • 全面的参数校验和空值处理
  • 智能判断会话状态,处理已结束会话的边缘情况
  • 完整的资源清理策略,防止内存泄漏
  • 详细的日志记录,便于问题追踪

3. 获取用户消息:处理复杂上下文

在会话终止时,需要获取原始用户消息,这需要从多个可能的数据源检索:

private UserMessage getUserMessageFromSession(Integer userId, String sessionId) { try { // 1. 从聊天记忆中获取该会话的完整历史 String memoryId = userId + ":u:" + sessionId; List<Message> messages = chatMemory.get(memoryId, -1); // -1 表示获取所有历史 // 2. 从历史记录中找到最后一条用户消息 if (messages != null && !messages.isEmpty()) { // 逆序遍历,找到最后一条用户消息 for (int i = messages.size() - 1; i >= 0; i--) { Message message = messages.get(i); if (message instanceof UserMessage) { return (UserMessage) message; } } } // 3. 如果没有找到用户消息,尝试从Redis中获取原始请求 String messageKey = "chat:message:" + sessionId; String messageData = stringRedisTemplate.opsForValue().get(messageKey); if (StrUtil.isNotBlank(messageData)) { try { // 尝试解析存储的原始消息 return new ObjectMapper().readValue(messageData, UserMessage.class); } catch (Exception e) { log.warn("解析Redis中存储的用户消息失败,使用默认消息", e); } } // 4. 作为最后的备选,返回一个通用的用户消息 UserMessage defaultMessage = new UserMessage("[已终止的对话]"); return defaultMessage; } catch (Exception e) { log.error("获取用户消息失败,会话ID: {}", sessionId, e); // 出错时返回一个安全的默认消息 UserMessage fallbackMessage = new UserMessage("对话在" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "被终止"); return fallbackMessage; } }

关键点解析:

  • 多层回退机制确保消息可用性
  • 从对话历史中精准定位最后一条用户消息
  • 异常情况下的优雅降级策略
  • 详细的时间戳记录增强可追溯性

4. Controller端点:提供终止接口

最后,我们在Controller中暴露一个简单的REST端点:

@Operation(summary = "会话终止") @GetMapping(value = "/stop") public Result<?> stop(@RequestParam(value = "chatId") String requestId) { String emitter = chatService.stop(requestId); return R.successWithData("会话已成功终止", emitter); }

四、实现难点与解决方案

1. 并发安全问题

挑战:多个请求可能同时操作同一个会话。解决方案:使用ConcurrentHashMapAtomicBoolean确保线程安全,避免并发修改异常。

2. 资源泄漏风险

挑战:流式响应不正确关闭会导致资源泄漏。解决方案:在SSE emitter上注册onTimeoutonCompletion回调,确保会话清理;显式调用dispose()方法释放响应式流。

3. 对话上下文完整性

挑战:会话中断后,对话历史不完整,影响后续对话质量。解决方案:收集并保存已生成的部分响应,添加明确的终止标记,保证上下文连贯性。

4. 边界情况处理

挑战:会话可能已自然结束,或用户多次触发终止。解决方案:通过isSessionActive()方法检查会话状态,针对不同状态采用不同处理策略。

五、优化建议

  1. 超时机制增强:为会话设置合理的超时时间,自动清理长时间无活动的会话
  2. 前端集成:设计优雅的UI反馈,让用户明确知道会话已终止
  3. 监控指标:添加会话终止率等监控指标,分析用户行为
  4. 资源回收优化:定期扫描并清理陈旧的会话数据,减少内存占用

六、总结

本文详细介绍了基于Spring AI实现流式对话中会话终止功能的技术方案。通过合理设计会话生命周期管理、部分响应收集和资源清理机制,我们能够为用户提供流畅的对话体验,同时确保系统资源的高效利用。这个功能的实现不仅提升了用户体验,也为构建更智能、更响应式的AI对话系统奠定了基础。

在AI应用日益普及的今天,关注用户体验的每个细节,包括如何优雅地结束对话,都是构建成功产品的关键。希望本文的分享能为同样在AI应用开发道路上探索的开发者提供有价值的经验。


技术栈:Spring AI、Spring Boot 3.x、Redis、Reactor、SSE
适用场景:AI聊天应用、智能客服系统、对话式AI产品

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/27 5:59:30

告别查重 + AIGC 双重警报!宏智树 AI 教你给论文注入人工原创灵魂

作为深耕论文写作科普的教育博主&#xff0c;后台每天都被毕业生的求助刷屏&#xff1a;“查重率降到 15%&#xff0c;却因 AIGC 检测超标被打回”“降重后语句不通顺&#xff0c;导师吐槽像机器翻译”“AI 写的初稿怎么改才能躲过双重审查”。 随着高校学术审核标准升级&…

作者头像 李华
网站建设 2026/3/27 9:25:33

Hugging Face模型说明

第一类&#xff1a;原生模型格式这是模型训练完成后最初的格式&#xff0c;无任何压缩&#xff0c;推理精度最高&#xff0c;适合「模型微调、训练、科研」&#xff0c;也适合「有高端显卡的用户推理」&#xff0c;缺点是体积大、硬件要求高。.safetensors后缀&#xff1a;.saf…

作者头像 李华
网站建设 2026/3/26 17:37:05

全网最全9个AI论文软件,MBA高效写作必备!

全网最全9个AI论文软件&#xff0c;MBA高效写作必备&#xff01; AI 工具助力论文写作&#xff0c;高效与精准并存 在当今快节奏的学术环境中&#xff0c;MBA 学生和科研工作者面临着日益繁重的论文写作任务。无论是开题报告、研究设计&#xff0c;还是数据分析与结论撰写&…

作者头像 李华
网站建设 2026/3/27 6:33:04

Typora下载与激活

下载 下载这一步很关键&#xff0c;一定要下对版本&#xff0c;本教程只支持 1.10.x以前的版本 安装包和补丁下载 1、历史版本安装包 2、补丁提取码&#xff1a;7ih6 安装 根据需求下载如下版本&#xff1a; 根据顺序依次按照可自定义安装路径 如D:\Typora一直下一步&…

作者头像 李华
网站建设 2026/3/27 1:10:39

学长亲荐2026自考AI论文平台TOP10:选对工具轻松过关

学长亲荐2026自考AI论文平台TOP10&#xff1a;选对工具轻松过关 2026年自考AI论文平台测评&#xff1a;选对工具事半功倍 随着人工智能技术的不断进步&#xff0c;越来越多的自考生开始借助AI写作工具提升论文撰写效率。然而&#xff0c;面对市场上琳琅满目的平台&#xff0c;如…

作者头像 李华