質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

89.13%

センサデバイスとラズパイをBLE通信することが出来ない。

受付中

回答 2

投稿

  • 評価
  • クリップ 0
  • VIEW 467

taka77

score 4

前提・実現したいこと

センサデータをBLE通信でラズパイで受け取り、クラウド環境にあげたいです。
このサイトを参考に
ラズパイのターミナルでサイトにあるようにpythonのコードを動かそうとするとエラーメッセージや受信出来ていないメッセージ?が発生して前へ進めません。

発生している問題・エラーメッセージ

1.sudoをつけてプログラム実行した場合

pi@raspberrypi:~ $ sudo python env2ambientCS.py

Traceback (most recent call last):
  File "env2ambientCS.py", line 6, in <module>
    from bluepy.btle import Peripheral, DefaultDelegate, Scanner, BTLEException, UUID
ImportError: No module named bluepy.btle


2.sudoをつけないでプログラムを実行した場合

pi@raspberrypi:~ $ env2ambientCS.py

BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.
BTLE Exception while scannning.

該当のソースコード

# -*- coding: utf-8 -*-
# 環境センサーをスキャンし、見つけたら300秒ごとにconnectして
# 最新データー(latest data)を取得し、Ambientに送信する
# 1台(Single)のセンサー端末に対応

from bluepy.btle import Peripheral, DefaultDelegate, Scanner, BTLEException, UUID
import bluepy.btle
from threading import Thread, Timer
import time
import sys
import struct
import argparse
import ambient

channelID = 100
writeKey = 'writekey'

def _OMRON_UUID(val):
    return UUID('%08X-7700-46F4-AA96-D5E974E32A54' % (0x0C4C0000 + val))

def _MICROBIT_UUID(val):
    return UUID('%08X-251D-470A-A062-FA1922DFA9A8' % (0xE95D0000 + val))


devs = {
    'omron':    {'desc': 'Short Local Name',    'value': 'Env',           'match': 'exact',   'uuid': _OMRON_UUID(0x3001)},
    'microbit': {'desc': 'Complete Local Name', 'value': 'BBC micro:bit', 'match': 'forward', 'uuid': _MICROBIT_UUID(0x9250)},
    'esp32':    {'desc': 'Complete Local Name', 'value': 'AmbientEnv-01', 'match': 'exact',   'uuid': 'b0c8c0fa-6d46-11e8-adc0-fa7ae01bbebc'}
}
target = 'esp32'

Debugging = False
def DBG(*args):
    if Debugging:
        msg = " ".join([str(a) for a in args])
        print(msg)
        sys.stdout.flush()

Verbose = True
def MSG(*args):
    if Verbose:
        msg = " ".join([str(a) for a in args])
        print(msg)
        sys.stdout.flush()

def timeoutRetry(addr):
    MSG('timer expired (%s)' % addr)
    devThread = scannedDevs[addr]
    devThread.forceDisconnect()
    MSG('Thread disconnected (%s)' % addr)

def send2ambient(am, dataRow):
    if target == 'esp32':
        (seq, temp, humid, press) = struct.unpack('<Bhhh', dataRow)
        MSG(seq, temp / 100, humid / 100, press / 10)
        ret = am.send({'d1': temp / 100, 'd2': humid / 100, 'd3': press / 10})
    elif target == 'microbit':
        temp = struct.unpack('<b', dataRow)
        print({'d1': temp[0]})
        ret = am.send({'d1': temp[0]})
    else:
        (seq, temp, humid, light, uv, press, noise, discom, heat, batt) = struct.unpack('<BhhhhhhhhH', dataRow)
        MSG(seq, temp / 100, humid / 100, light, uv / 100, press / 10, noise / 100, discom / 100, heat / 100, batt / 1000)
        ret = am.send({'d1': temp / 100, 'd2': humid / 100, 'd3': press / 10, 'd4': batt / 1000, 'd5': light, 'd6': noise / 100})
    MSG('sent to Ambient (ret = %d)' % ret.status_code)

class EnvSensor(Thread, Peripheral):
    def __init__(self, dev):
        Peripheral.__init__(self)
        Thread.__init__(self)
        self.setDaemon(True)
        self.dev = dev
        self.isConnected = False
        self.am = ambient.Ambient(channelID, writeKey)

    def run(self):
        while True:
            t = Timer(30, timeoutRetry, [self.dev.addr])
            t.start()
            while self.isConnected == False:  # つながるまでconnectする
                try:
                    self.connect(self.dev)
                    self.isConnected = True
                except BTLEException as e:
                    MSG('BTLE Exception while connect on ', self.dev.addr)
                    MSG('type:' + str(type(e)))
                    MSG('args:' + str(e.args))
                    # pass
            MSG('connected to ', self.dev.addr)
            try:
                latestDataRow = self.getCharacteristics(uuid=devs[target]['uuid'])[0]
                dataRow = latestDataRow.read()
                send2ambient(self.am, dataRow)
                t.cancel()
                time.sleep(300.0)
            except BTLEException as e:
                MSG('BTLE Exception while getCharacteristics on ', self.dev.addr)
                MSG('type:' + str(type(e)))
                MSG('args:' + str(e.args))
                self.disconnect()
                self.isConnected = False
                t.cancel()

    def forceDisconnect(self):
        if self.isConnected:
            self.disconnect()
        self.isConnected = False

scannedDevs = {}

class ScanDelegate(DefaultDelegate):
    def __init__(self):
        DefaultDelegate.__init__(self)

    def handleDiscovery(self, dev, isNewDev, isNewData):
        if isNewDev:
            for (adtype, desc, value) in dev.getScanData():  # スキャンデーターを調べる
                if devs[target]['match'] == 'exact' and desc == devs[target]['desc'] and value == devs[target]['value'] \
                or devs[target]['match'] == 'forward' and desc == devs[target]['desc'] and value.startswith(devs[target]['value']):  # 対象を見つけたら
                    if dev.addr in scannedDevs.keys():  # すでに見つけていたらスキップ
                        return
                    MSG('New %s %s' % (value, dev.addr))
                    devThread = EnvSensor(dev)  # EnvSensorクラスのインスタンスを生成
                    scannedDevs[dev.addr] = devThread
                    devThread.start()  # スレッドを起動

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-d',action='store_true', help='debug msg on')
    parser.add_argument('-o',action='store_true', help='device is omron env sensor')
    parser.add_argument('-b',action='store_true', help='device is BBC micro:bit')

    args = parser.parse_args(sys.argv[1:])

    global Debugging
    Debugging = args.d
    bluepy.btle.Debugging = args.d

    global target
    if args.o:
        target = 'omron'
    elif args.b:
        target = 'microbit'

    scanner = Scanner().withDelegate(ScanDelegate())
    while True:
        try:
            scanner.scan(5.0) # スキャンする。デバイスを見つけた後の処理はScanDelegateに任せる
        except BTLEException:
            MSG('BTLE Exception while scannning.')

if __name__ == "__main__":
    main()

試したこと

python3でプログラムを実行したが同様の現象が起きた。

補足情報(FW/ツールのバージョンなど)

raspberryPi 3b+
python2.7.16
python3.7.3

  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 過去に投稿した質問と同じ内容の質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

回答 2

0

ImportError: No module named bluepy.btle
というエラーが出るのは、Python用ライブラリ bluepy のインストールがうまく出来ていないからだと思います。

bluepyモジュールのインストールに関しては、参考にされたサイトからリンクが張られているRaspberryPiにpyenvを導入の記事や、Raspberry Pi で bluepy を使ってmicro:bitをコントロールの記事などが役立つのではないかと思います。

BTLEモジュールへのアクセスにはroot権限が必要です。
root権限なしに(sudoをつけないで)実行した場合に、BTLEの例外が発生するのは、アクセス権限の無いBTLEモジュールを使ってscanなどの操作を行おうとするからでしょう。

投稿

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

  • 2020/01/31 14:21 編集

    回答ありがとうございます。
    上記サイトを参考にインストールを見直した後、プログラムを実行すると
    pi@raspberrypi:~ $ sudo python env2ambientCS.py

    File "env2ambientCS.py", line 13, in <module>
    import ambient
    ImportError: No module named ambient

    と表示されるようになってしまいました。サイトを参考にambientのモジュールはインストールされているはずなのになぜこのようなエラーが起こるのか分かりません。
    原因わかる方教えていただきたいです。

    キャンセル

0

Pythonを使う場合、最低限どこにライブラリをインストールしたのかは自分で把握していないと手がつけられなくなることがあります。
(これはどう考えてもPythonの欠点ではありません。そもそもPythonの外側の話だし、それすらUNIX系システムに関する基本的な知識があれば何も問題にならないのですから。しかしPythonでは複数の処理系が単一システム上で共存させられることが多いので、この問題がよく発生します)

たとえば(この段落の小文字は実行ファイルと思ってください)pythonもpipも複数あって、ユーザのPATHでどれが動くか変わるので一般ユーザとスーパーユーザで動くpythonやらpipやらが変わる、というような場合、何も把握していないとどうしようもないと思います。

pyenvのような仮想環境を導入すると更に状況が複雑になり、わけがわからなくなるでしょう。


たとえばpip showなどを使うとライブラリのインストールされたディレクトリを確認できます。またlinuxではwhichコマンドで実行ファイルの在り処を確認できます。まずはこれらを使用して、どのpythonにどれ何が入っているのかを把握するのが先決です。

特定のpythonで実行したい場合は、絶対パスで実行ファイルを指定するのが一番確実です。shebangに書いておけばいいのではないでしょうか。あるいは、環境次第では仮想環境をactivateすれば仮想環境のpythonが使えるとか、特定のコマンドで使い分けることが可能になっている(たとえば同一バージョンが重複して入っていない限りはpython3.6とかpython2.7で使い分けられる)ならそうするといった工夫もありえます。

投稿

編集

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

  • 2020/02/14 15:43

    回答ありがとうございます。
    上記のことを理解するレベルに達していないプログラム素人なので具体的に何をやればいいかよく分かりません。まず何を確認すれば良いのでしょうか。
    そしてどういう場合はこうしなければということあれば教えて下さい。

    キャンセル

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 89.13%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる