$shibayu36->blog;

クラスター株式会社のソフトウェアエンジニアです。エンジニアリングや読書などについて書いています。

MQTTのトピックフィルターによる購読をトライ木を使って実装する

最近趣味でMQTTブローカーの実装をしてみている。その中でトライ木を使うとトピックフィルターによる購読の実装を簡単に行えるというのが面白かったのでブログにしておく。

なおトライ木はHTTP Routerの実装にも使われることがあり、そういう点でも一度実装してみると参考になる。GitHub - bmf-san/goblin: A golang http router based on trie tree.

トピックフィルターとは

MQTTでSUBSCRIBE/PUBLISHする時は、トピックというものを使ってやり取りをする。トピックは階層的な構造を持っている。たとえば

  • a
  • a/b/c
  • hoge/fuga

この時SUBSCRIBEでは、トピック名完全一致で購読したい、ある階層以下のすべてを購読したい、特定の階層はなんでも良いので購読したいなどの要求がある。その要求を満たすためにあるのが、トピックフィルターという文字列だ。トピックフィルターは完全一致の他にワイルドカードを使うことができる。

  • プラス記号(+): トピック階層内の1つのレベルにマッチ。たとえば、a/+/ca/b/ca/x/cにマッチし、a/b/x/ca/cにはマッチしない
  • ハッシュ記号(#): トピック階層内の0個以上のレベルにマッチします。ハッシュ記号はトピックフィルターの最後にのみ使用可能。a/#a/ba/b/ca/b/c/dなど、aで始まるすべてのトピックにマッチ

これを使うことでいろんな要求を満たすような購読ができる。

  • sport/tennis/player1: player1のテニスの成績を購読したい
  • sport/tennis/#: すべての選手のテニスの成績を購読したい
  • sport/+/player1: player1のすべてのスポーツの成績を購読したい
  • #: とにかくなんでも良いので全情報を購読したい

トピックフィルターによる購読をトライ木を使って購読する

このようなワイルドカードを含んだマッチを実装するやり方だが、トライ木を使うとシンプルに実装できる。トライ木の構造は以下を参考にしてほしい。

MQTTのトピックフィルターにおいて、#a/b/ca/+/ca/#を登録した場合、トライ木は次のような構造になる。トピックがやってきたら、これを辿りながらマッチするトピックフィルターを見つけ、それを購読しているクライアントを取得することになる。

root
  |- #
  └- a
     |- b
     |  └- c
     |- +
     |  └- c
     └- #

たとえばMQTTのトピックフィルターの実装として、自分で実装してみた例だと次のようになった。

https://github.com/shibayu36/go-mqtt-playground/blob/ddcc3bf1c57d1b05920e95f5697ddbf470010a31/broker/topic_tree.go

package main

import (
    "fmt"
    "strings"
    "sync"
)

type TopicTree struct {
    root *topicTreeNode
    mu   sync.RWMutex
}

func NewTopicTree() *TopicTree {
    return &TopicTree{
        root: newTopicTreeNode(""),
    }
}

func (t *TopicTree) Add(topic string, client *Client) {
    t.mu.Lock()
    defer t.mu.Unlock()

    parts := strings.Split(topic, "/")

    current := t.root
    for _, part := range parts {
        if _, exists := current.subnodes[part]; !exists {
            current.subnodes[part] = newTopicTreeNode(part)
        }
        current = current.subnodes[part]
    }
    current.clients[client] = true
}

func (t *TopicTree) Get(topic string) []*Client {
    t.mu.RLock()
    defer t.mu.RUnlock()

    parts := strings.Split(topic, "/")

    matchingClients := make([]*Client, 0)

    var traverse func(*topicTreeNode, []string)
    traverse = func(node *topicTreeNode, parts []string) {
        if len(parts) == 0 || node.isWildcard() {
            for client := range node.clients {
                matchingClients = append(matchingClients, client)
            }
        }

        if len(parts) > 0 {
            part := parts[0]
            if nextNode, exists := node.subnodes[part]; exists {
                traverse(nextNode, parts[1:])
            }
            if nextNode, exists := node.subnodes["+"]; exists {
                traverse(nextNode, parts[1:])
            }
            if nextNode, exists := node.subnodes["#"]; exists {
                traverse(nextNode, parts)
            }
        }
    }
    traverse(t.root, parts)

    return matchingClients
}

type topicTreeNode struct {
    part     string
    clients  map[*Client]bool
    subnodes map[string]*topicTreeNode
}

func newTopicTreeNode(part string) *topicTreeNode {
    return &topicTreeNode{
        part:     part,
        clients:  make(map[*Client]bool),
        subnodes: make(map[string]*topicTreeNode),
    }
}

func (n *topicTreeNode) isWildcard() bool {
    return n.part == "#"
}

まとめ

今回はMQTTのトピックフィルターによる購読の実装をトライ木を使うと簡潔に実装できるのが面白かったためまとめた。トライ木の構造は他にもHTTP RouterやIP Routing Tableなどの実装にも使われていたりしておもしろい。