以下のとおり、djangoのカスタムコマンドでasyncioを使ってwebsocketのデータを加工しています。
なお、元のソースコードはものすごく長いので、省略しています。
デバッグで追ったところ、method3のreturnまではいけたのですが、method2のreturn out_layerで止まってしまいます。
どなたかアドバイスをいただけないでしょうか?
(asyncioを使っているとエラーメッセージが表示できず、なぜ止まっているか判断が難しいです。)
実行したいことは、save_dataが終わった後に、method2と新たなメッセージの受信を同時に始めたいです。
python
1class Command(BaseCommand): 2 3 def handle(self, *args, **kwargs): 4 5 """ループの開始""" 6 loop = asyncio.get_event_loop() 7 loop.create_task(self.stream()) 8 print('start receiving') 9 try: 10 loop.run_forever() 11 except KeyboardInterrupt: 12 exit() 13 14 async def stream(self): 15 uri = 'ws://localhost:1234/' 16 17 async with websockets.connect(uri, ping_interval=None) as ws: 18 while not ws.closed: 19 response = await ws.recv() 20 content = json.loads(response) 21 task = loop.create_task(self.save_pushdata(content, sid)) 22 task.add_done_callback(self.finish) 23 24 async def save_pushdata(self, content, sid): 25 loop = asyncio.get_event_loop() 26 newData = list( 27 symbol=content['****'], 28 ) 29 newData.save() 30 task = loop.create_task(self.method2(content, sid)) 31 task.add_done_callback(self.finish) 32 save_pushdata_sid = str(sid) + 'save_pushdata' 33 return save_pushdata_sid 34 35 async def method2(self, content): 36 37 args1 = self.method4(content) 38 args2 = self.method5(content) 39 self.method3(args1, args2) 40 method2_sid = str(sid) + 'method2' 41 return method2_sid 42 43 def method3(self, content): 44 print('123') 45 return content 46 47 def method4(self, content): 48 newData = list( 49 symbol=content['****'], 50 ) 51 newData.save() 52 return content['****'] 53 54 def method5(self, content): 55 newData = list( 56 symbol=content['****'], 57 ) 58 newData.save() 59 return content['****'] 60 61 def finish(self, text): 62 logging.info('%s', text.result()) 63
python
1 2 def handle(self, *args, **kwargs): 3 4 """ループの開始""" 5 loop = asyncio.get_event_loop() 6 loop.create_task(self.stream()) 7 print('start receiving') 8 try: 9 loop.run_forever() 10 except KeyboardInterrupt: 11 exit() 12 13 async def stream(self): 14 uri = 'ws://localhost:1234/' 15 16 async with websockets.connect(uri, ping_interval=None) as ws: 17 while not ws.closed: 18 response = await ws.recv() 19 content = json.loads(response) 20 21 executor = concurrent.futures.ProcessPoolExecutor() 22 queue = asyncio.Queue() 23 24 for i in range(10): 25 queue.put_nowait(i) 26 27 async def proc(q): 28 while not q.empty(): 29 i = await q.get() 30 future = loop.run_in_executor(executor, save_pushdata(content, sid), i) 31 await future 32 33 tasks = [proc(queue) for i in range(4)] # 4 = number of cpu core 34 return await asyncio.wait(tasks) 35 36def save_pushdata(self, content, sid): 37 loop = asyncio.get_event_loop() 38 newData = list( 39 symbol=content['****'], 40 ) 41 newData.save() 42 self.method2(content, sid)
回答1件
あなたの回答
tips
プレビュー
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
2020/12/29 01:40
2020/12/29 03:02
2020/12/29 03:24
2020/12/29 03:37
2020/12/29 04:24
2020/12/29 04:28
2020/12/29 04:50
2020/12/29 04:58 編集
2020/12/29 06:40
2020/12/29 06:42
2020/12/29 10:35
2020/12/29 11:04 編集
2020/12/29 12:58
2020/12/29 13:35
2020/12/29 14:22
2020/12/29 18:38 編集
2020/12/29 17:56 編集
2020/12/30 01:42