EMQX 规则引擎系列(三)存储消息到 InfluxDB 时序数据库

前言

InfluxDB 是一个用于存储和分析时间序列数据的开源数据库,内置 HTTP API,类 SQL 语句的支持和无结构的特性对使用者而言都非常友好。它强大的数据吞吐能力以及稳定的性能表现使其非常适合 IoT 领域。

通过 EMQX 消息引擎,我们可以自定义 Template 文件,然后将 Json 格式的 MQTT 消息转换为 Measurement 写入 InfluxDB: Artboard.jpg

场景介绍

该场景需要将 EMQX 指定主题下且满足条件的消息存储到 InfluxDB 时序数据库。为了便于后续分析检索,消息内容需要进行拆分存储。

该场景下客户端上报数据如下:

  • Topic:data/sensor

  • Payload:

    {
      "location": "bedroom",
      "data": {
        "temperature": 25,
        "humidity": 46.4,
        "pm2_5": 0.5
      }
    }
    

准备工作

数据库安装及初始化

创建 db 数据库并开放 8089 UDP 端口。

$ docker pull influxdb

$ git clone -b v1.0.0 https://github.com/palkan/influx_udp.git

$ cd influx_udp

$ docker run --name=influxdb --rm -d -p 8086:8086 -p 8089:8089/udp -v ${PWD}/files/influxdb.conf:/etc/influxdb/influxdb.conf:ro -e INFLUXDB_DB=db influxdb:latest

配置说明

创建资源

打开 EMQX Dashboard,进入左侧菜单的 资源 页面,点击 新建 按钮,选择 InfluxDB 资源类型并完成相关配置进行资源创建。

image20190719110910530.png

创建规则

进入左侧菜单的 规则 页面,点击 新建 按钮,进行规则创建。触发事件 选择 message.publish,即在 EMQX 收到 PUBLISH 消息时触发该规则进行数据处理。

选定触发事件后,我们可在界面上看到可选字段及示例 SQL:

image20190719112141128.png

筛选所需字段

规则引擎使用 SQL 语句过滤和处理数据。例如前文提到的场景中我们需要将 payload 中的字段提取出来使用,则可以通过 payload.<fieldName> 实现。同时我们仅仅期望处理 data/sensor 主题,那么可以在 WHERE 子句中使用主题通配符 =~topic 进行筛选:topic =~ 'data/sensor', 最终我们得到 SQL 如下:

SELECT
  payload.location as location,
  payload.data.temperature as temperature,
  payload.data.humidity as humidity,
  payload.data.pm2_5 as pm2_5
FROM
  "message.publish"
WHERE
    topic =~ 'data/sensor'

SQL 测试

借助 SQL 测试功能,我们可以快速确认刚刚填写的 SQL 语句是否能达到我们的目的。首先填写用于测试的 payload 等数据如下:

image20190719113731130.png

然后点击 测试 按钮,得到以下输出结果,与预期相符。

{
  "humidity": 46.4,
  "location": "bedroom",
  "pm2_5": 0.5,
  "temperature": 25
}

添加响应动作,存储消息到 InfluxDB

SQL 条件输入输出无误后,我们继续添加响应动作,配置写入 SQL 语句,将筛选结果存储到 InfluxDB。

点击响应动作中的 添加 按钮,选择动作 保存数据到 InfluxDB,选取刚刚创建的 InfluxDB 资源,再按照实际需求将 ${fieldName} 填写到 Field Keys, Tag KeysTimestamp Key 中,Measurement 表示将数据写入 InfluxDB 时使用的 Measurement,最后点击 新建 按钮完成规则创建。

image20190719115340429.png

测试

预期结果

我们成功创建了一条规则,包含一个处理动作,动作期望效果如下:

  1. 客户端向 data/sensor 主题上报消息时,该消息将命中规则,规则列表中 已命中 数字将会增加 1;
  2. InfluxDB 的 db 数据库中将会增加一条数据,数据内容与处理后的消息内容一致。

使用 Dashboard 中的 Websocket 工具测试

切换到 工具 --> Websocket 页面,使用任意 Client ID 连接到 EMQX,连接成功后在 消息 卡片中发送如下消息:

  • Topic:data/sensor

  • Payload:

    {
      "location": "bedroom",
      "data": {
        "temperature": 25,
        "humidity": 46.4,
        "pm2_5": 0.5
      }
    }
    

image20190719133414535.png

点击 发送 按钮,发送成功后可以看到当前规则已命中次数已经变为 1。

然后检查 InfluxDB,新的 data point 是否添加成功:

$ docker exec -it influxdb influx

> use db
Using database db
> select * from "sensor_data"
name: sensor_data
time                humidity location pm2_5 temperature
----                -------- -------- ----- -----------
1561535778444457348 46.4     bedroom  0.5   25

至此,我们通过规则引擎实现了存储消息到 InfluxDB 数据库的业务开发。

在阅读该教程之前,假定你已经了解 MQTTEMQX 的简单知识。

免费试用 EMQX Cloud
全托管的云原生 MQTT 消息服务
开始试用 →

推荐阅读

「一次连接、无限集成」:EMQX 企业版 4.3.0 正式发布

EMQX 企业版 v4.3.0 继承了开源版 4.3.0 中的诸多性能提升和功能改进,并在此基础上新增了 Kafka 分区动态扩容的支持,以及更灵活的通过 Kafka 下发 MQTT 消息的方式。

2021-05-25
EMQX 规则引擎系列(十三)- 消息写入到 TimescaleDB

TimescaleDB 是一款针对快速获取和复杂查询而优化的开源时间序列数据库。 它使用标准的 SQL 语句,并且像传统的关系数据库那样容易使用,像 NoSQL 那样可扩展。

2019-12-17
EMQX 规则引擎系列(九)- 消息写入到 TDEngine

TDengine 是涛思数据(北京涛思数据科技有限公司)推出的一款开源的专为物联网、车联网、工业互联网、IT 运维等设计和优化的大数据平台。除核心的快 10 倍以上的时序数据库功能外,还提供缓存、数据订阅、流式计算等功能,最大程度减少研发和运维的复杂度。

2019-10-28