Webinar
Introducing EMQX 6.1: Durable MQTT Streams and Analytics-Ready Data | Register Now →

Raspberry PiでPaho Pythonクライアントを活用したMQTTの使用ガイド

Saiteng YouSaiteng You
Aug 11, 2023MQTT
Raspberry PiでPaho Pythonクライアントを活用したMQTTの使用ガイド

MQTTは、パブリッシュ/サブスクライブモデルのIoT向け軽量メッセージングプロトコルで、最小限のコードと帯域幅で信頼性の高いリアルタイム通信を実現します。特に、リソースが限られている機器や帯域幅の小さいネットワークに有効で、IoT、モバイルインターネット、IoV、電力業界などで広く採用されています。

ラズベリー・パイは、イギリスのラズベリー・パイ財団によって開発されたARMベースの小型シングルボードコンピュータである。このボードはUSBインターフェースとイーサネットインターフェースを備えており、キーボード、マウス、ネットワークケーブルを接続することができる。このボードはPCの基本機能を持ち、ラズベリーパイはWi-Fi、Bluetooth、多数のGPIOを統合しており、教育、ファミリー・エンターテインメント、IoTなどで広く使用されている。

このプロジェクトでは、Pythonを使ってRaspberry Pi上にシンプルなMQTTクライアントを書き、このクライアントとMQTTブローカー間の接続、サブスクリプション、メッセージング、その他の機能を実装する。

Raspberry Pi MQTTプロジェクトの準備

オペレーティング・システムのインストール

Raspberry Piは、Raspberry Pi OS、Ubuntu、Kodiなど、さまざまなOSを実行できる万能ミニコンピューターだ。それぞれのOSには、独自の機能、利点、推奨アプリケーションがある。

特にRaspberry Pi OSは、Raspberry Piハードウェアとの互換性が高く、最適化されたソフトウェアやツールがプリインストールされているため、初心者に非常におすすめです。Debian LinuxをベースにRaspberry Pi専用にカスタマイズされており、プログラミング、マルチメディア、電子工作プロジェクトに使いやすいプラットフォームを提供します。

Raspberry Pi OSをインストールするには、公式ドキュメントのインストールガイドに従うことをお勧めします。この記事では、Raspberry Pi OS with desktop (Debian version: 11)をインストールしたRaspberry Pi 4を使用します。

Python3のインストール

Raspberry Pi上のMQTTの開発言語としてPythonを使用します。Pythonは習得しやすい構文で、膨大な種類のライブラリやチュートリアルがオンラインで利用できるため、MQTTを扱うのに最適です。

このプロジェクトはPython 3.6を使って開発されている。通常、Raspberry PiにはPython 3がプリインストールされています。しかし、Python 3がインストールされているかどうかわからない場合は、以下のコマンドで確認することができます:

$ python3 --version             
Python 3.6.7

コマンドラインが "Python 3.x.x"("x "はバージョン番号を示す)を返した場合、Python3がRaspberry Piにインストール済みであることを意味する。インストールされていない場合は、"apt "コマンドを使ってインストールするか、Python3のインストールガイドラインに従ってください。

sudo apt install python3

Paho MQTTクライアントをインストールする

これは、Python 2.7および3.xでMQTT v5.0、v3.1.1、およびv3.1をサポートするクライアント・クラスを提供するPaho Pythonクライアント・ライブラリを使用します。さらに、Paho Pythonクライアント・ライブラリには、MQTTサーバーへの単発メッセージの発行を非常に簡単にする便利なヘルパー関数が含まれています。

ソースコードを使ってインストールする

git clone https://github.com/eclipse/paho.mqtt.python 
cd paho.mqtt.python 
python3 setup.py install

pip3を使ってインストールする

pip3 install paho-mqtt

MQTTブローカーを構築

先に進む前に、通信とテストを行うためのMQTTブローカーがあることを確認してください。MQTTブローカーを入手するには、いくつかのオプションがあります:

  • プライベート展開
    EMQXは、IoT、IIoT、コネクテッドカー向けの最もスケーラブルなオープンソースのMQTTブローカーです。以下のDockerコマンドを実行することでEMQXをインストールすることができます。
    docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx
    
  • フルマネージドクラウドサービス
    フルマネージドクラウドサービスは、MQTTサービスを開始するための最も簡単な方法です。EMQX Cloudを利用すれば、わずか数分でサービスを開始でき、AWS、Google Cloud、Microsoft Azureの20以上のリージョンでMQTTサービスを実行し、グローバルな可用性と高速接続を確保することが可能です。
    最新版のEMQX Cloud Serverlessは、開発者が数秒で簡単にMQTTの導入を開始できるように、永久無料の1Mセッション分/月の無償提供をしています。
  • 無料公開のMQTTブローカー
    無料公開MQTTブローカーは、MQTTプロトコルの学習とテストを希望する人だけが利用できます。セキュリティリスクやダウンタイムの懸念があるため、本番環境での使用は避けることが重要です。

このブログ記事では、 broker.emqx.io の無料公開MQTTブローカーを使用します。

MQTT Broker Info

Server: broker.emqx.io

TCP Port: 1883

WebSocket Port: 8083

SSL/TLS Port: 8883

Secure WebSocket Port: 8084

詳しくは、こちらをご確認ください:無料公開のMQTTブローカー。

MQTTのクイックスタート

MQTTコネクションの作成

コネクトのコード例

# test_connect.py 
import paho.mqtt.client as mqtt 

# The callback function. It will be triggered when trying to connect to the MQTT broker
# client is the client instance connected this time
# userdata is users' information, usually empty. If it is needed, you can set it through user_data_set function.
# flags save the dictionary of broker response flag.
# rc is the response code.
# Generally, we only need to pay attention to whether the response code is 0.
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected success")
    else:
        print(f"Connected fail with code {rc}")

client = mqtt.Client() 
client.on_connect = on_connect 
client.connect("broker.emqx.io", 1883, 60) 
client.loop_forever()

上記のコードを "test_connect.py "というファイル名で保存します。ファイルを実行するには、Raspberry Piでターミナルを開き、ファイルのあるディレクトリに移動します。そして以下のコマンドを入力してスクリプトを実行します。これでMQTTクライアントが起動し、MQTTブローカーに接続します。

python3 test_connect.py

on_connect 関数では、MQTTブローカーが返すレスポンスコードをチェックする。レスポンスコードが 0 の場合、"Connected success" と表示し、接続に成功したことを示します。しかし、レスポンス・コードが not 0 の場合は、以下のレスポンス・コード表に基づき、その意味を確認する必要がある。

0: connection succeeded
1: connection failed - incorrect protocol version
2: connection failed - invalid client identifier
3: connection failed - the broker is not available
4: connection failed - wrong username or password
5: connection failed - unauthorized
6-255: undefined
If it is other issues, you can check the network situation, or check whether `paho-mqtt` has been installed.

サブスクライブ

MQTTプロトコルは、トピックに基づいてメッセージをルーティングする。サブスクライバは、ブローカに興味のあるトピックをサブスクライブすることができる。パブリッシャーが特定のトピックにメッセージを送信すると、ブローカーはそのトピックを購読している全ての購読者にそのメッセージを転送します。

テキストエディタを開き、以下のコードを入力してください。ファイルを "subscriber.py "として保存します。

# subscriber.py
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    # Subscribe, which need to put into on_connect
    # If reconnect after losing the connection with the broker, it will continue to subscribe to the raspberry/topic topic
    client.subscribe("raspberry/topic")

# The callback function, it will be triggered when receiving messages
def on_message(client, userdata, msg):
    print(f"{msg.topic} {msg.payload}")

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

# Set the will message, when the Raspberry Pi is powered off, or the network is interrupted abnormally, it will send the will message to other clients
client.will_set('raspberry/status', b'{"status": "Off"}')

# Create connection, the three parameters are broker address, broker port number, and keep-alive time respectively
client.connect("broker.emqx.io", 1883, 60)

# Set the network loop blocking, it will not actively end the program before calling disconnect() or the program crash
client.loop_forever()

subscribe() 関数を使うと、Raspberry Piが特定のトピックを購読できるようになります。上のコードでは、この関数を使って raspberry/topic というトピックを購読し、メッセージの着信を監視しています。

さらに、 will_set() 機能を利用して、ウィル・メッセージを設定する。MQTTのこの機能により、デバイスが意図せず電源オフになってしまった場合に、指定したトピックにメッセージを送信することができます。この機能を使用することで、Raspberry Piの電源が切れたか、ネットワーク接続に問題があるかを判断することができます。

メッセージのパブリッシュ

テキストエディタを開き、以下のコードを入力してください。ファイルを "publisher.py "として保存します。

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")

    # Send a message to the raspberry/topic every 1 second, 5 times in a row
    for i in range(5):
        # The four parameters are topic, sending content, QoS and whether retaining the message respectively
        client.publish('raspberry/topic', payload=i, qos=0, retain=False)
        print(f"send {i} to raspberry/topic")

client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)

client.loop_forever()

publish() 関数を使うと、特定のトピックにメッセージを送ることができます。上のコード例では、この関数を使用してトピック raspberry/topic にメッセージを送信しています。QoS パラメータは、メッセージ配信のサービス品質を定義する MQTT のもう 1 つの機能です。QoSレベルの詳細については、「Introduction to MQTT QoS 0, 1, 2」を参照してください。

Pahoクライアントの使い方については、ブログ「Pahoクライアントを使ってPythonでMQTTを使う方法」をご覧ください。

テスト

以下のテストでは、MQTTX を使用する。MQTTX は、macOS、Linux、Windows で動作するエレガントなクロスプラットフォーム MQTT 5.0 デスクトップ・クライアントである。そのユーザーフレンドリーなチャットスタイルのインターフェースにより、ユーザーは簡単に複数のMQTT/MQTTS接続を作成し、MQTTメッセージをサブスクライブ/パブリッシュすることができる。

サブスクライブ

  1. MQTT subscription スクリプト subscriber.py を実行すると、クライアントは正常に接続し、パブリッシャーがメッセージを発行するのを待ち始めます。
    python3 subscriber.py
    

  2. MQTTXをパブリッシャーとして "raspberry/topic "にメッセージをパブリッシュする。
  3. MQTTXによって発行されたメッセージが表示されます。

メッセージのパブリッシュ

  1. MQTTXクライアント内で raspberry/topic をサブスクライブする。
  2. ターミナルで publish.py を実行する。
  3. MQTTXクライアントでは、Raspberry Piからパブリッシュされたメッセージを見ることができます。

ウィル・メッセージのテスト

次に、ウィル・メッセージが正常にセットされたかどうかをテストする。

  1. MQTTXクライアントで raspberry/status をサブスクライブする。
  2. プログラムを中断したり、Raspberry Piのネットワークを切断する。
  3. MQTTXクライアントで、 raspberry/status が受信したメッセージを表示する。

Raspberry Pi MQTTアドバンスド

Raspberry Piのシリアルデータを読む

提供されているコードは、Raspberry Piとのシリアル接続を確立し、シリアルポートからデータを読み取り、MQTTブローカーにパブリッシュします。 serial ライブラリを使用してシリアルポートの設定とデータの読み込みを行い、 paho.mqtt.client ライブラリを使用して MQTT ブローカーに接続してデータをパブリッシュします。コードはループ内でシリアルデータを読み取り、指定されたMQTTトピックにパブリッシュし、定義された反復回数だけこのプロセスを繰り返します。最後に、MQTT ブローカーとの接続を切断し、シリアル接続を閉じます。

import time

import paho.mqtt.client as mqtt
import serial


# Establishing the connection with the serial port
ser = serial.Serial(
    # Serial Port to read the data from
    port='/dev/ttyAMA0', # Use `dmesg | grep tty` to find the port
    baudrate=9600,
    parity=serial.PARITY_NONE,
    stopbits=serial.STOPBITS_ONE,
    bytesize=serial.EIGHTBITS,
    timeout=1
)

broker_address = "broker.emqx.io"
broker_port = 1883
topic = "emqx/serial/read"

client = mqtt.Client()
client.connect(broker_address, broker_port)

# Read data from serial port and publish it to MQTT broker
for i in range(10):
    data = ser.readline().decode()
    client.publish(topic, data)
    print("Read data {data} from serial port and publish it to MQTT broker".format(data=data))
    time.sleep(1)


client.disconnect()

ser.close()

Raspberry Piのシリアルデータを書き込む

提供されたコードは、Raspberry Piとのシリアル接続を確立し、MQTTメッセージをリッスンする。メッセージを受信すると、シリアル・ポートに書き込まれる。このコードでは、 serial ライブラリを使用してシリアルポートの設定とデータの書き込みを行い、 paho.mqtt.client ライブラリを使用して MQTT ブローカーへの接続、MQTT メッセージの受信処理、シリアルポートへのデータのパブリッシュを行っています。MQTT クライアントは、特定の MQTT トピックをサブスクライブする on_connect や、受信メッセージを処理する on_message など、必要なコールバックでセットアップされます。接続されると、コードは無限ループに入り、継続的に MQTT メッセージをリッスンし、シリアル・ポートに書き込みます。

import paho.mqtt.client as mqtt
import serial


# Establishing the connection with the serial port
ser = serial.Serial(
    # Serial Port to read the data from
    port='/dev/ttyAMA0', # Use `dmesg | grep tty` to find the port
    baudrate=9600,
    parity=serial.PARITY_NONE,
    stopbits=serial.STOPBITS_ONE,
    bytesize=serial.EIGHTBITS,
    timeout=1
)

broker_address = "broker.emqx.io"
broker_port = 1883
topic = "emqx/serial/write"


def on_connect(client, userdata, flags, rc):
    print("Connected to MQTT broker")
    client.subscribe(topic)


def on_message(client, userdata, msg):
    payload = msg.payload.decode()
    print("Received message: {payload} on topic {topic}".format(payload=payload, topic=msg.topic))
    # Write data to serial port
    ser.write(payload.encode())


client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port)
client.loop_forever()

まとめ

このブログでは、Python MQTT クライアント・ライブラリ paho-mqtt を使って、Raspberry Pi 上でクライアントを書き、テストします。クライアントとMQTTブローカー間の接続、サブスクリプション、メッセージング、その他の機能を実装しました。

ここまでの進歩は素晴らしい!MQTTを使って多くのエキサイティングなアプリケーションを構築するための基本を学ぶことができました。例えば

  1. 携帯電話からMQTTメッセージを使ってRaspberry Piを遠隔操作できます。
  2. Raspberry PiからMQTTブローカーに定期的にデバイスデータを送信することで、携帯電話のメッセージを継続的に監視し、受信することができます。
  3. Raspberry PiがMQTTブローカーにアクセスし、様々なセンサーやESPモジュールを使用することで、数多くの興味深いIoTアプリケーションを作成することができる。

次に、MQTTガイドをチェックすることができます:EMQが提供する「Beginner to Advanced」シリーズで、MQTTプロトコルの機能を学び、MQTTのより高度なアプリケーションを探求し、MQTTアプリケーションとサービス開発を始めましょう。

リソース

無料トライアルEMQX Cloud
IoT向けフルマネージド型MQTTサービス
無料トライアル →

著者

Saiteng You
Saiteng You

EMQX Cloud engineer. Retired secondary LISP wizard, focus on helping everyone to use the most advanced technology. Private hobbies include writing and game design.

ブログの購読

おすすめ閲読

Mar 14, 2024Yaqi Luo
MicroPythonとRaspberry PiでMQTT

この記事では、Raspberry PiでMicroPythonを使用して簡単なMQTTクライアントを記述する方法と、クライアントとMQTTブローカー間の接続、サブスクライブ、パブリッシュの実装方法について説明します。

Mar 15, 2024Dekun Tao
ESP8266とMQTTを使用してLEDを遠隔制御

このチュートリアルでは、ESP8266 Wi-FiモジュールとMQTTプロトコルを活用してLEDライトの遠隔制御を実現する方法について説明します。