質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

90.00%

milkcocoaにて[Errno 32] Broken pipeエラーが発生する。

解決済

回答 1

投稿 編集

  • 評価
  • クリップ 1
  • VIEW 7,196

usuallyi

score 36

raspberry pi3にて、
温度、湿度、気圧を計測し、
milkcocoaにpushするプログラムを作成し、
crontabにて5ヶ月ほど運用してきましたが、
ここ数日、安定して計測できなくなり、
直接コマンドを入力した所、
数回に一度、[Errno 32] Broken pipeと表示され、
milkcocoaにpushできていません。
原因はなんでしょうか。

どうやら、以下のプログラムでエラーになっているようです。
def on_push(e):
print(e)

エラー場所1
push_start
on_push_end
temp_end
on_push_end
humid_end
[Errno 32] Broken pipe
on_push_end
pressure_end

エラー場所2
push_start
on_push_end
temp_end
[Errno 32] Broken pipe
on_push_end
humid_end
on_push_end
pressure_end

#coding: utf-8

import sys
import smbus
import time
import datetime
import milkcocoa.milkcocoa as milkcocoa

milkcocoaClient = milkcocoa.Milkcocoa.connectWithApiKey("****", "****", "***", useSSL=False,blocking=True)

temp = milkcocoaClient.datastore("temp")
humid = milkcocoaClient.datastore("humid")
pressure = milkcocoaClient.datastore("pressure")

bus_number  = 1
i2c_address = 0x76

bus = smbus.SMBus(bus_number)

digT = []
digP = []
digH = []

t_fine = 0.0

sensor_data = {'temp':'0.0', 'pressure':'0.0','humidity':'0.0'}

def on_push(e):
    print(e)

def writeReg(reg_address, data):
    bus.write_byte_data(i2c_address,reg_address,data)

def get_calib_param():
    calib = []

    for i in range (0x88,0x88+24):
        calib.append(bus.read_byte_data(i2c_address,i))
    calib.append(bus.read_byte_data(i2c_address,0xA1))
    for i in range (0xE1,0xE1+7):
        calib.append(bus.read_byte_data(i2c_address,i))

    digT.append((calib[1] << 8) | calib[0])
    digT.append((calib[3] << 8) | calib[2])
    digT.append((calib[5] << 8) | calib[4])
    digP.append((calib[7] << 8) | calib[6])
    digP.append((calib[9] << 8) | calib[8])
    digP.append((calib[11]<< 8) | calib[10])
    digP.append((calib[13]<< 8) | calib[12])
    digP.append((calib[15]<< 8) | calib[14])
    digP.append((calib[17]<< 8) | calib[16])
    digP.append((calib[19]<< 8) | calib[18])
    digP.append((calib[21]<< 8) | calib[20])
    digP.append((calib[23]<< 8) | calib[22])
    digH.append( calib[24] )
    digH.append((calib[26]<< 8) | calib[25])
    digH.append( calib[27] )
    digH.append((calib[28]<< 4) | (0x0F & calib[29]))
    digH.append((calib[30]<< 4) | ((calib[29] >> 4) & 0x0F))
    digH.append( calib[31] )

    for i in range(1,2):
        if digT[i] & 0x8000:
            digT[i] = (-digT[i] ^ 0xFFFF) + 1

    for i in range(1,8):
        if digP[i] & 0x8000:
            digP[i] = (-digP[i] ^ 0xFFFF) + 1

    for i in range(0,6):
        if digH[i] & 0x8000:
            digH[i] = (-digH[i] ^ 0xFFFF) + 1  

def readData():
    data = []
    for i in range (0xF7, 0xF7+8):
        data.append(bus.read_byte_data(i2c_address,i))
    pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
    temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
    hum_raw  = (data[6] << 8)  |  data[7]

    compensate_T(temp_raw)
    compensate_P(pres_raw)
    compensate_H(hum_raw)

def compensate_P(adc_P):
    global  t_fine
    pressure = 0.0

    v1 = (t_fine / 2.0) - 64000.0
    v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5]
    v2 = v2 + ((v1 * digP[4]) * 2.0)
    v2 = (v2 / 4.0) + (digP[3] * 65536.0)
    v1 = (((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8)  + ((digP[1] * v1) / 2.0)) / 262144
    v1 = ((32768 + v1) * digP[0]) / 32768

    if v1 == 0:
        return 0
    pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125
    if pressure < 0x80000000:
        pressure = (pressure * 2.0) / v1
    else:
        pressure = (pressure / v1) * 2
    v1 = (digP[8] * (((pressure / 8.0) * (pressure / 8.0)) / 8192.0)) / 4096
    v2 = ((pressure / 4.0) * digP[7]) / 8192.0
    pressure = pressure + ((v1 + v2 + digP[6]) / 16.0)  

    # print "pressure : %7.2f hPa" % (pressure/100)
        sensor_data['pressure'] = pressure/100

def compensate_T(adc_T):
    global t_fine
    v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1]
    v2 = (adc_T / 131072.0 - digT[0] / 8192.0) * (adc_T / 131072.0 - digT[0] / 8192.0) * digT[2]
    t_fine = v1 + v2
    temperature = t_fine / 5120.0
    # print "temp : %-6.2f 邃・ % (temperature)
        sensor_data['temp'] = temperature

def compensate_H(adc_H):
    global t_fine
    var_h = t_fine - 76800.0
    if var_h != 0:
        var_h = (adc_H - (digH[3] * 64.0 + digH[4]/16384.0 * var_h)) * (digH[1] / 65536.0 * (1.0 + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h)))
    else:
        return 0
    var_h = var_h * (1.0 - digH[0] * var_h / 524288.0)
    if var_h > 100.0:
        var_h = 100.0
    elif var_h < 0.0:
        var_h = 0.0
    # print "hum : %6.2f ・・ % (var_h)
        sensor_data['humidity'] = var_h

def setup():
    osrs_t = 1            #Temperature oversampling x 1
    osrs_p = 1            #Pressure oversampling x 1
    osrs_h = 1            #Humidity oversampling x 1
    mode   = 3            #Normal mode
    t_sb   = 5            #Tstandby 1000ms
    filter = 0            #Filter off
    spi3w_en = 0            #3-wire SPI Disable

    ctrl_meas_reg = (osrs_t << 5) | (osrs_p << 2) | mode
    config_reg    = (t_sb << 5) | (filter << 2) | spi3w_en
    ctrl_hum_reg  = osrs_h

    writeReg(0xF2,ctrl_hum_reg)
    writeReg(0xF4,ctrl_meas_reg)
    writeReg(0xF5,config_reg)


setup()
get_calib_param()

if __name__ == '__main__':
    try:
        readData()
                time_str = datetime.datetime.today().strftime("%Y/%m/%d %H:%M:%S")
                temp_str = str(sensor_data['temp'])
                humid_str = str(sensor_data['humidity'])
                pressure_str = str(sensor_data['pressure'])
                #milkcocoa縺ォ繝・・繧ソ繧恥ush縺吶k縲・
                print("push_start");
                temp.on("push",on_push)
                print("on_push_end");
                temp.push({"temp":temp_str})
                print("temp_end");
                time.sleep(3)
                humid.on("push",on_push)
                print("on_push_end");
                humid.push({"humid":humid_str})
                print("humid_end")
                time.sleep(3)
                pressure.on("push",on_push)
                print("on_push_end");
                pressure.push({"pressure":pressure_str})
                print("pressure_end")
        #print(sensor_data)
                print(time_str+","+temp_str+","+humid_str+","+pressure_str)
    except KeyboardInterrupt:
        pass
  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

回答 1

checkベストアンサー

0

一般的にsocket通信でBad Pipeが発生するのは、接続が切れているのに送受信を実行した場合(相手先の受信終了を待たずに送信終了した場合など)のようです。
How to prevent errno 32 broken pipe?

以下の公式のサンプルのように、通信待ち(loop_forever())が必要なのではないでしょうか?
blocking.py

詳細はmilkcocoaが利用しているMQTTライブラリの説明「Network loop」や以下を参照ください。
paho-mqttで回線を切るとどうなるか

投稿

編集

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

  • 2017/01/26 15:42

    sleepにて「うまくpushできない」の詳細を質問に追記ください。
    どうも通信の同期がとれていないのが原因のような気がします。

    キャンセル

  • 2017/01/29 00:14

    以下のメールが先日来ましたどうやら、障害だったようです。
    おかげさまで順調にアプリ数が増えておりまして、クライアントの制限を超えたアクセスなどにより、
    負荷が上昇してしまったことで、12,1月と夜間Milkcocoaのサーバにアクセスしづらくなっておりました。

    現在負荷調整のための改修作業を断続的に行っております。
    2017年1月26日 17:25現在負荷が安定していることを確認していますが、今後も継続的に改修を行っていきます。継続的に皆様にも状況をご連絡いたします。ご迷惑をおかけして、大変申し訳ございません。

    キャンセル

  • 2017/01/29 09:56

    原因について了解しました。
    障害が発生した場合のエラー処理も加えておいたほうがよいのかもしれませんね。

    キャンセル

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 90.00%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる