理解和应用 MQTT 协议是物联网领域中的一项重要技能。为了确保设备和服务器之间的稳定连接,我们需要深入了解并有效应用 MQTT 客户端的自动重连特性。下面,让我们像在探索一个神秘的冒险岛一样,深入探索 MQTT 协议和重连机制。

MQTT 是一种基于 TCP 协议的发布/订阅模型协议。它像一艘经过风雨洗礼的海船,在物联网、传感器%ignore_a_1%和其他低带宽、不稳定网络环境中航行。但是,就像海上的风浪,网络环境中也充满了各种挑战:网络故障、信号弱化、数据丢包等等,这些都可能使得 MQTT 客户端与服务器之间的连接中断。在物联网的大海中,常见的触发断线重连的风浪包括网络环境恶劣或断网、服务器升级、设备或客户端重启、以及其他网络因素等。

在这种情况下,我们如何确保我们的 “船”(MQTT 客户端)始终能与 “港口”(服务器)保持稳定的连接呢?答案就是我们需要给我们的 “船” 装备一套自动导航系统——MQTT 客户端的自动重连逻辑。

设计一个优秀的 MQTT 客户端重连逻辑,就像建造一艘坚固的海船。如果设计得不合理,那么我们的 “船” 可能会失去导航,静默不再接收来自 “港口” 的消息,甚至可能会因为频繁地尝试重连而无意识地攻击我们的 “港口”,这就如同在海上无头乱窜,不仅消耗了自身的能量,也给 “港口” 带来了不必要的压力。然而,如果我们的重连逻辑设计得合理,那么无论何时失去连接,我们的 “船” 都能稳定地自动导航,重新找到 “港口”,并保持与其的连接。

设计 MQTT 客户端重连逻辑时,我们需要考虑几个关键因素:

  1. 航行保活时间:在 MQTT 中,我们称其为 Keep Alive。这是一个定时器,它会定期检查我们的 “船” 是否与 “港口” 保持连接。我们需要根据实际的网络环境和应用需求,来设置一个合适的 Keep Alive。
  2. 重连策略和退避:当我们的 “船” 失去了与 “港口” 的连接,我们不应立刻尝试重新连接,而应该设置一个合理的等待时间,以免过度消耗资源。这就像是当我们的船在海上迷路时,我们需要暂时停下,观察风向、测量海流,然后再制定新的航行路线。我们可以使用指数退避算法或者阶梯式的延时策略来实现这个功能。
  3. 连接状态管理:我们的 “船” 需要一个航海日志,来记录与 “港口” 的连接状态、连接断开的原因、已经订阅的信息等重要信息。在连接断开时,我们的 “船” 应该查阅航海日志,分析连接断开的原因,然后尝试重新连接 “港口”。
  4. 异常处理:在航行过程中,我们的 “船” 可能会遇到各种各样的问题,例如 “港口” 不可用、认证失败、网络异常等。我们的 “船” 需要有一个应急计划,来应对这些问题。例如,当 “港口” 不可用时,我们的 “船” 可能需要寻找其他的 “港口”;当认证失败时,我们的 “船” 可能需要检查自身的认证信息是否正确;当网络异常时,我们的 “船” 可能需要暂停航行,等待网络恢复正常。
  5. 最大尝试次数限制:对于一些低功耗设备,我们可能需要考虑限制尝试重连的次数,以避免过度消耗设备的电力。就像在海上迷航的 “船”,当它已经尝试了很多次都无法找到 “港口” 时,可能就需要暂时停下,等待更好的航行条件。

在设计了这个自动导航系统(MQTT 客户端的自动重连逻辑)之后,我们的 “船” 就能更好地在物联网的海洋中航行,无论面临何种挑战,都能始终保持与 “港口” 的稳定连接,从而确保我们的应用能够顺利进行。

来看一个实际的案例。我们以 Paho MQTT C 库为例,它为我们提供了一套丰富的航海工具——回调函数,让我们可以根据实际情况设定自动导航系统的工作方式。Paho 提供了全局回调、API 回调和异步方法回调,让我们可以在各种情况下都能保持与 “港口” 的连接。

这就是我们如何在物联网的海洋中航行的故事。希望通过这个故事,能够帮助你更好地理解 MQTT 协议和重连机制,也希望你的 “船” 能在物联网的海洋中顺利航行。

 

/*******************************************************************************
 * Copyright (c) 2012, 2022 IBM Corp., Ian Craggs
 *
 * 保留所有权利。此程序和随附的资料
 * 根据Eclipse公共许可证v2.0
 * 和Eclipse发行许可证v1.0的条款提供。 
 *
 * Eclipse公共许可证可在以下网址查阅 
 *   https://www.eclipse.org/legal/epl-2.0/
 * Eclipse发行许可证可在以下网址查阅 
 *   http://www.eclipse.org/org/documents/edl-v10.php。
 *
 *******************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"

#if !defined(_WIN32)
#include <unistd.h>
#else
#include <windows.h>
#endif

#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif

// 定义需要使用的MQTT连接参数,如broker地址和客户端ID等
#define ADDRESS     "tcp://broker.emqx.io:1883"
#define CLIENTID    "PahoClientSub"
#define TOPIC       "nanomq/test"
#define PAYLOAD     "Hello World!"
#define QOS         1
#define TIMEOUT     10000L

// 定义在主线程中的逻辑Flag
int disc_finished = 0;
int subscribed = 0;
int finished = 0;

//首先声明 API 回调函数
void onConnect(void* context, MQTTAsync_successData* response);
void onConnectFailure(void* context, MQTTAsync_failureData* response);
void onSubscribe(void* context, MQTTAsync_successData* response);
void onSubscribeFailure(void* context, MQTTAsync_failureData* response);

// 下面2个是 Async 使用的回调函数
// 异步连接成功的回调函数,在连接成功的时候进行Subscribe操作。
void conn_established(void *context, char *cause)
{
	printf("客户端已重新连接!\n");
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
	int rc;

	printf("连接成功\n");

	printf("订阅主题 %s\n使用客户端 %s 并用QoS%d\n\n"
           "按Q<Enter>退出\n\n", TOPIC, CLIENTID, QOS);
	opts.onSuccess = onSubscribe;
	opts.onFailure = onSubscribeFailure;
	opts.context = client;
	if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
	{
		printf("开始订阅失败,返回码 %d\n", rc);
		finished = 1;
	}
}

// 异步连接收到 Disconnect消息时的回调,由于大部分断开的情况下不会收到 Disconnect消息,所以此方法很少被触发
void disconnect_lost(void* context, MQTTProperties* properties,
		enum MQTTReasonCodes reasonCode)
{
	printf("客户端已断开连接!\n");
}

// 下面是客户端全局回调函数,分别是连接断开和消息到达
void conn_lost(void *context, char *cause)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;

	printf("\n连接已断开\n");
	if (cause)
		printf("     原因: %s\n", cause);

	printf("正在重连\n");
	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	conn_opts.maxRetryInterval = 16;
	conn_opts.minRetryInterval = 2;
	conn_opts.automaticReconnect = 1;
	
	//conn_opts.onSuccess = onConnect;
	conn_opts.onFailure = onConnectFailure;
	MQTTAsync_setConnected(client, client, conn_established);
	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		printf("开始连接失败,返回码 %d\n", rc);
		finished = 1;
	}
}

// 收到消息时的全局回调函数,此处简单的打印消息
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
    printf("消息已到达\n");
    printf("     主题: %s\n", topicName);
    printf("   消```C
息: ");

    /* 打印消息内容 */
    char* payloadptr = message->payload;
    for(int i = 0; i < message->payloadlen; i++)
    {
        putchar(*payloadptr++);
    }
    putchar('\n');

    /* 释放消息内存 */
    MQTTAsync_freeMessage(&message);
    MQTTAsync_free(topicName);

    return 1;
}

/* 异步断开连接的回调函数 */
void onDisconnect(void* context, MQTTAsync_successData* response)
{
    printf("成功断开连接\n");
    disc_finished = 1;
}

/* 异步连接成功的回调函数,在连接成功的时候进行订阅操作。 */
void onConnect(void* context, MQTTAsync_successData* response)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    int rc;

    printf("成功连接\n");

    printf("订阅主题 %s\n使用客户端 %s 并用QoS%d\n\n"
           "按Q<Enter>退出\n\n", TOPIC, CLIENTID, QOS);

    /* 开始订阅 */
    opts.onSuccess = onSubscribe;
    opts.onFailure = onSubscribeFailure;
    opts.context = client;
    if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
    {
        printf("开始订阅失败,返回码 %d\n", rc);
        finished = 1;
    }
}

/* 异步连接失败的回调函数 */
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
    printf("连接失败\n");
    if (response && response->message)
    {
        printf("失败信息: %s\n", response->message);
    }
    finished = 1;
}

/* 异步订阅成功的回调函数 */
void onSubscribe(void* context, MQTTAsync_successData* response)
{
    printf("成功订阅\n");
    subscribed = 1;
}

/* 异步订阅失败的回调函数 */
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
    printf("订阅失败\n");
    if (response && response->message)
    {
        printf("失败信息: %s\n", response->message);
    }
    finished = 1;
}

/* 异步取消订阅的回调函数 */
void onUnsubscribe(void* context, MQTTAsync_successData* response)
{
    printf("成功取消订阅\n");
    finished = 1;
}

int main(int argc, char* argv[])
{
    MQTTAsync client;
    MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    int rc;
    MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
    MQTTAsync_token token;

    /* 创建MQTT客户端 */
    MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);

    /* 设置全局回调函数 */
    MQTTAsync_setCallbacks(client, client, conn_lost, msgarrvd, NULL);

    /* 设置连接选项 */
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.automaticReconnect = 1;
    //conn_opts.onSuccess = onConnect;
    conn_opts.onFailure = onConnectFailure;
    MQTTAsync_setConnected(client, client, conn_established);

    /* 开始连接 */
    if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
    {
        printf("开始连接失败,返回码 %d\n", rc);
        return EXIT_FAILURE;
    }

    while (!finished)
    {
        #if defined(_WIN32)
            Sleep(1000);
        #else
            sleep(1);
        #endif
    }

    if (subscribed)
    {
        if ((rc = MQTTAsync_unsubscribe(client, TOPIC, NULL)) != MQTTASYNC_SUCCESS)
        {
            printf("取消订阅失败,返回码 %d\n", rc);
            return EXIT_FAILURE;
        }
    }
    
    /* 断开连接 */
    MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
    disc_opts.onSuccess = onDisconnect;
    if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
    {
        printf("开始断开连接失败,返回码 %d\n", rc);
        return EXIT_FAILURE;
    }
    while (!disc_finished)
    {
        #if defined(_WIN32)
            Sleep(1000);
        #else
            sleep(1);
        #endif
    }

    /* 销毁客户端 */
    MQTTAsync_destroy(&client);

    return EXIT_SUCCESS;
}

 

相关新闻

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

邮箱

cloud@modbus.cn

QQ
QQ
微信
微信
SHARE
TOP