質問編集履歴
5
run_in_executorのバージョンを作ってみました。
test
CHANGED
File without changes
|
test
CHANGED
@@ -149,3 +149,93 @@
|
|
149
149
|
|
150
150
|
|
151
151
|
```
|
152
|
+
|
153
|
+
|
154
|
+
|
155
|
+
```python
|
156
|
+
|
157
|
+
|
158
|
+
|
159
|
+
def handle(self, *args, **kwargs):
|
160
|
+
|
161
|
+
|
162
|
+
|
163
|
+
"""ループの開始"""
|
164
|
+
|
165
|
+
loop = asyncio.get_event_loop()
|
166
|
+
|
167
|
+
loop.create_task(self.stream())
|
168
|
+
|
169
|
+
print('start receiving')
|
170
|
+
|
171
|
+
try:
|
172
|
+
|
173
|
+
loop.run_forever()
|
174
|
+
|
175
|
+
except KeyboardInterrupt:
|
176
|
+
|
177
|
+
exit()
|
178
|
+
|
179
|
+
|
180
|
+
|
181
|
+
async def stream(self):
|
182
|
+
|
183
|
+
uri = 'ws://localhost:1234/'
|
184
|
+
|
185
|
+
|
186
|
+
|
187
|
+
async with websockets.connect(uri, ping_interval=None) as ws:
|
188
|
+
|
189
|
+
while not ws.closed:
|
190
|
+
|
191
|
+
response = await ws.recv()
|
192
|
+
|
193
|
+
content = json.loads(response)
|
194
|
+
|
195
|
+
|
196
|
+
|
197
|
+
executor = concurrent.futures.ProcessPoolExecutor()
|
198
|
+
|
199
|
+
queue = asyncio.Queue()
|
200
|
+
|
201
|
+
|
202
|
+
|
203
|
+
for i in range(10):
|
204
|
+
|
205
|
+
queue.put_nowait(i)
|
206
|
+
|
207
|
+
|
208
|
+
|
209
|
+
async def proc(q):
|
210
|
+
|
211
|
+
while not q.empty():
|
212
|
+
|
213
|
+
i = await q.get()
|
214
|
+
|
215
|
+
future = loop.run_in_executor(executor, save_pushdata(content, sid), i)
|
216
|
+
|
217
|
+
await future
|
218
|
+
|
219
|
+
|
220
|
+
|
221
|
+
tasks = [proc(queue) for i in range(4)] # 4 = number of cpu core
|
222
|
+
|
223
|
+
return await asyncio.wait(tasks)
|
224
|
+
|
225
|
+
|
226
|
+
|
227
|
+
def save_pushdata(self, content, sid):
|
228
|
+
|
229
|
+
loop = asyncio.get_event_loop()
|
230
|
+
|
231
|
+
newData = list(
|
232
|
+
|
233
|
+
symbol=content['****'],
|
234
|
+
|
235
|
+
)
|
236
|
+
|
237
|
+
newData.save()
|
238
|
+
|
239
|
+
self.method2(content, sid)
|
240
|
+
|
241
|
+
```
|
4
動いたソースに直しました
test
CHANGED
File without changes
|
test
CHANGED
@@ -15,10 +15,6 @@
|
|
15
15
|
|
16
16
|
|
17
17
|
実行したいことは、save_dataが終わった後に、method2と新たなメッセージの受信を同時に始めたいです。
|
18
|
-
|
19
|
-
|
20
|
-
|
21
|
-
|
22
18
|
|
23
19
|
|
24
20
|
|
@@ -66,13 +62,13 @@
|
|
66
62
|
|
67
63
|
content = json.loads(response)
|
68
64
|
|
69
|
-
task = loop.create_task(self.save_pushdata(content))
|
65
|
+
task = loop.create_task(self.save_pushdata(content, sid))
|
70
66
|
|
71
|
-
task.add_done_callback(self.finish
|
67
|
+
task.add_done_callback(self.finish)
|
72
68
|
|
73
69
|
|
74
70
|
|
75
|
-
async def save_data(self, content):
|
71
|
+
async def save_pushdata(self, content, sid):
|
76
72
|
|
77
73
|
loop = asyncio.get_event_loop()
|
78
74
|
|
@@ -84,11 +80,13 @@
|
|
84
80
|
|
85
81
|
newData.save()
|
86
82
|
|
87
|
-
task = loop.create_task(self.method2(content))
|
83
|
+
task = loop.create_task(self.method2(content, sid))
|
88
84
|
|
89
|
-
task.add_done_callback(self.finish
|
85
|
+
task.add_done_callback(self.finish)
|
90
86
|
|
87
|
+
save_pushdata_sid = str(sid) + 'save_pushdata'
|
88
|
+
|
91
|
-
return
|
89
|
+
return save_pushdata_sid
|
92
90
|
|
93
91
|
|
94
92
|
|
@@ -102,9 +100,9 @@
|
|
102
100
|
|
103
101
|
self.method3(args1, args2)
|
104
102
|
|
103
|
+
method2_sid = str(sid) + 'method2'
|
105
104
|
|
106
|
-
|
107
|
-
return o
|
105
|
+
return method2_sid
|
108
106
|
|
109
107
|
|
110
108
|
|
@@ -144,11 +142,9 @@
|
|
144
142
|
|
145
143
|
|
146
144
|
|
147
|
-
|
148
|
-
|
149
145
|
def finish(self, text):
|
150
146
|
|
151
|
-
|
147
|
+
logging.info('%s', text.result())
|
152
148
|
|
153
149
|
|
154
150
|
|
3
save_pushdataをtaskを返すようにしました。また、finish()を少し修正しました。
test
CHANGED
File without changes
|
test
CHANGED
@@ -66,7 +66,9 @@
|
|
66
66
|
|
67
67
|
content = json.loads(response)
|
68
68
|
|
69
|
-
a
|
69
|
+
task = loop.create_task(self.save_pushdata(content))
|
70
|
+
|
71
|
+
task.add_done_callback(self.finish('123'))
|
70
72
|
|
71
73
|
|
72
74
|
|
@@ -142,17 +144,11 @@
|
|
142
144
|
|
143
145
|
|
144
146
|
|
145
|
-
def print_t(self, text):
|
146
|
-
|
147
|
-
print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {text}")
|
148
|
-
|
149
|
-
|
150
|
-
|
151
147
|
|
152
148
|
|
153
149
|
def finish(self, text):
|
154
150
|
|
155
|
-
|
151
|
+
print(text)
|
156
152
|
|
157
153
|
|
158
154
|
|
2
method4とmethod5を追加しました。
test
CHANGED
File without changes
|
test
CHANGED
@@ -114,6 +114,34 @@
|
|
114
114
|
|
115
115
|
|
116
116
|
|
117
|
+
def method4(self, content):
|
118
|
+
|
119
|
+
newData = list(
|
120
|
+
|
121
|
+
symbol=content['****'],
|
122
|
+
|
123
|
+
)
|
124
|
+
|
125
|
+
newData.save()
|
126
|
+
|
127
|
+
return content['****']
|
128
|
+
|
129
|
+
|
130
|
+
|
131
|
+
def method5(self, content):
|
132
|
+
|
133
|
+
newData = list(
|
134
|
+
|
135
|
+
symbol=content['****'],
|
136
|
+
|
137
|
+
)
|
138
|
+
|
139
|
+
newData.save()
|
140
|
+
|
141
|
+
return content['****']
|
142
|
+
|
143
|
+
|
144
|
+
|
117
145
|
def print_t(self, text):
|
118
146
|
|
119
147
|
print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {text}")
|
1
実行したいことを追記しました
test
CHANGED
File without changes
|
test
CHANGED
@@ -11,6 +11,10 @@
|
|
11
11
|
|
12
12
|
|
13
13
|
(asyncioを使っているとエラーメッセージが表示できず、なぜ止まっているか判断が難しいです。)
|
14
|
+
|
15
|
+
|
16
|
+
|
17
|
+
実行したいことは、save_dataが終わった後に、method2と新たなメッセージの受信を同時に始めたいです。
|
14
18
|
|
15
19
|
|
16
20
|
|