MQTTの基本動作

MQTTの基本動作をEclipse Foundationによるオープンソースのリファレンス実装であるMosquittoと、Python用のMQTTクライアントであるpaho-mqttを使って確認する。

Broker

下記のようにBrokerを起動する。-vを指定すると詳細なログが表示される(verboseモード)。
デフォルトでは、1883番ポートを使用する。

mosquitto -v

PublishとSubscribe

sequenceDiagram

Subscriber ->> Broker : subscribe topic 'test'
Publisher ->> Broker : message 'abc' to topic 'test'
Broker ->> Subscriber : message 'abc'

Subscriberでtestトピックを購読(subscribe)する。

mosquitto_sub -t test

Publisherでtestトピックにabcと発行(publish)する。

mosquitto_pub -t test -m abc

Subscriberにabcと表示される。

QoS

Subscriberは購読する際に、Publisherはメッセージを発行する際に、QoSを指定する。

  • QoS 0(デフォルト):メッセージを最高1回届ける(届かないこともある)
  • QoS 1:メッセージを最低1回届ける(2回届くこともある)
  • QoS 2:メッセージを必ず1回届ける

SubscriberとPublisherで指定したQoSが異なる場合は、低い方に設定される。

Durable Subscribe

sequenceDiagram
Subscriber ->> Broker : subscribe topic 'test'
Note over Subscriber, Broker : ここで切断
Publisher ->> Broker : message 'abc' to topic 'test'
Subscriber ->> Broker : subscribe topic 'test'
Broker ->> Subscriber : message 'abc'

Subscriberが下記の条件で購読したとする。

  • QoSを1または2
  • クライアントIDをセットする
  • Clean Sessionなし
mosquitto_sub -t test -q 1 -i client01 -c

その後、SubscriberがBrokerとの接続を切断する。

Publisherが、QoSを1または2としてメッセージを発行する。

mosquitto_pub -t test -m abc -i client02 -q 1

その時点ではメッセージはSubscriberに届かないが、Subscriberが再度同じ条件でBrokerに接続し、購読を始めた際に、Publisherが発行したメッセージが届く。
Subscriberの切断時にメッセージが複数発行されていれば、そのメッセージはすべて再接続時に届く。

Pythonでの実装

Subscriberの実装は下記のとおり。

import paho.mqtt.client as mqtt

def on_message(client, userdata, message):
    print('topic:', message.topic)
    print('payload:', message.payload)
    print('qos:', message.qos)

client03 = mqtt.Client(client_id='client03', clean_session=False)
client03.on_message = on_message

client03.connect('localhost', 1883)
client03.subscribe(topic='test', qos=1)
client03.loop_forever()

Publisherの実装は下記のとおり。

import paho.mqtt.client as mqtt

client04 = mqtt.Client(client_id='client04')

client04.connect('localhost', 1883)
info = client04.publish(topic='test', payload='abc', qos=1)
print('rc:', info.rc)
print('mid:', info.mid)
client04.disconnect()

Retain

sequenceDiagram
Publisher ->> Broker : retain 'retain_message' to topic 'test'
Subscriber ->> Broker : subscribe topic 'test'
Broker ->> Subscriber : message 'retain_message'

PublisherはあらかじめRetainメッセージをBrokerに登録しておくことによって、新たに購読を始めたSubscriberに対してそのメッセージを届けることができる。

既に購読しているSubscriberに対しては、Retainメッセージを登録した時点でそのメッセージが届く。

mosquitto_pub -t test -m retain -r

Pythonでの実装

Publisherを下記のように実装する。

import paho.mqtt.client as mqtt

client04 = mqtt.Client(client_id='client04')

client04.connect('localhost', 1883)
info = client04.publish(topic='test', payload='retain message', retain=True)
print('rc:', info.rc)
print('mid:', info.mid)
client04.disconnect()

Subscriberでは特殊な実装は必要ない。クライアントIDやClean Sessionの設定も不要。
Subscriberが受信済みのRetainメッセージは、同一IDで接続している限り、さらに届くことはない(QoS次第かも?)。

Will

sequenceDiagram
Subscriber ->> Broker : subscribe topic 'test'
Publisher ->> Broker : set will message to topic 'test'
Publisher ->> Broker : message 'abc' to topic 'test'
Broker ->> Subscriber : message 'abc'
Note over Publisher, Broker : ここで切断
Broker ->> Subscriber : will message

PublisherがあらかじめWillメッセージと、送信先のトピックを設定しておくと、PublisherとBrokerの接続が不意に切れた際に、トピックのSubscriberにWillメッセージが届く。

ここで「不意に切れた」とは、正常にdisconnectが行われなかったという意味であり、下記のようにdisconnectが行われていればWillメッセージは送信されない。

import paho.mqtt.client as mqtt

client04 = mqtt.Client(client_id='client04')
client04.will_set(topic='test', payload='will message')

client04.connect('localhost', 1883)
info = client04.publish(topic='test', payload='abc')
print('rc:', info.rc)
print('mid:', info.mid)
client04.disconnect()

一方、下記のようにdisconnectを行うことなくPublisherのプロセスを終了させた場合、keepaliveのデフォルト値である60秒が経過した時点でWillメッセージが送信される。

import paho.mqtt.client as mqtt

client04 = mqtt.Client(client_id='client04')
client04.will_set(topic='test', payload='will')

client04.connect('localhost', 1883)
info = client04.publish(topic='test', payload='abc')
print('rc:', info.rc)
print('mid:', info.mid)
# diconnectしない
#client04.disconnect()

さらに、下記の例のように定期的にメッセージを発行しており(ここでは60秒)、Brokerへの接続時にkeepaliveを5分と設定していたとする。
その場合でも、Publisherのプロセスを強制終了するなどして、Brokerが不意の切断を認識することができれば、keepaliveの時間を待つことなくWillメッセージをSubscriberに送信する。

import paho.mqtt.client as mqtt
from time import sleep

client04 = mqtt.Client(client_id='client04')
client04.will_set(topic='test', payload='will message')

client04.connect('localhost', 1883, keepalive=300)

i = 1
while True:
    info = client04.publish(topic='test', payload='message %i' % i)
    print('rc:', info.rc)
    print('mid:', info.mid)
    i+=1
    sleep(60)

ところで、Publisherでメッセージを発行していなくても、loop_foreverを実行していればkeepaliveの間隔で行われたBrokerからPublisherに対するPINGREQに、PINGRESPを返すことができる。接続は継続しているものとしてWillメッセージは送信されない。

import paho.mqtt.client as mqtt

client04 = mqtt.Client(client_id='client04')
client04.will_set(topic='test', payload='will')

client04.connect('localhost', 1883)
info = client04.publish(topic='test', payload='abc')
print('rc:', info.rc)
print('mid:', info.mid)
client04.loop_forever()

SSLとログイン

SSL証明書をLet’s Encryptで取得した場合、/etc/mosquitto/mosquitto.confは下記のように設定する。

listener 8883
cafile /etc/letsencrypt/live/{domain}/cert.pem
certfile /etc/letsencrypt/live/{domain}/fullchain.pem
keyfile /etc/letsencrypt/live/{domain}/privkey.pem
tls_version tlsv1.2

MQTT接続時にログインをさせたい場合は、mosquitto_passwdコマンドでパスワードファイル(ここでは/etc/mosquitto/pwfile)を作成し、下記の設定を追加する。

allow_anonymous false
password_file /etc/mosquitto/pwfile

mosquittoクライアントを使用したSubscriberの接続。Publisherも概ね同様のコマンドになる。cafileはLet’s Encryptの場合はDST_ROOT_CA_X3.pemを使用できる。但し、このファイルの場所はOSにより異なる。

mosquitto_sub -h {host} -p 8883 -u mqtt -P test -t test --cafile /etc/ssl/certs/DST_Root_CA_X3.pem

Pythonでは下記のようなコードになる。

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

def on_message(client, userdata, message):
    print('topic:', message.topic)
    print('payload:', message.payload)
    print('qos:', message.qos)

client03 = mqtt.Client(client_id='client03', clean_session=False)
client03.tls_set(ca_certs='/etc/ssl/certs/DST_Root_CA_X3.pem')
client03.username_pw_set('mqtt', 'test')
client03.on_connect = on_connect
client03.on_message = on_message

client03.connect('{host}', 8883)
client03.subscribe(topic='test', qos=1)
client03.loop_forever()