teratail header banner
teratail header banner
質問するログイン新規登録

質問編集履歴

5

run_in_executorのバージョンを作ってみました。

2020/12/29 14:21

投稿

taro_yamada
taro_yamada

スコア55

title CHANGED
File without changes
body CHANGED
@@ -73,4 +73,49 @@
73
73
  def finish(self, text):
74
74
  logging.info('%s', text.result())
75
75
 
76
+ ```
77
+
78
+ ```python
79
+
80
+ def handle(self, *args, **kwargs):
81
+
82
+ """ループの開始"""
83
+ loop = asyncio.get_event_loop()
84
+ loop.create_task(self.stream())
85
+ print('start receiving')
86
+ try:
87
+ loop.run_forever()
88
+ except KeyboardInterrupt:
89
+ exit()
90
+
91
+ async def stream(self):
92
+ uri = 'ws://localhost:1234/'
93
+
94
+ async with websockets.connect(uri, ping_interval=None) as ws:
95
+ while not ws.closed:
96
+ response = await ws.recv()
97
+ content = json.loads(response)
98
+
99
+ executor = concurrent.futures.ProcessPoolExecutor()
100
+ queue = asyncio.Queue()
101
+
102
+ for i in range(10):
103
+ queue.put_nowait(i)
104
+
105
+ async def proc(q):
106
+ while not q.empty():
107
+ i = await q.get()
108
+ future = loop.run_in_executor(executor, save_pushdata(content, sid), i)
109
+ await future
110
+
111
+ tasks = [proc(queue) for i in range(4)] # 4 = number of cpu core
112
+ return await asyncio.wait(tasks)
113
+
114
+ def save_pushdata(self, content, sid):
115
+ loop = asyncio.get_event_loop()
116
+ newData = list(
117
+ symbol=content['****'],
118
+ )
119
+ newData.save()
120
+ self.method2(content, sid)
76
121
  ```

4

動いたソースに直しました

2020/12/29 14:21

投稿

taro_yamada
taro_yamada

スコア55

title CHANGED
File without changes
body CHANGED
@@ -9,8 +9,6 @@
9
9
  実行したいことは、save_dataが終わった後に、method2と新たなメッセージの受信を同時に始めたいです。
10
10
 
11
11
 
12
-
13
-
14
12
  ```python
15
13
  class Command(BaseCommand):
16
14
 
@@ -32,27 +30,28 @@
32
30
  while not ws.closed:
33
31
  response = await ws.recv()
34
32
  content = json.loads(response)
35
- task = loop.create_task(self.save_pushdata(content))
33
+ task = loop.create_task(self.save_pushdata(content, sid))
36
- task.add_done_callback(self.finish('123'))
34
+ task.add_done_callback(self.finish)
37
35
 
38
- async def save_data(self, content):
36
+ async def save_pushdata(self, content, sid):
39
37
  loop = asyncio.get_event_loop()
40
38
  newData = list(
41
39
  symbol=content['****'],
42
40
  )
43
41
  newData.save()
44
- task = loop.create_task(self.method2(content))
42
+ task = loop.create_task(self.method2(content, sid))
45
- task.add_done_callback(self.finish('test'))
43
+ task.add_done_callback(self.finish)
44
+ save_pushdata_sid = str(sid) + 'save_pushdata'
46
- return
45
+ return save_pushdata_sid
47
46
 
48
47
  async def method2(self, content):
49
48
 
50
49
  args1 = self.method4(content)
51
50
  args2 = self.method5(content)
52
51
  self.method3(args1, args2)
52
+ method2_sid = str(sid) + 'method2'
53
+ return method2_sid
53
54
 
54
- return out_layer
55
-
56
55
  def method3(self, content):
57
56
  print('123')
58
57
  return content
@@ -71,8 +70,7 @@
71
70
  newData.save()
72
71
  return content['****']
73
72
 
74
-
75
73
  def finish(self, text):
76
- print(text)
74
+ logging.info('%s', text.result())
77
75
 
78
76
  ```

3

save_pushdataをtaskを返すようにしました。また、finish()を少し修正しました。

2020/12/29 06:30

投稿

taro_yamada
taro_yamada

スコア55

title CHANGED
File without changes
body CHANGED
@@ -32,7 +32,8 @@
32
32
  while not ws.closed:
33
33
  response = await ws.recv()
34
34
  content = json.loads(response)
35
- await self.save_pushdata(content)
35
+ task = loop.create_task(self.save_pushdata(content))
36
+ task.add_done_callback(self.finish('123'))
36
37
 
37
38
  async def save_data(self, content):
38
39
  loop = asyncio.get_event_loop()
@@ -70,11 +71,8 @@
70
71
  newData.save()
71
72
  return content['****']
72
73
 
73
- def print_t(self, text):
74
- print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {text}")
75
74
 
76
-
77
75
  def finish(self, text):
78
- self.print(text)
76
+ print(text)
79
77
 
80
78
  ```

2

method4とmethod5を追加しました。

2020/12/29 03:20

投稿

taro_yamada
taro_yamada

スコア55

title CHANGED
File without changes
body CHANGED
@@ -56,6 +56,20 @@
56
56
  print('123')
57
57
  return content
58
58
 
59
+ def method4(self, content):
60
+ newData = list(
61
+ symbol=content['****'],
62
+ )
63
+ newData.save()
64
+ return content['****']
65
+
66
+ def method5(self, content):
67
+ newData = list(
68
+ symbol=content['****'],
69
+ )
70
+ newData.save()
71
+ return content['****']
72
+
59
73
  def print_t(self, text):
60
74
  print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {text}")
61
75
 

1

実行したいことを追記しました

2020/12/29 01:43

投稿

taro_yamada
taro_yamada

スコア55

title CHANGED
File without changes
body CHANGED
@@ -6,9 +6,11 @@
6
6
 
7
7
  (asyncioを使っているとエラーメッセージが表示できず、なぜ止まっているか判断が難しいです。)
8
8
 
9
+ 実行したいことは、save_dataが終わった後に、method2と新たなメッセージの受信を同時に始めたいです。
9
10
 
10
11
 
11
12
 
13
+
12
14
  ```python
13
15
  class Command(BaseCommand):
14
16