質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.40%
Go

Go(golang)は、Googleで開発されたオープンソースのプログラミング言語です。

意見交換

クローズ

2回答

611閲覧

Go言語 gorutine 効率的な並列処理について

tex2345rr

総合スコア0

Go

Go(golang)は、Googleで開発されたオープンソースのプログラミング言語です。

0グッド

0クリップ

投稿2024/06/04 12:14

編集2024/06/07 11:31

0

0

Go言語のgorutineゴルーチン、並列処理において、go func()をただつかっただけの実装Aと、
チャンネルとワーカを作成し並列処理を書いた実装B とどちらが処理は効率的(速さ、リソースの点で)でしょうか?デメリット、メリット知りたいです。
※サーバのメモリは4Gでコア2つ、検索するデイレくトリ配下のファイルは300以上-1000の想定
すみません、消去ロジックのLinuxコマンド(実装A)とgoのパッケージを使う(実装B)差異は観点にいれない無視でお願いします。
よろしくお願いします。

実装A

// 削除を実行する go func() { cmd := exec.Command("find", mpddir, "-name", "test_*.txt", "-mtime", "+1", "-exec", "rm", "-f", "{}", "\\;") cmd.Stderr = &stderr err := cmd.Run() if err != nil { lg.LW(err, "failed to remove mpd file : [%s]", mpddir) lg.LEM("error: %s", stderr.String()) } lg.LTM("execute command-[%s]", cmd.String()) }()

実装B 

// ファイル削除を並列処理で実行するためのチャネル type fileTask struct { path string modTime time.Time } taskChan := make(chan fileTask, 50) // エラーチャネル errorChan := make(chan error, 1) // ワーカーを起動 var wg sync.WaitGroup const numWorkers = 2 for i := 0; i < numWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for task := range taskChan { // ファイルが24時間より前のものかどうかをチェック if task.modTime.Before(time.Now().Add(-24 * time.Hour)) { if err := os.Remove(task.path); err != nil { lg.LEM("Failed to remove file. path: %s, err: %v", task.path, err) errorChan <- err } else { lg.LBM("Removed file. path: %s", task.path) } } } }() } // ファイルを探索し、タスクをチャネルに送る go func() { defer close(taskChan) err := filepath.Walk(mpddir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // ファイル名がパターンに一致するかどうかをチェック if matched, _ := filepath.Match(`test_*.text`, info.Name()); matched { taskChan <- fileTask{path: path, modTime: info.ModTime()} } return nil }) if err != nil { errorChan <- lg.LEM("Failed to walk the path. mpddir: %s, err: %v", mpddir, err) } }() // ワーカーが全て終了するのを待つ wg.Wait() close(errorChan) // エラーが発生していた場合最初のエラーを返す if err := <-errorChan; err != nil { return err }

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

回答2

#1

mtempa

総合スコア130

投稿2024/06/11 01:34

こういう何もしないjobworkerで試してみました。

  • 逐次処理してエラーをきっちり処理したい -> for
  • メモリを食い切ることなんてあり得ない、速度こそ全て -> go
  • ある程度メモリの使用量が予測できるので制限しておきたい -> errgroup (runtime.NumCPU()にする必要はない)
  • workerの事前処理や処理に利用するリソースの事前準備に時間がかかる -> threadworker

という感想でしたので、今回のメモリ4Gという状況を加味すると、

  1. 商用環境前提だとするならメモリを食い切ってしまうことは避けたい
  2. そこまでメモリを消費する処理でもない (実測はした方がいいと思います)
  3. ファイル数も際限なく増えるわけではなく一定?

なのでerrgroupのSetLimitに大きめの数字(1000とか)を割り当てておくといいかもしれません。
あくまで、何もしないjobで計測した結論なので実際の処理に置き換えてベンチマークをかけてみて下さい。

swapに入ろうが何しようが速度最重視ということであればgoでコードを少なく保っておくというのも開発初期の選択肢だと思っています。

go

1type ( 2 worker struct { 3 pool *sync.Pool 4 job func([]byte) error 5 } 6) 7 8func (x worker) Do() error { 9 b := x.pool.Get() 10 defer x.pool.Put(b) 11 bb, ok := b.([]byte) 12 if !ok { 13 return errors.New("pool returned invalid buffer") 14 } 15 return x.job(bb) 16} 17 18var ( 19 smallPool = &sync.Pool{ 20 New: func() any { 21 return make([]byte, 1024) // 1KB 22 }, 23 } 24 bigPool = &sync.Pool{ 25 New: func() any { 26 return make([]byte, 1024*1024) // 1MB 27 }, 28 } 29) 30 31func lightJob(b []byte) error { 32 _ = b 33 time.Sleep(10 * time.Millisecond) // 10ms 34 return nil 35} 36 37func heavyJob(b []byte) error { 38 _ = b 39 time.Sleep(100 * time.Millisecond) // 100ms 40 return nil 41}

go

1func benchmarkLoop(b *testing.B, x *worker) { 2 b.Run("for", func(b *testing.B) { 3 for i := 0; i < b.N; i++ { 4 if err := x.Do(); err != nil { 5 b.Error(err) 6 } 7 } 8 }) 9 b.Run("go", func(b *testing.B) { 10 wg := &sync.WaitGroup{} 11 for i := 0; i < b.N; i++ { 12 wg.Add(1) 13 go func() { 14 defer wg.Done() 15 if err := x.Do(); err != nil { 16 b.Error(err) 17 } 18 }() 19 } 20 wg.Wait() 21 }) 22 b.Run("errgroup-nolimit", func(b *testing.B) { 23 eg := &errgroup.Group{} 24 for i := 0; i < b.N; i++ { 25 eg.Go(func() error { 26 return x.Do() 27 }) 28 } 29 if err := eg.Wait(); err != nil { 30 b.Error(err) 31 } 32 }) 33 b.Run("errgroup", func(b *testing.B) { 34 eg := &errgroup.Group{} 35 eg.SetLimit(runtime.NumCPU()) 36 for i := 0; i < b.N; i++ { 37 eg.Go(func() error { 38 return x.Do() 39 }) 40 } 41 if err := eg.Wait(); err != nil { 42 b.Error(err) 43 } 44 }) 45 b.Run("threadworker", func(b *testing.B) { 46 workerNum := runtime.NumCPU() 47 wg := &sync.WaitGroup{} 48 wg.Add(workerNum) 49 q := make(chan struct{}, workerNum*100) 50 for w := 0; w < workerNum; w++ { 51 go func() { 52 defer wg.Done() 53 for range q { 54 if err := x.Do(); err != nil { 55 b.Error(err) 56 } 57 } 58 }() 59 } 60 for i := 0; i < b.N; i++ { 61 q <- struct{}{} 62 } 63 close(q) 64 wg.Wait() 65 }) 66} 67 68func BenchmarkJob(b *testing.B) { 69 b.Run("small-light", func(b *testing.B) { 70 benchmarkLoop(b, &worker{ 71 pool: smallPool, 72 job: lightJob, 73 }) 74 }) 75 b.Run("small-heavy", func(b *testing.B) { 76 benchmarkLoop(b, &worker{ 77 pool: smallPool, 78 job: heavyJob, 79 }) 80 }) 81 b.Run("big-light", func(b *testing.B) { 82 benchmarkLoop(b, &worker{ 83 pool: bigPool, 84 job: lightJob, 85 }) 86 }) 87 b.Run("big-heavy", func(b *testing.B) { 88 benchmarkLoop(b, &worker{ 89 pool: bigPool, 90 job: heavyJob, 91 }) 92 }) 93}

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

#2

hidori

総合スコア403

投稿2024/06/11 01:36

単純な実行効率やリソース消費量だけを比べたら、余分な仕組みを使わない方が有利と思います。

ですが、「どういう実装が必要なのか」を考える際、現実には実行効率やリソース消費量だけを元に判断することは少ないはずです。

例えば、メイン側でワーカー処理の完了やエラーの発生を知る必要があるなら、その分余分な実装が増えるのは不可避だと思います。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

最新の回答から1ヶ月経過したため この意見交換はクローズされました

意見をやりとりしたい話題がある場合は質問してみましょう!

質問する

関連した質問