物联网

Gmqtt 简介——基于Go语言的 MQTT borker 实现

3044 2022-01-07 02:26:43

Gmqtt 是用 Go 语言实现的一个具备灵活灵活扩展能力,高性能的 MQTT broker,其完整实现了 MQTT V3.x 和 V5 协议。

功能特性

  • 内置了许多实用的钩子方法,使用者可以方便的定制需要的 MQTT 服务器(鉴权、ACL等功能)
  • 支持 tls/ssl 以及 ws/wss
  • 提供扩展编程接口,可以通过函数调用直接往broker发消息,添加删除订阅等。详见 server.go 的 Server 接口定义,以及  admin 插件。
  • 丰富的钩子方法和扩展编程接口赋予了 Gmqtt 强大的插件定制化能力。详见 server/plugin.go 和 /plugin。
  • 提供监控指标,支持prometheus。 (plugin: prometheus)
  • GRPC 和 REST API 支持. (plugin:admin)
  • 支持 session 持久化,broker 重启消息不丢失,目前支持 redis 持久化。
  • 支持集群, 示例和详情请参考 federation plugin

开始

我们需要通过源码编译的方式启动,请确保您所在的机器上已经具备 Go 环境。

下列命令会使用默认配置启动 Gmqtt 服务,该服务使用 1883 端口[tcp]和 8883 端口[websocket]端口提供 MQTT broker 服务,并加载 admin 和 prometheu s插件。

$ git clone https://github.com/DrmagicE/gmqtt
$ cd gmqtt/cmd/gmqttd
$ go run . start -c default_config.yml

配置

Gmqtt 通过 -c 来指定配置文件路径,如果没有指定,Gmqtt 默认读取 $HOME/gmqtt.yml 为配置文件。

使用持久化存储

Gmqtt 默认使用内存存储,这也是 Gmqtt 推荐的存储方式,内存存储具备绝佳的性能优势,但缺点是 session 信息会在 broker 重启后丢失。 如果你希望重启后 session 不丢失,可以配置 redis 持久化存储:

persistence:
  type: redis  
  redis:
    # redis server address
    addr: "127.0.0.1:6379"
    # the maximum number of idle connections in the redis connection pool
    max_idle: 1000
    # the maximum number of connections allocated by the redis connection pool at a given time.
    # If zero, there is no limit on the number of connections in the pool.
    max_active: 0
    # the connection idle timeout, connection will be closed after remaining idle for this duration. If the value is zero, then idle connections are not closed
    idle_timeout: 240s
    password: ""
    # the number of the redis database
    database: 0

配置鉴权

Gmqtt 内置了基于 username/password 的简单鉴权机制。(由 auth 插件提供)。 Gmqtt 默认配置没有开启鉴权,可以通过修改配置文件来加载鉴权插件:

# plugin loading orders
plugin_order:
  - auth
  - prometheus
  - admin

加载后,需要添加账户才可以连接,可以通过 HTTP接口来添加账户:

# 创建: username = user1, password = user1pass
$ curl -X POST -d '{"password":"user1pass"}' 127.0.0.1:8083/v1/accounts/user1
{}
# 查询:
$ curl 127.0.0.1:8083/v1/accounts/user1
{"account":{"username":"user1","password":"20a0db53bc1881a7f739cd956b740039"}}

Docker

$ docker build -t gmqtt .
$ docker run -p 1883:1883 -p 8883:8883 -p 8082:8082 -p 8083:8083  -p 8084:8084  gmqtt

文档说明

钩子方法

Gmqtt 实现了下列钩子方法。

hook说明用途示例
OnAcceptTCP连接建立时调用TCP连接限速,黑白名单等.
OnStop当gmqtt退出时调用 
OnSubscribe收到订阅请求时调用校验订阅是否合法
OnSubscribed订阅成功后调用统计订阅报文数量
OnUnsubscribe取消订阅时调用校验是否允许取消订阅
OnUnsubscribed取消订阅成功后调用统计订阅报文数
OnMsgArrived收到消息发布报文时调用校验发布权限,改写发布消息
OnBasicAuth收到连接请求报文时调用客户端连接鉴权
OnEnhancedAuth收到带有AuthMetho的连接请求报文时调用(V5特性)客户端连接鉴权
OnReAuth收到Auth报文时调用(V5特性)客户端连接鉴权
OnConnected客户端连接成功后调用统计在线客户端数量
OnSessionCreated客户端创建新session后调用统计session数量
OnSessionResumed客户端从旧session恢复后调用统计session数量
OnSessionTerminatedsession删除后调用统计session数量
OnDelivered消息从broker投递到客户端后调用 
OnClosed客户端断开连接后调用统计在线客户端数量
OnMsgDropped消息被丢弃时调用 
OnWillPublish发布遗嘱消息前修改或丢弃遗嘱消息
OnWillPublished发布遗嘱消息后