news 2026/2/27 11:58:15

10.2 会话管理机制竟然还能这样实现?

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
10.2 会话管理机制竟然还能这样实现?

// startExpirationChecker 启动过期检查协程
func (msm *MemorySessionManager) startExpirationChecker() {
ticker := time.NewTicker(msm.expirationCheckInterval)
defer ticker.Stop()

for { select { case <-msm.stopChan: return case <-ticker.C: expiredSessions, err := msm.GetExpiredSessions(context.Background(), time.Now()) if err != nil { log.Printf("Failed to get expired sessions: %v", err) continue } for _, session := range expiredSessions { if err := msm.DeleteSession(context.Background(), session.ID); err != nil { log.Printf("Failed to delete expired session %s: %v", session.ID, err) } } } }

}

// Close 关闭会话管理器
func (msm *MemorySessionManager) Close() error {
close(msm.stopChan)
return nil
}

## 4. 分布式会话管理实现 对于多节点部署场景,需要实现分布式会话管理以支持会话共享和故障恢复。 ### 4.1 Redis会话管理器 ```go // RedisSessionManager Redis会话管理器 type RedisSessionManager struct { client *redis.Client prefix string defaultSessionTTL time.Duration } // NewRedisSessionManager 创建Redis会话管理器 func NewRedisSessionManager(redisAddr, prefix string, defaultSessionTTL time.Duration) *RedisSessionManager { client := redis.NewClient(&redis.Options{ Addr: redisAddr, }) return &RedisSessionManager{ client: client, prefix: prefix, defaultSessionTTL: defaultSessionTTL, } } // CreateSession 创建会话 func (rsm *RedisSessionManager) CreateSession(ctx context.Context, session *Session) error { // 设置默认值 if session.ID == "" { session.ID = uuid.New().String() } if session.CreatedAt.IsZero() { session.CreatedAt = time.Now() } if session.LastActive.IsZero() { session.LastActive = time.Now() } if session.ExpiresAt.IsZero() { session.ExpiresAt = time.Now().Add(rsm.defaultSessionTTL) } if session.Status == "" { session.Status = SessionStatusActive } if session.Attributes == nil { session.Attributes = make(map[string]interface{}) } if session.Extensions == nil { session.Extensions = make(map[string]interface{}) } // 序列化会话数据 data, err := json.Marshal(session) if err != nil { return fmt.Errorf("failed to marshal session: %w", err) } // 存储会话 sessionKey := fmt.Sprintf("%s:session:%s", rsm.prefix, session.ID) err = rsm.client.Set(ctx, sessionKey, data, rsm.defaultSessionTTL).Err() if err != nil { return fmt.Errorf("failed to store session in redis: %w", err) } // 建立用户ID索引 if session.UserID != "" { userKey := fmt.Sprintf("%s:user:%s:sessions", rsm.prefix, session.UserID) err = rsm.client.SAdd(ctx, userKey, session.ID).Err() if err != nil { return fmt.Errorf("failed to add session to user index: %w", err) } // 设置过期时间 rsm.client.Expire(ctx, userKey, rsm.defaultSessionTTL) } // 建立连接ID索引 if session.ConnectionID != "" { connKey := fmt.Sprintf("%s:connection:%s:session", rsm.prefix, session.ConnectionID) err = rsm.client.Set(ctx, connKey, session.ID, rsm.defaultSessionTTL).Err() if err != nil { return fmt.Errorf("failed to store connection-session mapping: %w", err) } } return nil } // GetSession 获取会话 func (rsm *RedisSessionManager) GetSession(ctx context.Context, sessionID string) (*Session, error) { sessionKey := fmt.Sprintf("%s:session:%s", rsm.prefix, sessionID) data, err := rsm.client.Get(ctx, sessionKey).Bytes() if err != nil { if err == redis.Nil { return nil, fmt.Errorf("session not found: %s", sessionID) } return nil, fmt.Errorf("failed to get session from redis: %w", err) } var session Session if err := json.Unmarshal(data, &session); err != nil { return nil, fmt.Errorf("failed to unmarshal session: %w", err) } // 检查会话是否过期 if session.ExpiresAt.Before(time.Now()) { rsm.DeleteSession(ctx, sessionID) return nil, fmt.Errorf("session expired: %s", sessionID) } return &session, nil } // UpdateSession 更新会话 func (rsm *RedisSessionManager) UpdateSession(ctx context.Context, session *Session) error { // 检查会话是否存在 _, err := rsm.GetSession(ctx, session.ID) if err != nil { return err } // 序列化会话数据 data, err := json.Marshal(session) if err != nil { return fmt.Errorf("failed to marshal session: %w", err) } // 更新会话 sessionKey := fmt.Sprintf("%s:session:%s", rsm.prefix, session.ID) err = rsm.client.Set(ctx, sessionKey, data, rsm.defaultSessionTTL).Err() if err != nil { return fmt.Errorf("failed to update session in redis: %w", err) } return nil } // DeleteSession 删除会话 func (rsm *RedisSessionManager) DeleteSession(ctx context.Context, sessionID string) error { // 获取会话信息 session, err := rsm.GetSession(ctx, sessionID) if err != nil { return nil // 会话不存在,直接返回 } // 删除会话数据 sessionKey := fmt.Sprintf("%s:session:%s", rsm.prefix, sessionID) err = rsm.client.Del(ctx, sessionKey).Err() if err != nil { return fmt.Errorf("failed to delete session from redis: %w", err) } // 删除用户ID索引 if session.UserID != "" { userKey := fmt.Sprintf("%s:user:%s:sessions", rsm.prefix, session.UserID) rsm.client.SRem(ctx, userKey, sessionID) } // 删除连接ID索引 if session.ConnectionID != "" { connKey := fmt.Sprintf("%s:connection:%s:session", rsm.prefix, session.ConnectionID) rsm.client.Del(ctx, connKey) } return nil } // GetSessionsByUserID 根据用户ID获取会话 func (rsm *RedisSessionManager) GetSessionsByUserID(ctx context.Context, userID string) ([]*Session, error) { userKey := fmt.Sprintf("%s:user:%s:sessions", rsm.prefix, userID) sessionIDs, err := rsm.client.SMembers(ctx, userKey).Result() if err != nil { return nil, fmt.Errorf("failed to get user sessions from redis: %w", err) } var sessions []*Session for _, sessionID := range sessionIDs { session, err := rsm.GetSession(ctx, sessionID) if err != nil { log.Printf("Failed to get session %s: %v", sessionID, err) continue } sessions = append(sessions, session) } return sessions, nil } // GetSessionByConnectionID 根据连接ID获取会话 func (rsm *RedisSessionManager) GetSessionByConnectionID(ctx context.Context, connectionID string) (*Session, error) { connKey := fmt.Sprintf("%s:connection:%s:session", rsm.prefix, connectionID) sessionID, err := rsm.client.Get(ctx, connKey).Result() if err != nil { if err == redis.Nil { return nil, fmt.Errorf("session not found for connection: %s", connectionID) } return nil, fmt.Errorf("failed to get session ID from redis: %w", err) } return rsm.GetSession(ctx, sessionID) } // RefreshSession 刷新会话活跃时间 func (rsm *RedisSessionManager) RefreshSession(ctx context.Context, sessionID string) error { session, err := rsm.GetSession(ctx, sessionID) if err != nil { return err } session.LastActive = time.Now() if session.ExpiresAt.Before(time.Now().Add(rsm.defaultSessionTTL / 2)) { session.ExpiresAt = time.Now().Add(rsm.defaultSessionTTL) } return rsm.UpdateSession(ctx, session) } // GetExpiredSessions 获取过期会话 func (rsm *RedisSessionManager) GetExpiredSessions(ctx context.Context, before time.Time) ([]*Session, error) { // Redis会自动处理过期键的删除,这里可以通过扫描所有会话来查找过期的 pattern := fmt.Sprintf("%s:session:*", rsm.prefix) var expiredSessions []*Session iter := rsm.client.Scan(ctx, 0, pattern, 0).Iterator() for iter.Next(ctx) { key := iter.Val() data, err := rsm.client.Get(ctx, key).Bytes() if err != nil { continue } var session Session if err := json.Unmarshal(data, &session); err != nil { continue } if session.ExpiresAt.Before(before) { expiredSessions = append(expiredSessions, &session) } } if err := iter.Err(); err != nil { return nil, fmt.Errorf("failed to scan sessions: %w", err) } return expiredSessions, nil } // Close 关闭会话管理器 func (rsm *RedisSessionManager) Close() error { return rsm.client.Close() }

5. 会话迁移与恢复

在分布式环境中,会话迁移和恢复是保证高可用性的关键。

5.1 会话迁移管理器

// SessionMigrationManager 会话迁移管理器typeSessionMigrationManagerstruct{localManager SessionManager remoteManager SessionManager migrationChanchan*SessionMigrationTask stopChanchanstruct{}}// SessionMigrationTask 会话迁移任务typeSessionMigrationTaskstruct{Session*Session TargetNodestringCallbackchanerror}// NewSessionMigrationManager 创建会话迁移管理器funcNewSessionMigrationManager(localManager,remoteManager SessionManager)*SessionMigrationManager{smm:=&SessionMigrationManager{localManager:localManager,remoteManager:remoteManager,migrationChan:make(chan*SessionMigrationTask,100)
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/23 1:29:46

Spring Boot 3 步完成日志脱敏,简单实用~

在我们写代码的时候&#xff0c;会书写许多日志代码&#xff0c;但是有些敏感数据是需要进行安全脱敏处理的。对于日志脱敏的方式有很多&#xff0c;常见的有&#xff1a;①使用conversionRule标签&#xff0c;继承MessageConverter ②书写一个脱敏工具类&#xff0c;在打印日志…

作者头像 李华
网站建设 2026/2/24 19:16:29

12.2 太牛了!批量传输技术竟然还能这样用?

太牛了!批量传输技术竟然还能这样用? 在WebSocket网关中,批量传输技术是提升系统吞吐量和降低网络开销的重要手段。通过将多个小消息合并为一个大消息进行传输,可以显著减少网络交互次数,提高传输效率。本章将深入探讨批量传输技术的实现原理和应用场景。 1. 批量传输概…

作者头像 李华
网站建设 2026/2/24 12:59:01

Spring AI Embedding 实战:从语义搜索到商品推荐系统

Spring AI Embedding 实战:从语义搜索到商品推荐系统 关键词:Spring AI / Embedding / 向量数据库 / PGVector / 推荐系统 / RAG 一、什么是 Spring AI Embedding Spring AI 中的 Embedding 技术核心在于将文本、图像等非结构化数据转化为高维向量(即 Embedding)。这些向量…

作者头像 李华
网站建设 2026/2/27 12:54:15

16.1 批量任务调度和心跳优化竟然还能这样做?

16.1 太震撼了!批量任务调度和心跳优化竟然还能这样做? 在分布式任务调度系统中,性能优化是确保系统能够处理大规模任务的关键。今天我们将深入探讨批量任务调度和心跳优化技术,这些技术能够显著提升系统的吞吐量和响应速度。 批量任务调度机制 批量任务调度是提升系统性…

作者头像 李华
网站建设 2026/2/23 20:13:28

java juc 01 进程与线程

进程和线程的概念 并行和并发的概念 线程基本应用ps &#xff1a;随便写写&#xff0c;今天就是开个新章对比维度进程&#xff08;Process&#xff09;线程&#xff08;Thread&#xff09;基本概念程序运行时的一个实例&#xff0c;用来加载指令、管理内存、管理 IO进程内部的一…

作者头像 李华
网站建设 2026/2/26 4:28:07

Ella陈嘉桦「艾拉主意」巡演南宁站两晚连唱 现场嗦粉打造出圈名场面

Ella 陈嘉桦「It’s Me 艾拉主意」巡演继长沙、杭州、广州、上海站后&#xff0c;持续保持场场秒罄、口碑爆棚的火爆势头。2026年2月7日至8日&#xff0c;这场音乐盛宴落地绿城南宁&#xff0c;于广西体育中心体育馆一连两晚盛大开唱。其中2月8日场是整轮巡演中的第十场演出&am…

作者头像 李华