使用 EMQ X Cloud 物联网 MQTT 云服务

摘要

在数分钟内创建全托管高可用 MQTT 集群,快速接入物联网设备并立即开始产品原型设计与应用开发,将物联网数据存储到华为云上的 Kafka 与数据库中。

实验属性

  • 难易程度:初级
  • 实验时长:120分钟

实验目标与基本要求

使用户快速了解 EMQ X Cloud 物联网云服务的优势与特性,完成账号注册与试用部署创建,接入物联网设备进行消息收发,存储设备数据到华为云 Kafka 与数据库中

实验摘要

  1. 登录 EMQ X Cloud
  2. 创建 华为云鲲鹏 试用部署
  3. 初始化客户端信息
  4. 接入设备进行消息收发
  5. 打通华为云-EMQ X Cloud VPC 网络
  6. 将物联网数据存储到华为云 Kafka
  7. 将物联网数据存储数据到云数据库 GaussDB(for Mongo)

实验步骤

领取代金券,购买华为云 Kafka 与数据库

领券链接:点击前往领取100元代金券

领券失败?微信添加 EMQ 小助手处理

1. 登录 EMQ X Cloud

EMQ X Cloud MQTT 公有云服务来自于 EMQ 服务客户总结的一些最佳实践, 致力于提供快速部署、轻松管理、弹性扩展、跨多云部署的物联网 MQTT 5.0 服务。

点击右上角 登录 按钮,使用实验室提供的账号登录 EMQ X Cloud,点击 控制台 进入 EMQ X Cloud 控制台页面。

2. 创建华为云部署

在控制台 部署 页面点击 创建 部署,选择 华为云,选择试用部署,完成部署创建。等待 2~3 分钟后部署完全启动后即可正常使用。

img

完全启动后,可以看到当前部署的状态以及对应的规格、MQTT 接入信息。

img

3. 初始化客户端信息

3.1 客户端认证信息

EMQ X Cloud 采用 MQTT 用户名、密码认证方式,客户端需携带正确信息才能成功连接。

在 EMQ X Cloud 部署详情页面,点击 认证鉴权 选项卡,在 认证 部分输入用户名与密码,点击 添加 按钮完成客户端信息初始化。

  • 用户名为:emqx_u
  • 密码:123321

img

3.2 客户端 ACL 信息

对于安全级别较高的物联网应用,可以设置客户端的发布订阅 ACL,“所有客户端,禁止向 cmd/# 主题发布消息” 的 ACL 规则设置如下:

  • 用户名:全部用户($all)
  • 主题:cmd/#
  • 是否允许:不允许
  • 主题动作:pub

img

4. 接入设备进行消息收发

EMQ 提供一个在线 MQTT 测试工具,访问 http://tools.emqx.io/,使用部署提供的连接信息和 3 步骤中初始化的客户端信息进行连接。

img

连接成功后订阅相应的主题,进行发布、订阅测试:

img

5. 打通华为云-EMQ X Cloud VPC 网络

提示:EMQ X Cloud 试用部署不支持打通 VPC,可以直接使用公网地址连接。

什么是 VPC

VPC (Virtual Private Cloud),也叫专有网络、私有网络。在同一个 VPC 中的所有资源相互连通,不同 VPC 的资源之间默认相互隔离。

正常情况下,你拥有的云资源和 EMQ X Cloud 资源是在两个不同的 VPC 中,彼此无法连通。为了使用规则引擎,你需要使用对等连接,连通两个 VPC。

注意事项

  1. EMQ X Cloud 只支持同一区域创建对等连接
  2. EMQ X Cloud 不支持 10.10.0.0/24 ~ 10.32.255.0/24 范围内的网段,请合理规划您的 VPC 网段
  3. 对等连接与资源相互绑定,创建资源前请先创建对等连接

操作步骤

  1. 在部署 详情 选项卡,点击 +VPC 对等连接 按钮,记录 EMQ X Cloud 上的 VPC 信息:

    注意:暂时不要关闭该页面

    • 部署 VPC ID
    • EMQ X Cloud 账户 ID
    • 部署 VPC 网段

      img

  1. 使用实验室提供的华为云账号登录华为云,进入控制台 -> 虚拟私有云 VPC

    img

  1. 点击 对等连接 -> 创建对等连接,选择其它账户。填入刚才在 EMQ X Cloud 控制台 记录的信息,点击确定创建对等连接请求

    • 对端项目 ID == EMQ X Cloud 账户 ID
    • 对端VPC ID == 部署 VPC ID

      img

  1. 在对等连接信息界面,记录下以下 3 个值

    • 1 为 对等连接 ID
    • 2 为 VPC 网段
    • 3 为 VPC ID

      img

![img](https://static.emqx.net/images/3ba507d10fc8bc79c75a2e035d8a30cf.png)            
  1. 找到 我的凭证,记录下用户 ID

    img

  1. 回到 EMQ X Cloud 控制台。填写步骤 4 记录的对等连接 ID,VPC 网段,VPC ID 和步骤 5 记录的用户 ID。点击确定,完成对等连接

    img

  1. 在华为云控制台,打开 虚拟私有云 VPC -> 路由表,将步骤 1 中的部署 VPC 网段加入到对应 VPC 的路由表中

    注意:下一跳类型为 对等连接

    img

  1. 在华为云控制台里配置安全组,允许 EMQ X Cloud 网段访问您的 VPC

    img

至此 EMQ X Cloud 与华为云 VPC 网络已经打通,

6. 将物联网数据存储到华为云 Kafka

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台,是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。

通过 EMQ X Cloud 规则引擎,你可以将数据桥接到 Kafka 服务,也可以设定消息模板,在 Kafka 服务中生产特定的消息。

6.1 初始化分布式消息服务 Kafka

使用实验室提供的华为云账号登录华为云,打开 https://www.huaweicloud.com/product/dmskafka.html 华为云分布式消息服务 Kafka 产品页面,选择 立即购买,注意以下几点信息,其他可自行设置:

  • 区域:选择 EMQ X Cloud 部署相同的区域,如 华南-广州
  • 虚拟私有云:选择上一步中创建的 VPC,选择对应的子网
  • 安全组:确保 Kafka 能够被访问,建议开放 TCP 9092 端口全部访问权限

点击 立即购买 完成创建,进入 https://console.huaweicloud.com/dms/?engine=kafka&region=cn-south-1#/queue/manager/newKafkaList 查看创建进程,等待创建完成后查看并记录连接地址

img

img

6.2 设置规则引擎的筛选条件

在部署页面,选择规则引擎,点击创建。

img

我们的目标是:当主题 greet 收到 msg 为 hello 字符时,就会触发引擎。这里需要对 SQL 进行一定的处理:

  • 针对 greet 主题,即 'greet/#'
  • 对 payload 中的 msg 进行匹配,当它为 'hello' 字符串再执行规则引擎

  • 根据上面的原则,我们最后得到的 SQL 应该如下:

SELECT
  payload.msg as msg
FROM
  "greet/#"
WHERE
  msg = 'hello'

可以点击 SQL 输入框下的 SQL 测试 ,填写数据:

  • topic: greet

payload:

{
"msg":"hello"
}

点击测试,查看得到的数据结果,如果设置无误,测试输出框应该得到完整的 JSON 数据,如下:

{
  "msg":"hello"
}

注意:如果无法通过测试,请检查 SQL 是否合规,测试中的 topic 是否与 SQL 填写的一致。

6.3 创建资源和动作

点击添加动作,在选择动作页,选择 桥接数据到 Kafka,点击下一步,在配置动作页面,点击创建资源。

img

img

在创建资源页面里,资源类型选择 Kafka,在 Kafka 服务器框里填写 6.1 步骤中保存的连接地址。点击测试,右上角返回 “测试资源创建成功” 表示测试成功。

img

注意:如果测试失败,请检查是否完成对等连接,详情请看 VPC 对等连接,并检查 URL 是否正确。

点击确定,返回到配置动作页面,Kafka 主题填写刚刚创建的 testTopic 主题,在消息内容模板里填写 "hello from emqx cloud",资源 ID 默认,点击确定。

img

创建好的动作会显示在响应动作一栏里,确认信息无误后,点击右下角的确认,完成规则引擎的配置。 img

6.4 测试

如果您是第一次使用 EMQ X Cloud 可以前往部署连接指南,查看 MQTT 客户端连接和测试指南

我们尝试向 home/sensor 主题发送下面的数据

{
  "msg":"hello"
}

在规则引擎页中,点击监控可以看到动作指标数的成功数变为 1。

img

至此,规则命中时在 Kafka 实例中消费者可以接收到 EMQ X Cloud 转发过来的消息。

添加小助手微信,进入 EMQ & 华为云技术交流群,与更多技术牛人深入交流、共同成长。 EMQ X 微信小助手

关注我们获取最新动态

推荐阅读

粘性会话负载均衡 - MQTT Broker 集群详解(二)

本文将通过在 EMQ X 4.3 集群前面配置 HAProxy 负载均衡器,带读者亲自体验如何充分利用粘性会话实现负载均衡。

EMQ 助力西安增材制造国家研究院打造增材智能车间平台

在本方案中,EMQ X 消息中间件提供了高并发的设备接入能力,以及在高频采集、数据高吞吐场景下的数据接入与实时处理能力。

Fan Wang 2020-11-30
EMQ X 规则引擎系列(六)存储消息到 DynamoDB 数据库

Amazon DynamoDB 是一个完全托管的 NoSQL 数据库服务,支持键值和文档数据结构。

2019-09-17