news 2026/3/20 18:54:45

使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

在开发实时聊天、AI 助手或者协作应用时,我们经常需要SSE(Server-Sent Events)实现服务端向前端持续推送数据。本文将分享一个Go SSE 打字机式输出实现,并附上上游模拟示例、curl 测试和前端实时渲染示例。


功能特点

  1. 使用SSE推送消息流,前端无需轮询。
  2. 对消息进行逐字符打字机式输出,模拟 AI 打字效果。
  3. 支持上游 SSE 模拟,方便本地测试。
  4. 可轻松扩展为真实 AI 聊天接口的代理服务。

技术栈

  • Go 1.21+
  • CloudWeGo Hertz 作为 HTTP 框架
  • resty 用于上游 SSE 请求
  • SSE 流式推送(使用hertz-contrib/sse

完整示例代码

packagemainimport("bufio""context""encoding/json""fmt""math/rand""strings""time""github.com/cloudwego/hertz/pkg/app""github.com/cloudwego/hertz/pkg/app/server""github.com/cloudwego/hertz/pkg/common/hlog""github.com/hertz-contrib/sse""resty.dev/v3")funcmain(){h:=server.Default(server.WithHostPorts(":8380"))h.POST("/v3/chat",CozeParseThenTypeWriter)h.GET("/upstream",MockUpstreamSSE)hlog.Info("🚀 Coze SSE parse + typewriter proxy running at :8380")h.Spin()}// 核心逻辑:解析 conversation.message.delta → 打字机式输出funcCozeParseThenTypeWriter(ctx context.Context,c*app.RequestContext){// SSE Headerc.SetStatusCode(200)h:=c.Response.Header h.Set("Content-Type","text/event-stream; charset=utf-8")h.Set("Cache-Control","no-cache, no-store, must-revalidate")h.Set("Pragma","no-cache")h.Set("Connection","keep-alive")h.Set("X-Accel-Buffering","no")stream:=sse.NewStream(c)// Resty 上游请求client:=resty.New().SetTimeout(0)resp,err:=client.R().SetContext(ctx).SetDoNotParseResponse(true).SetHeader("Accept","text/event-stream").Get("http://localhost:8380/upstream")iferr!=nil||resp.RawResponse==nil||resp.RawResponse.Body==nil{hlog.Error("upstream connect failed")return}deferresp.RawResponse.Body.Close()// 打字机准备r:=rand.New(rand.NewSource(time.Now().UnixNano()))scanner:=bufio.NewScanner(resp.RawResponse.Body)varcurrentEventstring// SSE 解析循环forscanner.Scan(){select{case<-ctx.Done():hlog.Warn("client disconnected")returndefault:}line:=scanner.Text()ifline==""{currentEvent=""continue}ifstrings.HasPrefix(line,"event:"){currentEvent=strings.TrimSpace(strings.TrimPrefix(line,"event:"))continue}ifstrings.HasPrefix(line,"data:"){payload:=strings.TrimSpace(strings.TrimPrefix(line,"data:"))ifpayload=="[DONE]"{stream.Publish(&sse.Event{Data:[]byte("[DONE]")})return}ifcurrentEvent=="conversation.message.delta"{vardstruct{Contentstring`json:"content"`}iferr:=json.Unmarshal([]byte(payload),&d);err==nil{typeWriter(stream,r,d.Content)}}ifcurrentEvent=="conversation.message.completed"{stream.Publish(&sse.Event{Event:"conversation.message.completed",Data:[]byte(`{"status":"completed"}`),})}}}}// 打字机逐字符输出functypeWriter(stream*sse.Stream,r*rand.Rand,textstring){fori,ch:=rangetext{time.Sleep(getSleepDuration(r,ch))data:=map[string]any{"id":fmt.Sprintf("char_%d_%d",i,time.Now().UnixNano()%100000),"role":"assistant","type":"answer","content":string(ch),"created_at":time.Now().UnixMilli(),}b,_:=json.Marshal(data)_=stream.Publish(&sse.Event{Event:"conversation.message.delta",ID:fmt.Sprintf("char_%d",i),Data:b,})}}// 延迟策略funcgetSleepDuration(r*rand.Rand,chrune)time.Duration{switch{casech=='\n'||ch=='。'||ch=='!'||ch=='?':returntime.Duration(300+r.Intn(200))*time.Millisecondcasech=='、'||ch==' '||ch=='-'||ch==':'||ch==',':returntime.Duration(150+r.Intn(100))*time.Millisecondcasech=='#'||ch=='*'||ch=='>':returntime.Duration(200+r.Intn(150))*time.Milliseconddefault:returntime.Duration(60+r.Intn(60))*time.Millisecond}}// 上游 Mock(模拟 Coze SSE)funcMockUpstreamSSE(ctx context.Context,c*app.RequestContext){c.SetStatusCode(200)c.Header("Content-Type","text/event-stream")c.Header("Cache-Control","no-cache")c.Header("Connection","keep-alive")c.Header("X-Accel-Buffering","no")c.Flush()send:=func(event,datastring){c.Write([]byte("event: "+event+"\n"+"data: "+data+"\n\n",))c.Flush()}deltas:=[]string{"你","好",",","这","是"," Coze"," SSE"}for_,ch:=rangedeltas{send("conversation.message.delta",`{"content":"`+ch+`"}`)time.Sleep(80*time.Millisecond)}send("conversation.message.completed",`{}`)c.Write([]byte("data: [DONE]\n\n"))c.Flush()}

使用方法

  1. 启动服务
go run main.go
  1. 访问接口
  • 上游 SSE 测试(浏览器可直接访问):
    http://localhost:8380/upstream

  • 打字机代理接口(POST):
    http://localhost:8380/v3/chat


使用curl测试 SSE

# 测试上游 SSEcurl-N http://localhost:8380/upstream# 测试打字机代理 SSEcurl-N -X POST http://localhost:8380/v3/chat

参数说明:

  • -N/--no-buffer:禁用输出缓存,实时显示流式数据。
  • -X POST:因为代理接口是 POST。

运行后,你会在终端看到类似打字机逐字符输出:

event: conversation.message.delta data: {"id":"char_0_12345","role":"assistant","type":"answer","content":"你","created_at":1700000000000} event: conversation.message.delta data: {"id":"char_1_67890","role":"assistant","type":"answer","content":"好","created_at":1700000000050} ... event: conversation.message.completed data: {"status":"completed"} data: [DONE]

前端实时渲染打字机效果示例

在前端,可以使用EventSource监听 SSE 并动态显示内容:

<divid="chat"></div><script>constchatDiv=document.getElementById("chat");constevtSource=newEventSource("http://localhost:8380/v3/chat");evtSource.addEventListener("conversation.message.delta",e=>{constdata=JSON.parse(e.data);chatDiv.innerHTML+=data.content;});evtSource.addEventListener("conversation.message.completed",e=>{console.log("消息完成");});evtSource.onopen=()=>console.log("连接已打开");evtSource.onerror=()=>console.log("连接错误或关闭");</script>

效果:消息逐字符显示,模拟 AI 打字机输出。


核心解析

  1. SSE Header 设置
h.Set("Content-Type","text/event-stream; charset=utf-8")h.Set("Cache-Control","no-cache, no-store, must-revalidate")h.Set("Connection","keep-alive")h.Set("X-Accel-Buffering","no")

保证浏览器或代理实时接收流式数据。

  1. 打字机效果

根据字符类型不同设置不同延迟:

casech=='\n'||ch=='。'||ch=='!'||ch=='?':returntime.Duration(300+r.Intn(200))*time.Millisecond
  1. 上游 SSE 模拟

方便本地测试,无需真实 AI 接口即可验证前端打字机效果。


总结

通过本文示例,你可以快速实现:

  • Go SSE 服务端代理
  • AI 聊天消息打字机式输出
  • 上游 SSE 模拟
  • curl 测试和前端实时渲染
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/18 9:55:19

LeetDown:macOS平台A6/A7设备降级完整解决方案

LeetDown&#xff1a;macOS平台A6/A7设备降级完整解决方案 【免费下载链接】LeetDown a GUI macOS Downgrade Tool for A6 and A7 iDevices 项目地址: https://gitcode.com/gh_mirrors/le/LeetDown 对于拥有iPhone 5s、iPad 4等A6/A7芯片设备的用户来说&#xff0c;系统…

作者头像 李华
网站建设 2026/3/19 18:33:00

【AI+教育实战指南】:7种高效果实场景下的智能推荐策略

第一章&#xff1a;教育 AI Agent 的学习推荐在现代教育技术中&#xff0c;AI Agent 正逐步成为个性化学习的核心驱动力。通过分析学生的学习行为、知识掌握程度和兴趣偏好&#xff0c;AI Agent 能够动态生成定制化的学习路径&#xff0c;提升学习效率与参与度。个性化推荐机制…

作者头像 李华
网站建设 2026/3/15 23:02:16

2000-2024各省铁路里程、公路里程、交通网密度数据

铁路里程是指铁路线从起点到终点的公里数&#xff0c;通常用于表示铁路线路的长度。 公路里程是指一定时期内实际达到《公路工程技术标准》规定的等级公路&#xff0c;并经公路主管部门正式验收交付使用的公路里程数。 交通网密度是指某一区域内交通线路的密集程度&#xff0…

作者头像 李华
网站建设 2026/3/15 23:02:17

【MCP Azure量子扩展配置终极指南】:从入门到精通的一站式解决方案

第一章&#xff1a;MCP Azure 量子扩展配置概述Azure 量子扩展是 Microsoft Quantum Development Kit 的核心组件之一&#xff0c;旨在为开发者提供在 Azure 平台上构建、测试和运行量子算法的能力。该扩展支持多种后端量子处理器和模拟器&#xff0c;使用户能够灵活选择执行环…

作者头像 李华
网站建设 2026/3/18 14:17:11

Kotaemon支持Grafana告警吗?异常情况及时通知

Kotaemon 支持 Grafana 告警吗&#xff1f;异常情况及时通知 在构建现代智能对话系统时&#xff0c;稳定性与可观测性早已不再是“锦上添花”的附加功能&#xff0c;而是决定服务能否真正落地生产环境的核心要素。想象这样一个场景&#xff1a;你的企业客服机器人正在全天候响…

作者头像 李华