Golang怎么使用協(xié)程池 Golang協(xié)程池實(shí)現(xiàn)方案

golang協(xié)程池的大小應(yīng)根據(jù)cpu核心數(shù)、任務(wù)類型、系統(tǒng)資源和壓測(cè)結(jié)果確定。1. cpu核心數(shù):協(xié)程池大小不應(yīng)超過(guò)cpu核心數(shù)太多,一般為1-2倍;2. 任務(wù)類型:cpu密集型任務(wù)應(yīng)接近c(diǎn)pu核心數(shù),i/o密集型任務(wù)可適當(dāng)增加;3. 系統(tǒng)資源:需考慮內(nèi)存等限制,避免oom;4. 壓測(cè):通過(guò)測(cè)試調(diào)整大小,觀察吞吐量和響應(yīng)時(shí)間等指標(biāo)找到最佳平衡點(diǎn)。

Golang怎么使用協(xié)程池 Golang協(xié)程池實(shí)現(xiàn)方案

協(xié)程池,簡(jiǎn)單來(lái)說(shuō),就是預(yù)先創(chuàng)建好一批協(xié)程,需要執(zhí)行任務(wù)時(shí),直接從池子里取一個(gè)來(lái)用,用完放回去,避免頻繁創(chuàng)建和銷毀協(xié)程的開銷。golang標(biāo)準(zhǔn)庫(kù)本身并沒有提供協(xié)程池,但我們可以自己實(shí)現(xiàn),或者使用第三方庫(kù)。

Golang怎么使用協(xié)程池 Golang協(xié)程池實(shí)現(xiàn)方案

Golang協(xié)程池實(shí)現(xiàn)方案

Golang怎么使用協(xié)程池 Golang協(xié)程池實(shí)現(xiàn)方案

實(shí)現(xiàn)協(xié)程池的核心思路是:維護(hù)一個(gè)協(xié)程隊(duì)列和一個(gè)任務(wù)隊(duì)列。任務(wù)來(lái)了,就從協(xié)程隊(duì)列里取一個(gè)協(xié)程去執(zhí)行,執(zhí)行完再放回協(xié)程隊(duì)列。

立即學(xué)習(xí)go語(yǔ)言免費(fèi)學(xué)習(xí)筆記(深入)”;

package main  import (     "fmt"     "sync"     "time" )  type Job struct {     ID  int     Payload int }  type WorkerPool struct {     JobQueue chan Job     WorkerQueue chan chan Job     Workers []Worker     Quit chan bool     Wg sync.WaitGroup }  type Worker struct {     ID int     JobQueue chan Job     WorkerQueue chan chan Job     Quit chan bool     Wg *sync.WaitGroup }  func NewWorker(id int, workerQueue chan chan Job, wg *sync.WaitGroup) Worker {     return Worker{         ID: id,         JobQueue: make(chan Job),         WorkerQueue: workerQueue,         Quit: make(chan bool),         Wg: wg,     } }  func (w Worker) Start() {     w.Wg.Add(1)     go func() {         defer w.Wg.Done()         for {             // 將自己的JobChannel 注冊(cè)到 WorkerPool 的 WorkerQueue 中             w.WorkerQueue <- w.JobQueue              select {             case job := <-w.JobQueue:                 // 接收到任務(wù)                 fmt.Printf("worker%d: 處理 job %d, payload %dn", w.ID, job.ID, job.Payload)                 time.Sleep(time.Duration(job.Payload) * time.Millisecond) // 模擬耗時(shí)操作                 fmt.Printf("worker%d: 完成 job %dn", w.ID, job.ID)              case <-w.Quit:                 // 收到停止信號(hào)                 fmt.Printf("worker%d: 停止n", w.ID)                 return             }         }     }() }  func (w Worker) Stop() {     go func() {         w.Quit <- true     }() }  func NewWorkerPool(workerNum int, jobQueueSize int) WorkerPool {     jobQueue := make(chan Job, jobQueueSize)     workerQueue := make(chan chan Job, workerNum)     workers := make([]Worker, workerNum)      wp := WorkerPool{         JobQueue: jobQueue,         WorkerQueue: workerQueue,         Workers: workers,         Quit: make(chan bool),         Wg: sync.WaitGroup{},     }      // 創(chuàng)建 worker     for i := 0; i < workerNum; i++ {         worker := NewWorker(i+1, wp.WorkerQueue, &wp.Wg)         workers[i] = worker     }      return wp }  func (wp WorkerPool) Run() {     // 啟動(dòng)所有 worker     for i := 0; i < len(wp.Workers); i++ {         wp.Workers[i].Start()     }      go wp.dispatch() }  func (wp WorkerPool) dispatch() {     for {         select {         case job := <-wp.JobQueue:             // 從 JobQueue 中取出任務(wù)             workerJobQueue := <-wp.WorkerQueue             // 將任務(wù)發(fā)送給 Worker             workerJobQueue <- job         case <-wp.Quit:             // 收到停止信號(hào)             for i := 0; i < len(wp.Workers); i++ {                 wp.Workers[i].Stop()             }             return         }     } }  func (wp WorkerPool) Stop() {     close(wp.JobQueue)     go func() {         wp.Quit <- true     }()     wp.Wg.Wait() }  func main() {     workerNum := 5     jobQueueSize := 100     wp := NewWorkerPool(workerNum, jobQueueSize)      wp.Run()      // 生產(chǎn) job     for i := 0; i < 20; i++ {         job := Job{             ID: i + 1,             Payload: i*100, // 模擬不同任務(wù)的耗時(shí)         }         wp.JobQueue <- job     }      // 等待所有 job 完成     time.Sleep(3 * time.Second)      wp.Stop()     fmt.Println("所有 job 完成") }

Golang 協(xié)程池的大小如何確定?

Golang怎么使用協(xié)程池 Golang協(xié)程池實(shí)現(xiàn)方案

協(xié)程池的大小直接影響到程序的并發(fā)能力和資源利用率。太小了,并發(fā)度不夠,浪費(fèi)資源;太大了,可能導(dǎo)致上下文切換開銷過(guò)大,甚至OOM。

確定協(xié)程池大小需要考慮以下幾個(gè)因素:

  • CPU 核心數(shù): 協(xié)程池的大小不應(yīng)超過(guò) CPU 核心數(shù)太多,否則會(huì)增加上下文切換的開銷。一般來(lái)說(shuō),可以設(shè)置為 CPU 核心數(shù)的 1-2 倍。
  • 任務(wù)類型: 如果任務(wù)是 CPU 密集型的,協(xié)程池的大小應(yīng)該接近 CPU 核心數(shù)。如果任務(wù)是 I/O 密集型的,協(xié)程池的大小可以適當(dāng)增加,因?yàn)閰f(xié)程在等待 I/O 時(shí)可以切換到其他協(xié)程執(zhí)行。
  • 系統(tǒng)資源: 協(xié)程池的大小還會(huì)受到系統(tǒng)資源的限制,例如內(nèi)存。如果協(xié)程池太大,可能會(huì)導(dǎo)致內(nèi)存不足。
  • 壓測(cè): 最終,需要通過(guò)壓測(cè)來(lái)確定最佳的協(xié)程池大小。通過(guò)不斷調(diào)整協(xié)程池的大小,并觀察程序的性能指標(biāo),例如吞吐量、響應(yīng)時(shí)間等,找到一個(gè)最佳的平衡點(diǎn)。

Golang 協(xié)程池如何處理 panic?

協(xié)程中如果發(fā)生panic,如果沒有recover,會(huì)導(dǎo)致程序崩潰。因此,在協(xié)程池中處理panic非常重要。

有幾種常見的處理方式:

  1. 在 Worker 中 recover: 這是最常見的方式,在每個(gè) Worker 的執(zhí)行函數(shù)中,使用 recover() 來(lái)捕獲 panic。這樣可以防止 panic 擴(kuò)散到整個(gè)程序,保證協(xié)程池的穩(wěn)定性。
func (w Worker) Start() {     w.Wg.Add(1)     go func() {         defer w.Wg.Done()         defer func() {             if r := recover(); r != nil {                 fmt.Printf("worker%d: panic recover: %vn", w.ID, r)                 // 可以選擇將 panic 重新拋出,或者記錄日志             }         }()         for {             w.WorkerQueue <- w.JobQueue              select {             case job := <-w.JobQueue:                 fmt.Printf("worker%d: 處理 job %d, payload %dn", w.ID, job.ID, job.Payload)                 // 模擬可能發(fā)生 panic 的操作                 if job.Payload == 0 {                     panic("payload is zero")                 }                 time.Sleep(time.Duration(job.Payload) * time.Millisecond)                 fmt.Printf("worker%d: 完成 job %dn", w.ID, job.ID)              case <-w.Quit:                 fmt.Printf("worker%d: 停止n", w.ID)                 return             }         }     }() }
  1. 使用第三方庫(kù): 一些第三方協(xié)程池庫(kù),例如 ants,已經(jīng)內(nèi)置了 panic 處理機(jī)制。使用這些庫(kù)可以簡(jiǎn)化 panic 處理的流程。

  2. 記錄日志: 無(wú)論使用哪種方式處理 panic,都應(yīng)該記錄詳細(xì)的日志,包括 panic 的類型、信息等。這樣可以方便后續(xù)的排查和修復(fù)。

Golang 協(xié)程池有哪些常用的第三方庫(kù)?

雖然可以自己實(shí)現(xiàn)協(xié)程池,但使用成熟的第三方庫(kù)可以省去很多麻煩,并獲得更好的性能和穩(wěn)定性。

以下是一些常用的 Golang 協(xié)程池第三方庫(kù):

  • ants: ants 是一個(gè)高性能的 Golang 協(xié)程池庫(kù),它具有以下特點(diǎn):
    • 高性能:基于無(wú)鎖隊(duì)列實(shí)現(xiàn),性能優(yōu)秀。
    • 自動(dòng)調(diào)整:可以根據(jù)任務(wù)負(fù)載自動(dòng)調(diào)整協(xié)程池的大小。
    • panic 處理:內(nèi)置了 panic 處理機(jī)制。
    • 資源回收:可以自動(dòng)回收空閑的協(xié)程。
    • 使用簡(jiǎn)單:API 簡(jiǎn)潔易用。
package main  import (     "fmt"     "sync"     "time"      "github.com/panjf2000/ants/v2" )  func main() {     defer ants.Release()      var wg sync.WaitGroup     syncCalculateSum := func(i interface{}) {         n := i.(int)         fmt.Printf("處理 job %dn", n)         time.Sleep(time.Duration(n) * time.Millisecond)         fmt.Printf("完成 job %dn", n)         wg.Done()     }      pool, _ := ants.NewPoolWithFunc(10, syncCalculateSum)     defer pool.Release()      for i := 0; i < 100; i++ {         wg.Add(1)         _ = pool.Invoke(i)     }      wg.Wait()     fmt.Printf("運(yùn)行的 goroutine: %dn", ants.Running())     fmt.Printf("完成所有任務(wù).n") }
  • tunny: tunny 是另一個(gè)流行的 Golang 協(xié)程池庫(kù),它支持多種任務(wù)類型,例如函數(shù)、命令等。tunny 的特點(diǎn)是:
    • 支持多種任務(wù)類型:可以執(zhí)行函數(shù)、命令等。
    • 靈活的配置:可以配置協(xié)程池的大小、超時(shí)時(shí)間等。
    • 易于擴(kuò)展:可以自定義 Worker 的行為。

選擇哪個(gè)第三方庫(kù)取決于具體的應(yīng)用場(chǎng)景。如果需要高性能和自動(dòng)調(diào)整,ants 是一個(gè)不錯(cuò)的選擇。如果需要支持多種任務(wù)類型和靈活的配置,tunny 也是一個(gè)不錯(cuò)的選擇。

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點(diǎn)贊15 分享