从一个错误案例开始

起因是在工作中看到一段代码,感觉是程序遇到异常而卡住的原因,起了两个协程,其中一个等待另一个完成后开始,二者通过channel进行消息传递。后一个协程在完成时使用了goto语句。程序最后为select{}永久卡死,等待前面启动的两个协程去跑完就完事了。

原来的代码(铁有问题,不能用在生产上):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
	ticker := time.NewTicker(5 * 60 * time.Second)
defer ticker.Stop()
loop:
for {
select {
case <-h.ctx.Done():
mlog.Info("asset infer handler stop")
return
case <-ticker.C:
if h.NeedRun() {
go h.AssetInfer()
go h.GenMiddleTable()
break loop
}
}
}
select {}
}

用ai帮忙修改了相应的程序,但是其写的代码在go包装协程中启动了工作协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
	ticker := time.NewTicker(5 * 60 * time.Second)
defer ticker.Stop()
var wg sync.WaitGroup
for {
select {
case <-h.ctx.Done():
mlog.Info("asset infer handler stop")
return
case <-ticker.C:
if h.NeedRun() {
wg.Add(2)
go func(){
defer wg.Done()
go h.AssetInfer()
}()
go func(){
defer wg.Done()
go h.GenMiddleTable()
}()
wg.Wait()
}
}
}
}

这个程序还是会有问题,用 WaitGroup 启了两个 wrapper 协程,wrapper 里又 go 出实际工作协程,导致 WaitGroup 很快结束,真正的工作协程无人管理,后续 tick 还会继续再起新协程,既难以追踪生命周期也可能造成 goroutine 泄漏。

1.不允许在go 协程里面嵌套协程

2.必须要遵守一处并发、统一等待的原则

修改后的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ticker := time.NewTicker(5 * 60 * time.Second)
defer ticker.Stop()
var wg sync.WaitGroup
for {
select {
case <-h.ctx.Done():
mlog.Info("asset infer handler stop")
return
case <-ticker.C:
if h.NeedRun() {
wg.Add(2)
go func() {
defer wg.Done()
h.AssetInfer()
}()
go func() {
defer wg.Done()
h.GenMiddleTable()
}()
wg.Wait()
}
}
}

进一步,还可以使用 errgroup 进行并发任务管理:waitGroup–>errGroup

原来的:sync package - sync - Go Packages

简化后:errgroup package - golang.org/x/sync/errgroup - Go Packages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    ticker := time.NewTicker(5 * 60 * time.Second)
defer ticker.Stop()
for {
if h.NeedRun() {
g, _ := errgroup.WithContext(h.ctx)
g.Go(func() error {
h.AssetInfer()
return nil
})
g.Go(func() error {
h.GenMiddleTable()
return nil
})
if err := g.Wait(); err != nil {
mlog.Warnf("asset infer run exit: %v", err)
}
return
}
select {
case <-h.ctx.Done():
mlog.Info("asset infer handler stop")
return
case <-ticker.C:
}
}
}

不需要,原来的loop版本,就是等待NeedRun,为true,就去执行两个协程,然后

[mysql] 2025/11/22 23:04:45 packets.go:36: unexpected EOF
{“level”:”warn”,”time”:”2025-11-22T23:04:45.769+0800”,”position”:”handler/asset_infer_handler.go:848”,”function”:”sangfor.com/xdr/asm-cron/internal/assetinfer/handler.(AssetInferHandler).genMiddleResult”,”message”:”query:SELECT dstAssetId, regexp_extract(responseBody, ‘(.?)‘, 1) as title, CAST(count(*) AS BIGINT), max(recordTimestamp) as maxRecordTimestamp, any_value(if(instr(url, ‘?’) > 0, substring(url, 1, instr(url, ‘?’) - 1), url)) As url FROM business.http_log WHERE respStatus = 200 AND recordTimestamp >= 1763654400 AND recordTimestamp < 1763740800 AND respContentType = ‘text/html’ AND instr(responseBody, ‘‘) > 0 GROUP BY dstAssetId, title LIMIT 300000;, query failed,err:Error 1064: Failed to allocate resource to query: org.apache.thrift.transport.TTransportException: Socket is closed by peer., will retry 3 times”}