news 2025/12/22 14:41:21

使用 SSE 单向推送实现 系统通知功能

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
使用 SSE 单向推送实现 系统通知功能

使用 SSE 单向推送实现 系统通知功能

说明:本说明基于自己毕设项目中“系统通知模块 (Notification / SSE)”的实现,重点讲清楚在前端从 **初始化环境 → 建立 SSE 连接 → 解析服务端事件 → 打印日志 ** 的完整技术链路,至于收到信息如何处理和具体项目有关。


在我的毕设中,又系统通知这个功能,单向的,告诉用户,你的内容发布审核成功,或者失败等等一系列的单方面的通知,所有选型sse而不选择websorket
接下来先给出后端实现,在是前端实现

后端实现 系统通知模块后端实现说明

涉及的核心类:

  • NotificationController:通知相关 HTTP 接口(SSE 流、已读、未读数、分页列表)。
  • NotificationService:通知的持久化、SSE 连接管理与推送实现。
  • Notification/notification表:通知实体与数据库表结构。
  • NotificationType/NotificationStatus:通知类型与状态枚举(如PRIVATE_MESSAGE)。

一、建立 SSE 通知连接

1. Controller:GET /api/notify/stream

  • 接口类NotificationController
  • 方法stream(@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId)
  • 路径/api/notify/stream
  • 作用
    • 校验当前用户是否已登录(从UserContext.getUserId()读取 userId)。
    • 未登录时抛出401 UNAUTHORIZED
    • 登录状态下,调用notificationService.connect(userId, lastEventId)建立 SSE 连接。

简要流程:

  1. UserContext获取当前用户 ID。
  2. 若未登录:抛出ResponseStatusException(HttpStatus.UNAUTHORIZED)
  3. 若已登录:将userIdLast-Event-ID交给NotificationService.connect
  4. 返回 Spring 的SseEmitter对象,由框架维持 SSE 链接。

2. Service:NotificationService.connect

  • 方法签名public SseEmitter connect(Long userId, String lastEventId)
  • 内部逻辑
    1. 创建一个SseEmitter实例,超时时间为SSE_TIMEOUT = 30 * 60 * 1000L(30 分钟)。
    2. 将该SseEmitter加入到emitterPool中:
      • emitterPool类型为Map<Long, CopyOnWriteArrayList<SseEmitter>>,按userId维护用户当前所有 SSE 连接。
    3. emitter注册回调:
      • onCompletion/onTimeout/onError时,从emitterPool中移除当前连接。
    4. 发送一次心跳事件heartbeat),名称为"heartbeat",数据为"ping"
    5. 解析Last-Event-ID
      • 如果有合法的lastEventId,解析为lastIdLong)。
      • 若解析失败或为空,则为null
    6. 调用resendPending(userId, lastId, emitter)补发历史通知(最多 100 条)。
    7. 返回SseEmitter,等待后续通知推送。

3. SSE 连接断开/超时

  • connect方法中,对SseEmitter注册了:
    • onCompletion:连接正常完成时调用removeEmitter(userId, emitter)
    • onTimeout:连接超时时调用removeEmitter(userId, emitter)
    • onError:发送异常等错误时调用removeEmitter(userId, emitter)
  • removeEmitter会从emitterPool对应用户的列表中移除该SseEmitter,防止内存泄漏与后续重复推送。

二、通知的创建与推送

1. 通用创建+推送:NotificationService.createAndDispatch

  • 方法签名

    publicNotificationcreateAndDispatch(LonguserId,NotificationTypetype,Stringtitle,Stringcontent,Stringpayload)
  • 调用方示例

    • 私信业务中的离线提醒(ChatServiceImpl)。
    • 管理员公告等其他业务模块。
  • 执行流程

    1. 调用buildNotification(userId, type, title, content, payload)构造一个Notification实体:
      • userId:接收通知的用户 ID。
      • type:通知类型(如PRIVATE_MESSAGESYSTEM_ANNOUNCEMENT等)。
      • title:通知标题/摘要。
      • content:通知正文或简述。
      • payload:扩展 JSON 字符串,用于前端跳转或展示更多信息。
      • status:初始为UNREAD
      • createdAt/updatedAt:为当前时间。
    2. 通过notificationMapper.insert(notification)将通知写入notification表。
    3. 调用dispatch(notification)将该通知推送给所有当前在线的 SSE 连接。
    4. 返回持久化后的Notification对象。

2. 批量发送:createAndDispatchBatch

  • 方法签名

    publicvoidcreateAndDispatchBatch(List<Long>userIds,NotificationTypetype,Stringtitle,Stringcontent,Stringpayload)
  • 逻辑:对userIds逐个调用createAndDispatch,用于对多用户广播同一类型通知(如系统公告或活动推送)。

3. 通知推送:NotificationService.dispatch

  • 方法签名public void dispatch(Notification notification)
  • 执行流程
    1. emitterPool中取出该通知对应userId的所有SseEmitter列表。
    2. 若列表为空,说明用户此时没有打开 SSE 通道,方法直接返回(仅数据库中保留通知记录,后续建立连接时再补发)。
    3. 遍历每个SseEmitter,调用:
      • emitter.send(SseEmitter.event().id(String.valueOf(notification.getId())).name(notification.getType()).data(notification))
        • id:使用通知 ID(字符串形式),用于客户端的Last-Event-ID与断点续传。
        • name:使用notification.getType(),即NotificationType的枚举名(如PRIVATE_MESSAGE)。
        • data:整个Notification对象(前端接收后可按NotificationVO解析)。
    4. 如果发送过程中抛出IOException,记录日志并调用removeEmitter移除当前失效连接。

4. 历史通知补发:NotificationService.resendPending

  • 方法签名private void resendPending(Long userId, Long lastId, SseEmitter emitter)
  • 补发策略
    • lastId != null:补发ID 大于lastId的所有通知。
    • lastId == null:补发所有未读(status = UNREAD)通知
    • 均按 ID 升序排序,LIMIT 1条,只是补发过最新一条数据,列表通过数据库查询。
  • 通知通过SseEmitter.event().id(...).name(...).data(...)发送,与实时推送一致。

三、通知的已读、未读与统计

1. 标记已读:POST /api/notify/read

Controller:NotificationController.markRead
  • 路径/api/notify/read
  • 请求体NotificationReadRequest,包含:
    • notificationIds(可选):要标记为已读的通知 ID 列表。
    • upToId(可选):将id <= upToId的通知全部标记为已读。
  • 控制器逻辑
    1. UserContext获取当前用户 ID,未登录返回Result.unauthorized("未登录")
    2. 调用notificationService.markAsRead(userId, notificationIds, upToId)执行更新。
    3. 再调用notificationService.countUnread(userId)获取最新未读数量。
    4. 返回Result.success(unread)
Service:NotificationService.markAsRead
  • 方法签名public void markAsRead(Long userId, List<Long> notificationIds, Long upToId)
  • 执行逻辑
    1. notificationIds为空且upToId == null,直接返回,不做更新。
    2. 创建LambdaUpdateWrapper<Notification>
      • eq(Notification::getUserId, userId):只更新当前用户的通知。
      • eq(Notification::getStatus, NotificationStatus.UNREAD.name()):只处理未读通知。
      • notificationIds不为空:in(Notification::getId, notificationIds)
      • upToId不为nullle(Notification::getId, upToId)
    3. set(Notification::getStatus, NotificationStatus.READ.name())
    4. 执行notificationMapper.update(null, wrapper)完成批量更新。

2. 未读数量统计:GET /api/notify/unread-count

Controller:NotificationController.unreadCount
  • 路径/api/notify/unread-count
  • 逻辑
    1. UserContext获取当前用户 ID,未登录返回Result.unauthorized("未登录")
    2. 调用notificationService.countUnread(userId)获取未读数量。
    3. 返回Result.success(unread)
Service:NotificationService.countUnread
  • 方法签名public long countUnread(Long userId)
  • 逻辑
    • 使用LambdaQueryWrapper<Notification>
      • eq(Notification::getUserId, userId)
      • eq(Notification::getStatus, NotificationStatus.UNREAD.name())
    • 调用notificationMapper.selectCount(wrapper)返回未读总数。

3. 最近未读列表:GET /api/notify/recent

  • 路径/api/notify/recent
  • 说明:用于调试或前端恢复,获取当前用户最近未读通知(最多 100 条)。
  • Service 方法public List<Notification> recentUnread(Long userId)
    • 条件:
      • userId匹配当前用户;
      • status = UNREAD
    • 排序:createdAt倒序;
    • last("LIMIT 100")限制数量。

四、通知分页列表接口

1. 接口:GET /api/notify/list

Controller:NotificationController.list
  • 路径/api/notify/list
  • 入参(Query)
    • page:页码,默认1
    • pageSize:每页数量,默认10
    • status:可选,UNREAD/READ/ALL,默认ALL(不传时也视为ALL)。
  • 流程
    1. UserContext获取当前用户 ID,未登录返回Result.unauthorized("未登录")
    2. 调用notificationService.listNotifications(userId, page, pageSize, status)
    3. 返回Result.success(Page<NotificationVO>),分页结构与项目统一(total/size/current/pages/records等)。
Service:NotificationService.listNotifications
  • 方法签名public Page<NotificationVO> listNotifications(Long userId, int page, int pageSize, String status)
  • 实现思路
    1. 构造Page<Notification> pageParam = new Page<>(page, pageSize)
    2. 构造LambdaQueryWrapper<Notification> wrapper
      • eq(Notification::getUserId, userId)
      • status非空且不是"ALL"eq(Notification::getStatus, status.toUpperCase())
      • orderByDesc(Notification::getCreatedAt)
    3. notificationMapper.selectPage(pageParam, wrapper)得到notificationPage
    4. 手动构造Page<NotificationVO> voPage = new Page<>(page, pageSize, notificationPage.getTotal())
      • 遍历notificationPage.getRecords(),将每条记录映射为NotificationVO
        • 复制id/userId/type/title/content/payload/status/createdAt等字段。
      • voPage.setRecords(records)
    5. 返回voPage

五、与私信模块的集成(离线私信提醒)

1. 场景描述

当用户 A 给用户 B 发送一对一私信时:

  • 如果 B 当前通过 WebSocket 在线(有chat:online:{userId}标记):
    • 只依赖聊天模块的实时消息推送(STOMP/topic/chat/{sessionId})。
  • 如果 B 不在线(没有在线标记):
    • 在聊天消息正常写入chat_message表的基础上,额外为 B 创建一条PRIVATE_MESSAGE类型的系统通知,写入notification表并通过 SSE 推送/补发。

2. 关键实现:ChatServiceImpl.sendMessage

  • 在创建ChatMessage并更新ChatSession.updatedAt后:
    1. 判断当前会话是否为单聊:"single".equalsIgnoreCase(session.getSessionType())
    2. 查询该会话中“对方成员” (ChatSessionMemberuserId != 发送者) 获取targetUserId
    3. 根据RedisConstants.CHAT_ONLINE_PREFIX + targetUserId从 Redis 中读取在线标记:
      • 若存在值:认为对方在线,不额外生成通知。
      • 若不存在值:认为对方离线,进入通知创建逻辑。
    4. 离线场景下:
      • 查询发送者用户信息,取昵称或用户名作为senderName
      • 构造标题title = "收到一条新的私信"
      • 构造预览preview = content的前若干字符(超长截断)。
      • 构造payloadJSON 字符串:包含sessionIdmessageIdsenderIdsenderName,供前端跳转使用。
      • 调用notificationService.createAndDispatch(targetUserId, NotificationType.PRIVATE_MESSAGE, title, preview, payload)
        • 将通知写入notification表。
        • 若 B 此时已经建立 SSE 连接,会立刻收到一条PRIVATE_MESSAGE事件。
        • 若 B 之后才建立 SSE 连接,会通过补发逻辑resendPending收到历史通知。

前端实现 前端 SSE 通知实现说明(从 0 到看到控制台日志)

一、相关文件总览

  • vite.config.js
    • 开发环境代理配置:/api→ 后端VITE_API_BASE_URL
  • src/main.js
    • 应用启动时初始化用户状态并根据登录态初始化通知 SSE。
  • src/utils/notificationStream.js
    • SSE 连接和消息解析的核心逻辑(基于 fetch + ReadableStream,自实现 EventSource)。
  • src/stores/notification.js
    • 通知相关 Pinia Store,管理未读数量、通知列表和 SSE 状态。
  • src/components/Header/index.vue
    • 顶部 Header,展示通知铃铛和未读红点。
  • src/pages/UserCenter/pages/Notifications/index.vue
    • 用户中心「消息通知」页面,展示通知列表。

二、环境与代理(为什么开发环境不直接跨域)

2.1 Vite 代理配置

vite.config.js中开发环境的核心代理配置:

server:{port:3003,host:true,cors:true,...(isDev&&{proxy:{'/api':{target:env.VITE_API_BASE_URL||'http://localhost:8080',changeOrigin:true,rewrite:(path)=>path.replace(/^\/api/,'')}}})}

关键点:

  • 浏览器访问的是http://localhost:3003(前端 dev server)。
  • 任何以/api开头的请求都会被 Vite 转发到VITE_API_BASE_URL对应的后端,例如:
    • 浏览器:/api/api/notify/stream
    • Vite rewrite:去掉第一个/api,转成/api/notify/stream
    • 后端实际收到:http://127.0.0.1:8081/api/notify/stream(和后端文档一致)。
  • 对浏览器来说始终是同源 (3003),不会触发 CORS 校验,跨域问题由 dev server 帮我们挡在背后。

三、应用入口:在什么时机建立 SSE 连接?

3.1src/main.js

main.js中:

  1. 创建应用实例并挂载 Pinia / Router / UI 库。
  2. 初始化用户状态(从 localStorage 恢复登录态):
constapp=createApp(App)app.use(pinia)app.use(router)// ...constuserStore=useUserStore()constnotificationStore=useNotificationStore()userStore.initUserState()
  1. 当恢复登录状态后,如果用户已登录,立即初始化通知 SSE,并获取一次未读数量:
if(userStore.isLogin){initNotificationStream()notificationStore.fetchUnreadCount()}
  1. 同时监听登录状态变化,自动处理连接的建立和关闭:
watch(()=>userStore.isLogin,(isLogin)=>{if(isLogin){initNotificationStream()notificationStore.fetchUnreadCount()}else{closeNotificationStream()notificationStore.reset()}})

结论:

  • 应用启动时,如果用户已登录 → 自动建立 SSE 连接。
  • 用户登录成功 → 自动建立连接。
  • 用户登出或 token 失效 → 自动关闭连接并清空通知状态。

四、SSE 核心:notificationStream.js中的连接与解析

4.1 连接地址

/api/api/notify/stream

说明:

  • 浏览器视角:http://localhost:3003/api/api/notify/stream(同源)。
  • Vite 代理:去掉第一个/api/api/notify/stream
  • 后端视角:http://127.0.0.1:8081/api/notify/stream(与接口文档一致)。

4.2 建立 SSE 连接

initNotificationStream负责发起连接:

exportfunctioninitNotificationStream(){if(typeofwindow==='undefined')returnif(reading)return// 已在读流则不重复建立constuserStore=useUserStore()constnotificationStore=useNotificationStore()try{constsseUrl='/api/api/notify/stream'console.log('[SSE] 准备建立连接:',sseUrl)abortController=newAbortController()reading=truenotificationStore.setSseConnected(true)fetch(sseUrl,{method:'GET',headers:{Accept:'text/event-stream','Cache-Control':'no-cache',...(userStore.token?{Authorization:`Bearer${userStore.token}`}:{})},credentials:'include',signal:abortController.signal}).then(/* 处理响应与读流 */).catch(/* 错误处理 */)}catch(error){// ...}}

关键点:

  • 使用原生fetch而不是EventSource,是因为需要自定义Authorization头。
  • 通过Authorization: Bearer <token>携带登录状态,兼容后端鉴权逻辑。
  • credentials: 'include'保留 Cookie 信息(如果后端需要)。

4.3 处理 HTTP 响应并输出基础日志

.then(async(response)=>{console.log('[SSE] 响应状态:',response.status,response.statusText)console.log('[SSE] 响应头 content-type:',response.headers.get('content-type'))if(!response.ok||!response.body){thrownewError(`SSE 连接失败:${response.status}`)}constreader=response.body.getReader()constdecoder=newTextDecoder('utf-8')letbuffer=''// 持续读流...})

此时在浏览器控制台可以看到:

  • [SSE] 响应状态: 200 OK
  • [SSE] 响应头 content-type: text/event-stream;charset=UTF-8

4.4 持续读取 SSE 流并解析事件

核心循环逻辑:

while(true){const{done,value}=awaitreader.read()if(done){console.log('[SSE] 流已结束')break}buffer+=decoder.decode(value,{stream:true})console.log('[SSE] 收到原始 chunk:',buffer)// 根据 \n\n 切分事件块constevents=buffer.split('\n\n')buffer=events.pop()||''for(constrawEventofevents){constlines=rawEvent.split('\n')leteventType='message'letdata=''letlastEventId=nullfor(constlineoflines){if(line.startsWith('event:')){eventType=line.slice(6).trim()}elseif(line.startsWith('data:')){data+=line.slice(5).trim()}elseif(line.startsWith('id:')){lastEventId=line.slice(3).trim()}}console.log('[SSE] 解析到事件:',{eventType,lastEventId,data,rawEvent})if(lastEventId){notificationStore.setLastEventId(lastEventId)}if(eventType==='heartbeat')continueif(!data)continuetry{constparsed=JSON.parse(data)notificationStore.handleIncomingNotification(parsed)}catch(error){console.error('解析通知 SSE 消息失败:',error,data)}}}

控制台能看到的典型日志(以 PRIVATE_MESSAGE 为例):

  • [SSE] 收到原始 chunk: id:21\nevent:PRIVATE_MESSAGE\ndata:{"id":21,"userId":3,"type":"PRIVATE_MESSAGE",...}\n\n
  • [SSE] 解析到事件: { eventType: 'PRIVATE_MESSAGE', lastEventId: '21', data: '{"id":21,"userId":3,"type":"PRIVATE_MESSAGE",...}', rawEvent: 'id:21\nevent:PRIVATE_MESSAGE\ndata:{"id":21,...}' }

五、通知 Store:如何消费和存储 SSE 消息

文件:src/stores/notification.js

5.1 状态结构

state:()=>({unreadCount:0,notifications:[],pagination:{total:0,size:10,current:1,pages:0},loadingList:false,loadingUnread:false,sseConnected:false,lastEventId:null})

5.2 处理 SSE 推送的新通知

handleIncomingNotification(notification){if(!notification||!notification.id){// 当前约定:必须有 id 才认为是有效通知return}constexists=this.notifications.some(item=>item.id===notification.id)if(!exists){this.notifications=[notification,...this.notifications]}// 未显式标记为 READ 的,都算未读if(!notification.status||notification.status==='UNREAD'){this.unreadCount+=1}}

效果:

  • 每一条从 SSE 收到的 JSON 通知(包含id字段)会被添加到notifications列表顶部。
  • 未读数量unreadCount会自增,用于 Header 红点等 UI 展示。

六、UI 展示:从 Store 到页面/控制台

6.1 Header 铃铛红点

文件:src/components/Header/index.vue

constnotificationStore=useNotificationStore()consthasUnread=computed(()=>notificationStore.unreadCount>0)

在模板中:

<button class="gridButton" @click="handleNotificationClick"> <span style="position: relative; display: inline-block;"> <i class="fas fa-bell text-lg"></i> <span v-if="hasUnread" class="unreadBadge"></span> </span> </button>

当 SSE 收到任意一条未读通知:

  • handleIncomingNotificationunreadCount++
  • hasUnread变为true
  • unreadBadge渲染,小红点点亮。

6.2 用户中心「消息通知」页面

文件:src/pages/UserCenter/pages/Notifications/index.vue

constnotificationStore=useNotificationStore()constnotifications=computed(()=>notificationStore.notifications)constloading=computed(()=>notificationStore.loadingList)

首次进入页面时,会从后端拉一次历史通知列表:

onMounted(()=>{notificationStore.fetchNotifications({page:1,pageSize:10,status:'ALL'})})

当 SSE 推送新通知时:

  • Store 的notifications首元素会是最新一条;
  • 此页面会自动响应式更新,用户可以点击查看并标记为已读。

最后、小结

  1. 建立连接/api/notify/streamNotificationService.connect→ 维护emitterPool+ 心跳 + 历史补发。
  2. 创建通知:各业务(如私信模块)调用NotificationService.createAndDispatch,将通知写入notification表并尝试通过 SSE 推送。
  3. 接收与补发:在线用户通过 SSE 立即收到通知;离线用户下次连接时通过Last-Event-ID或未读筛选补发(最多 100 条)。
  4. 已读/未读管理/api/notify/read/api/notify/unread-count负责已读标记与未读统计。
  5. 列表与分页/api/notify/recent提供最近未读调试接口;/api/notify/list提供按状态过滤的分页通知列表。
  6. 私信集成ChatServiceImpl在单聊离线场景下,为接收方生成PRIVATE_MESSAGE通知,实现“有人给你发私信”类型的系统提示。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2025/12/15 23:11:39

解锁Windows隐藏功能:RDP Wrapper多用户远程桌面终极指南

解锁Windows隐藏功能&#xff1a;RDP Wrapper多用户远程桌面终极指南 【免费下载链接】rdpwrap RDP Wrapper Library 项目地址: https://gitcode.com/gh_mirrors/rd/rdpwrap 还在为Windows家庭版无法实现多用户远程桌面而烦恼吗&#xff1f;&#x1f914; 本文将为你揭示…

作者头像 李华
网站建设 2025/12/15 23:11:24

如何轻松获取网盘真实下载链接?2025终极网盘直链解决方案

还在为网盘下载速度慢而烦恼吗&#xff1f;网盘直链下载助手项目为您提供完美解决方案&#xff01;这是一款基于JavaScript开发的免费工具&#xff0c;能够帮助您快速获取百度网盘、阿里云盘、天翼云盘等八大主流网盘的真实下载地址&#xff0c;让下载体验瞬间升级。 【免费下载…

作者头像 李华
网站建设 2025/12/15 23:11:06

MTKClient三分钟精通:联发科设备调试终极指南

MTKClient作为专为联发科芯片设计的开源调试利器&#xff0c;让设备维修和系统管理变得前所未有地简单。无论是救活变砖手机还是深度定制系统&#xff0c;这款工具都能为你提供专业级解决方案。 【免费下载链接】mtkclient MTK reverse engineering and flash tool 项目地址:…

作者头像 李华
网站建设 2025/12/15 23:11:04

百度网盘提取码智能解析器:告别繁琐搜索的数字助手

百度网盘提取码智能解析器&#xff1a;告别繁琐搜索的数字助手 【免费下载链接】baidupankey 项目地址: https://gitcode.com/gh_mirrors/ba/baidupankey 在这个信息爆炸的时代&#xff0c;我们每天都在与各种网盘资源打交道。特别是百度网盘&#xff0c;作为国内最主流…

作者头像 李华
网站建设 2025/12/15 23:11:02

Qwen3-14B模型token计费模式详解与优化建议

Qwen3-14B模型Token计费模式详解与优化建议 在AI能力逐步渗透企业核心业务的今天&#xff0c;如何在保障智能服务性能的同时控制推理成本&#xff0c;已成为技术团队不可回避的关键命题。尤其是随着大语言模型&#xff08;LLM&#xff09;进入私有化部署和常态化调用阶段&#…

作者头像 李华
网站建设 2025/12/15 23:10:21

DeepSeek爆火背后:AI竞争格局重塑与企业机遇,程序员必学收藏指南

DeepSeek爆火引发AI竞争格局变革&#xff0c;降低AI应用门槛&#xff0c;推动金融、医疗、教育等行业应用爆发。企业需通过GPU算力优化、场景化小模型训练、引入私域知识、智能体协同框架等技术叠加来抓住机遇。MoE架构可能成为主流&#xff0c;软硬协同能力与安全合规是企业面…

作者头像 李华