使用自定义函数实现数据编解码、格式处理与业务告警

背景

在物联网平台的设备数据接入场景中,开发者总是希望平台接入的设备数据格式标准统一,以便对数据进行统一处理。在实际情况中,由于业务需要,平台常常会面对不同类型、不同厂商的设备接入。即使设备接入协议已经统一使用 MQTT ,由于 MQTT 协议中对 Payload 格式的宽松定义,应用开发者往往还需要针对不同设备上报格式进行加工处理。尤其在已经出厂的存量设备或是已经部署到现场的设备对接过程中,平台开发者往往无法要求设备侧按照平台的统一标准进行数据上报。因此,平台侧对于设备数据的统一化处理就成为开发过程中的一项重要工作。

设备数据处理常用方法对比

使用全托管 MQTT 消息云服务 EMQX Cloud 进行设备数据接入的过程中,解决数据格式统一化通常有以下几种方案。

数据透传

通过 EMQX Cloud 接入数据后将报文透传到应用侧,在后台业务应用中对各类型数据进行格式转换与处理。这种方式相对最为直接,但对应用开发者最不友好。尤其在数据吞吐量比较大的情况下,后台业务应用既要处理数据格式,又要完成业务处理,对业务端性能也会影响比较大。

通过规则引擎进行数据处理

EMQX Cloud 内置的数据集成规则引擎提供了基于 SQL 语言的规则定义与数据处理能力。开发者可以通过内置函数对数据格式和内容进行处理。这种低代码数据处理方式将数据格式与内容处理前置到消息中间件层,为后台业务应用开发提供了高性能、标准化的数据处理方式。但是受限于规则引擎的内置函数与方法,用户无法实现逻辑相对复杂的数据处理需求,且规则引擎也不支持比较复杂的逻辑判断。

优化方案:自定义函数

为了增强规则引擎数据处理能力,EMQX Cloud 推出了「自定义函数」增值服务。

用户可以通过编写基于 ECMAScript 5.1 及部分 ECMAScript 6 的语法的自定义脚本实现数据处理逻辑,并在 EMQX Cloud 规则引擎中调用,这样就可以完成更加复杂的数据预处理逻辑,并通过规则引擎将函数返回的数据按业务需求流转至消息消费方。

考虑到执行性能对整体消息吞吐性能的影响,EMQX Cloud 提供的自定义函数会对执行脚本进行性能检查,脚本执行时间不能超过 3 秒,否则将无法通过验证。因此我们不建议在脚本中编写高耗时的操作。

在实际场景中使用自定义函数

下面我们通过几个例子来看看自定义函数可以用于哪些业务场景。

数据编解码

在设备数据上报场景中,对于网络带宽受限以及功耗敏感的设备,在数据传输过程中往往会将设备数据通过更加紧凑的十六进制格式进行传输,以便节省带宽和功耗。在平台侧,如果业务应用需要理解上报数据的具体业务意义,则需根据规约将十六进制报文解析成 JSON 格式报文,方便应用开发。

示例

平台需要接入一个智能井盖设备,井盖设备需要定时上传 4 个传感器数据,设备通过十六进制数格式上报到平台,每两个字节代表一个传感器数据:

序号 名称 代号 数值说明
1 晃动传感器 HD 0 or 1
2 唤醒传感器 HX 0 or 1
3 温度传感器 WD 温度度数
4 震动传感器 ZD 0 or 1

我们通过自定义函数将十六进制数转换为有意义的 JSON 格式,方便后台业务应用对数据进行消费。我们在 EMQX Cloud 中创建一个自定义函数:

在 EMQX Cloud 中创建一个自定义函数

假如此时设备上报数据为 00011601,Payload 作为入参进入自定义函数输入为 { 0x00, 0x01, 0x16, 0x01};我们以此输入测试函数输出,可以看到结果如下:

自定义函数输出

这样我们就可以通过自定义函数,将设备上报的编码数据格式转化为更为易处理的 JSON 格式供后端业务应用消费。

数据格式处理

在设备数据上报过程中,数据上传通常会以比较简单的格式进行传输,比如会按默认规定省略一些默认的数据与格式。但对于平台侧应用来说,一般需要能够通过数据处理补齐相关的数据以达成数据格式的标准化。此时我们可以通过自定义函数来实现数据格式补齐。

示例

我们在办公楼的机房内部署了温湿度传感器以用来检测环境数据,传感器通过 MQTT 协议以 JSON 格式的Payload 上报报文到云端平台。数据报文格式如下:

{
  "hum" : 20.0,
  "temp" : 36.5
}

我们希望通过自定义函数对每个上报报文进行单位的补齐,这时可以通过定义如下函数来实现:

通过自定义函数对每个上报报文进行单位的补齐

通过函数测试我们可以看到,输出的数据被添加了相应的单位格式:

自定义函数输出

这样,我们就完成了对上报报文的格式转换工作。

数据告警

除了通过函数处理报文格式外,我们也可以通过逻辑判断实现业务告警。虽然 EMQX 提供的规则引擎也可以进行简单的数值判断,但是相对复杂的逻辑判断与数据处理更适合在自定义函数中进行。

示例

沿用上一个场景的设备数据,我们可以通过判断温度与湿度是否在正常工作状态来设置告警信息。比如机房的正常工作温度在 15 到 40 摄氏度之间,湿度在 30% 到 60% 之间。我们就可以通过在自定义函数中进行判断后,对超过工作温湿度的上报数据进行告警处理。

自定义函数设置数据告警

如果我们从传感器上报的实时湿度超过工作范围,我们将得到告警信息。

自定义函数输出告警信息

这样,后台应用就可以通过输出的 Warning 信息来进行业务告警处理。

另外在 EMQX Cloud 规则引擎中调用自定义函数可以支持多个入参,比如 Topic、ClinetID 等。用户可以结合各种设备与主题信息进行对报文的加工与处理,在此不一一赘述。

结语

物联网设备接入的业务场景纷繁复杂,EMQX Cloud 提供的自定义函数功能为广大物联网开发者提供了更加灵活的数据处理方式。用户可以在自定义函数中实现相对复杂的数据处理逻辑,通过 EMQX Cloud 服务快速部署函数与设备数据和应用对接,大大提升开发效率。

有关自定义函数的具体使用方法,请参考 EMQX Cloud 官方文档的相关章节:https://docs.emqx.com/zh/cloud/latest/vas/codec.html

免费试用 EMQX Cloud
无须绑定信用卡
开始试用 →

推荐阅读

EMQX Cloud 更新:更易用的规则引擎,三步轻松实现数据处理转存

全新改版的「数据集成」将使规则引擎的使用逻辑更加符合用户习惯,为用户利用 EMQX Cloud 搭建符合业务需求的物联网平台与应用提供便利。

2022-03-31
EMQX Cloud更新:数据集成新增 HStreamDB & Tablestore

在最近的版本更新中,EMQX Cloud 又新增了HStreamDB 和阿里云 Tablestore 两个外部数据库扩展支持。

2022-08-12
开箱即用的数据缓存服务|EMQX Cloud 影子服务应用场景解析

通过影子服务提供的平台数据缓存能力,用户可以更加便捷地实现各类场景应用,缩短研发周期。

2022-09-26