轻量级边缘计算 EMQ X Kuiper 与 AWS IoT 集成方案

背景

本文以一个常见的物联网使用场景为案例,介绍了如何利用边缘计算来实现对业务的快速、低成本和有效地处理。

在各类物联网项目中,比如智能楼宇项目,需要将楼宇的数据(比如电梯、燃气、水电等)进行采集和分析。一种解决方案是将所有的设备直接接入在云端的物联网平台,类似于像 AWS IoT 或者 Azure IoT Hub。这种解决方案的问题在于,

  • 数据处理时延较长:通过 Internet 传输和云端的处理后返回给设备,所需时间较长
  • 数据传输和存储成本:通过 Internet 传输需要带宽,对于大规模连接的物联网项目来说,耗费的带宽会相当可观
  • 数据的安全性:有些物联网的数据会相当敏感,全部通过物联网传输的话会有风险

为了解决以上的问题,业界提出了边缘计算的方案,边缘计算的核心就在于把数据进行就近处理,避免不必要的时延、成本和安全问题。

业务场景

假设现有一组设备,组中的每个设备有一个 id,通过 MQTT 协议往 MQTT 消息服务器上相应的主题发送数据。主题的设计如下,其中 {device_id} 为设备的 id。

devices/{device_id}/messages

每个设备发送的数据格式为 JSON,发送的通过该传感器采集的温度与湿度数据。

{
    "temperature": 30, 
    "humidity" : 20
}

现在需要实时分析数据,并提出以下的需求:对每个设备的温度数据按照每 10 秒钟计算平均值(t_av),并且记下 10 秒钟内的最大值 (t_max)、最小值(t_min) 和数据条数(t_count),计算完毕后将这 4 个结果进行保存,以下为样例结果数据:

[
    {
        "device_id" : "1", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
    },
    {
        "device_id" : "2", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
    },
    ...
]

方案介绍

如下图所示,采用边缘分析/流式数据处理的方式,在边缘端我们采用了 EMQ X 的方案,最后将计算结果输出到 AWS IoT 中。

emqx_aws.png

  • EMQ X Edge 可以接入各种协议类型的设备,比如 MQTT、CoAP、LwM2M 等,这样用户可以不需要关心协议适配方面的问题;另外它本身也比较轻量级,适合部署在边缘设备上
  • EMQ X Kuiper 是 EMQ 发布的基于 SQL 的轻量级边缘流式数据分析引擎,安装包只有约 7MB,非常适合于运行在边缘设备端
  • AWS IoT 提供了比较全的设备接入和数据分析的方案,此处用于云端的结果数据接入,以及应用所需的结果数据分析

实现步骤

安装 EMQ X Edge & Kuiper

  • 写本文的时候,EMQ X Edge 的最新版本是4.0,用户可以通过 Docker 来安装和启动 EMQ X Edge

    # docker pull emqx/emqx-edge
    # docker run -d --name emqx -p 1883:1883  emqx/emqx-edge:latest
    # docker ps
    CONTAINER ID        IMAGE                   COMMAND                  CREATED             STATUS              PORTS                                                                                                           NAMES
    a348e3ac150c        emqx/emqx-edge:latest   "/usr/bin/docker-entr"   3 seconds ago       Up 2 seconds        4369/tcp, 5369/tcp, 6369/tcp, 8080/tcp, 8083-8084/tcp, 8883/tcp, 11883/tcp, 0.0.0.0:1883->1883/tcp, 18083/tcp   emqx
    

    用户可以通过 telnet 命令来判断是否启动成功,如下所示。

    # telnet localhost 1883
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    
  • 安装、启动 Kuiper

    点击这里下载最新版 Kuiper,并解压。在写本文的时候,Kuiper 最新版本为 0.0.3。

    # unzip kuiper-linux-amd64-0.0.3.zip
    # cd kuiper
    # bin/server
    Serving Kuiper server on port 20498
    

    如果无法启动,请查看日志文件 log/stream.log

创建流

Kuiper 提供了一个命令用于管理流和规则,用户可以通过在命令行窗口中敲入 bin/cli 查看有哪些子命令及其帮助。cli 命令缺省连接的是本地的 Kuiper 服务器,cli 命令也可以连接到别的 Kuiper 服务器,用户可以在 etc/client.yaml配置文件中修改连接的 Kuiper 服务器。用户如果想了解更多关于命令行的信息,可以参考这里

创建流定义:创建流的目的是为了定义发送到该流上的数据格式,类似于在关系数据库中定义表的结构。 Kuiper 中所有支持的数据类型,可以参考这里

# cd kuiper
# bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'

上述语句在 Kuiper 中创建了一个名为 demo 的流定义,包含了两个字段,分别为 temperature 和 humidity,数据源为订阅 MQTT 的主题 devices/+/messages,这里请注意采用了通配符 +,用于订阅不同设备的消息。该数据源所对应的 MQTT 服务器地址在配置文件 etc/mqtt_source.yaml中,可以根据所在的服务器地址进行配置。如下图所示,配置 servers 项目。

#Global MQTT configurations
default:
  qos: 1
  sharedsubscription: true
  servers: [tcp://127.0.0.1:1883]

用户可以在命令行中敲入 describe 子命令来查看刚创建好的流定义。

# bin/cli describe stream demo
Connecting to 127.0.0.1:20498
Fields
--------------------------------------------------------------------------------
temperature float
humidity    bigint

FORMAT: JSON
DATASOURCE: devices/+/messages

数据业务逻辑处理

Kuiper 采用 SQL 实现业务逻辑,每10秒钟统计温度的平均值、最大值、最小值和次数,并根据设备 ID 进行分组,实现的 SQL 如下所示。

SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)

这里的 SQL 用了四个聚合函数,用于统计在10秒钟窗口期内的相关值。

  • avg:平均值
  • max:最大值
  • min:最小值
  • count:计数

另外还使用了两个基本的函数

  • mqtt:消息中取出 MQTT 协议的信息,mqtt(topic) 就是取得当前取得消息的主题名称
  • split_value:该函数将第一个参数使用第二个参数进行分割,然后第三个参数指定下标,取得分割后的值。所以函数 split_value("devices/001/messages", "/", 1)调用就返回001

GROUP BY 跟的是分组的字段,分别为计算字段 device_id;时间窗口 TUMBLINGWINDOW(ss, 10),该时间窗口的含义为每10秒钟生成一批统计数据。

调试 SQL

在正式写规则之前,我们需要对规则进行调试,Kuiper 提供了 SQL 的调试工具,可以让用户非常方便地对 SQL 进行调试。

  • 进入 kuiper 安装目录,并运行 bin/cli query

  • 在出现的命令行提示符中输入前面准备好的 SQL 语句。

    # bin/cli query
    Connecting to 127.0.0.1:20498
    kuiper > SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)
    query is submit successfully.
    kuiper >
    

    在日志文件 log/stream.log 中,可以看到创建了一个名为 internal-kuiper_query_rule 的临时规则。

    ...
    time="2019-11-12T11:56:10+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=internal-kuiper_query_rule
    time="2019-11-12T11:56:10+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=internal-kuiper_query_rule
    

    值得注意的是,这个名为 internal-kuiper_query_rule 的规则是通过 query 创建的,服务器端每5秒钟会检测一下 query 客户端是否在线,如果query 客户端发现有超过10秒钟没有反应(比如被关闭),那么这个内部创建的 internal-kuiper_query_rule 规则会被自动删除,被删除的时候在日志文件中会打印如下的信息。

    ...
    time="2019-11-12T12:04:08+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now."
    time="2019-11-12T12:04:08+08:00" level=info msg="stop the query."
    time="2019-11-12T12:04:08+08:00" level=info msg="unary operator project cancelling...." rule=internal-kuiper_query_rule
    ...
    
  • 发送测试数据

    通过任何的测试工具,向 EMQ X Edge 发送以下的测试数据。笔者在测试过程中用的是 JMeter 的 MQTT 插件,因为基于 JMeter 可以做一些比较灵活的自动数据生成,业务逻辑控制,以及大量设备的模拟等。用户也可以直接使用 mosquitto 等其它客户端进行模拟。

    • 主题:devices/$device_id/messages,其中$device_id 为下面数据中的第一列
    • 消息:{"temperature": $temperature, "humidity" : $humidity}, 其中$temperature$humidity 分别为下面数据中的第二列和第三列
    #device_id, temperature, humidity
    1,20,30
    2,31,40
    1,35,50
    2,20,30
    1,80,90
    2,45,20
    1,10,90
    2,12,30
    1,65,35
    2,55,32
    

    我们可以发现发送了模拟数据后,在 query 客户端命令行里在两个10秒的时间窗口里打印了两组数据。这里输出的结果条数跟用户发送数据的频率有关系,如果 Kuiper 在一个时间窗口内接受到所有的数据,那么只打印一条结果。

    kuiper > [{"device_id":"1","t_av":45,"t_count":3,"t_max":80,"t_min":20},{"device_id":"2","t_av":25.5,"t_count":2,"t_max":31,"t_min":20}]
    
    [{"device_id":"2","t_av":37.333333333333336,"t_count":3,"t_max":55,"t_min":12},{"device_id":"1","t_av":37.5,"t_count":2,"t_max":65,"t_min":10}]
    

创建、提交规则

完成了 SQL 的调试之后,开始配置规则文件,将结果数据通过 Kuiper 的 MQTT Sink 发送到远程的 AWS IoT 中。 在 AWS IoT 中,用户需要先创建好以下内容,

  • 设备:代表处理设备数据的网关,该网关安装了 Kuiper,网关在把相关相关数据处理完毕后,将结果发送到 AWS 云端。此处创建的名称为 demo,如下图所示。

aws_device.png

  • 设备连接证书与密钥:AWS 的物联网设备通过证书来连接,保证其安全性。在创建设备的过程中,AWS会生成的以下三个文件。这里会用到的是证书与私钥。

    • 证书:类似于 d3807d9fa5-certificate.pem
    • 私钥:类似于 d3807d9fa5-private.pem.key
    • 公钥:类似于 d3807d9fa5-public.pem.key

    关于这方面更多的信息,请参考 AWS 创建设备的文档

编写 Kuiper 规则文件

规则文件是一个文本文件,描述了业务处理的逻辑(前面已经调试好的 SQL 语句),以及 sink 的配置(消息处理结果的发送目的地)。连接 AWS IoT 的大部分信息都已经在前文中描述。 Kuiper 的测试结果将被发送到 AWS IoT设备的 devices/result 主题下。

{
  "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)",
  "actions": [
    {
      "log": {}
    },
    {
      "mqtt": {
        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
        "topic": "devices/result",
        "qos": 1,
        "clientId": "demo_001",
        "certificationPath": "/var/aws/d3807d9fa5-certificate.pem",
        "privateKeyPath": "/var/aws/d3807d9fa5-private.pem.key"
      }
    }
  ]
}

通过 Kuiper 命令行创建规则

# bin/cli create rule rule1 -f rule1.txt
Connecting to 127.0.0.1:20498
Creating a new rule from file rule1.txt. 
Rule rule1 was created.

在日志文件中可以查看规则的运行连接情况,如果配置项都正确的话,应该可以看到到 AWS IoT 的连接建立成功。

......
time="2019-11-13T17:41:19+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=rule1
time="2019-11-13T17:41:19+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=rule1
time="2019-11-13T17:41:20+08:00" level=info msg="The connection to server ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883 was established successfully" rule=rule1
......
  • 通过 AWS IoT 提供的 MQTT Client 工具,订阅设备的devices/result主题。并往本地的 EMQ X Edge 上发送模拟数据。经过 Kuiper 处理后,相应的处理结果被发送到了 AWS IoT 中。如下图所示,收到了两次测试结果(第一次结果被折叠)。

aws_iot_result.png

用户可以通过 AWS IoT Rule 将分析结果存储到 Amazon DynamoDB 数据库或者其它服务中,前端的应用程序可以通过读取 DynamoDB 中的数据来呈现给终端用户,具体请参考 Amazon DynamoDB 文档

总结

通过本文,读者可以了解到利用 EMQ X 在边缘端的解决方案可以非常快速、灵活地开发出基于边缘数据分析的系统,实现数据低时延、低成本和安全的处理。

AWS IoT 也提供了 Greengrass 的边缘解决方案,与 AWS Greengrass 相比,Kuiper 方案更加轻量级,业务逻辑的实现方式(基于 SQL)在编程上也相对更加简单。AWS Greengrass 提供了基于 Lambada 的编程模型,通过提供不同编程语言的 SDK 来实现边缘端的数据分析,以及往 AWS IoT 的分析结果上传,因此在业务逻辑实现的灵活性上会更好。最后,与 Greengrass 只能连接 AWS IoT 不同,Kuiper 在与不同的第三方 IoT 平台的集成的灵活性上也更好。

如果有兴趣了解更多关于边缘流式数据分析的内容,请参考 Kuiper 开源项目

关注我们获取最新动态

推荐阅读

EMQ X Kuiper 构建云-边协同物联网应用

在华为云智能边缘平台 IEF 上部署 EMQ X Kuiper,将边缘数据处理、传输至云端服务器。

2020-09-08
发布说明 - Kuiper 0.0.3

Kuiper团队将宣布发布Kuiper 0.0.3

发布说明 - Kuiper 0.0.4

EMQ X Kuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。Kuiper 设计的一个主要目标就是将在云端运行的实时流式计算框架(比如 Apache Spark Apache Storm 和 Apache Flink 等)迁移到边缘端