理解和应用 MQTT 协议是物联网领域中的一项重要技能。为了确保设备和服务器之间的稳定连接,我们需要深入了解并有效应用 MQTT 客户端的自动重连特性。下面,让我们像在探索一个神秘的冒险岛一样,深入探索 MQTT 协议和重连机制。
MQTT 是一种基于 TCP 协议的发布/订阅模型协议。它像一艘经过风雨洗礼的海船,在物联网、传感器%ignore_a_1%和其他低带宽、不稳定网络环境中航行。但是,就像海上的风浪,网络环境中也充满了各种挑战:网络故障、信号弱化、数据丢包等等,这些都可能使得 MQTT 客户端与服务器之间的连接中断。在物联网的大海中,常见的触发断线重连的风浪包括网络环境恶劣或断网、服务器升级、设备或客户端重启、以及其他网络因素等。
在这种情况下,我们如何确保我们的 “船”(MQTT 客户端)始终能与 “港口”(服务器)保持稳定的连接呢?答案就是我们需要给我们的 “船” 装备一套自动导航系统——MQTT 客户端的自动重连逻辑。
设计一个优秀的 MQTT 客户端重连逻辑,就像建造一艘坚固的海船。如果设计得不合理,那么我们的 “船” 可能会失去导航,静默不再接收来自 “港口” 的消息,甚至可能会因为频繁地尝试重连而无意识地攻击我们的 “港口”,这就如同在海上无头乱窜,不仅消耗了自身的能量,也给 “港口” 带来了不必要的压力。然而,如果我们的重连逻辑设计得合理,那么无论何时失去连接,我们的 “船” 都能稳定地自动导航,重新找到 “港口”,并保持与其的连接。
设计 MQTT 客户端重连逻辑时,我们需要考虑几个关键因素:
- 航行保活时间:在 MQTT 中,我们称其为 Keep Alive。这是一个定时器,它会定期检查我们的 “船” 是否与 “港口” 保持连接。我们需要根据实际的网络环境和应用需求,来设置一个合适的 Keep Alive。
- 重连策略和退避:当我们的 “船” 失去了与 “港口” 的连接,我们不应立刻尝试重新连接,而应该设置一个合理的等待时间,以免过度消耗资源。这就像是当我们的船在海上迷路时,我们需要暂时停下,观察风向、测量海流,然后再制定新的航行路线。我们可以使用指数退避算法或者阶梯式的延时策略来实现这个功能。
- 连接状态管理:我们的 “船” 需要一个航海日志,来记录与 “港口” 的连接状态、连接断开的原因、已经订阅的信息等重要信息。在连接断开时,我们的 “船” 应该查阅航海日志,分析连接断开的原因,然后尝试重新连接 “港口”。
- 异常处理:在航行过程中,我们的 “船” 可能会遇到各种各样的问题,例如 “港口” 不可用、认证失败、网络异常等。我们的 “船” 需要有一个应急计划,来应对这些问题。例如,当 “港口” 不可用时,我们的 “船” 可能需要寻找其他的 “港口”;当认证失败时,我们的 “船” 可能需要检查自身的认证信息是否正确;当网络异常时,我们的 “船” 可能需要暂停航行,等待网络恢复正常。
- 最大尝试次数限制:对于一些低功耗设备,我们可能需要考虑限制尝试重连的次数,以避免过度消耗设备的电力。就像在海上迷航的 “船”,当它已经尝试了很多次都无法找到 “港口” 时,可能就需要暂时停下,等待更好的航行条件。
在设计了这个自动导航系统(MQTT 客户端的自动重连逻辑)之后,我们的 “船” 就能更好地在物联网的海洋中航行,无论面临何种挑战,都能始终保持与 “港口” 的稳定连接,从而确保我们的应用能够顺利进行。
来看一个实际的案例。我们以 Paho MQTT C 库为例,它为我们提供了一套丰富的航海工具——回调函数,让我们可以根据实际情况设定自动导航系统的工作方式。Paho 提供了全局回调、API 回调和异步方法回调,让我们可以在各种情况下都能保持与 “港口” 的连接。
这就是我们如何在物联网的海洋中航行的故事。希望通过这个故事,能够帮助你更好地理解 MQTT 协议和重连机制,也希望你的 “船” 能在物联网的海洋中顺利航行。
/******************************************************************************* * Copyright (c) 2012, 2022 IBM Corp., Ian Craggs * * 保留所有权利。此程序和随附的资料 * 根据Eclipse公共许可证v2.0 * 和Eclipse发行许可证v1.0的条款提供。 * * Eclipse公共许可证可在以下网址查阅 * https://www.eclipse.org/legal/epl-2.0/ * Eclipse发行许可证可在以下网址查阅 * http://www.eclipse.org/org/documents/edl-v10.php。 * *******************************************************************************/ #include <stdio.h> #include <stdlib.h> #include <string.h> #include "MQTTAsync.h" #if !defined(_WIN32) #include <unistd.h> #else #include <windows.h> #endif #if defined(_WRS_KERNEL) #include <OsWrapper.h> #endif // 定义需要使用的MQTT连接参数,如broker地址和客户端ID等 #define ADDRESS "tcp://broker.emqx.io:1883" #define CLIENTID "PahoClientSub" #define TOPIC "nanomq/test" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L // 定义在主线程中的逻辑Flag int disc_finished = 0; int subscribed = 0; int finished = 0; //首先声明 API 回调函数 void onConnect(void* context, MQTTAsync_successData* response); void onConnectFailure(void* context, MQTTAsync_failureData* response); void onSubscribe(void* context, MQTTAsync_successData* response); void onSubscribeFailure(void* context, MQTTAsync_failureData* response); // 下面2个是 Async 使用的回调函数 // 异步连接成功的回调函数,在连接成功的时候进行Subscribe操作。 void conn_established(void *context, char *cause) { printf("客户端已重新连接!\n"); MQTTAsync client = (MQTTAsync)context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; int rc; printf("连接成功\n"); printf("订阅主题 %s\n使用客户端 %s 并用QoS%d\n\n" "按Q<Enter>退出\n\n", TOPIC, CLIENTID, QOS); opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; opts.context = client; if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS) { printf("开始订阅失败,返回码 %d\n", rc); finished = 1; } } // 异步连接收到 Disconnect消息时的回调,由于大部分断开的情况下不会收到 Disconnect消息,所以此方法很少被触发 void disconnect_lost(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode) { printf("客户端已断开连接!\n"); } // 下面是客户端全局回调函数,分别是连接断开和消息到达 void conn_lost(void *context, char *cause) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; printf("\n连接已断开\n"); if (cause) printf(" 原因: %s\n", cause); printf("正在重连\n"); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.maxRetryInterval = 16; conn_opts.minRetryInterval = 2; conn_opts.automaticReconnect = 1; //conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; MQTTAsync_setConnected(client, client, conn_established); if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("开始连接失败,返回码 %d\n", rc); finished = 1; } } // 收到消息时的全局回调函数,此处简单的打印消息 int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message) { printf("消息已到达\n"); printf(" 主题: %s\n", topicName); printf(" 消```C 息: "); /* 打印消息内容 */ char* payloadptr = message->payload; for(int i = 0; i < message->payloadlen; i++) { putchar(*payloadptr++); } putchar('\n'); /* 释放消息内存 */ MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } /* 异步断开连接的回调函数 */ void onDisconnect(void* context, MQTTAsync_successData* response) { printf("成功断开连接\n"); disc_finished = 1; } /* 异步连接成功的回调函数,在连接成功的时候进行订阅操作。 */ void onConnect(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; int rc; printf("成功连接\n"); printf("订阅主题 %s\n使用客户端 %s 并用QoS%d\n\n" "按Q<Enter>退出\n\n", TOPIC, CLIENTID, QOS); /* 开始订阅 */ opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; opts.context = client; if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS) { printf("开始订阅失败,返回码 %d\n", rc); finished = 1; } } /* 异步连接失败的回调函数 */ void onConnectFailure(void* context, MQTTAsync_failureData* response) { printf("连接失败\n"); if (response && response->message) { printf("失败信息: %s\n", response->message); } finished = 1; } /* 异步订阅成功的回调函数 */ void onSubscribe(void* context, MQTTAsync_successData* response) { printf("成功订阅\n"); subscribed = 1; } /* 异步订阅失败的回调函数 */ void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { printf("订阅失败\n"); if (response && response->message) { printf("失败信息: %s\n", response->message); } finished = 1; } /* 异步取消订阅的回调函数 */ void onUnsubscribe(void* context, MQTTAsync_successData* response) { printf("成功取消订阅\n"); finished = 1; } int main(int argc, char* argv[]) { MQTTAsync client; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; MQTTAsync_token token; /* 创建MQTT客户端 */ MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); /* 设置全局回调函数 */ MQTTAsync_setCallbacks(client, client, conn_lost, msgarrvd, NULL); /* 设置连接选项 */ conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.automaticReconnect = 1; //conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; MQTTAsync_setConnected(client, client, conn_established); /* 开始连接 */ if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("开始连接失败,返回码 %d\n", rc); return EXIT_FAILURE; } while (!finished) { #if defined(_WIN32) Sleep(1000); #else sleep(1); #endif } if (subscribed) { if ((rc = MQTTAsync_unsubscribe(client, TOPIC, NULL)) != MQTTASYNC_SUCCESS) { printf("取消订阅失败,返回码 %d\n", rc); return EXIT_FAILURE; } } /* 断开连接 */ MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; disc_opts.onSuccess = onDisconnect; if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) { printf("开始断开连接失败,返回码 %d\n", rc); return EXIT_FAILURE; } while (!disc_finished) { #if defined(_WIN32) Sleep(1000); #else sleep(1); #endif } /* 销毁客户端 */ MQTTAsync_destroy(&client); return EXIT_SUCCESS; }