前提・実現したいこと
センサデータをble通信でラズパイで受け取ろうと考えています。
このサイトを参考に
ラズベリーパイのターミナルでsudoをつけてpythonのコードを動かそうとするとエラーになる。
sudoをつけずに実行するとうまくいきます。
何がいけないのでしょうか?
とりあえずはラズパイでBLEデータを受け取れることが確認したいのですが、、、
発生している問題・エラーメッセージ
pi@raspberrypi:~ $ sudo python 123.py Traceback (most recent call last): File "123.py", line 7, in <module> from bluepy.btle import Peripheral, DefaultDelegate, Scanner, BTLEException, UUID ImportError: No module named bluepy.btle
pi@raspberrypi:~ $ python 123.py esp32 BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning. BTLE Exception while scannning.
該当のソースコード
python
1# -*- coding: shift-jis -*- 2# 環境センサーをLimited Broadcasterモードにして 3# 10秒アドバタイズ、290秒休み(開発中は50秒休み)に設定 4# 常時スキャンし、データーを取得したらAmbientに送信する 5# 1台(Single)のセンサー端末に対応 6 7from bluepy.btle import Peripheral, DefaultDelegate, Scanner, BTLEException, UUID 8import bluepy.btle 9import sys 10import struct 11from datetime import datetime 12import argparse 13import ambient 14import requests 15import time 16 17channelID = 100 18writeKey = 'writeKey' 19am = ambient.Ambient(channelID, writeKey) 20 21devs = { 22 'omron': {'companyID': 'd502'}, 23 'esp32': {'companyID': 'ffff'} 24} 25target = 'esp32' 26 27Debugging = False 28def DBG(*args): 29 if Debugging: 30 msg = " ".join([str(a) for a in args]) 31 print(msg) 32 sys.stdout.flush() 33 34Verbose = True 35def MSG(*args): 36 if Verbose: 37 msg = " ".join([str(a) for a in args]) 38 print(msg) 39 sys.stdout.flush() 40 41def sendWithRetry(data): 42 for retry in range(6): # 10秒間隔で6回リトライして、ダメならこの回は送信しない 43 try: 44 ret = am.send(data) 45 MSG('sent to Ambient (ret = %d)' % ret.status_code) 46 break 47 except requests.exceptions.RequestException as e: 48 MSG('request failed.') 49 time.sleep(10) 50 51def send2ambient(dataRow): 52 if companyID == 'ffff': 53 (temp, humid, press) = struct.unpack('<hhh', bytes.fromhex(dataRow)) 54 MSG(temp / 100, humid / 100, press / 10) 55 sendWithRetry({'d1': temp / 100, 'd2': humid / 100, 'd3': press / 10}) 56 else: 57 (temp, humid, light, uv, press, noise, accelX, accelY, accelZ, batt) = struct.unpack('<hhhhhhhhhB', bytes.fromhex(dataRow)) 58 MSG(temp / 100, humid / 100, light, uv / 100, press / 10, noise / 100, (batt + 100) / 100) 59 sendWithRetry({'d1': temp / 100, 'd2': humid / 100, 'd3': press / 10, 'd4': (batt + 100) / 100, 'd5': light, 'd6': noise / 100}) 60 61class ScanDelegate(DefaultDelegate): 62 def __init__(self): 63 DefaultDelegate.__init__(self) 64 self.lastseq = None 65 self.lasttime = datetime.fromtimestamp(0) 66 67 def handleDiscovery(self, dev, isNewDev, isNewData): 68 if isNewDev or isNewData: 69 for (adtype, desc, value) in dev.getScanData(): 70 # print(adtype, desc, value) 71 if target in ('omron', 'esp32'): 72 if desc == 'Manufacturer' and value[0:4] == devs[target]['companyID']: 73 delta = datetime.now() - self.lasttime 74 if value[4:6] != self.lastseq and delta.total_seconds() > 11: # アドバタイズする10秒の間に測定が実行されseqが加算されたものは捨てる 75 self.lastseq = value[4:6] 76 self.lasttime = datetime.now() 77 send2ambient(value[6:]) 78 elif target in ('microbit', 'microbit+BME280'): 79 if desc == '16b Service Data' and value[0:4] == 'aafe' and value[8:28] == '000000000000616d6269': 80 print(value) 81 seq = (int(value[32:33], 16) & 0xC) >> 2 82 print(seq) 83 if seq != self.lastseq: 84 self.lastseq = seq 85 if target == 'microbit': 86 temp = int(value[38:40], 16) 87 print('Micro:bit %d' % temp) 88 sendWithRetry({'d1': temp}) 89 elif target == 'microbit+BME280': 90 temp = (int(value[37:40], 16) & 0x3FF) / 10 91 humid = ((int(value[35:38], 16) & 0xFFC) >> 2) / 10 92 press = (int(value[32:35], 16) & 0x3FF) + 400 93 print('Micro:bit t: %f, h: %f, p: %f' % (temp, humid, press)) 94 sendWithRetry({'d1': temp, 'd2': humid, 'd3': press}) 95 96def main(): 97 parser = argparse.ArgumentParser() 98 parser.add_argument('-d',action='store_true', help='debug msg on') 99 parser.add_argument('-o',action='store_true', help='device is omron env sensor') 100 parser.add_argument('-b',action='store_true', help='device is BBC micro:bit') 101 parser.add_argument('-bb',action='store_true', help='device is BBC micro:bit + BME280') 102 103 args = parser.parse_args(sys.argv[1:]) 104 105 global Debugging 106 Debugging = args.d 107 bluepy.btle.Debugging = args.d 108 109 global target 110 if args.o: 111 target = 'omron' 112 elif args.b: 113 target = 'microbit' 114 elif args.bb: 115 target = 'microbit+BME280' 116 print(target) 117 118 scanner = Scanner().withDelegate(ScanDelegate()) 119 while True: 120 try: 121 scanner.scan(5.0) # スキャンする。デバイスを見つけた後の処理はScanDelegateに任せる 122 except BTLEException: 123 MSG('BTLE Exception while scannning.') 124 125if __name__ == "__main__": 126main()
試したこと
sudo をつけることでBLEデバイスにアクセスできるようになるらしいです。
補足
回答者から聞かれたことを試してみました。
回答1件
あなたの回答
tips
プレビュー