$shibayu36->blog;

クラスター株式会社のソフトウェアエンジニアです。エンジニアリングや読書などについて書いています。

MQTTプロトコルをブローカーを実装しながら学ぶ

clusterのリアルタイム通信サーバーの漸進的な進化 - Cluster Tech Blogを見て、リアルタイム通信の仕組みに興味を持った。しかし、MQTTプロトコルは使ったことがなく、どのようなプロトコルなのか、何がそんなに効率的なのか、なぜIoTなどでよく使われるのかなどが分からない状態だった。

そこでChatGPTを使えば自作しながらMQTTプロトコルを学べるのではないかと思い、https://github.com/shibayu36/go-mqtt-playground にて作り始めた。ひとまず簡易的にCONNECT/SUBSCRIBE/PUBLISHをハンドリングできるようになったのでメモしておく。

MQTTプロトコルの特徴

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html のAbstractがよくまとまっている。日本語訳した内容を置いておく。

MQTTは、クライアント・サーバー・パブリッシュ/サブスクライブ・メッセージング・トランスポート・プロトコルである。軽量、オープン、シンプルで、実装が容易なように設計されている。このような特性により、M2M(Machine to Machine)やIoT(Internet of Things)の通信など、コードフットプリントが小さく、ネットワーク帯域幅に制約がある環境など、さまざまな状況での使用に最適です。

このプロトコルは、TCP/IP上、または順序付けられたロスレス双方向接続を提供する他のネットワーク・プロトコル上で動作する。その特徴は以下の通り:

  • 1対多のメッセージ配信とアプリケーションのデカップリングを提供するパブリッシュ/サブスクライブ・メッセージ・パターンの使用。
  • ペイロードの内容にとらわれないメッセージング・トランスポート 。
  • メッセージ配信の3つのサービス品質
    • "最大1回"、メッセージは動作環境の最善の努力に従って配信される。メッセージの損失は起こりうる。このレベルは、例えば、個々の読み取り値が失われたとしても、次の読み取り値がすぐに公表されるため問題にはならないような、周囲のセンサーデータで使用することができる。
    • 少なくとも1回」。メッセージは確実に届くが、重複が発生する可能性がある。
    • Exactly once"(正確に一度):メッセージが正確に一度だけ届くことが保証される。このレベルは、例えば、メッセージの重複や紛失によって誤った料金が適用される可能性のある課金システムで使用できる。
  • 小さなトランスポート・オーバーヘッドとプロトコル交換を最小化し、ネットワーク・トラフィックを削減 。
  • 異常切断が発生したときに関係者に通知する仕組み。

何をベースに作るか

MQTT仕様のバージョンには3.1.1と5.0がある。それぞれ https://mqtt.org/mqtt-specification/ から確認できる。

バージョン5.0の方ができることは多いが複雑なので、今回は理解することを優先に3.1.1をベースに作ることにする。

できたもの

現時点でできたものは https://github.com/shibayu36/go-mqtt-playground/tree/ddcc3bf1c57d1b05920e95f5697ddbf470010a31

go run ./brokerで立ち上げた上で、mqtt-cliを使うと、動いている様子を確認できる。

まずmqtt-cliのshellモードでversion 3.1.1を使うように設定し

~/.mqtt-cli/config.properties

#Fri Jun 30 19:27:25 JST 2023
client.id.length=8
client.id.prefix=mqtt
logfile.level=debug
logfile.path=/Users/shibayu36/.mqtt-cli/logs
mqtt.host=localhost
mqtt.port=1883
mqtt.version=3
ws.path=/mqtt

サーバーを立ち上げ

$ go run ./broker
2023/07/31 09:48:32 Listening on port 1883

2つのクライアントを接続し、様子を確認する。

1つ目でsubscribe。

$ mqtt shell -l
mqtt> con
Restriction request problem information was set but is unused in MQTT Version MQTT_3_1_1
mqtt5h5LqvY7@localhost> sub --topic hoge/fuga -oc

2つ目でpublish。

$ mqtt shell -l
mqtt> con
Restriction request problem information was set but is unused in MQTT Version MQTT_3_1_1
mqtt8YrsztIc@localhost> pub --topic hoge/fuga -m piyo

すると1つ目のshellで、piyoがstdoutに流れてくる。

mqtt5h5LqvY7@localhost> piyo

学んだこと

まだ実装はかなり簡易的ではあるが、これだけでも色々なことを学べた。

まず以下のような特性により、IoTやリアルタイム通信などでよく使われることが多そうだ。

  • パケット構造が効率化されていて転送量をかなり低く抑えられる
  • コネクションを継続する仕様がいくつかあり、繋いだ状態であれば素早くやり取りができる
  • 仕様がある程度単純で、実装しやすい

またトピックフィルターの仕様によって、階層構造の下すべてをsubscribeしたりなども可能になっている。

  • MQTTのトピックは階層的な構造を持ち、スラッシュ(/)で区切られた複数のレベルから構成
  • ワイルドカードを利用できる
    • プラス記号(+): トピック階層内の1つのレベルにマッチ。たとえば、a/+/ca/b/ca/x/cにマッチし、a/b/x/ca/cにはマッチしない
    • ハッシュ記号(#): トピック階層内の0個以上のレベルにマッチします。ハッシュ記号はトピックフィルターの最後にのみ使用可能。a/#a/ba/b/ca/b/c/dなど、aで始まるすべてのトピックにマッチ

このトピックフィルターの実装はTrie木を使うとシンプルに実装できるのがおもしろい。このことについてはまた別のブログ記事で書いてみようと思う。

参考資料

ChatGPTに教えてもらったこと

ChatGPTに教えてもらったことをざっとコピペしておく。ちょっと不正確な話も混ざっているので、話半分くらいに。

実際のやり取りに使ったのはこの辺り。どう質問して作っていったのかも参考になると思う。

MQTT概要

MQTT(Message Queuing Telemetry Transport)は、軽量なメッセージングプロトコルであり、IoT(Internet of Things)デバイスと通信するために設計されています。このプロトコルは、低帯域幅での通信と低電力消費が必要な状況で非常に役立ちます。MQTTは、クライアントとサーバ(ブローカーとも呼ばれます)間の通信を行います。

MQTTの主な概念は以下の通りです。

  1. Publish/Subscribeモデル: MQTTはPublish/Subscribe(パブリッシュ/サブスクライブ)モデルを使用します。クライアントは、特定のトピック(テーマやカテゴリ)にメッセージを公開(publish)するか、特定のトピックのメッセージを購読(subscribe)します。ブローカーはこれらのメッセージを受け取り、購読しているクライアントに配信します。
  2. トピック: トピックはメッセージのフィルタリングに使用されるラベルです。クライアントは、特定のトピックに対してメッセージを公開したり、トピックを購読したりします。トピックは階層的な構造を持つことができ、ワイルドカードを使用して複数のトピックを一度に購読することもできます。
  3. QoS (Quality of Service) レベル: MQTTは3つのQoSレベルを提供しています。QoS 0はメッセージが最大1回配信されることを意味し、「少なくとも一度」の配信を保証します。QoS 1はメッセージが少なくとも1回配信されることを保証しますが、複数回配信される可能性があります。QoS 2はメッセージが正確に1回だけ配信されることを保証します。
    • -> ChatGPTのミスで、QoS 0はat most once
  4. ブローカー: MQTTブローカーは、メッセージの配信を仲介します。クライアントはブローカーに接続し、メッセージを公開または購読します。ブローカーは、購読されたトピックに関連するメッセージを購読者に配信します。
  5. 接続とセッション: クライアントがMQTTブローカーに接続するとき、接続を確立するためのハンドシェイクが行われます。クライアントは通常、接続時に一意のクライアントIDを提供します。また、クライアントは「クリーンセッション」を指定することができ、これは接続が切断されたときにセッション状態を保持するかどうかを制御します。
  6. Last Will and Testament (LWT): クライアントは接続時に「遺言」メッセージを設定することができます。これは、クライアントが何らかの理由で接続を失った場合にブローカーによって公開されるメッセージです。
  7. 保持メッセージ: クライアントはメッセージを公開する際に、そのメッセージを「保持」するように指示することができます。これにより、新しいサブスクライバーがトピックを購読したときに、最後に公開された保持メッセージが受信されます。

何から実装すると学びやすいか

メインの機能に絞ってMQTTプロトコルを学ぶと効率的です。以下は、実装に重点を置いて学ぶべき基本的なプロトコルの要素です。

  1. クライアントとブローカーの接続: MQTTプロトコルでは、クライアントはブローカーに接続する必要があります。接続を確立するためのプロセスと、クライアントが接続を切断する方法を理解することが重要です。CONNECTとDISCONNECTのコマンドを理解しましょう。
  2. Publish/Subscribeモデル: MQTTの中核となる概念です。クライアントがトピックにメッセージを公開し、他のクライアントがこれらのトピックを購読する方法を理解する必要があります。PUBLISH、SUBSCRIBE、およびUNSUBSCRIBEコマンドを注視しましょう。
  3. トピック: MQTTのトピックは、メッセージングのカテゴリを指定します。トピックの命名規則とワイルドカードの使用を理解しましょう。
  4. QoS (Quality of Service) レベル: メッセージの配信保証レベルを理解し、それぞれのレベルがどのように動作するかを学びましょう。QoS 0、1、および2の違いを把握することが重要です。

ネットワークの流れ

クライアントがオフラインの時の挙動

MQTTプロトコルでは、クライアントがオフラインのときに、そのクライアントがサブスクライブしているトピックにメッセージがパブリッシュされる場合の挙動は、Quality of Service(QoS)レベルと「クリーンセッション」フラグの設定に依存します。

  1. QoS 0 (At most once): このレベルでは、メッセージは最大1回配信されます。クライアントがオフラインの場合、メッセージは失われます。
  2. QoS 1 (At least once): このレベルでは、メッセージは少なくとも1回配信されます。クライアントがオフラインの場合で、クリーンセッションフラグがfalseに設定されている場合、ブローカーはメッセージを保存し、クライアントが再接続したときに配信します。
  3. QoS 2 (Exactly once): このレベルでは、メッセージは正確に1回配信されます。これもQoS 1と同様に、クリーンセッションフラグがfalseの場合、ブローカーはメッセージを保存し、クライアントが再接続したときに配信します。

また、「クリーンセッション」フラグは重要な役割を果たします。

  • クリーンセッション = true: クライアントとブローカーの間のセッションは、クライアントが切断されると終了します。これは、クライアントがオフラインの間にパブリッシュされたメッセージは保存されず、クライアントが再接続しても受信されないことを意味します。
  • クリーンセッション = false: クライアントが切断されても、ブローカーはセッションを維持します。クライアントがサブスクライブしたトピックに関連するメッセージは、クライアントが再接続するまでブローカーによって保存されます。

したがって、オフラインのクライアントに対してメッセージを保存しておきたい場合は、サブスクリプションをQoS 1または2で行い、クリーンセッションフラグをfalseに設定する必要があります。

トピックフィルターと構造

トピックフィルターは、MQTTプロトコルにおいて、クライアントが特定のトピックにサブスクライブする際に使用されるパターンです。トピックフィルターを使用することで、クライアントは自分が関心を持つメッセージだけを受け取ることができます。MQTTのトピックは階層的な構造を持ち、スラッシュ(/)で区切られた複数のレベルから構成されます。

トピックフィルターでは2つのワイルドカード文字が使用できます: 1. プラス記号(+): トピック階層内の1つのレベルにマッチします。たとえば、a/+/ca/b/ca/x/cにマッチしますが、a/b/x/ca/cにはマッチしません。 2. ハッシュ記号(#): トピック階層内の0個以上のレベルにマッチします。ハッシュ記号はトピックフィルターの最後にのみ使用できます。例えば、a/#a/ba/b/ca/b/c/dなど、aで始まるすべてのトピックにマッチします。

以下はトピックフィルターの具体的な例です: 1. sensor/temperature: これはワイルドカードを使用していない普通のトピックフィルターで、厳密にsensor/temperatureというトピックにのみマッチします。 2. sensor/+/temperature: このフィルターは、sensorで始まり、temperatureで終わる、3つの階層からなる任意のトピックにマッチします。たとえば、sensor/room/temperaturesensor/outdoor/temperatureなど。 3. sensor/#: このフィルターは、sensorで始まる任意のトピックにマッチします。例えばsensor/temperature, sensor/humidity/room1, sensor/temperature/living-roomなど。 4. #: このフィルターは全てのトピックにマッチします。

トピックフィルターの$について

MQTTプロトコルでは、トピック名の先頭に$記号がある場合、それはシステムトピックを表します。システムトピックは、通常のメッセージングとは異なり、ブローカーの内部情報や診断データなどを配信するために使用されます。

例えば、一般的なシステムトピックとして$SYS/で始まるトピックがよく使用されます。これはブローカーの内部状態に関する情報を提供します。たとえば、$SYS/broker/uptimeはブローカーの稼働時間を表し、$SYS/broker/clients/connectedは接続されているクライアントの数を表すかもしれません。

また、$記号で始まるトピックは、通常のトピックフィルターではマッチしないようになっています。例えば、#は通常のトピックをキャプチャするのに対して、$SYS/#のように指定しないと$で始まるトピックはキャプチャされません。

これにより、システム関連のメッセージと通常のメッセージングが分離され、クライアントが意図せずにシステムメッセージを受信することがなくなります。また、システムトピックを使用することで、ブローカーの監視や管理を効果的に行うことができます。