从一次线上故障说起
上周我们系统遇到了一次令人头疼的线上问题——内存泄漏。经过排查,发现问题出在一个看似简单的并发场景中。这让我意识到,虽然Go语言的goroutine和channel让并发编程变得简单,但其中仍有许多需要注意的细节。
goroutine泄漏:隐形的内存杀手
问题复现
先来看看我们当时出问题的代码:
func processTasks(tasks []Task) {
for _, task := range tasks {
go func(t Task) {
// 处理任务
if err := handleTask(t); err != nil {
log.Printf("task failed: %v", err)
}
}(task)
}
}
看起来没什么问题,对吧?但实际上,当tasks数量很大时,大量goroutine同时运行,如果handleTask函数中有阻塞操作,goroutine就会堆积,最终导致内存耗尽。
解决方案:使用worker池
经过重构,我们采用了worker池模式:
type WorkerPool struct {
workers int
taskChan chan Task
wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
workers: workers,
taskChan: make(chan Task, workers*2),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker()
}
}
func (wp *WorkerPool) worker() {
defer wp.wg.Done()
for task := range wp.taskChan {
handleTask(task)
}
}
func (wp *WorkerPool) Submit(task Task) {
wp.taskChan <- task
}
func (wp *WorkerPool) Stop() {
close(wp.taskChan)
wp.wg.Wait()
}
这种方式的好处:
- 控制并发数量,避免资源耗尽
- 优雅关闭,确保所有任务完成
- 复用goroutine,减少创建销毁开销
channel使用的常见陷阱
1. 未关闭的channel导致goroutine阻塞
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
// 忘记 close(ch)
}()
// 这个循环永远不会结束
for v := range ch {
fmt.Println(v)
}
}
教训:作为发送方,在数据发送完毕后应该关闭channel。
2. 向已关闭的channel发送数据
func main() {
ch := make(chan int)
close(ch)
// 这会导致panic!
ch <- 1
}
最佳实践:
- 由发送方关闭channel
- 使用sync.Once确保只关闭一次
- 在文档中明确channel的生命周期管理
context的正确使用方式
超时控制
在处理网络请求时,超时控制至关重要:
func processWithTimeout(ctx context.Context, task Task) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
done := make(chan error, 1)
go func() {
done <- processTask(task)
}()
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}
传递取消信号
func handler(ctx context.Context, req *Request) (*Response, error) {
// 创建子context,设置更短的超时
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
var wg sync.WaitGroup
results := make(chan Result, 2)
// 并发调用两个服务
services := []string{"serviceA", "serviceB"}
for _, service := range services {
wg.Add(1)
go func(svc string) {
defer wg.Done()
result, err := callService(ctx, svc, req)
if err != nil {
return
}
select {
case results <- result:
case <-ctx.Done():
}
}(service)
}
// 等待所有goroutine完成
go func() {
wg.Wait()
close(results)
}()
// 处理结果
return aggregateResults(ctx, results)
}
实用的并发模式
1. 扇出/扇入模式
// 扇出:一个输入channel,多个worker消费
func fanOut(input <-chan Task, workers int) []<-chan Result {
outputs := make([]<-chan Result, workers)
for i := 0; i < workers; i++ {
output := make(chan Result)
outputs[i] = output
go func() {
defer close(output)
for task := range input {
output <- processTask(task)
}
}()
}
return outputs
}
// 扇入:多个输入channel合并为一个
func fanIn(inputs ...<-chan Result) <-chan Result {
output := make(chan Result)
var wg sync.WaitGroup
for _, input := range inputs {
wg.Add(1)
go func(in <-chan Result) {
defer wg.Done()
for result := range in {
output <- result
}
}(input)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
2. 错误处理策略
在并发场景中,错误处理需要特别小心:
type TaskResult struct {
Task Task
Result Result
Error error
}
func processConcurrently(tasks []Task) ([]TaskResult, error) {
var wg sync.WaitGroup
results := make(chan TaskResult, len(tasks))
for _, task := range tasks {
wg.Add(1)
go func(t Task) {
defer wg.Done()
result, err := processTask(t)
results <- TaskResult{
Task: t,
Result: result,
Error: err,
}
}(task)
}
go func() {
wg.Wait()
close(results)
}()
var allResults []TaskResult
for result := range results {
allResults = append(allResults, result)
}
return allResults, nil
}
性能优化经验
1. 减少锁竞争
使用sync.RWMutex替代sync.Mutex:
type Cache struct {
mu sync.RWMutex
items map[string]interface{}
}
func (c *Cache) Get(key string) interface{} {
c.mu.RLock()
defer c.mu.RUnlock()
return c.items[key]
}
func (c *Cache) Set(key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.items[key] = value
}
2. 使用sync.Pool减少内存分配
var bufferPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}
func processRequest(data []byte) {
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
buf.Reset()
buf.Write(data)
// 使用buf处理数据
}
总结
在Go语言并发编程的实践中,我总结出几个关键点:
- 资源管理:始终要管理好goroutine的生命周期,避免泄漏
- 错误处理:在并发场景中,错误处理需要更加谨慎
- 超时控制:使用context为所有可能阻塞的操作设置超时
- 性能考量:合理使用锁、池化技术来提升性能
Go语言的并发模型很强大,但也需要开发者对其有深入的理解。希望这些经验教训能帮助你在Go并发编程的道路上少走弯路。
暂无评论