Gmqtt 简介——基于Go语言的 MQTT borker 实现
Gmqtt 是用 Go 语言实现的一个具备灵活灵活扩展能力,高性能的 MQTT broker,其完整实现了 MQTT V3.x 和 V5 协议。
功能特性
- 内置了许多实用的钩子方法,使用者可以方便的定制需要的 MQTT 服务器(鉴权、ACL等功能)
- 支持 tls/ssl 以及 ws/wss
- 提供扩展编程接口,可以通过函数调用直接往broker发消息,添加删除订阅等。详见 server.go 的 Server 接口定义,以及 admin 插件。
- 丰富的钩子方法和扩展编程接口赋予了 Gmqtt 强大的插件定制化能力。详见 server/plugin.go 和 /plugin。
- 提供监控指标,支持prometheus。 (plugin: prometheus)
- GRPC 和 REST API 支持. (plugin:admin)
- 支持 session 持久化,broker 重启消息不丢失,目前支持 redis 持久化。
- 支持集群, 示例和详情请参考 federation plugin。
开始
我们需要通过源码编译的方式启动,请确保您所在的机器上已经具备 Go 环境。
下列命令会使用默认配置启动 Gmqtt 服务,该服务使用 1883 端口[tcp]和 8883 端口[websocket]端口提供 MQTT broker 服务,并加载 admin 和 prometheu s插件。
$ git clone https://github.com/DrmagicE/gmqtt
$ cd gmqtt/cmd/gmqttd
$ go run . start -c default_config.yml
配置
Gmqtt 通过 -c 来指定配置文件路径,如果没有指定,Gmqtt 默认读取 $HOME/gmqtt.yml 为配置文件。
使用持久化存储
Gmqtt 默认使用内存存储,这也是 Gmqtt 推荐的存储方式,内存存储具备绝佳的性能优势,但缺点是 session 信息会在 broker 重启后丢失。 如果你希望重启后 session 不丢失,可以配置 redis 持久化存储:
persistence:
type: redis
redis:
# redis server address
addr: "127.0.0.1:6379"
# the maximum number of idle connections in the redis connection pool
max_idle: 1000
# the maximum number of connections allocated by the redis connection pool at a given time.
# If zero, there is no limit on the number of connections in the pool.
max_active: 0
# the connection idle timeout, connection will be closed after remaining idle for this duration. If the value is zero, then idle connections are not closed
idle_timeout: 240s
password: ""
# the number of the redis database
database: 0
配置鉴权
Gmqtt 内置了基于 username/password 的简单鉴权机制。(由 auth 插件提供)。 Gmqtt 默认配置没有开启鉴权,可以通过修改配置文件来加载鉴权插件:
# plugin loading orders
plugin_order:
- auth
- prometheus
- admin
加载后,需要添加账户才可以连接,可以通过 HTTP接口来添加账户:
# 创建: username = user1, password = user1pass
$ curl -X POST -d '{"password":"user1pass"}' 127.0.0.1:8083/v1/accounts/user1
{}
# 查询:
$ curl 127.0.0.1:8083/v1/accounts/user1
{"account":{"username":"user1","password":"20a0db53bc1881a7f739cd956b740039"}}
Docker
$ docker build -t gmqtt .
$ docker run -p 1883:1883 -p 8883:8883 -p 8082:8082 -p 8083:8083 -p 8084:8084 gmqtt
文档说明
钩子方法
Gmqtt 实现了下列钩子方法。
hook | 说明 | 用途示例 |
---|---|---|
OnAccept | TCP连接建立时调用 | TCP连接限速,黑白名单等. |
OnStop | 当gmqtt退出时调用 | |
OnSubscribe | 收到订阅请求时调用 | 校验订阅是否合法 |
OnSubscribed | 订阅成功后调用 | 统计订阅报文数量 |
OnUnsubscribe | 取消订阅时调用 | 校验是否允许取消订阅 |
OnUnsubscribed | 取消订阅成功后调用 | 统计订阅报文数 |
OnMsgArrived | 收到消息发布报文时调用 | 校验发布权限,改写发布消息 |
OnBasicAuth | 收到连接请求报文时调用 | 客户端连接鉴权 |
OnEnhancedAuth | 收到带有AuthMetho的连接请求报文时调用(V5特性) | 客户端连接鉴权 |
OnReAuth | 收到Auth报文时调用(V5特性) | 客户端连接鉴权 |
OnConnected | 客户端连接成功后调用 | 统计在线客户端数量 |
OnSessionCreated | 客户端创建新session后调用 | 统计session数量 |
OnSessionResumed | 客户端从旧session恢复后调用 | 统计session数量 |
OnSessionTerminated | session删除后调用 | 统计session数量 |
OnDelivered | 消息从broker投递到客户端后调用 | |
OnClosed | 客户端断开连接后调用 | 统计在线客户端数量 |
OnMsgDropped | 消息被丢弃时调用 | |
OnWillPublish | 发布遗嘱消息前 | 修改或丢弃遗嘱消息 |
OnWillPublished | 发布遗嘱消息后 |