前提・実現したいこと
XBee3 ZigBee 3.0とラズパイで通信をしています.
下の表のような感じです.
コーディネータ | ルータ | エンドデバイス |
---|---|---|
ラズパイ | Xbee3 | Xbee3 |
python | microPython | microPython |
今,自分が書いているプログラムでは中継先のアドレスを全て指定して書いています.
これを,エンドデバイスのプログラムではコーディネータを指定して情報を送るプログラムで統一して,逆にコーディネータからの送信も,あるエンドデバイスを指定して送るプログラムに修正したいです.
つまり,1対1で通信できない距離にあるコーディネータとエンドデバイスの通信でルータのアドレスを指定してリレーせず,ネットワークを取得して送信先のNIを指定して情報を送信できないのでしょうか.
また,可能な場合はルータのプログラミングは必要なのでしょうか.
例えば,microPythonではxbee.discover()が,pythonではxbee.network()でルータを介した先のXBeeのNIを拾えるのか.できないのなら,それに似た代替え案があるのでしょうか.
読んでも理解できませんでした.
現状では1機ごとに中継するルータのアドレスを入力しなければならない上,エンドデバイスの場合分けをしなければならないため複雑で不便です.
試していたタイミングではxbee間の距離が短く,手元に2台しかないため試すことができませんでした.
教えていただけるとありがたいです.よろしくお願いします.
あと完全に初歩的な話なのですが,microPythonのxbeeライブラリはdigiの解説ページのここにある分の関数しかは存在しないのでしょうか.
該当のソースコード
microPython
1#エンドデバイス側(現在の状態.アドレスで中継先を指定しているものです) 2import xbee 3from machine import Pin 4import time 5 6#TARGET_64BIT_ADDR = b'\x00\x13\xA2\x00\x41\xCC\x0D\x58' 7TARGET_64BIT_ADDR = b'\x00\x13\xA2\x00\x41\xCB\xF8\xAC' 8 9srl = '001' 10 11MESSAGE_MOTION = "mdt" #Motion detection 12MESSAGE_NO_MOTION = "ucd" #Unchanged 13 14INPUT_PIN_ID = "D1" 15 16SM = 0 17# 0で通常動作、1でスリープ 18 19before_state = 2 20 21print(" +-----------------------------+") 22print(" | send PIR Motion Sensor info |") 23print(" +-----------------------------+\n") 24 25xb = xbee.XBee() 26 27input_pin = Pin(INPUT_PIN_ID, Pin.IN, Pin.PULL_UP) 28 29while True: 30 if SM == 0: 31 if input_pin.value() == 0 and input_pin.value() != before_state: 32# print("Sending data to %s >> %s" % (''.join('{:02x}'.format(x).upper() for x in TARGET_64BIT_ADDR), 33# MESSAGE_NO_MOTION)) 34 try: 35 xbee.transmit(TARGET_64BIT_ADDR, srl + MESSAGE_NO_MOTION) 36 print("Data sent successfully") 37 except Exception as e: 38 print("Transmit failure:", str(e)) 39 40 elif input_pin.value() == 1 and input_pin.value() != before_state: 41# print("Sending data to %s >> %s" % (''.join('{:02x}'.format(x).upper() for x in TARGET_64BIT_ADDR), 42# MESSAGE_MOTION)) 43 try: 44 xbee.transmit(TARGET_64BIT_ADDR, srl + MESSAGE_MOTION) 45 print("Data sent successfully") 46 except Exception as e: 47 print("Transmit failure:", str(e)) 48 49 else: 50 pass 51 52 received_msg = xbee.receive() 53 if received_msg: 54 sender = received_msg['sender_eui64'] 55 payload = received_msg['payload'] 56 print("Data received from %s >> %s" % (''.join('{:02x}'.format(x).upper() for x in sender), 57 payload.decode())) 58 if str(payload.decode()) == 'sleep': 59 print("sleeping") 60 SM = 1 61 elif str(payload.decode()) == srl + MESSAGE_NO_MOTION: 62 print("Unchanged") 63 before_state = 0 64 elif str(payload.decode()) == srl + MESSAGE_MOTION: 65 print("Motion detection") 66 before_state = 1 67 elif str(payload.decode()) == "wakeUp": 68 print("wake up res") 69 try: 70 xbee.transmit(TARGET_64BIT_ADDR,srl + "wakeUp") 71 print("Data sent successfully") 72 except Exception as e: 73 print("Transmit failure:", str(e)) 74 75 else: # SM==1のとき 76 # i = 0 77 print("sleep mode") 78 sleep_ms = xb.sleep_now(30000, True) 79 80 t = time.time() 81 while True: 82 to = time.time() 83 received_msg = xbee.receive() 84 if received_msg: 85 payload = received_msg['payload'] 86 if str(payload.decode()) == 'wakeUp': 87 print("wakeUp") 88 try: 89 xbee.transmit(TARGET_64BIT_ADDR, srl + "wakeUp") 90 print("Data sent successfully") 91 except Exception as e: 92 print("Transmit failure:", str(e)) 93 SM = 0 94 break 95 if (to - t) >= 5: 96 break 97 98 time.sleep(1)
microPython
1#エンドデバイス側(例えば現在考えているもの修正かつ簡易的なものです.digi公式のgit hubのサンプルをまねて上のプログラムのトランスミット部分をNIで修正しようと考えています.これで動くのでしょうか...) 2import xbee 3 4TARGET_NODE_ID = "coordinator" 5 6srl = "001" #NIと同じものを.get_paraできたらしたい 7 8MESSAGE_MOTION = "mdt" #Motion detection 9 10INPUT_PIN_ID = "D1" 11 12def find_device(node_id): 13 for dev in xbee.discover(): 14 if dev['node_id'] == node_id: 15 return dev 16 return None 17 18device = find_device(TARGET_NODE_ID) 19if not device: 20 print("Could not find the device with node identifier '%s'" % TARGET_NODE_ID) 21else: 22 addr16 = device['sender_nwk'] 23 addr64 = device['sender_eui64'] 24 25 print("Sending data to %s >> %s" % (TARGET_NODE_ID, MESSAGE_MOTION)) 26 27 try: 28 xbee.transmit(addr16 if addr16 else addr64, MESSAGE) 29 print("Data sent successfully") 30 except Exception as e: 31 print("Transmit failure: %s" % str(e)) 32
Python
1#コーディネータ側 2from digi.xbee.devices import XBeeDevice 3import im_wireless as imw 4 5PORT = "COM3" 6 7BAUD_RATE = 115200 8 9device = XBeeDevice(PORT,BAUD_RATE) 10 11SLAVE_ADR = 0x30 12 13SM = 0 #1のときsleep,0のとき通常動作 14 15#中略 16 17def sleepBC(): 18# print("sleep now") 19 DATA_TO_SEND = 'sleep' 20 global SM 21 while True: 22 try: 23 device.send_data_broadcast(DATA_TO_SEND) 24 if SM == 0: 25 break 26 finally: 27 pass 28 29def wakeupBC(): 30 global SM 31 senL = ["0013A20041CC0D58"]#全エンドデバイスが要素 32 DATA_TO_SEND = 'wakeUp' 33 34 while len(senL) != 0: 35# try: 36 device.send_data_broadcast(DATA_TO_SEND) 37 38 xbee_message = device.read_data() 39 addr = xbee_message.remote_device.get_64bit_addr() 40 mes = xbee_message.data.decode() 41 if mes == 'wakeUp': 42 try: 43 senL.remove(str(addr)) 44 except ValueError: 45 pass 46# device.send_data(addr,'wakeUpRe') 47 SM = 0 48 49def main(): 50 print(" +---------------------+") 51 print(" | IM920sL and XBee R3 |") 52 print(" +---------------------+\n") 53 54 iwc = imw.IMWireClass(SLAVE_ADR) 55 56 global SM 57 58 try: 59 device.open() 60 61 device.flush_queues() 62 63 print("Waiting for data...\n") 64 65 while True: 66 67 recIM920() 68 xbee_message = device.read_data() 69 if xbee_message is not None: 70 71 xbee_network = device.get_network() 72 73 print("From %s >> %s" % (xbee_message.remote_device.get_64bit_addr(), 74 xbee_message.data.decode())) 75 addr = xbee_message.remote_device.get_64bit_addr() 76 mes = xbee_message.data.decode() 77 78 remote_device = xbee_network.discover_device(mes[:3]) 79 if remote_device is None: 80 print("Could not find the remote device") 81 else: 82 device.send_data(remote_device,mes) 83 print("Success") 84 85 sendIM(mes,str(addr)) 86 87 88 if SM == 1: 89 sleepBC() 90 elif SM == 2: 91 wakeupBC() 92 finally: 93 if device is not None and device.is_open(): 94 device.close() 95 96 97if __name__ == '__main__': 98 main()
試したこと
いろいろ書き換えて試しています.
現在,ルータが突然動かなくなったため困っています.
補足情報(FW/ツールのバージョンなど)
Python 3.9
XBee Python Library v1.4.1.
Digi XBee 2.0.1
回答2件
あなたの回答
tips
プレビュー
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。