Storing Messages to MySQL Database | EMQX Rule Engine Series
Scenario Introduction
In this scenario, it is required to store the messages that meet the criteria under EMQX in the MySQL database. In order to facilitate subsequent analysis and retrieval, the message content needs to be split for storage.
The information reported by the device in this scenario is as follows:
Reported topic:cmd/state/:id,Topic id represents the vehicle client ID
Message body:
{ "id": "NXP-058659730253-963945118132721-22", // Client identification code "speed": 32.12, // vehicle speed "direction": 198.33212, // driving direction "tachometer": 3211, // Engine speed, which is required for storage when the value is greater than 8000 "dynamical": 8.93, // Instantaneous fuel consumption "location": { // GPS Latitude and longitude data "lng": 116.296011, "lat": 40.005091 }, "ts": 1563268202 // reporting time }
When the reported data of engine speed value is greater than `8000', the current information is stored for subsequent analysis of the user's vehicle usage.
Preparation
Create a Database
Create the iot_data
database to store the message data, specifying the database encoding as utf8mb4
to avoid coding problems:
CREATE DATABASE `emqx_rule_engine_output` CHARACTER SET utf8mb4;
Create a Data Table
According to the scenario requirements, create a data table use_statistics
with structure and field comments as follows:
CREATE TABLE `use_statistics` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`client_id` varchar(100) DEFAULT NULL COMMENT 'Client identification code',
`speed` float unsigned DEFAULT '0.00' COMMENT 'current vehicle speed',
`tachometer` int(11) unsigned DEFAULT '0' COMMENT 'engine speed',
`ts` int(11) unsigned DEFAULT '0' COMMENT 'Reported timestamp',
`msg_id` varchar(50) DEFAULT NULL COMMENT 'MQTT message ID',
PRIMARY KEY (`id`),
KEY `client_id_index` (`client_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
After the creation is successful, confirm the existence of the data table with the following MySQL command:
Database changed
mysql> desc use_statistics;
+------------+------------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+------------+------------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| client_id | varchar(100) | YES | MUL | NULL | |
| speed | float unsigned | YES | | 0 | |
| tachometer | int(11) unsigned | YES | | 0 | |
| ts | int(11) unsigned | YES | | 0 | |
| msg_id | varchar(50) | YES | | NULL | |
+------------+------------------+------+-----+---------+----------------+
6 rows in set (0.01 sec)
Configuration Instructions
Create a Resource
Open EMQX Dashboard, go to the Resources page on the left menu, click the New button, type MySQL server information for resource creation.
The network environment of the nodes in the EMQX cluster may be different. After the resources are created successfully, click the Status button in the list to check the connection status of each node. If the resources on the node are unavailable, check whether the configuration is correct and the network connectivity is correct, and click the Reconnect button to manually reconnect.
Create a Rule
Go to the Rules page on the left menu and click the New button to create the rule. Select the trigger event Publishing message here, which is triggered when the message is published for data processing.
After selecting the trigger event, we can see the optional fields and sample SQL on the interface:
Filter the Required Fields
The rule engine uses SQL statements to process rule conditions. In this business, we need to select all the fields in payload
separately, use the payload.fieldName
format to select, and also need the topic context information of topic
, qos
, id
, the current SQL is as follows:
SELECT
payload.id as client_id, payload.speed as speed,
payload.tachometer as tachometer,
payload.ts as ts, id
FROM
"message.publish"
WHERE
topic =~ 't/#'
Establish Filtering Criteria
Conditional filtering is done by using the SQL statement WHERE clause, in which we need to define two conditions:
- Only handle
cmd/state/:id
topic, use the topic wildcard=~
to filtertopic
: `topic =~ 'cmd/state/+' - Only handle
tachometer > 8000
messages, use the comparator to filtertachometer
:payload.tachometer > 8000
Combine the previous step to get the SQL as follows:
SELECT
payload.id as client_id, payload.speed as speed,
payload.tachometer as tachometer,
payload.ts as ts,
id
FROM
"message.publish"
WHERE
topic =~ 'cmd/state/+'
AND payload.tachometer > 8000
Output Testing via SQL Test Function
With the SQL test function, we can view the current SQL processed data output in real time. This function requires us to specify the simulated raw data such as payload.
The payload data is as follows, note to change the tachometer
value to satisfy the SQL condition:
{
"id": "NXP-058659730253-963945118132721-22",
"speed": 32.12,
"direction": 198.33212,
"tachometer": 9001,
"dynamical": 8.93,
"location": {
"lng": 116.296011,
"lat": 40.005091
},
"ts": 1563268202
}
Click the SQL Test toggle button, change topic
and payload
into the information in the scenario, and click the Test button to view the data output:
The test output data is as follows:
{
"client_id": "NXP-058659730253-963945118132721-22",
"id": "589A429E9572FB44B0000057C0001",
"speed": 32.12,
"tachometer": 9001,
"ts": 1563268202
}
The test output is as expected and we can proceed to the next step.
Add a Response Action and Store the Message to MySQL
After the input and output of SQL condition is correct, we continue to add the corresponding action, configure to write SQL statement, and store the filtered result in MySQL.
We populate the SQL statement with the ${fieldName}
syntax, insert the data into the database, and finally click the New button to complete the rule creation.
The SQL configuration of the action is as follows:
INSERT INTO
`use_statistics` (`client_id`, `speed`, `tachometer`, `ts`, `msg_id`)
VALUES
(${client_id}, ${speed}, ${tachometer}, ${ts}, ${id});
Test
Expected Result
We successfully created a rule that contains a processing action, and expected result of the action is as follows:
- When the device reports a message to the
cmd/state/:id
topic, it will hit SQL when the value oftachometer
in the message exceeds 8000, and the number of hit in the rule list is increased by 1; - A piece of data will be added to the 'use_statistics' table in MySQL
iot_data
database with the same value as the current message.
Test With the Websocket Tool in Dashboard
Switch to tools -> Websocket page, connect to EMQX with any client, and send the following message to message card after successful connection:
Topic: cmd/state/NXP-058659730253-963945118132721-22
Message body:
{ "id": "NXP-058659730253-963945118132721-22", "speed": 32.12, "direction": 198.33212, "tachometer": 9002, "dynamical": 8.93, "location": { "lng": 116.296011, "lat": 40.005091 }, "ts": 1563268202 }
Click the Send button to view the rule hit statistics after the successful transmission. The data statistic value of hit is 1 to indicate that the rule has been successfully hit. View the data table record with the MySQL command line to get the following data:
So far, we have implemented the business development of using the rules engine to store messages to MySQL .