New
EMQX 6.1.0 新特性:可回放的 MQTT 消息流、增强的多租户能力与更多数据集成 →

RT-Thread 使用 TLS/SSL 连接到 EMQX Cloud

Alvin WongAlvin Wong
2021-9-3产品
RT-Thread 使用 TLS/SSL 连接到 EMQX Cloud

本文将使用 RT-Thread 配合 ART-Pi 搭建 MQTT 客户端,快速接入 EMQX Cloud

EMQX Cloud 简介

EMQX Cloud 是由 EMQ 公司推出的可连接海量物联网设备,集成各类数据库及业务系统的全托管云原生 MQTT 服务。作为全球首个全托管的 MQTT 5.0 公有云服务,EMQX Cloud 提供了一站式运维代管、独有隔离环境的 MQTT 消息服务。

在万物互联的时代,EMQX Cloud 可以帮助用户快速构建面向物联网领域的行业应用,轻松实现物联网数据的采集、传输、计算和持久化。

借助云服务商提供的基础计算设施,EMQX Cloud 面向全球数十个国家与地区提供服务,为 5G 与万物互联应用提供低成本、安全可靠的云服务。

创建和部署 EMQX Cloud

通过快速入门创建部署 EMQX Cloud,以下是创建完成的实例:

  • 基础版

  • 专业版

创建项目工程

本文使用 RT-Thread 官方 IDE:RT-Thread-Studio 来创建工程;

本次 Demo 使用的是 RT-Thread 官方的开发板 ART-Pi,通过板载 Wifi 模块进行联网,可以直接创建一个 art_pi_wifi 样例工程进行 MQTT 客户端的开发;

工程配置和引入依赖包

  1. 进入配置页面

选择“More”

  1. 启用 RTC 驱动

  1. 引入 MQTT 依赖包

启动 TLS 需设置 MQTT 线程栈大小 ≥ 6144!

  1. 配置 mbedtls
    1. 选择 用户 CA 证书(单/双向认证)

    1. 选择无证书 SSL 连接(单向认证)

  2. 保存当前配置,IDE 会将配置更新到工程

  1. 修改宏 MEMP_NUM_NETDB2

位于项目路径"rt-thread\components\net\lwip-2.0.2\src\include\lwip\opt.h:488"

/**
 * MEMP_NUM_NETDB: the number of concurrently running lwip_addrinfo() calls
 * (before freeing the corresponding memory using lwip_freeaddrinfo()).
 */
#if !defined MEMP_NUM_NETDB || defined __DOXYGEN__
#define MEMP_NUM_NETDB                  2
#endif

代码编写

  1. 打开 applications/main.c,可见 RT-Thread-Studio 已经帮我们生成好了连接 WiFi 和 LED 操作的代码
    #include <rtthread.h>
    #include <rtdevice.h>
    #include "drv_common.h"
    
    #define LED_PIN GET_PIN(I, 8)
    
    extern void wlan_autoconnect_init(void);
    
    int main(void)
    {
        rt_uint32_t count = 1;
        rt_pin_mode(LED_PIN, PIN_MODE_OUTPUT);
    
        /* init Wi-Fi auto connect feature */
        wlan_autoconnect_init();
        /* enable auto reconnect on WLAN device */
        rt_wlan_config_autoreconnect(RT_TRUE);
    
        while(count++)
        {
            rt_thread_mdelay(500);
            rt_pin_write(LED_PIN, PIN_HIGH);
            rt_thread_mdelay(500);
            rt_pin_write(LED_PIN, PIN_LOW);
        }
        return RT_EOK;
    }
    
    #include "stm32h7xx.h"
    static int vtor_config(void)
    {
        /* Vector Table Relocation in Internal QSPI_FLASH */
        SCB->VTOR = QSPI_BASE;
        return 0;
    }
    INIT_BOARD_EXPORT(vtor_config);
    
  2. 为了实现第一次启动也能自动连接WiFi,我们可以在 main() 加入连接函数;
     rt_wlan_connect(WIFI_SSID, WIFI_PASSWORD);
    
  3. 分别新建 mqtt-client.c 和 mqtt-client.h;
    static void mqtt_create(void)
    {
        /* init condata param by using MQTTPacket_connectData_initializer */
        MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
    
        static char client_id[50] = { 0 };
    
        rt_memset(&client, 0, sizeof(MQTTClient));
    
        /* config MQTT context param */
        {
            client.uri = MQTT_BROKER_URI;
    
            /* config connect param */
            memcpy(&client.condata, &condata, sizeof(condata));
            rt_snprintf(client_id, sizeof(client_id), "%s%d",MQTT_CLIENTID, rt_tick_get());
            client.condata.clientID.cstring = client_id;
            client.condata.keepAliveInterval = 60;
            client.condata.cleansession = 1;
            client.condata.username.cstring = MQTT_USERNAME;
            client.condata.password.cstring = MQTT_PASSWORD;
    
            /* config MQTT will param. */
            client.condata.willFlag = 1;
            client.condata.will.qos = MQTT_QOS;
            client.condata.will.retained = 0;
            client.condata.will.topicName.cstring = MQTT_PUBTOPIC;
            client.condata.will.message.cstring = MQTT_WILLMSG;
    
            /* malloc buffer. */
            client.buf_size = client.readbuf_size = MQTT_PUB_SUB_BUF_SIZE;
            client.buf = rt_malloc(client.buf_size);
            client.readbuf = rt_malloc(client.readbuf_size);
            if (!(client.buf && client.readbuf))
            {
                rt_kprintf("no memory for MQTT client buffer!\n");
                goto _exit;
            }
    
            /* set event callback function */
            client.connect_callback = mqtt_connect_callback;
            client.online_callback = mqtt_online_callback;
            client.offline_callback = mqtt_offline_callback;
    
            /* set subscribe table and event callback */
            client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
            client.messageHandlers[0].callback = mqtt_sub_callback;
            client.messageHandlers[0].qos = MQTT_QOS;
    
            /* set default subscribe event callback */
            client.defaultMessageHandler = mqtt_sub_callback;
        }
    
        /* run mqtt client */
        paho_mqtt_start(&client);
    
        return;
    
    _exit:
        if (client.buf)
        {
            rt_free(client.buf);
            client.buf = RT_NULL;
        }
        if (client.readbuf)
        {
            rt_free(client.readbuf);
            client.readbuf = RT_NULL;
        }
        return;
    }
    

    对应 连接/订阅/上线/离线 回调函数
    static void mqtt_connect_callback(MQTTClient *c)
    {
        LOG_D("mqtt_connect_callback!");
    }
    
    static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
    {
        sub_count++;
        *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
        rt_kprintf("mqtt sub callback[%u]: \n topic: %.*s \n message %.*s",
                   sub_count,
                   msg_data->topicName->lenstring.len,
                   msg_data->topicName->lenstring.data,
                   msg_data->message->payloadlen,
                   (char *)msg_data->message->payload);
    }
    
    static void mqtt_online_callback(MQTTClient *c)
    {
        LOG_D("mqtt_online_callback!");
    }
    
    static void mqtt_offline_callback(MQTTClient *c)
    {
        LOG_D("mqtt_offline_callback!");
    }
    

    创建 pub 线程回调函数和 mqtt 客户端启动函数
    static void thread_pub(void *parameter)
    {
        pub_data = rt_malloc(TEST_DATA_SIZE * sizeof(char));
        if (!pub_data)
        {
            rt_kprintf("no memory for pub_data\n");
            return;
        }
    
        start_tm = time((time_t *) RT_NULL);
        rt_kprintf("test start at '%d'\r\n", start_tm);
    
        while (1)
        {
            rt_snprintf(pub_data, TEST_DATA_SIZE, "Pub EMQX message-%d", pub_count);
    
            if (!paho_mqtt_publish(&client, QOS1, MQTT_PUBTOPIC, pub_data))
            {
                ++pub_count;
            }
    
            rt_thread_delay(PUB_CYCLE_TM);
        }
    }
    
    void mqtt_client_start(void)
    {
        if (is_started)
        {
            return;
        }
    
        mqtt_create();
    
        while (!client.isconnected)
        {
            rt_kprintf("Waiting for mqtt connection...\n");
            rt_thread_delay(1000);
        }
    
        pub_thread_tid = rt_thread_create("pub_thread", thread_pub, RT_NULL, 1024, 8, 100);
        if (pub_thread_tid != RT_NULL)
        {
            rt_thread_startup(pub_thread_tid);
        }
    
        is_started = 1;
    
        return;
    }
    

    设置 MQTT 连接参数和用户名
    #define EMQX_Cloud_Professional_Version 1
    #define EMQX_Cloud_TLS_SSL 1
    
    #if !EMQX_Cloud_Professional_Version
    
    #if EMQX_Cloud_TLS_SSL
    #define MQTT_BROKER_URI         "ssl://ge06f1e1.cn-shenzhen.emqx.cloud:15455"
    #else
    #define MQTT_BROKER_URI         "tcp://ge06f1e1.cn-shenzhen.emqx.cloud:15915"
    #endif
    #else
    
    #if EMQX_Cloud_TLS_SSL
    #define MQTT_BROKER_URI         "ssl://oba9d641.emqx.cloud:8883"
    #else
    #define MQTT_BROKER_URI         "tcp://oba9d641.emqx.cloud:1883"
    #endif
    
    #endif
    
    #define MQTT_CLIENTID_PREFIX    "rtthread-mqtt"
    #define MQTT_USERNAME           "EMQX_RTT"
    #define MQTT_PASSWORD           "emqx_rtt_0813"
    #define MQTT_SUBTOPIC           "/emqx/mqtt/sub"
    #define MQTT_PUBTOPIC           "/emqx/mqtt/pub"
    #define MQTT_WILLMSG            "Goodbye!"
    #define MQTT_QOS                1
    

    main() 函数启动 MQTT client
    mqtt_client_start()
    

    也可在终端手动启动/停止
    mqtt_ctrl start
    mqtt_ctrl stop
    

    若选择用户 CA 证书验证,则将 CA 证书(双向认证还需 client.crt 和 client.key)放置到 packages/mbedtls-latest/certs 文件夹中

    重新更新工程,会自动将证书内容复制到源文件中

    构建项目并下载到目标板上

    打开终端 Terminal 可以看到运行日志

使用 MQTTX 测试数据收发

客户端连接配置

单向无证书认证

单向自签名证书认证

双向认证

订阅和发布

查看 RT-Thread-Stdio 终端

数据收发正常!

完整代码

请见 MQTT-Client-Examples

免费试用 EMQX Cloud
无须绑定信用卡
开始试用 →

文章作者

Alvin Wong
Alvin Wong

EMQ 边缘计算平台开发工程师, 负责NanoMQ, MQTT SDK 等边缘计算开发工作

订阅我们的博客

推荐阅读

2020-9-21Dekun Tao
EMQX + CNN 在 AIoT 中的融合应用

我们将创建一个简单的 AIoT 应用:使用 EMQX 收集温度传感器数据并输入到一维卷积神经网络 (1D CNN) ,以实现液压系统冷却器状态的预测。

2022-9-7Jiyong Huang
使用 Prometheus 监控 eKuiper 规则运行状态

本文介绍了 eKuiper 中的规则状态指标以及如何使用 Prometheus 监控这些状态指标,用户可以基于此进一步探索 Prometheus 的更多高级功能,更好地实现 eKuiper 的运维。