2.3 资源控制与容量规划:避免系统被突发流量打垮
引言
在高并发的分布式系统中,资源控制和容量规划是保障系统稳定性的关键环节。特别是在面对突发流量时,如果没有合理的资源控制机制和充足的容量规划,系统很容易因为资源耗尽而崩溃,导致服务不可用。
本节我们将深入探讨通知平台的资源控制与容量规划策略,包括计算资源、存储资源、网络资源以及第三方依赖资源的管理,确保系统能够在各种流量场景下稳定运行。
资源控制的核心挑战
在设计资源控制系统时,我们面临以下几个核心挑战:
- 资源隔离:如何在多业务方共享的环境中实现资源的有效隔离
- 动态调整:如何根据实时负载动态调整资源分配
- 过载保护:如何在系统过载时进行有效的保护和降级
- 性能监控:如何实时监控资源使用情况并及时预警
- 容量评估:如何准确评估系统容量并进行合理规划
计算资源控制
计算资源是系统最核心的资源之一,主要包括CPU、内存和协程等。我们需要实现精细化的控制机制。
协程池管理
``go
// GoroutinePool 协程池
type GoroutinePool struct {
// 最大协程数
maxWorkers int
// 当前协程数 currentWorkers int // 工作队列 workQueue chan WorkItem // 工作者列表 workers []*Worker // 资源控制器 resourceController *ResourceController // 统计信息 stats *PoolStats // 互斥锁 mutex sync.RWMutex}
// WorkItem 工作项
type WorkItem struct {
Job func() error
Callback func(error)
Priority int
Created time.Time
}
// Worker 工作者
type Worker struct {
ID int
Pool *GoroutinePool
Quit chan bool
IsActive bool
}
// PoolStats 协程池统计信息
type PoolStats struct {
TotalJobs int64
CompletedJobs int64
FailedJobs int64
QueueLength int64
ActiveWorkers int64
}
// NewGoroutinePool 创建协程池
func NewGoroutinePool(maxWorkers int, queueSize int) *GoroutinePool {
pool := &GoroutinePool{
maxWorkers: maxWorkers,
workQueue: make(chan WorkItem, queueSize),
workers: make([]*Worker, 0, maxWorkers),
stats: &PoolStats{},
}
// 初始化工作者 for i := 0; i < maxWorkers; i++ { worker := &Worker{ ID: i, Pool: pool, Quit: make(chan bool), } pool.workers = append(pool.workers, worker) worker.Start() } return pool}
// Submit 提交工作项
func (gp *GoroutinePool) Submit(work WorkItem) error {
// 检查队列是否已满
if len(gp.workQueue) >= cap(gp.workQueue) {
return errors.New(“work queue is full”)
}
// 提交工作项 select { case gp.workQueue <- work: atomic.AddInt64(&gp.stats.TotalJobs, 1) atomic.AddInt64(&gp.stats.QueueLength, 1) return nil default: return errors.New("failed to submit work item") }}
// Start 启动工作者
func (w *Worker) Start() {
go func() {
w.IsActive = true
defer func() {
w.IsActive = false
}()
for { select { case work := <-w.Pool.workQueue: atomic.AddInt64(&w.Pool.stats.QueueLength, -1) atomic.AddInt64(&w.Pool.stats.ActiveWorkers, 1) // 执行工作 err := work.Job() atomic.AddInt64(&w.Pool.stats.ActiveWorkers, -1) if err != nil { atomic.AddInt64(&w.Pool.stats.FailedJobs, 1) } else { atomic.AddInt64(&w.Pool.stats.CompletedJobs, 1) } // 执行回调 if work.Callback != nil { work.Callback(err) } case <-w.Quit: return } } }()}
// Resize 调整协程池大小
func (gp *GoroutinePool) Resize(newSize int) error {
gp.mutex.Lock()
defer gp.mutex.Unlock()
if newSize <= 0 || newSize > gp.maxWorkers { return fmt.Errorf("invalid pool size: %d", newSize) } currentSize := len(gp.workers) if newSize > currentSize { // 增加工作者 for i := currentSize; i < newSize; i++ { worker := &Worker{ ID: i, Pool: gp, Quit: make(chan bool), } gp.workers = append(gp.workers, worker) worker.Start() } } else if newSize < currentSize { // 减少工作