从一次线上故障说起

上周我们系统遇到了一次令人头疼的线上问题——内存泄漏。经过排查,发现问题出在一个看似简单的并发场景中。这让我意识到,虽然Go语言的goroutine和channel让并发编程变得简单,但其中仍有许多需要注意的细节。

goroutine泄漏:隐形的内存杀手

问题复现

先来看看我们当时出问题的代码:

func processTasks(tasks []Task) {
    for _, task := range tasks {
        go func(t Task) {
            // 处理任务
            if err := handleTask(t); err != nil {
                log.Printf("task failed: %v", err)
            }
        }(task)
    }
}

看起来没什么问题,对吧?但实际上,当tasks数量很大时,大量goroutine同时运行,如果handleTask函数中有阻塞操作,goroutine就会堆积,最终导致内存耗尽。

解决方案:使用worker池

经过重构,我们采用了worker池模式:

type WorkerPool struct {
    workers  int
    taskChan chan Task
    wg       sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    return &WorkerPool{
        workers:  workers,
        taskChan: make(chan Task, workers*2),
    }
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workers; i++ {
        wp.wg.Add(1)
        go wp.worker()
    }
}

func (wp *WorkerPool) worker() {
    defer wp.wg.Done()
    for task := range wp.taskChan {
        handleTask(task)
    }
}

func (wp *WorkerPool) Submit(task Task) {
    wp.taskChan <- task
}

func (wp *WorkerPool) Stop() {
    close(wp.taskChan)
    wp.wg.Wait()
}

这种方式的好处:

  • 控制并发数量,避免资源耗尽
  • 优雅关闭,确保所有任务完成
  • 复用goroutine,减少创建销毁开销

channel使用的常见陷阱

1. 未关闭的channel导致goroutine阻塞

func main() {
    ch := make(chan int)
    
    go func() {
        for i := 0; i < 10; i++ {
            ch <- i
        }
        // 忘记 close(ch)
    }()
    
    // 这个循环永远不会结束
    for v := range ch {
        fmt.Println(v)
    }
}

教训:作为发送方,在数据发送完毕后应该关闭channel。

2. 向已关闭的channel发送数据

func main() {
    ch := make(chan int)
    close(ch)
    
    // 这会导致panic!
    ch <- 1
}

最佳实践

  • 由发送方关闭channel
  • 使用sync.Once确保只关闭一次
  • 在文档中明确channel的生命周期管理

context的正确使用方式

超时控制

在处理网络请求时,超时控制至关重要:

func processWithTimeout(ctx context.Context, task Task) error {
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()
    
    done := make(chan error, 1)
    
    go func() {
        done <- processTask(task)
    }()
    
    select {
    case err := <-done:
        return err
    case <-ctx.Done():
        return ctx.Err()
    }
}

传递取消信号

func handler(ctx context.Context, req *Request) (*Response, error) {
    // 创建子context,设置更短的超时
    ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
    defer cancel()
    
    var wg sync.WaitGroup
    results := make(chan Result, 2)
    
    // 并发调用两个服务
    services := []string{"serviceA", "serviceB"}
    for _, service := range services {
        wg.Add(1)
        go func(svc string) {
            defer wg.Done()
            
            result, err := callService(ctx, svc, req)
            if err != nil {
                return
            }
            
            select {
            case results <- result:
            case <-ctx.Done():
            }
        }(service)
    }
    
    // 等待所有goroutine完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 处理结果
    return aggregateResults(ctx, results)
}

实用的并发模式

1. 扇出/扇入模式

// 扇出:一个输入channel,多个worker消费
func fanOut(input <-chan Task, workers int) []<-chan Result {
    outputs := make([]<-chan Result, workers)
    for i := 0; i < workers; i++ {
        output := make(chan Result)
        outputs[i] = output
        
        go func() {
            defer close(output)
            for task := range input {
                output <- processTask(task)
            }
        }()
    }
    return outputs
}

// 扇入:多个输入channel合并为一个
func fanIn(inputs ...<-chan Result) <-chan Result {
    output := make(chan Result)
    var wg sync.WaitGroup
    
    for _, input := range inputs {
        wg.Add(1)
        go func(in <-chan Result) {
            defer wg.Done()
            for result := range in {
                output <- result
            }
        }(input)
    }
    
    go func() {
        wg.Wait()
        close(output)
    }()
    
    return output
}

2. 错误处理策略

在并发场景中,错误处理需要特别小心:

type TaskResult struct {
    Task Task
    Result Result
    Error error
}

func processConcurrently(tasks []Task) ([]TaskResult, error) {
    var wg sync.WaitGroup
    results := make(chan TaskResult, len(tasks))
    
    for _, task := range tasks {
        wg.Add(1)
        go func(t Task) {
            defer wg.Done()
            
            result, err := processTask(t)
            results <- TaskResult{
                Task:   t,
                Result: result,
                Error:  err,
            }
        }(task)
    }
    
    go func() {
        wg.Wait()
        close(results)
    }()
    
    var allResults []TaskResult
    for result := range results {
        allResults = append(allResults, result)
    }
    
    return allResults, nil
}

性能优化经验

1. 减少锁竞争

使用sync.RWMutex替代sync.Mutex:

type Cache struct {
    mu    sync.RWMutex
    items map[string]interface{}
}

func (c *Cache) Get(key string) interface{} {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.items[key]
}

func (c *Cache) Set(key string, value interface{}) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.items[key] = value
}

2. 使用sync.Pool减少内存分配

var bufferPool = sync.Pool{
    New: func() interface{} {
        return bytes.NewBuffer(make([]byte, 0, 1024))
    },
}

func processRequest(data []byte) {
    buf := bufferPool.Get().(*bytes.Buffer)
    defer bufferPool.Put(buf)
    
    buf.Reset()
    buf.Write(data)
    
    // 使用buf处理数据
}

总结

在Go语言并发编程的实践中,我总结出几个关键点:

  1. 资源管理:始终要管理好goroutine的生命周期,避免泄漏
  2. 错误处理:在并发场景中,错误处理需要更加谨慎
  3. 超时控制:使用context为所有可能阻塞的操作设置超时
  4. 性能考量:合理使用锁、池化技术来提升性能

Go语言的并发模型很强大,但也需要开发者对其有深入的理解。希望这些经验教训能帮助你在Go并发编程的道路上少走弯路。