// startExpirationChecker 启动过期检查协程
func (msm *MemorySessionManager) startExpirationChecker() {
ticker := time.NewTicker(msm.expirationCheckInterval)
defer ticker.Stop()
for { select { case <-msm.stopChan: return case <-ticker.C: expiredSessions, err := msm.GetExpiredSessions(context.Background(), time.Now()) if err != nil { log.Printf("Failed to get expired sessions: %v", err) continue } for _, session := range expiredSessions { if err := msm.DeleteSession(context.Background(), session.ID); err != nil { log.Printf("Failed to delete expired session %s: %v", session.ID, err) } } } }}
// Close 关闭会话管理器
func (msm *MemorySessionManager) Close() error {
close(msm.stopChan)
return nil
}
## 4. 分布式会话管理实现 对于多节点部署场景,需要实现分布式会话管理以支持会话共享和故障恢复。 ### 4.1 Redis会话管理器 ```go // RedisSessionManager Redis会话管理器 type RedisSessionManager struct { client *redis.Client prefix string defaultSessionTTL time.Duration } // NewRedisSessionManager 创建Redis会话管理器 func NewRedisSessionManager(redisAddr, prefix string, defaultSessionTTL time.Duration) *RedisSessionManager { client := redis.NewClient(&redis.Options{ Addr: redisAddr, }) return &RedisSessionManager{ client: client, prefix: prefix, defaultSessionTTL: defaultSessionTTL, } } // CreateSession 创建会话 func (rsm *RedisSessionManager) CreateSession(ctx context.Context, session *Session) error { // 设置默认值 if session.ID == "" { session.ID = uuid.New().String() } if session.CreatedAt.IsZero() { session.CreatedAt = time.Now() } if session.LastActive.IsZero() { session.LastActive = time.Now() } if session.ExpiresAt.IsZero() { session.ExpiresAt = time.Now().Add(rsm.defaultSessionTTL) } if session.Status == "" { session.Status = SessionStatusActive } if session.Attributes == nil { session.Attributes = make(map[string]interface{}) } if session.Extensions == nil { session.Extensions = make(map[string]interface{}) } // 序列化会话数据 data, err := json.Marshal(session) if err != nil { return fmt.Errorf("failed to marshal session: %w", err) } // 存储会话 sessionKey := fmt.Sprintf("%s:session:%s", rsm.prefix, session.ID) err = rsm.client.Set(ctx, sessionKey, data, rsm.defaultSessionTTL).Err() if err != nil { return fmt.Errorf("failed to store session in redis: %w", err) } // 建立用户ID索引 if session.UserID != "" { userKey := fmt.Sprintf("%s:user:%s:sessions", rsm.prefix, session.UserID) err = rsm.client.SAdd(ctx, userKey, session.ID).Err() if err != nil { return fmt.Errorf("failed to add session to user index: %w", err) } // 设置过期时间 rsm.client.Expire(ctx, userKey, rsm.defaultSessionTTL) } // 建立连接ID索引 if session.ConnectionID != "" { connKey := fmt.Sprintf("%s:connection:%s:session", rsm.prefix, session.ConnectionID) err = rsm.client.Set(ctx, connKey, session.ID, rsm.defaultSessionTTL).Err() if err != nil { return fmt.Errorf("failed to store connection-session mapping: %w", err) } } return nil } // GetSession 获取会话 func (rsm *RedisSessionManager) GetSession(ctx context.Context, sessionID string) (*Session, error) { sessionKey := fmt.Sprintf("%s:session:%s", rsm.prefix, sessionID) data, err := rsm.client.Get(ctx, sessionKey).Bytes() if err != nil { if err == redis.Nil { return nil, fmt.Errorf("session not found: %s", sessionID) } return nil, fmt.Errorf("failed to get session from redis: %w", err) } var session Session if err := json.Unmarshal(data, &session); err != nil { return nil, fmt.Errorf("failed to unmarshal session: %w", err) } // 检查会话是否过期 if session.ExpiresAt.Before(time.Now()) { rsm.DeleteSession(ctx, sessionID) return nil, fmt.Errorf("session expired: %s", sessionID) } return &session, nil } // UpdateSession 更新会话 func (rsm *RedisSessionManager) UpdateSession(ctx context.Context, session *Session) error { // 检查会话是否存在 _, err := rsm.GetSession(ctx, session.ID) if err != nil { return err } // 序列化会话数据 data, err := json.Marshal(session) if err != nil { return fmt.Errorf("failed to marshal session: %w", err) } // 更新会话 sessionKey := fmt.Sprintf("%s:session:%s", rsm.prefix, session.ID) err = rsm.client.Set(ctx, sessionKey, data, rsm.defaultSessionTTL).Err() if err != nil { return fmt.Errorf("failed to update session in redis: %w", err) } return nil } // DeleteSession 删除会话 func (rsm *RedisSessionManager) DeleteSession(ctx context.Context, sessionID string) error { // 获取会话信息 session, err := rsm.GetSession(ctx, sessionID) if err != nil { return nil // 会话不存在,直接返回 } // 删除会话数据 sessionKey := fmt.Sprintf("%s:session:%s", rsm.prefix, sessionID) err = rsm.client.Del(ctx, sessionKey).Err() if err != nil { return fmt.Errorf("failed to delete session from redis: %w", err) } // 删除用户ID索引 if session.UserID != "" { userKey := fmt.Sprintf("%s:user:%s:sessions", rsm.prefix, session.UserID) rsm.client.SRem(ctx, userKey, sessionID) } // 删除连接ID索引 if session.ConnectionID != "" { connKey := fmt.Sprintf("%s:connection:%s:session", rsm.prefix, session.ConnectionID) rsm.client.Del(ctx, connKey) } return nil } // GetSessionsByUserID 根据用户ID获取会话 func (rsm *RedisSessionManager) GetSessionsByUserID(ctx context.Context, userID string) ([]*Session, error) { userKey := fmt.Sprintf("%s:user:%s:sessions", rsm.prefix, userID) sessionIDs, err := rsm.client.SMembers(ctx, userKey).Result() if err != nil { return nil, fmt.Errorf("failed to get user sessions from redis: %w", err) } var sessions []*Session for _, sessionID := range sessionIDs { session, err := rsm.GetSession(ctx, sessionID) if err != nil { log.Printf("Failed to get session %s: %v", sessionID, err) continue } sessions = append(sessions, session) } return sessions, nil } // GetSessionByConnectionID 根据连接ID获取会话 func (rsm *RedisSessionManager) GetSessionByConnectionID(ctx context.Context, connectionID string) (*Session, error) { connKey := fmt.Sprintf("%s:connection:%s:session", rsm.prefix, connectionID) sessionID, err := rsm.client.Get(ctx, connKey).Result() if err != nil { if err == redis.Nil { return nil, fmt.Errorf("session not found for connection: %s", connectionID) } return nil, fmt.Errorf("failed to get session ID from redis: %w", err) } return rsm.GetSession(ctx, sessionID) } // RefreshSession 刷新会话活跃时间 func (rsm *RedisSessionManager) RefreshSession(ctx context.Context, sessionID string) error { session, err := rsm.GetSession(ctx, sessionID) if err != nil { return err } session.LastActive = time.Now() if session.ExpiresAt.Before(time.Now().Add(rsm.defaultSessionTTL / 2)) { session.ExpiresAt = time.Now().Add(rsm.defaultSessionTTL) } return rsm.UpdateSession(ctx, session) } // GetExpiredSessions 获取过期会话 func (rsm *RedisSessionManager) GetExpiredSessions(ctx context.Context, before time.Time) ([]*Session, error) { // Redis会自动处理过期键的删除,这里可以通过扫描所有会话来查找过期的 pattern := fmt.Sprintf("%s:session:*", rsm.prefix) var expiredSessions []*Session iter := rsm.client.Scan(ctx, 0, pattern, 0).Iterator() for iter.Next(ctx) { key := iter.Val() data, err := rsm.client.Get(ctx, key).Bytes() if err != nil { continue } var session Session if err := json.Unmarshal(data, &session); err != nil { continue } if session.ExpiresAt.Before(before) { expiredSessions = append(expiredSessions, &session) } } if err := iter.Err(); err != nil { return nil, fmt.Errorf("failed to scan sessions: %w", err) } return expiredSessions, nil } // Close 关闭会话管理器 func (rsm *RedisSessionManager) Close() error { return rsm.client.Close() }5. 会话迁移与恢复
在分布式环境中,会话迁移和恢复是保证高可用性的关键。
5.1 会话迁移管理器
// SessionMigrationManager 会话迁移管理器typeSessionMigrationManagerstruct{localManager SessionManager remoteManager SessionManager migrationChanchan*SessionMigrationTask stopChanchanstruct{}}// SessionMigrationTask 会话迁移任务typeSessionMigrationTaskstruct{Session*Session TargetNodestringCallbackchanerror}// NewSessionMigrationManager 创建会话迁移管理器funcNewSessionMigrationManager(localManager,remoteManager SessionManager)*SessionMigrationManager{smm:=&SessionMigrationManager{localManager:localManager,remoteManager:remoteManager,migrationChan:make(chan*SessionMigrationTask,100)