Upcoming Webinar: MQTT over QUIC: A New Standard for Connected Vehicles
Upcoming Webinar: MQTT over QUIC: A New Standard for Connected Vehicles
Register Now →

Storing Messages to InfluxDB Time Series Database | EMQX Rule Engine Series

EMQX Team EMQX Team Nov 27, 2019


InfluxDB is an open source database for storing and analyzing time series data, with built-in HTTP API, and the support for SQL-like statements and unstructured features are very friendly for users. Its powerful data throughput and stable performance make it ideal for the IoT area.

With the EMQX messaging engine, we can customize the Template file and then convert the Json-formatted MQTT message into Measurement to write to InfluxDB:


Scenario Introduction

In this scenario, it is required to store the messages that meet the criteria under EMQX in the InfluxDB time series database. In order to facilitate subsequent analysis and retrieval, the message content needs to be split for storage.

The data reported by the device in this scenario is as follows:

  • Topic:data/sensor

  • Payload:

      "location": "bedroom",
      "data": {
        "temperature": 25,
        "humidity": 46.4,
        "pm2_5": 0.5


Database Installation and Initialization

Create a db database and open the 8089 UDP port.

$ docker pull influxdb

$ git clone -b v1.0.0 https://github.com/palkan/influx_udp.git

$ cd influx_udp

$ docker run --name=influxdb --rm -d -p 8086:8086 -p 8089:8089/udp -v ${PWD}/files/influxdb.conf:/etc/influxdb/influxdb.conf:ro -e INFLUXDB_DB=db influxdb:latest

Configuration Instructions

Create a Resource

Open EMQX Dashboard, go to the Resources page on the left menu, click the New button, type MySQL server information for resource creation, select the InfluxDB resource type and complete the relevant configuration for resource creation.


Create a Rule

Go to the Rules page on the left menu and click the New button to create the rule. Select the trigger event message.publish , which is triggered when the message is published for data processing.

After selecting the trigger event, we can see the optional fields and sample SQL on the interface:


Filter the Required Fields

The rules engine uses SQL statements to filter and process data. For example, in the scenario mentioned above, we need to extract the fields in payload, which can be implemented by payload.<fieldName>. At the same time we only expect to handle the data/sensor topic, then we can use the topic wildcard =~ to filter the topic in the WHERE clause: topic =~ 'data/sensor', and finally we get the SQL as follows:

  payload.location as location,
  payload.data.temperature as temperature,
  payload.data.humidity as humidity,
  payload.data.pm2_5 as pm2_5
    topic =~ 'data/sensor'

SQL Test

With the SQL test function, we can quickly confirm whether the SQL statement just filled in can achieve our goal. We firstly fill in the payload and other data for testing as follows:


Then click the Test button and get the following output, which is as expected.

  "humidity": 46.4,
  "location": "bedroom",
  "pm2_5": 0.5,
  "temperature": 25

Add a Response Action and Store the Message to InfluxDB

After the input and output of SQL condition is correct, we continue to add the corresponding action, configure to write SQL statement, and store the filtered result in MySQL.

Click the Add button in the response action, select action of Save Data to InfluxDB, select the InfluxDB resource just created, and then fill the ${fieldName} into Field Keys according to actual needs. In Tag Keys and Timestamp Key, Measurement represents the Measurement used when writing data to InfluxDB. Finally, click the New button to complete the rule creation.



Expected Result

We successfully created a rule that contains a processing action, and expected result of the action is as follows:

  1. When the client reports a message to the data/sensor topic, it will hit the rule, and the number of hit in the rule list is increased by 1;
  2. A piece of data will be added to the db database in InfluxDB, and the data content is consistent with the processed message content

Test With the Websocket Tool in Dashboard

Switch to the Tools --> Websocket page, connect to EMQX with any Client ID, and send the following message in the Message card after the connection is successful:

  • Topic:data/sensor

  • Payload:

      "location": "bedroom",
      "data": {
        "temperature": 25,
        "humidity": 46.4,
        "pm2_5": 0.5


Click the Send button. After the transmission succeeds, you can see that number of hits for current rule has changed to 1.

Then check InfluxDB and see if the new data point is added successfully:

$ docker exec -it influxdb influx

> use db
Using database db
> select * from "sensor_data"
name: sensor_data
time                humidity location pm2_5 temperature
----                -------- -------- ----- -----------
1561535778444457348 46.4     bedroom  0.5   25

So far, we have implemented the business development of using the rules engine to store messages to InfluxDB .

Before reading this tutorial, assume that you already know simple knowledge about MQTT,EMQX .

Try EMQX Cloud for Free
A fully managed, cloud-native MQTT service
Get Started →

Related Posts

Nov 26, 2019 EMQX Team
Bridging MQTT Data to Kafka | EMQX Rule Engine Series

The EMQX 3.2 version introduces the Rules Engine feature that supports screening data reported by the EMQX Broker terminal, which is processed and streamed to the back-end database or other message queues. This article uses a specific scenario to explain "How to use the rules engine to forward messages to Kafka"