センサーからデータを取り出すための関数が7つあり、これらを直列に実行する場合と並列に処理する場合の実行時間を比較したプログラムを作成しております。各関数の実行時間を1秒としたとき、直列に実行すれば7秒ほどの実行時間がかかりますが、並列処理にすることで2秒ほどに短縮できます。一方で、各関数の実行時間を0.005秒とすると、並列化の恩恵を見込めず、直列処理の実行時間とほぼ同じになりました。
理想的には、並列化をすることで各関数の実行時間ほどに効率化ができればと思ったのですが、目論見は失敗しました。
並列化のために使用しているライブラリはconcurrent.futuresのThreadPoolExecutorを使用しています。
あくまで仮説ですが、PythonにはGILの制約があるため、同時に実行されるスレッドが一つに制約されるので、一つのスレッドを呼び出すための時間 > 各関数の実行時間の関係が成り立つ場合、いくら並列化をしても各関数の実行時間よりも早くなることは無いのではないかと考えています。
(GILロックを回避した並列処理の実装が必要?)
上記に関して改善策や知見などありましたらご教授いただきたいです。
下記は実験に使用したコードになります。よろしくお願いいたします。
python
1 2import time 3from time import perf_counter 4 5import numpy as np 6from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor 7 8 9def wait_process(wait_sec): 10 until = perf_counter() + wait_sec 11 while perf_counter() < until: 12 pass 13 return 14 15def get_euler(waittime): 16 wait_process(waittime) 17 x, y, z = 1, 1, 1 18 return x, y, z 19 20def get_gyro(waittime): 21 wait_process(waittime) 22 x,y,z = 2,2,2 23 return x,y,z 24 25def get_accel(waittime): 26 wait_process(waittime) 27 x, y, z = 10, 10, 10 28 return x, y, z 29 30def get_liner_acc(waittime): 31 wait_process(waittime) 32 x, y, z = 1, 1, 1 33 return x, y, z 34 35def get_gravity(waittime): 36 wait_process(waittime) 37 x, y, z = 1, 1, 1 38 return x, y, z 39 40def get_quaternion(waittime): 41 wait_process(waittime) 42 w, x, y, z = 1, 1, 1, 1 43 return w, x, y, z 44 45def get_calib_status(waittime): 46 wait_process(waittime) 47 w, x, y, z = 3, 3, 3, 3 48 return w, x, y, z 49 50 51 52 53def single_process(waittime): 54 start = time.time() 55 get_gyro(waittime) 56 end = time.time() 57 print('elepsed_time:{0}[s]'.format(end-start)) 58 59def stream_process(waittime = 0.005): 60 start = time.time() 61 ex, ey, ez = get_euler(waittime) 62 gx, gy, gz = get_gravity(waittime) 63 ax, ay, az = get_accel(waittime) 64 lax, lay, laz = get_liner_acc(waittime) 65 rx, ry, rz = get_gyro(waittime) 66 qw, qx, qy, qz = get_quaternion(waittime) 67 a, b ,c, d = get_calib_status(waittime) 68 end = time.time() 69 print('elepsed_time:{0}[s]'.format(end-start)) 70 return ex, ey, ez, gx, gy, gz, ax, ay, az, lax, lay, laz, rx, ry, rz, qw, qx, qy, qz, a, b, c, d 71 72def get_all_data(sensor, waittime=0.005): 73 74 75 if sensor == 'euler': 76 x, y, z = get_euler(waittime) 77 return x, y, z 78 79 elif sensor == 'gyro': 80 x, y, z = get_gyro(waittime) 81 return x,y,z 82 83 elif sensor == 'linear_acc': 84 x,y,z=get_liner_acc(waittime) 85 return x,y,z 86 87 elif sensor == 'gravity': 88 x, y, z = get_gravity(waittime) 89 return x,y,z 90 91 elif sensor == 'acceleration': 92 x, y, z = get_accel(waittime) 93 return x, y, z 94 95 elif sensor == 'quaternion': 96 w, x, y, z = get_quaternion(waittime) 97 return w, x, y, z 98 elif sensor == 'calib_status': 99 a, b, c, d = get_calib_status(waittime) 100 return a, b, c, d 101 else: 102 pass 103 104 105 106def pararel_process(n): 107 start = time.time() 108 sensors = ['euler', 'gyro', 'gravity', 'acceleration', 'linear_acc', 'quaternion', 'calib_status'] 109 result = [] 110 111 112 with ThreadPoolExecutor(max_workers=n) as e: 113 ret = e.map(get_all_data, sensors, chunksize=1) 114 sms_multi = [r for r in ret] 115 for res in sms_multi: 116 for i in range(len(res)): 117 result.append(res[i]) 118 119 end = time.time() 120 print('elepsed_time:{0}[s]'.format(end - start)) 121 122 123if __name__ == '__main__': 124 125 waittime = 0.005# 実行時間の仮定 126 worker = 7 127 single_process(waittime)# シングル 128 stream_process(waittime)# 直列処理 129 pararel_process(n=worker) # 並列処理 130 131