質問編集履歴

2

質問に必要ない部分をカットしました。GitHubのソースコードも多少アップデートしています。

2020/07/26 02:27

投稿

退会済みユーザー
test CHANGED
File without changes
test CHANGED
@@ -6,57 +6,75 @@
6
6
 
7
7
 
8
8
 
9
- ### 前提
10
-
11
-
12
-
13
- ファイル構成は以下のようです
14
-
15
-
16
-
17
- ```
18
-
19
- – go.mod
20
-
21
- twitter
22
-
23
- - twitter.go
24
-
25
- - tasks.json
26
-
27
- – feed
28
-
29
- - feed.go
30
-
31
- - queue
32
-
33
- - queue.go
34
-
35
- - lock
36
-
37
- - rwlock.go
38
-
39
- ```
40
-
41
-
42
-
43
- まず、feed.goにはLinkedListが実装してありますが、これは、あるTwitterユーザーの投稿一覧のようなものだと思ってください。
44
-
45
-
46
-
47
- queue.goには、非有界のロックフリーなキューが実装してあります。このキューは、実行したいタスクを格納しているものです。tasks.jsonファイルをみてもらいたいのですが、一行目の
48
-
49
- ```JSON
50
-
51
- {"command": "ADD", "id": 1, "body": "just setting up my twttr", "timestamp": 43242423}
52
-
53
- ```
54
-
55
- は、"just setting up my twttr"という投稿を、ユーザーの投稿一覧に"ADD"する(追加する)タスクを意味しています。
56
-
57
-
58
-
59
- メインの質問にはあまり関係ないので、説明はこれぐらいにしておきます。
9
+ ### 発生している問題・エラーメッセージ
10
+
11
+
12
+
13
+ 期待する結果は以下の通りですが、
14
+
15
+
16
+
17
+ ```
18
+
19
+ {"success":true,"id":1}
20
+
21
+ {"success":true,"id":2}
22
+
23
+ {"success":true,"id":3}
24
+
25
+ {"success":false,"id":4}
26
+
27
+ {"success":true,"id":5}
28
+
29
+ {"success":false,"id":6}
30
+
31
+ {"id":7,"feed":[{"body":"Another post to add","timestamp":43242421}]}
32
+
33
+ ```
34
+
35
+
36
+
37
+ 以下のコマンドで2つのサブスレッドを発生させると(一つ目のコマンド引数がスレッドの数です。)、デッドロックを起こしてしまいます。
38
+
39
+
40
+
41
+ ```
42
+
43
+ $ twitter kazukiegusa$ go run twitter.go 2 4 < tasks.json
44
+
45
+ {"success":true,"id":1}
46
+
47
+ {"success":true,"id":2}
48
+
49
+ {"success":true,"id":3}
50
+
51
+ {"success":false,"id":4}
52
+
53
+ {"success":false,"id":6}
54
+
55
+ {"id":7,"feed":[{"body":"Another post to add","timestamp":43242421}]}
56
+
57
+ fatal error: all goroutines are asleep - deadlock!
58
+
59
+
60
+
61
+ goroutine 1 [semacquire]:
62
+
63
+ sync.runtime_Semacquire(0xc000016088)
64
+
65
+ /usr/local/go/src/runtime/sema.go:56 +0x42
66
+
67
+ sync.(*WaitGroup).Wait(0xc000016080)
68
+
69
+ /usr/local/go/src/sync/waitgroup.go:130 +0x64
70
+
71
+ main.main()
72
+
73
+ /Users/kazukiegusa/mpcs/Parallel Programming/egusa/proj1/twitter/twitter.go:66 +0x422
74
+
75
+ exit status 2
76
+
77
+ ```
60
78
 
61
79
 
62
80
 
@@ -64,65 +82,111 @@
64
82
 
65
83
 
66
84
 
67
- まず、twitter.go内メインのgoroutineがサブgoroutineを発生させて、それぞれのサブgoroutineがconsumer関数を実行します。全てサブgoroutineを発生させたらメイのgoroutineがproducer関数を実行します。
68
-
69
-
70
-
71
- cosumer関数の中では、各サブgoroutineが、キューから一定の数のタスクを取ってきて、それを実行していくのですが、もしキューが空の場合は、そのサブgoroutineを待機させたいです。(最初に発生させたサブgoroutine以外は発生させません。)ここで、待機させるのは、sync.CondのWait()を使えばいいのでしょうか?
72
-
73
-
74
-
75
- producer関数の中では、メインのgoroutineがタスクをキューにどんどん格納していくのですが、タスクをキューに入れるたびに、Wait()しているサブのgoroutineがないか確認して、ある場合は一つ起こします。一つだけ起こすので、sync.CondのSignal()を使うのであっていますでしょうか?
76
-
77
-
78
-
79
- ### 発生している問題・エラーメッセージ
80
-
81
-
82
-
83
- スレッドを使わず、メインのgoroutineのみに処理をさせる場合は以下の形です。
84
-
85
-
86
-
87
- ```
88
-
89
- $ go run twitter.go < tasks.json
90
-
91
- ```
92
-
93
-
94
-
95
- 実行結果は、
96
-
97
-
98
-
99
- ```
100
-
101
- {"success":true,"id":1}
102
-
103
- {"success":true,"id":2}
104
-
105
- {"success":true,"id":3}
106
-
107
- {"success":false,"id":4}
108
-
109
- {"success":true,"id":5}
110
-
111
- {"success":false,"id":6}
112
-
113
- {"id":7,"feed":[{"body":"Another post to add","timestamp":43242421}]}
114
-
115
- ```
116
-
117
-
118
-
119
- 本題です。以下のコマンドで2つのサブスレッドを発生させることが出来るのですが、何も返さず、上記のような結果が出ません。
120
-
121
-
122
-
123
- ```
124
-
125
- $ go run twitter.go 2 10 < tasks.json
85
+ 質問の箇所は`twitter.go`にあります。まず`main`関数の中でディション変数を定義ています。
86
+
87
+
88
+
89
+ ```Go
90
+
91
+ ・・・中略・・・
92
+
93
+
94
+
95
+ var wg sync.WaitGroup
96
+
97
+
98
+
99
+ cond := sync.NewCond(new(sync.Mutex)) // Conditional variable
100
+
101
+
102
+
103
+ for i := 0; i < numGoroutines; i++ {
104
+
105
+ wg.Add(1)
106
+
107
+ go consumer(q, f, blockSize, cond, &wg)
108
+
109
+ }
110
+
111
+
112
+
113
+ producer(q, cond)
114
+
115
+
116
+
117
+ // 全てのサブスレッドがconsumer関数の実行を終えるまで待つ
118
+
119
+ wg.Wait()
120
+
121
+ ```
122
+
123
+
124
+
125
+ `producer`関数の中では、エンキュー(Enqueue)するごとに、`sync.Cond`の`Signal`関数を使って、`consumer`関数内で待機しているスレッドを一つ呼び起こしています。
126
+
127
+
128
+
129
+ ```
130
+
131
+ func producer(q *queue.Queue, cond *sync.Cond) {
132
+
133
+
134
+
135
+ ・・・中略・・・
136
+
137
+
138
+
139
+ for {
140
+
141
+
142
+
143
+ ・・・中略(ここにエンキュー(Enqueue)を実行するコードが入ります。)・・・
144
+
145
+
146
+
147
+ // 待機中のサブスレッドを一つ呼び起こす
148
+
149
+ cond.Signal()
150
+
151
+ }
152
+
153
+ }
154
+
155
+ ```
156
+
157
+
158
+
159
+ `consumer`関数内では、初期化するときに`sync.Cond`に紐付けたMutexを使ってロックしています。
160
+
161
+
162
+
163
+ ```
164
+
165
+ func consumer(q *queue.Queue, f feed.Feed, blockSize int64, cond *sync.Cond, wg *sync.WaitGroup) {
166
+
167
+
168
+
169
+ cond.L.Lock()
170
+
171
+
172
+
173
+ // キューが空なら待機する
174
+
175
+ cond.Wait()
176
+
177
+
178
+
179
+ ・・・中略(ここにデキュー(Dequeue)を実行するコードが入ります。)・・・
180
+
181
+
182
+
183
+ cond.L.Unlock()
184
+
185
+
186
+
187
+ wg.Done()
188
+
189
+ }
126
190
 
127
191
  ```
128
192
 

1

多少説明を追加しました。

2020/07/26 02:27

投稿

退会済みユーザー
test CHANGED
File without changes
test CHANGED
@@ -68,13 +68,11 @@
68
68
 
69
69
 
70
70
 
71
- cosumer関数の中では、各サブgoroutineが、キューから一定の数のタスクを取ってきて、それを実行していくのですが、もしキューが空の場合は、待機させたいです。(最初に発生させたgoroutine以外は、サブgoroutineをこれ以上発生させません。)ここで、待機させるのは、sync.CondのWait()を使えばいいのでしょうか?
71
+ cosumer関数の中では、各サブgoroutineが、キューから一定の数のタスクを取ってきて、それを実行していくのですが、もしキューが空の場合は、そのサブgoroutineを待機させたいです。(最初に発生させたサブgoroutine以外は発生させません。)ここで、待機させるのは、sync.CondのWait()を使えばいいのでしょうか?
72
72
 
73
73
 
74
74
 
75
75
  producer関数の中では、メインのgoroutineがタスクをキューにどんどん格納していくのですが、タスクをキューに入れるたびに、Wait()しているサブのgoroutineがないか確認して、ある場合は一つ起こします。一つだけ起こすので、sync.CondのSignal()を使うのであっていますでしょうか?
76
-
77
-
78
76
 
79
77
 
80
78
 
@@ -118,7 +116,7 @@
118
116
 
119
117
 
120
118
 
121
- 本題です。以下のコマンドでスレッドを発生させることが出来るのですが、何も返さず、上記のような結果が出ません。
119
+ 本題です。以下のコマンドで2つのサブスレッドを発生させることが出来るのですが、何も返さず、上記のような結果が出ません。
122
120
 
123
121
 
124
122