1 Mosquitto 简介与核心特性
Mosquitto 是 Eclipse 基金会下的开源 MQTT 代理服务器,采用 C 语言编写,轻量高效,支持 MQTT 3.1、3.1.1 和 5.0 协议。
核心特性
· 轻量级,资源占用少
· 支持 QoS 0/1/2 消息等级
· 持久化会话支持
· TLS/SSL 加密通信
· 灵活的认证授权机制
· 桥接模式支持分布式部署
2 安装与使用
项目连接:https://github.com/eclipse-mosquitto/mosquitto
下载一份代码 git clone https://github.com/eclipse-mosquitto/mosquitto,直接make编译,我的环境是Centos7,缺少依赖包就先安装依赖的包。
编译完成,直接运行程序:

mosquitto已经运行在监听1883端口,在另外的终端分别运行订阅与发布命令测试下,编译完成后在项目的client目录下已经有这两个命令。
订阅:

发布:

mosquitto已经可以正常运行,客户端正常订阅与发布消息,接下来就可以项目化使用。
3. 最佳实践:分布式实践方案
3.1 架构设计

3.2 多节点部署配置
Mosquitto作为MQTT的一个Broker,可以分布式部署,设备端通过一个API接口获取Broker列表,然后根据连接策略选择一个Broker进行连接与登陆。
主节点配置文件 (mosquitto_primary.conf)
1# 基础配置 2listener 1883 3protocol mqtt 4 5# 持久化 6persistence true 7persistence_location /var/lib/mosquitto/ 8 9# 日志配置 10log_dest file /var/log/mosquitto/mosquitto.log 11log_type all 12 13# 安全配置 14allow_anonymous false 15password_file /etc/mosquitto/passwd 16 17# 桥接配置 - 连接到其他节点 18connection bridge_secondary 19address 192.168.1.2:1883 20topic # both 2 "" "" 21 22# 集群发现 23listener 1884 24protocol websockets 25
从节点配置文件 (mosquitto_secondary.conf)
1listener 1883 2protocol mqtt 3persistence true 4persistence_location /var/lib/mosquitto_secondary/ 5 6# 连接到主节点 7connection bridge_primary 8address 192.168.1.1:1883 9topic # both 2 "" "" 10
3.3 使用 Docker 部署集群
docker-compose.yml
1version: '3.8' 2services: 3 mosquitto-primary: 4 image: eclipse-mosquitto:latest 5 ports: 6 - "1883:1883" 7 - "9001:9001" 8 volumes: 9 - ./primary.conf:/mosquitto/config/mosquitto.conf 10 - ./primary-data:/mosquitto/data 11 - ./primary-log:/mosquitto/log 12 13 mosquitto-secondary: 14 image: eclipse-mosquitto:latest 15 ports: 16 - "1884:1883" 17 volumes: 18 - ./secondary.conf:/mosquitto/config/mosquitto.conf 19 - ./secondary-data:/mosquitto/data 20 - ./secondary-log:/mosquitto/log 21 depends_on: 22 - mosquitto-primary 23
4. 自定义认证插件开发
mosquitto本身没有实现对接数据库进行访问,不过可以开发一个简单插件实现访问数据库对每个设备进行认证鉴权。
按照项目提供的插件例子,实现相应的接口,把插件编译为一个so文件,然后在配置文件中配置插件文件名、数据库用户名与密码,就可以完成一个可访问数据库进行认证的Broker。
4.1 插件基础结构
mosquitto_auth_db.h
1#ifndef MOSQUITTO_AUTH_DB_H 2#define MOSQUITTO_AUTH_DB_H 3 4#include "mosquitto_broker.h" 5#include "mosquitto_plugin.h" 6#include "mosquitto.h" 7 8// 插件配置结构 9struct auth_db_config { 10 char *db_host; 11 int db_port; 12 char *db_name; 13 char *db_user; 14 char *db_password; 15 int cache_timeout; 16}; 17 18// 数据库连接结构 19struct db_connection { 20 // 数据库连接句柄 21 void *conn; 22 // 连接状态 23 int connected; 24}; 25 26#endif 27
4.2 主要认证函数实现
mosquitto_auth_db.c
1#include "mosquitto_auth_db.h" 2#include <stdio.h> 3#include <string.h> 4#include <stdlib.h> 5 6// 插件版本 7int mosquitto_auth_plugin_version(void) { 8 return MOSQ_AUTH_PLUGIN_VERSION; 9} 10 11// 插件初始化 12int mosquitto_auth_plugin_init(void **user_data, struct mosquitto_opt *opts, int opt_count) { 13 struct auth_db_config *config = malloc(sizeof(struct auth_db_config)); 14 15 // 解析配置参数 16 for (int i = 0; i < opt_count; i++) { 17 if (strcmp(opts[i].key, "db_host") == 0) { 18 config->db_host = strdup(opts[i].value); 19 } else if (strcmp(opts[i].key, "db_port") == 0) { 20 config->db_port = atoi(opts[i].value); 21 } 22 // 其他参数解析... 23 } 24 25 *user_data = config; 26 27 // 初始化数据库连接 28 if (db_connect(config) != 0) { 29 mosquitto_log_printf(MOSQ_LOG_ERR, "Failed to connect to database"); 30 return MOSQ_ERR_UNKNOWN; 31 } 32 33 return MOSQ_ERR_SUCCESS; 34} 35 36// 用户名/密码认证 37int mosquitto_auth_plugin_authenticate_user( 38 void *user_data, 39 struct mosquitto *client, 40 const char *username, 41 const char *password 42) { 43 struct auth_db_config *config = (struct auth_db_config *)user_data; 44 45 if (!username || !password) { 46 return MOSQ_ERR_AUTH; 47 } 48 49 // 查询数据库验证用户 50 int auth_result = db_authenticate_user(config, username, password); 51 52 if (auth_result == 0) { 53 mosquitto_log_printf(MOSQ_LOG_INFO, "User %s authenticated successfully", username); 54 return MOSQ_ERR_SUCCESS; 55 } else { 56 mosquitto_log_printf(MOSQ_LOG_WARNING, "Authentication failed for user %s", username); 57 return MOSQ_ERR_AUTH; 58 } 59} 60 61// ACL 检查 62int mosquitto_auth_plugin_acl_check( 63 void *user_data, 64 int access, 65 struct mosquitto *client, 66 const struct mosquitto_acl_msg *msg 67) { 68 // 实现主题访问控制逻辑 69 const char *username = mosquitto_client_username(client); 70 71 if (username && check_topic_permission(username, msg->topic, access)) { 72 return MOSQ_ERR_SUCCESS; 73 } 74 75 return MOSQ_ERR_ACL_DENIED; 76} 77 78// 数据库认证函数 79int db_authenticate_user(struct auth_db_config *config, const char *username, const char *password) { 80 // 这里实现具体的数据库查询逻辑 81 // 示例使用 PostgreSQL 82 PGconn *conn = PQconnectdb( 83 fmt::format("host={} port={} dbname={} user={} password={}", 84 config->db_host, config->db_port, config->db_name, 85 config->db_user, config->db_password) 86 ); 87 88 if (PQstatus(conn) != CONNECTION_OK) { 89 mosquitto_log_printf(MOSQ_LOG_ERR, "Database connection failed: %s", PQerrorMessage(conn)); 90 return -1; 91 } 92 93 // 执行认证查询 94 const char *query = "SELECT password_hash FROM devices WHERE device_id = $1 AND status = 'active'"; 95 PGresult *res = PQexecParams(conn, query, 1, NULL, &username, NULL, NULL, 0); 96 97 if (PQresultStatus(res) != PGRES_TUPLES_OK) { 98 PQclear(res); 99 PQfinish(conn); 100 return -1; 101 } 102 103 if (PQntuples(res) == 1) { 104 char *stored_hash = PQgetvalue(res, 0, 0); 105 int result = verify_password(password, stored_hash); 106 107 PQclear(res); 108 PQfinish(conn); 109 110 return result; 111 } 112 113 PQclear(res); 114 PQfinish(conn); 115 return -1; 116} 117
4.3 插件编译与配置
CMakeLists.txt
1cmake_minimum_required(VERSION 3.10) 2project(mosquitto_auth_db) 3 4find_library(MOSQUITTO_LIB mosquitto) 5find_path(MOSQUITTO_INCLUDE_DIR mosquitto_broker.h) 6 7add_library(auth_db SHARED mosquitto_auth_db.c) 8target_include_directories(auth_db PRIVATE ${MOSQUITTO_INCLUDE_DIR}) 9target_link_libraries(auth_db ${MOSQUITTO_LIB}) 10
插件配置文件
1# mosquitto.conf 中添加 2allow_anonymous false 3auth_plugin /path/to/auth_db.so 4auth_opt_db_host localhost 5auth_opt_db_port 5432 6auth_opt_db_name iot_db 7auth_opt_db_user mosquitto 8auth_opt_db_password secret 9auth_opt_cache_timeout 300 10
5. 后端服务集成
5.1 go 后端服务示例
我们项目的后端使用go语言开发,go语言有很好的mqtt开发包,可以直接引入使用。下面是一个简单的范例,只展示MQTT的初始化连接部分。
MQTT的相关接口在mqtt.go实现
1package services 2 3import ( 4 "encoding/json" 5 "fmt" 6 "itcps/database" 7 "itcps/logger" 8 "itcps/models" 9 "math" 10 "math/rand" 11 "strings" 12 "sync" 13 "time" 14 15 MQTT "github.com/eclipse/paho.mqtt.golang" 16) 17 18// connectBroker 尝试连接单个broker 19func (m *MQTTService) connectBroker(broker *models.MQTTBroker) error { 20 opts := MQTT.NewClientOptions() 21 opts.AddBroker(fmt.Sprintf("tcp://%s", broker.IP)) 22 clientID := fmt.Sprintf("itcps-service-%s-%d", broker.IP, time.Now().UnixNano()) 23 opts.SetClientID(clientID) 24 opts.SetUsername(broker.Username) 25 opts.SetPassword(broker.Password) 26 opts.SetDefaultPublishHandler(m.messageHandler) 27 opts.SetAutoReconnect(true) 28 opts.SetConnectRetry(true) 29 opts.SetOnConnectHandler(m.onConnect) 30 opts.SetConnectionLostHandler(m.onConnectionLost) 31 opts.SetConnectTimeout(5 * time.Second) 32 33 client := MQTT.NewClient(opts) 34 35 // 创建一个带超时的通道 36 done := make(chan error, 1) 37 38 go func() { 39 token := client.Connect() 40 token.Wait() 41 done <- token.Error() 42 }() 43 44 // 设置5秒超时 45 select { 46 case err := <-done: 47 if err != nil { 48 return fmt.Errorf("连接broker失败: %v", err) 49 } 50 // 从IP:Port中提取IP地址 51 brokerIP := strings.Split(broker.IP, ":")[0] 52 53 m.mutex.Lock() 54 m.clients[brokerIP] = client 55 m.mutex.Unlock() 56 57 logger.Logs("info", 58 map[string]interface{}{ 59 "MQTT": "connectBroker", 60 "full_addr": broker.IP, 61 "ip": brokerIP, 62 }, 63 fmt.Sprintf("成功添加broker client, IP: %s", brokerIP)) 64 return nil 65 case <-time.After(5 * time.Second): 66 // 尝试断开连接 67 client.Disconnect(250) 68 return fmt.Errorf("连接broker超时: %s", broker.IP) 69 } 70} 71 72// Connect 连接到所有可用的broker 73func (m *MQTTService) Connect() error { 74 // 获取所有broker列表,使用较大的pageSize确保获取所有broker 75 brokers, _, err := models.GetAllBrokers(1, 100) 76 if err != nil { 77 logger.Logs("error", 78 map[string]interface{}{"MQTT": "Connect"}, 79 fmt.Sprintf("获取MQTT代理列表失败: %v", err)) 80 return err 81 } 82 83 logger.Logs("info", 84 map[string]interface{}{"MQTT": "Connect"}, 85 fmt.Sprintf("获取到broker列表,总数: %d", len(brokers))) 86 87 successCount := 0 88 var lastErr error 89 90 // 遍历所有状态为0的broker,尝试连接 91 for i := range brokers { 92 logger.Logs("info", 93 map[string]interface{}{ 94 "MQTT": "Connect", 95 "broker_ipport": brokers[i].IP, 96 "broker_status": brokers[i].Status, 97 }, 98 fmt.Sprintf("正在处理broker: %s, 状态: %s", brokers[i].IP, brokers[i].Status)) 99 100 if brokers[i].Status != "0" { 101 logger.Logs("info", 102 map[string]interface{}{"MQTT": "Connect"}, 103 fmt.Sprintf("跳过非活动broker: %s, 状态: %s", brokers[i].IP, brokers[i].Status)) 104 continue 105 } 106 107 err := m.connectBroker(&brokers[i]) 108 if err != nil { 109 lastErr = err 110 logger.Logs("warn", 111 map[string]interface{}{ 112 "MQTT": "Connect", 113 "error": err.Error(), 114 }, 115 fmt.Sprintf("连接broker %s 失败: %v", brokers[i].IP, err)) 116 continue 117 } 118 119 successCount++ 120 logger.Logs("info", 121 map[string]interface{}{"MQTT": "Connect"}, 122 fmt.Sprintf("成功连接到broker %s", brokers[i].IP)) 123 } 124 125 logger.Logs("info", 126 map[string]interface{}{ 127 "MQTT": "Connect", 128 "success_count": successCount, 129 "total_brokers": len(brokers), 130 }, 131 fmt.Sprintf("broker连接总结: 成功 %d 个, 总数 %d 个", successCount, len(brokers))) 132 133 if successCount == 0 { 134 m.resultChan <- lastErr 135 return fmt.Errorf("所有broker连接均失败,最后错误: %v", lastErr) 136 } 137 138 m.resultChan <- nil 139 return nil 140} 141 142// Publish 发布消息到指定的broker 143func (m *MQTTService) Publish(topic string, message interface{}, brokerHost string) error { 144 m.mutex.RLock() 145 client, exists := m.clients[brokerHost] 146 m.mutex.RUnlock() 147 148 if !exists { 149 logger.Logs("error", 150 map[string]interface{}{ 151 "MQTT": "Publish", 152 "broker_host": brokerHost, 153 }, 154 fmt.Sprintf("broker %s 的client不存在", brokerHost)) 155 return fmt.Errorf("broker %s 的client不存在", brokerHost) 156 } 157 158 payload, err := json.Marshal(message) 159 if err != nil { 160 logger.Logs("error", 161 map[string]interface{}{ 162 "MQTT": "Publish", 163 "broker_host": brokerHost, 164 "topic": topic, 165 "error": err.Error(), 166 }, 167 "消息序列化失败") 168 return err 169 } 170 171 logger.Logs("info", 172 map[string]interface{}{ 173 "MQTT": "Publish", 174 "broker_host": brokerHost, 175 "topic": topic, 176 "payload": string(payload), 177 }, 178 "准备发布MQTT消息") 179 180 token := client.Publish(topic, 1, false, payload) 181 if token.Wait() && token.Error() != nil { 182 logger.Logs("error", 183 map[string]interface{}{ 184 "MQTT": "Publish", 185 "broker_host": brokerHost, 186 "topic": topic, 187 "error": token.Error(), 188 }, 189 "MQTT消息发布失败") 190 return token.Error() 191 } 192 193 logger.Logs("info", 194 map[string]interface{}{ 195 "MQTT": "Publish", 196 "broker_host": brokerHost, 197 "topic": topic, 198 }, 199 "MQTT消息发布成功") 200 return nil 201} 202 203// GetRandomClient 随机获取一个可用的client 204func (m *MQTTService) GetRandomClient() (MQTT.Client, string, error) { 205 m.mutex.RLock() 206 defer m.mutex.RUnlock() 207 208 if len(m.clients) == 0 { 209 return nil, "", fmt.Errorf("没有可用的MQTT client") 210 } 211 212 // 将所有client转换为切片 213 hosts := make([]string, 0, len(m.clients)) 214 for host := range m.clients { 215 hosts = append(hosts, host) 216 } 217 218 // 随机选择一个host 219 rand.Seed(time.Now().UnixNano()) 220 selectedHost := hosts[rand.Intn(len(hosts))] 221 222 return m.clients[selectedHost], selectedHost, nil 223} 224 225// GetMQTTInstance 获取MQTT服务的单例 226func GetMQTTInstance() *MQTTService { 227 mqttOnce.Do(func() { 228 mqttInstance = &MQTTService{ 229 topics: []string{ 230 "REP_INFO", // 设备启动 231 "HEART_INFO", // 心跳 232 "REP_LOCATE", // 定位上报 233 "START_UPDATE_RESPONSE", // 版本升级 234 "REP_CALLEVENT", // 呼叫事件 235 "LASTWILL", // 最后遗言 236 "REP_CALLDETAIL", // 呼叫详情 237 }, 238 resultChan: make(chan error, 1), 239 clients: make(map[string]MQTT.Client), 240 isConnecting: false, 241 connected: false, 242 } 243 // 异步连接MQTT 244 go mqttInstance.connectInBackground() 245 }) 246 return mqttInstance 247} 248 249// 异步连接方法 250func (m *MQTTService) connectInBackground() { 251 m.mu.Lock() 252 if m.isConnecting { 253 m.mu.Unlock() 254 return 255 } 256 m.isConnecting = true 257 m.mu.Unlock() 258 259 logger.Logs("info", 260 map[string]interface{}{"MQTT": "Connect"}, 261 "开始后台连接MQTT服务") 262 263 // 获取所有broker并尝试连接 264 brokers, _, err := models.GetAllBrokers(1, 100) 265 if err != nil { 266 logger.Logs("error", 267 map[string]interface{}{"MQTT": "Connect"}, 268 fmt.Sprintf("获取MQTT代理列表失败: %v", err)) 269 return 270 } 271 272 for i := range brokers { 273 if brokers[i].Status != "0" { 274 continue 275 } 276 277 go func(broker models.MQTTBroker) { 278 err := m.connectBroker(&broker) 279 if err != nil { 280 logger.Logs("warn", 281 map[string]interface{}{"MQTT": "Connect"}, 282 fmt.Sprintf("连接broker %s 失败: %v", broker.IP, err)) 283 return 284 } 285 286 m.mu.Lock() 287 m.connected = true 288 m.mu.Unlock() 289 290 logger.Logs("info", 291 map[string]interface{}{"MQTT": "Connect"}, 292 fmt.Sprintf("成功连接到broker %s", broker.IP)) 293 }(brokers[i]) 294 } 295} 296 297// 新增检查连接状态的方法 298func (m *MQTTService) IsConnected() bool { 299 m.mu.RLock() 300 defer m.mu.RUnlock() 301 return m.connected 302} 303// 消息处理函数 304func (m *MQTTService) messageHandler(client MQTT.Client, msg MQTT.Message) { 305 ...... 306} 307 308# 使用示例 309# 在main.go中调用GetMQTTInstance即可 310 // 2. 然后初始化MQTT服务 311 mqttService := services.GetMQTTInstance() 312 defer mqttService.Disconnect() 313 logger.Logs("info", 314 map[string]interface{}{"Main": "main"}, 315 "开启MQTT异步连接") 316
5.2 设备端连接示例
device_client.py
1import paho.mqtt.client as mqtt 2import json 3import time 4import random 5 6class DeviceClient: 7 def __init__(self, device_id, username, password, broker_host="localhost", broker_port=1883): 8 self.device_id = device_id 9 self.client = mqtt.Client(client_id=device_id, protocol=mqtt.MQTTv311) 10 self.client.username_pw_set(username, password) 11 12 self.client.on_connect = self.on_connect 13 self.client.on_message = self.on_message 14 15 self.broker_host = broker_host 16 self.broker_port = broker_port 17 18 self.connected = False 19 20 def on_connect(self, client, userdata, flags, rc): 21 if rc == 0: 22 print(f"Device {self.device_id} connected successfully") 23 self.connected = True 24 # 订阅控制主题 25 control_topic = f"devices/{self.device_id}/control" 26 client.subscribe(control_topic) 27 else: 28 print(f"Connection failed with code {rc}") 29 30 def on_message(self, client, userdata, msg): 31 print(f"Received control message: {msg.payload.decode()}") 32 # 处理控制命令 33 self.handle_control_message(json.loads(msg.payload.decode())) 34 35 def handle_control_message(self, message): 36 command = message.get('command') 37 if command == 'reboot': 38 print("Rebooting device...") 39 # 执行重启逻辑 40 self.publish_status("rebooting") 41 elif command == 'update': 42 print("Updating device...") 43 # 执行更新逻辑 44 45 def connect(self): 46 self.client.connect(self.broker_host, self.broker_port, 60) 47 self.client.loop_start() 48 49 def disconnect(self): 50 self.client.loop_stop() 51 self.client.disconnect() 52 53 def publish_telemetry(self): 54 """发布遥测数据""" 55 telemetry = { 56 "timestamp": int(time.time()), 57 "temperature": random.uniform(20, 30), 58 "humidity": random.uniform(40, 60), 59 "battery": random.uniform(80, 100) 60 } 61 62 topic = f"devices/{self.device_id}/telemetry" 63 self.client.publish(topic, json.dumps(telemetry), qos=1) 64 65 def publish_status(self, status): 66 """发布设备状态""" 67 message = { 68 "status": status, 69 "timestamp": int(time.time()) 70 } 71 72 topic = f"devices/{self.device_id}/status" 73 self.client.publish(topic, json.dumps(message), qos=1) 74 75# 使用示例 76if __name__ == "__main__": 77 device = DeviceClient( 78 device_id="device_001", 79 username="device_001", 80 password="device_password" 81 ) 82 83 device.connect() 84 device.publish_status("online") 85 86 try: 87 while True: 88 if device.connected: 89 device.publish_telemetry() 90 time.sleep(30) # 每30秒发送一次遥测数据 91 except KeyboardInterrupt: 92 device.publish_status("offline") 93 device.disconnect() 94
6. 优化建议
6.1 安全最佳实践
- TLS/SSL 加密: 始终在生产环境使用 TLS 加密
- 认证强化: 使用强密码策略和定期轮换
- ACL 精细化: 基于最小权限原则配置主题访问控制
- 网络隔离: 将 MQTT Broker 部署在 DMZ 区域
6.2 性能优化
- 连接池管理: 后端服务使用连接池避免频繁连接
- QoS 选择: 根据业务需求选择合适的 QoS 等级
- 消息大小控制: 限制单个消息大小,避免大消息阻塞
- 持久化优化: 合理配置持久化策略平衡性能和数据安全
6.3 监控与运维
- 健康检查: 实现 Broker 健康检查机制
- 指标监控: 监控连接数、消息吞吐量等关键指标
- 日志管理: 集中管理日志并设置合理的日志级别
- 备份策略: 定期备份配置和持久化数据
7 总结
本文详细介绍了 Mosquitto MQTT Broker 的分布式部署方案、自定义认证插件开发以及前后端集成的最佳实践。通过这种架构,可以实现高可用、可扩展的物联网平台,满足企业级应用的需求。在我的一个物联网项目中,已经稳定运行一年多,没有出现一起Broker的故障事件,还是很稳定。对于一些物联网项目的简单应用,比如设备状态上报,指令下发,消息广播,Mosquitto完全满足生产需要。
《Mosquitto:MQTT Broker入门与分布式部署最佳实践》 是转载文章,点击查看原文。