質問編集履歴

5

run_in_executorのバージョンを作ってみました。

2020/12/29 14:21

投稿

taro_yamada
taro_yamada

スコア55

test CHANGED
File without changes
test CHANGED
@@ -149,3 +149,93 @@
149
149
 
150
150
 
151
151
  ```
152
+
153
+
154
+
155
+ ```python
156
+
157
+
158
+
159
+ def handle(self, *args, **kwargs):
160
+
161
+
162
+
163
+ """ループの開始"""
164
+
165
+ loop = asyncio.get_event_loop()
166
+
167
+ loop.create_task(self.stream())
168
+
169
+ print('start receiving')
170
+
171
+ try:
172
+
173
+ loop.run_forever()
174
+
175
+ except KeyboardInterrupt:
176
+
177
+ exit()
178
+
179
+
180
+
181
+ async def stream(self):
182
+
183
+ uri = 'ws://localhost:1234/'
184
+
185
+
186
+
187
+ async with websockets.connect(uri, ping_interval=None) as ws:
188
+
189
+ while not ws.closed:
190
+
191
+ response = await ws.recv()
192
+
193
+ content = json.loads(response)
194
+
195
+
196
+
197
+ executor = concurrent.futures.ProcessPoolExecutor()
198
+
199
+ queue = asyncio.Queue()
200
+
201
+
202
+
203
+ for i in range(10):
204
+
205
+ queue.put_nowait(i)
206
+
207
+
208
+
209
+ async def proc(q):
210
+
211
+ while not q.empty():
212
+
213
+ i = await q.get()
214
+
215
+ future = loop.run_in_executor(executor, save_pushdata(content, sid), i)
216
+
217
+ await future
218
+
219
+
220
+
221
+ tasks = [proc(queue) for i in range(4)] # 4 = number of cpu core
222
+
223
+ return await asyncio.wait(tasks)
224
+
225
+
226
+
227
+ def save_pushdata(self, content, sid):
228
+
229
+ loop = asyncio.get_event_loop()
230
+
231
+ newData = list(
232
+
233
+ symbol=content['****'],
234
+
235
+ )
236
+
237
+ newData.save()
238
+
239
+ self.method2(content, sid)
240
+
241
+ ```

4

動いたソースに直しました

2020/12/29 14:21

投稿

taro_yamada
taro_yamada

スコア55

test CHANGED
File without changes
test CHANGED
@@ -15,10 +15,6 @@
15
15
 
16
16
 
17
17
  実行したいことは、save_dataが終わった後に、method2と新たなメッセージの受信を同時に始めたいです。
18
-
19
-
20
-
21
-
22
18
 
23
19
 
24
20
 
@@ -66,13 +62,13 @@
66
62
 
67
63
  content = json.loads(response)
68
64
 
69
- task = loop.create_task(self.save_pushdata(content))
65
+ task = loop.create_task(self.save_pushdata(content, sid))
70
66
 
71
- task.add_done_callback(self.finish('123'))
67
+ task.add_done_callback(self.finish)
72
68
 
73
69
 
74
70
 
75
- async def save_data(self, content):
71
+ async def save_pushdata(self, content, sid):
76
72
 
77
73
  loop = asyncio.get_event_loop()
78
74
 
@@ -84,11 +80,13 @@
84
80
 
85
81
  newData.save()
86
82
 
87
- task = loop.create_task(self.method2(content))
83
+ task = loop.create_task(self.method2(content, sid))
88
84
 
89
- task.add_done_callback(self.finish('test'))
85
+ task.add_done_callback(self.finish)
90
86
 
87
+ save_pushdata_sid = str(sid) + 'save_pushdata'
88
+
91
- return
89
+ return save_pushdata_sid
92
90
 
93
91
 
94
92
 
@@ -102,9 +100,9 @@
102
100
 
103
101
  self.method3(args1, args2)
104
102
 
103
+ method2_sid = str(sid) + 'method2'
105
104
 
106
-
107
- return out_layer
105
+ return method2_sid
108
106
 
109
107
 
110
108
 
@@ -144,11 +142,9 @@
144
142
 
145
143
 
146
144
 
147
-
148
-
149
145
  def finish(self, text):
150
146
 
151
- print(text)
147
+ logging.info('%s', text.result())
152
148
 
153
149
 
154
150
 

3

save_pushdataをtaskを返すようにしました。また、finish()を少し修正しました。

2020/12/29 06:30

投稿

taro_yamada
taro_yamada

スコア55

test CHANGED
File without changes
test CHANGED
@@ -66,7 +66,9 @@
66
66
 
67
67
  content = json.loads(response)
68
68
 
69
- await self.save_pushdata(content)
69
+ task = loop.create_task(self.save_pushdata(content))
70
+
71
+ task.add_done_callback(self.finish('123'))
70
72
 
71
73
 
72
74
 
@@ -142,17 +144,11 @@
142
144
 
143
145
 
144
146
 
145
- def print_t(self, text):
146
-
147
- print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {text}")
148
-
149
-
150
-
151
147
 
152
148
 
153
149
  def finish(self, text):
154
150
 
155
- self.print(text)
151
+ print(text)
156
152
 
157
153
 
158
154
 

2

method4とmethod5を追加しました。

2020/12/29 03:20

投稿

taro_yamada
taro_yamada

スコア55

test CHANGED
File without changes
test CHANGED
@@ -114,6 +114,34 @@
114
114
 
115
115
 
116
116
 
117
+ def method4(self, content):
118
+
119
+ newData = list(
120
+
121
+ symbol=content['****'],
122
+
123
+ )
124
+
125
+ newData.save()
126
+
127
+ return content['****']
128
+
129
+
130
+
131
+ def method5(self, content):
132
+
133
+ newData = list(
134
+
135
+ symbol=content['****'],
136
+
137
+ )
138
+
139
+ newData.save()
140
+
141
+ return content['****']
142
+
143
+
144
+
117
145
  def print_t(self, text):
118
146
 
119
147
  print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {text}")

1

実行したいことを追記しました

2020/12/29 01:43

投稿

taro_yamada
taro_yamada

スコア55

test CHANGED
File without changes
test CHANGED
@@ -11,6 +11,10 @@
11
11
 
12
12
 
13
13
  (asyncioを使っているとエラーメッセージが表示できず、なぜ止まっているか判断が難しいです。)
14
+
15
+
16
+
17
+ 実行したいことは、save_dataが終わった後に、method2と新たなメッセージの受信を同時に始めたいです。
14
18
 
15
19
 
16
20