$shibayu36->blog;

クラスター株式会社のソフトウェアエンジニアです。エンジニアリングや読書などについて書いています。

MQTTのトピックフィルターによる購読をトライ木を使って実装する

最近趣味でMQTTブローカーの実装をしてみている。その中でトライ木を使うとトピックフィルターによる購読の実装を簡単に行えるというのが面白かったのでブログにしておく。

なおトライ木はHTTP Routerの実装にも使われることがあり、そういう点でも一度実装してみると参考になる。GitHub - bmf-san/goblin: A golang http router based on trie tree.

トピックフィルターとは

MQTTでSUBSCRIBE/PUBLISHする時は、トピックというものを使ってやり取りをする。トピックは階層的な構造を持っている。たとえば

  • a
  • a/b/c
  • hoge/fuga

この時SUBSCRIBEでは、トピック名完全一致で購読したい、ある階層以下のすべてを購読したい、特定の階層はなんでも良いので購読したいなどの要求がある。その要求を満たすためにあるのが、トピックフィルターという文字列だ。トピックフィルターは完全一致の他にワイルドカードを使うことができる。

  • プラス記号(+): トピック階層内の1つのレベルにマッチ。たとえば、a/+/ca/b/ca/x/cにマッチし、a/b/x/ca/cにはマッチしない
  • ハッシュ記号(#): トピック階層内の0個以上のレベルにマッチします。ハッシュ記号はトピックフィルターの最後にのみ使用可能。a/#a/ba/b/ca/b/c/dなど、aで始まるすべてのトピックにマッチ

これを使うことでいろんな要求を満たすような購読ができる。

  • sport/tennis/player1: player1のテニスの成績を購読したい
  • sport/tennis/#: すべての選手のテニスの成績を購読したい
  • sport/+/player1: player1のすべてのスポーツの成績を購読したい
  • #: とにかくなんでも良いので全情報を購読したい

トピックフィルターによる購読をトライ木を使って購読する

このようなワイルドカードを含んだマッチを実装するやり方だが、トライ木を使うとシンプルに実装できる。トライ木の構造は以下を参考にしてほしい。

MQTTのトピックフィルターにおいて、#a/b/ca/+/ca/#を登録した場合、トライ木は次のような構造になる。トピックがやってきたら、これを辿りながらマッチするトピックフィルターを見つけ、それを購読しているクライアントを取得することになる。

root
  |- #
  └- a
     |- b
     |  └- c
     |- +
     |  └- c
     └- #

たとえばMQTTのトピックフィルターの実装として、自分で実装してみた例だと次のようになった。

https://github.com/shibayu36/go-mqtt-playground/blob/ddcc3bf1c57d1b05920e95f5697ddbf470010a31/broker/topic_tree.go

package main

import (
    "fmt"
    "strings"
    "sync"
)

type TopicTree struct {
    root *topicTreeNode
    mu   sync.RWMutex
}

func NewTopicTree() *TopicTree {
    return &TopicTree{
        root: newTopicTreeNode(""),
    }
}

func (t *TopicTree) Add(topic string, client *Client) {
    t.mu.Lock()
    defer t.mu.Unlock()

    parts := strings.Split(topic, "/")

    current := t.root
    for _, part := range parts {
        if _, exists := current.subnodes[part]; !exists {
            current.subnodes[part] = newTopicTreeNode(part)
        }
        current = current.subnodes[part]
    }
    current.clients[client] = true
}

func (t *TopicTree) Get(topic string) []*Client {
    t.mu.RLock()
    defer t.mu.RUnlock()

    parts := strings.Split(topic, "/")

    matchingClients := make([]*Client, 0)

    var traverse func(*topicTreeNode, []string)
    traverse = func(node *topicTreeNode, parts []string) {
        if len(parts) == 0 || node.isWildcard() {
            for client := range node.clients {
                matchingClients = append(matchingClients, client)
            }
        }

        if len(parts) > 0 {
            part := parts[0]
            if nextNode, exists := node.subnodes[part]; exists {
                traverse(nextNode, parts[1:])
            }
            if nextNode, exists := node.subnodes["+"]; exists {
                traverse(nextNode, parts[1:])
            }
            if nextNode, exists := node.subnodes["#"]; exists {
                traverse(nextNode, parts)
            }
        }
    }
    traverse(t.root, parts)

    return matchingClients
}

type topicTreeNode struct {
    part     string
    clients  map[*Client]bool
    subnodes map[string]*topicTreeNode
}

func newTopicTreeNode(part string) *topicTreeNode {
    return &topicTreeNode{
        part:     part,
        clients:  make(map[*Client]bool),
        subnodes: make(map[string]*topicTreeNode),
    }
}

func (n *topicTreeNode) isWildcard() bool {
    return n.part == "#"
}

まとめ

今回はMQTTのトピックフィルターによる購読の実装をトライ木を使うと簡潔に実装できるのが面白かったためまとめた。トライ木の構造は他にもHTTP RouterやIP Routing Tableなどの実装にも使われていたりしておもしろい。

MQTTプロトコルをブローカーを実装しながら学ぶ

clusterのリアルタイム通信サーバーの漸進的な進化 - Cluster Tech Blogを見て、リアルタイム通信の仕組みに興味を持った。しかし、MQTTプロトコルは使ったことがなく、どのようなプロトコルなのか、何がそんなに効率的なのか、なぜIoTなどでよく使われるのかなどが分からない状態だった。

そこでChatGPTを使えば自作しながらMQTTプロトコルを学べるのではないかと思い、https://github.com/shibayu36/go-mqtt-playground にて作り始めた。ひとまず簡易的にCONNECT/SUBSCRIBE/PUBLISHをハンドリングできるようになったのでメモしておく。

MQTTプロトコルの特徴

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html のAbstractがよくまとまっている。日本語訳した内容を置いておく。

MQTTは、クライアント・サーバー・パブリッシュ/サブスクライブ・メッセージング・トランスポート・プロトコルである。軽量、オープン、シンプルで、実装が容易なように設計されている。このような特性により、M2M(Machine to Machine)やIoT(Internet of Things)の通信など、コードフットプリントが小さく、ネットワーク帯域幅に制約がある環境など、さまざまな状況での使用に最適です。

このプロトコルは、TCP/IP上、または順序付けられたロスレス双方向接続を提供する他のネットワーク・プロトコル上で動作する。その特徴は以下の通り:

  • 1対多のメッセージ配信とアプリケーションのデカップリングを提供するパブリッシュ/サブスクライブ・メッセージ・パターンの使用。
  • ペイロードの内容にとらわれないメッセージング・トランスポート 。
  • メッセージ配信の3つのサービス品質
    • "最大1回"、メッセージは動作環境の最善の努力に従って配信される。メッセージの損失は起こりうる。このレベルは、例えば、個々の読み取り値が失われたとしても、次の読み取り値がすぐに公表されるため問題にはならないような、周囲のセンサーデータで使用することができる。
    • 少なくとも1回」。メッセージは確実に届くが、重複が発生する可能性がある。
    • Exactly once"(正確に一度):メッセージが正確に一度だけ届くことが保証される。このレベルは、例えば、メッセージの重複や紛失によって誤った料金が適用される可能性のある課金システムで使用できる。
  • 小さなトランスポート・オーバーヘッドとプロトコル交換を最小化し、ネットワーク・トラフィックを削減 。
  • 異常切断が発生したときに関係者に通知する仕組み。

何をベースに作るか

MQTT仕様のバージョンには3.1.1と5.0がある。それぞれ https://mqtt.org/mqtt-specification/ から確認できる。

バージョン5.0の方ができることは多いが複雑なので、今回は理解することを優先に3.1.1をベースに作ることにする。

できたもの

現時点でできたものは https://github.com/shibayu36/go-mqtt-playground/tree/ddcc3bf1c57d1b05920e95f5697ddbf470010a31

go run ./brokerで立ち上げた上で、mqtt-cliを使うと、動いている様子を確認できる。

まずmqtt-cliのshellモードでversion 3.1.1を使うように設定し

~/.mqtt-cli/config.properties

#Fri Jun 30 19:27:25 JST 2023
client.id.length=8
client.id.prefix=mqtt
logfile.level=debug
logfile.path=/Users/shibayu36/.mqtt-cli/logs
mqtt.host=localhost
mqtt.port=1883
mqtt.version=3
ws.path=/mqtt

サーバーを立ち上げ

$ go run ./broker
2023/07/31 09:48:32 Listening on port 1883

2つのクライアントを接続し、様子を確認する。

1つ目でsubscribe。

$ mqtt shell -l
mqtt> con
Restriction request problem information was set but is unused in MQTT Version MQTT_3_1_1
mqtt5h5LqvY7@localhost> sub --topic hoge/fuga -oc

2つ目でpublish。

$ mqtt shell -l
mqtt> con
Restriction request problem information was set but is unused in MQTT Version MQTT_3_1_1
mqtt8YrsztIc@localhost> pub --topic hoge/fuga -m piyo

すると1つ目のshellで、piyoがstdoutに流れてくる。

mqtt5h5LqvY7@localhost> piyo

学んだこと

まだ実装はかなり簡易的ではあるが、これだけでも色々なことを学べた。

まず以下のような特性により、IoTやリアルタイム通信などでよく使われることが多そうだ。

  • パケット構造が効率化されていて転送量をかなり低く抑えられる
  • コネクションを継続する仕様がいくつかあり、繋いだ状態であれば素早くやり取りができる
  • 仕様がある程度単純で、実装しやすい

またトピックフィルターの仕様によって、階層構造の下すべてをsubscribeしたりなども可能になっている。

  • MQTTのトピックは階層的な構造を持ち、スラッシュ(/)で区切られた複数のレベルから構成
  • ワイルドカードを利用できる
    • プラス記号(+): トピック階層内の1つのレベルにマッチ。たとえば、a/+/ca/b/ca/x/cにマッチし、a/b/x/ca/cにはマッチしない
    • ハッシュ記号(#): トピック階層内の0個以上のレベルにマッチします。ハッシュ記号はトピックフィルターの最後にのみ使用可能。a/#a/ba/b/ca/b/c/dなど、aで始まるすべてのトピックにマッチ

このトピックフィルターの実装はTrie木を使うとシンプルに実装できるのがおもしろい。このことについてはまた別のブログ記事で書いてみようと思う。

参考資料

ChatGPTに教えてもらったこと

ChatGPTに教えてもらったことをざっとコピペしておく。ちょっと不正確な話も混ざっているので、話半分くらいに。

実際のやり取りに使ったのはこの辺り。どう質問して作っていったのかも参考になると思う。

MQTT概要

MQTT(Message Queuing Telemetry Transport)は、軽量なメッセージングプロトコルであり、IoT(Internet of Things)デバイスと通信するために設計されています。このプロトコルは、低帯域幅での通信と低電力消費が必要な状況で非常に役立ちます。MQTTは、クライアントとサーバ(ブローカーとも呼ばれます)間の通信を行います。

MQTTの主な概念は以下の通りです。

  1. Publish/Subscribeモデル: MQTTはPublish/Subscribe(パブリッシュ/サブスクライブ)モデルを使用します。クライアントは、特定のトピック(テーマやカテゴリ)にメッセージを公開(publish)するか、特定のトピックのメッセージを購読(subscribe)します。ブローカーはこれらのメッセージを受け取り、購読しているクライアントに配信します。
  2. トピック: トピックはメッセージのフィルタリングに使用されるラベルです。クライアントは、特定のトピックに対してメッセージを公開したり、トピックを購読したりします。トピックは階層的な構造を持つことができ、ワイルドカードを使用して複数のトピックを一度に購読することもできます。
  3. QoS (Quality of Service) レベル: MQTTは3つのQoSレベルを提供しています。QoS 0はメッセージが最大1回配信されることを意味し、「少なくとも一度」の配信を保証します。QoS 1はメッセージが少なくとも1回配信されることを保証しますが、複数回配信される可能性があります。QoS 2はメッセージが正確に1回だけ配信されることを保証します。
    • -> ChatGPTのミスで、QoS 0はat most once
  4. ブローカー: MQTTブローカーは、メッセージの配信を仲介します。クライアントはブローカーに接続し、メッセージを公開または購読します。ブローカーは、購読されたトピックに関連するメッセージを購読者に配信します。
  5. 接続とセッション: クライアントがMQTTブローカーに接続するとき、接続を確立するためのハンドシェイクが行われます。クライアントは通常、接続時に一意のクライアントIDを提供します。また、クライアントは「クリーンセッション」を指定することができ、これは接続が切断されたときにセッション状態を保持するかどうかを制御します。
  6. Last Will and Testament (LWT): クライアントは接続時に「遺言」メッセージを設定することができます。これは、クライアントが何らかの理由で接続を失った場合にブローカーによって公開されるメッセージです。
  7. 保持メッセージ: クライアントはメッセージを公開する際に、そのメッセージを「保持」するように指示することができます。これにより、新しいサブスクライバーがトピックを購読したときに、最後に公開された保持メッセージが受信されます。

何から実装すると学びやすいか

メインの機能に絞ってMQTTプロトコルを学ぶと効率的です。以下は、実装に重点を置いて学ぶべき基本的なプロトコルの要素です。

  1. クライアントとブローカーの接続: MQTTプロトコルでは、クライアントはブローカーに接続する必要があります。接続を確立するためのプロセスと、クライアントが接続を切断する方法を理解することが重要です。CONNECTとDISCONNECTのコマンドを理解しましょう。
  2. Publish/Subscribeモデル: MQTTの中核となる概念です。クライアントがトピックにメッセージを公開し、他のクライアントがこれらのトピックを購読する方法を理解する必要があります。PUBLISH、SUBSCRIBE、およびUNSUBSCRIBEコマンドを注視しましょう。
  3. トピック: MQTTのトピックは、メッセージングのカテゴリを指定します。トピックの命名規則とワイルドカードの使用を理解しましょう。
  4. QoS (Quality of Service) レベル: メッセージの配信保証レベルを理解し、それぞれのレベルがどのように動作するかを学びましょう。QoS 0、1、および2の違いを把握することが重要です。

ネットワークの流れ

クライアントがオフラインの時の挙動

MQTTプロトコルでは、クライアントがオフラインのときに、そのクライアントがサブスクライブしているトピックにメッセージがパブリッシュされる場合の挙動は、Quality of Service(QoS)レベルと「クリーンセッション」フラグの設定に依存します。

  1. QoS 0 (At most once): このレベルでは、メッセージは最大1回配信されます。クライアントがオフラインの場合、メッセージは失われます。
  2. QoS 1 (At least once): このレベルでは、メッセージは少なくとも1回配信されます。クライアントがオフラインの場合で、クリーンセッションフラグがfalseに設定されている場合、ブローカーはメッセージを保存し、クライアントが再接続したときに配信します。
  3. QoS 2 (Exactly once): このレベルでは、メッセージは正確に1回配信されます。これもQoS 1と同様に、クリーンセッションフラグがfalseの場合、ブローカーはメッセージを保存し、クライアントが再接続したときに配信します。

また、「クリーンセッション」フラグは重要な役割を果たします。

  • クリーンセッション = true: クライアントとブローカーの間のセッションは、クライアントが切断されると終了します。これは、クライアントがオフラインの間にパブリッシュされたメッセージは保存されず、クライアントが再接続しても受信されないことを意味します。
  • クリーンセッション = false: クライアントが切断されても、ブローカーはセッションを維持します。クライアントがサブスクライブしたトピックに関連するメッセージは、クライアントが再接続するまでブローカーによって保存されます。

したがって、オフラインのクライアントに対してメッセージを保存しておきたい場合は、サブスクリプションをQoS 1または2で行い、クリーンセッションフラグをfalseに設定する必要があります。

トピックフィルターと構造

トピックフィルターは、MQTTプロトコルにおいて、クライアントが特定のトピックにサブスクライブする際に使用されるパターンです。トピックフィルターを使用することで、クライアントは自分が関心を持つメッセージだけを受け取ることができます。MQTTのトピックは階層的な構造を持ち、スラッシュ(/)で区切られた複数のレベルから構成されます。

トピックフィルターでは2つのワイルドカード文字が使用できます: 1. プラス記号(+): トピック階層内の1つのレベルにマッチします。たとえば、a/+/ca/b/ca/x/cにマッチしますが、a/b/x/ca/cにはマッチしません。 2. ハッシュ記号(#): トピック階層内の0個以上のレベルにマッチします。ハッシュ記号はトピックフィルターの最後にのみ使用できます。例えば、a/#a/ba/b/ca/b/c/dなど、aで始まるすべてのトピックにマッチします。

以下はトピックフィルターの具体的な例です: 1. sensor/temperature: これはワイルドカードを使用していない普通のトピックフィルターで、厳密にsensor/temperatureというトピックにのみマッチします。 2. sensor/+/temperature: このフィルターは、sensorで始まり、temperatureで終わる、3つの階層からなる任意のトピックにマッチします。たとえば、sensor/room/temperaturesensor/outdoor/temperatureなど。 3. sensor/#: このフィルターは、sensorで始まる任意のトピックにマッチします。例えばsensor/temperature, sensor/humidity/room1, sensor/temperature/living-roomなど。 4. #: このフィルターは全てのトピックにマッチします。

トピックフィルターの$について

MQTTプロトコルでは、トピック名の先頭に$記号がある場合、それはシステムトピックを表します。システムトピックは、通常のメッセージングとは異なり、ブローカーの内部情報や診断データなどを配信するために使用されます。

例えば、一般的なシステムトピックとして$SYS/で始まるトピックがよく使用されます。これはブローカーの内部状態に関する情報を提供します。たとえば、$SYS/broker/uptimeはブローカーの稼働時間を表し、$SYS/broker/clients/connectedは接続されているクライアントの数を表すかもしれません。

また、$記号で始まるトピックは、通常のトピックフィルターではマッチしないようになっています。例えば、#は通常のトピックをキャプチャするのに対して、$SYS/#のように指定しないと$で始まるトピックはキャプチャされません。

これにより、システム関連のメッセージと通常のメッセージングが分離され、クライアントが意図せずにシステムメッセージを受信することがなくなります。また、システムトピックを使用することで、ブローカーの監視や管理を効果的に行うことができます。

SPACEフレームワークを参考に開発生産性の可視化の改善方法を考える

GitHub Copilotについて調べていたときに見た Research: quantifying GitHub Copilot’s impact on developer productivity and happiness - The GitHub Blog の記事の中で、開発生産性を測るSPACEフレームワークを使って計測しているのを見かけた。SPACEフレームワークという文字は見たことあったが調べたことはなかったので、この機会に調査した。その調査の中で開発生産性の可視化の改善方法についても考えたため、考えたことをまとめておく。

まずSPACEフレームワークがあったとしても、これをそのまま使うことはないと感じた。個人的に5カテゴリの分類の納得感があまりなく、計測する指標の例の計測難易度が非常に高いと感じたためだ。また使う難易度も高く見えて、使う人にも専門性が必要と感じるため、フレームワークという意味で使うのも難しいと感じた。

しかしこのフレームワークの中で、いくつかのポイントは非常に参考にできると感じた。たとえばカテゴリの中に顧客の観点が含まれていることや、必ずアンケートデータなどの定性的な指標をセットに捉えるべきと書かれていることだ。

このような参考になるポイントを取り入れると、組織の開発生産性を単純にFourKeysのみで可視化するより良くなると感じる。たとえば自分だったら以下のように3つの手段で可視化するだろう。

  • FourKeysの計測。変更のリードタイム・デプロイの頻度・MTTR・変更時の障害率・信頼性
    • 最新のレポートの5指標測定する方式を採用する
    • この5指標は、SPACEフレームワークの色々な部分をバランスよく集計できていて、かつ集計が難しすぎない
  • プロダクトKPI
    • FourKeysの弱点として生産性の文脈に顧客視点が抜けやすいということがあるので、生産性の計測という部分でもプロダクトKPIを参考にする
  • 定期的に開発者へのアンケートを行う。SPACEフレームワークの観点をアンケート項目に採用しても良い
    • 指標だけからは見えない原因の探索や、指標にまだ現れていない傾向を見るため

FourKeys自体も意外と測定が難しいという問題はあるが、この形を意識しながら取れるところから取っていくと良いと考えている。

参考資料

自分の考えの関連記事