質問編集履歴
5
run_in_executorのバージョンを作ってみました。
title
CHANGED
File without changes
|
body
CHANGED
@@ -73,4 +73,49 @@
|
|
73
73
|
def finish(self, text):
|
74
74
|
logging.info('%s', text.result())
|
75
75
|
|
76
|
+
```
|
77
|
+
|
78
|
+
```python
|
79
|
+
|
80
|
+
def handle(self, *args, **kwargs):
|
81
|
+
|
82
|
+
"""ループの開始"""
|
83
|
+
loop = asyncio.get_event_loop()
|
84
|
+
loop.create_task(self.stream())
|
85
|
+
print('start receiving')
|
86
|
+
try:
|
87
|
+
loop.run_forever()
|
88
|
+
except KeyboardInterrupt:
|
89
|
+
exit()
|
90
|
+
|
91
|
+
async def stream(self):
|
92
|
+
uri = 'ws://localhost:1234/'
|
93
|
+
|
94
|
+
async with websockets.connect(uri, ping_interval=None) as ws:
|
95
|
+
while not ws.closed:
|
96
|
+
response = await ws.recv()
|
97
|
+
content = json.loads(response)
|
98
|
+
|
99
|
+
executor = concurrent.futures.ProcessPoolExecutor()
|
100
|
+
queue = asyncio.Queue()
|
101
|
+
|
102
|
+
for i in range(10):
|
103
|
+
queue.put_nowait(i)
|
104
|
+
|
105
|
+
async def proc(q):
|
106
|
+
while not q.empty():
|
107
|
+
i = await q.get()
|
108
|
+
future = loop.run_in_executor(executor, save_pushdata(content, sid), i)
|
109
|
+
await future
|
110
|
+
|
111
|
+
tasks = [proc(queue) for i in range(4)] # 4 = number of cpu core
|
112
|
+
return await asyncio.wait(tasks)
|
113
|
+
|
114
|
+
def save_pushdata(self, content, sid):
|
115
|
+
loop = asyncio.get_event_loop()
|
116
|
+
newData = list(
|
117
|
+
symbol=content['****'],
|
118
|
+
)
|
119
|
+
newData.save()
|
120
|
+
self.method2(content, sid)
|
76
121
|
```
|
4
動いたソースに直しました
title
CHANGED
File without changes
|
body
CHANGED
@@ -9,8 +9,6 @@
|
|
9
9
|
実行したいことは、save_dataが終わった後に、method2と新たなメッセージの受信を同時に始めたいです。
|
10
10
|
|
11
11
|
|
12
|
-
|
13
|
-
|
14
12
|
```python
|
15
13
|
class Command(BaseCommand):
|
16
14
|
|
@@ -32,27 +30,28 @@
|
|
32
30
|
while not ws.closed:
|
33
31
|
response = await ws.recv()
|
34
32
|
content = json.loads(response)
|
35
|
-
task = loop.create_task(self.save_pushdata(content))
|
33
|
+
task = loop.create_task(self.save_pushdata(content, sid))
|
36
|
-
task.add_done_callback(self.finish
|
34
|
+
task.add_done_callback(self.finish)
|
37
35
|
|
38
|
-
async def
|
36
|
+
async def save_pushdata(self, content, sid):
|
39
37
|
loop = asyncio.get_event_loop()
|
40
38
|
newData = list(
|
41
39
|
symbol=content['****'],
|
42
40
|
)
|
43
41
|
newData.save()
|
44
|
-
task = loop.create_task(self.method2(content))
|
42
|
+
task = loop.create_task(self.method2(content, sid))
|
45
|
-
task.add_done_callback(self.finish
|
43
|
+
task.add_done_callback(self.finish)
|
44
|
+
save_pushdata_sid = str(sid) + 'save_pushdata'
|
46
|
-
return
|
45
|
+
return save_pushdata_sid
|
47
46
|
|
48
47
|
async def method2(self, content):
|
49
48
|
|
50
49
|
args1 = self.method4(content)
|
51
50
|
args2 = self.method5(content)
|
52
51
|
self.method3(args1, args2)
|
52
|
+
method2_sid = str(sid) + 'method2'
|
53
|
+
return method2_sid
|
53
54
|
|
54
|
-
return out_layer
|
55
|
-
|
56
55
|
def method3(self, content):
|
57
56
|
print('123')
|
58
57
|
return content
|
@@ -71,8 +70,7 @@
|
|
71
70
|
newData.save()
|
72
71
|
return content['****']
|
73
72
|
|
74
|
-
|
75
73
|
def finish(self, text):
|
76
|
-
|
74
|
+
logging.info('%s', text.result())
|
77
75
|
|
78
76
|
```
|
3
save_pushdataをtaskを返すようにしました。また、finish()を少し修正しました。
title
CHANGED
File without changes
|
body
CHANGED
@@ -32,7 +32,8 @@
|
|
32
32
|
while not ws.closed:
|
33
33
|
response = await ws.recv()
|
34
34
|
content = json.loads(response)
|
35
|
-
|
35
|
+
task = loop.create_task(self.save_pushdata(content))
|
36
|
+
task.add_done_callback(self.finish('123'))
|
36
37
|
|
37
38
|
async def save_data(self, content):
|
38
39
|
loop = asyncio.get_event_loop()
|
@@ -70,11 +71,8 @@
|
|
70
71
|
newData.save()
|
71
72
|
return content['****']
|
72
73
|
|
73
|
-
def print_t(self, text):
|
74
|
-
print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {text}")
|
75
74
|
|
76
|
-
|
77
75
|
def finish(self, text):
|
78
|
-
|
76
|
+
print(text)
|
79
77
|
|
80
78
|
```
|
2
method4とmethod5を追加しました。
title
CHANGED
File without changes
|
body
CHANGED
@@ -56,6 +56,20 @@
|
|
56
56
|
print('123')
|
57
57
|
return content
|
58
58
|
|
59
|
+
def method4(self, content):
|
60
|
+
newData = list(
|
61
|
+
symbol=content['****'],
|
62
|
+
)
|
63
|
+
newData.save()
|
64
|
+
return content['****']
|
65
|
+
|
66
|
+
def method5(self, content):
|
67
|
+
newData = list(
|
68
|
+
symbol=content['****'],
|
69
|
+
)
|
70
|
+
newData.save()
|
71
|
+
return content['****']
|
72
|
+
|
59
73
|
def print_t(self, text):
|
60
74
|
print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {text}")
|
61
75
|
|
1
実行したいことを追記しました
title
CHANGED
File without changes
|
body
CHANGED
@@ -6,9 +6,11 @@
|
|
6
6
|
|
7
7
|
(asyncioを使っているとエラーメッセージが表示できず、なぜ止まっているか判断が難しいです。)
|
8
8
|
|
9
|
+
実行したいことは、save_dataが終わった後に、method2と新たなメッセージの受信を同時に始めたいです。
|
9
10
|
|
10
11
|
|
11
12
|
|
13
|
+
|
12
14
|
```python
|
13
15
|
class Command(BaseCommand):
|
14
16
|
|