EMQX Rule Engine Series - Store messages to PostgreSQL database
Introduction to PostgreSQL Database
As an important open source relational databases, PostgreSQL claims to be the most advanced open source database in the world. Compared to other open source relational databases such as MySQL, PostgreSQL is a completely community-driven open source project, maintained by more than 1,000 contributors worldwide. PostgreSQL provides a single full-featured version, that is unlike MySQL which offers multiple different versions for community, business, and enterprise. PostgreSQL is based on a free BSD / MIT license, and organizations can use, copy, modify, and redistribute its code with a copyright notice.
PostgreSQL has many features, and has more support in the field of GIS. Its "lock-free" feature is very prominent, supports function and condition indexes, and has a mature clustering solution. PostgreSQL also has powerful SQL programming capabilities such as statistical functions and statistical syntax support. With the Timescaledb plugin, PostgreSQL can be transformed into a full-featured time-series database Timescaledb.
Scenario introduction
Under this scenario, it is required to store the messages that meet the conditions under the topic specified by EMQX to the PostgreSQL database. In order to facilitate subsequent analysis and retrieval, the message content needs to be split and stored.
The data reported by the client in this scenario is as follows:
Topic:testtopic
Payload:
{"msg":"Hello, World!"}
Preparation
Create a database
Create a tutorial database with a username of postgres and a password of password:
$ docker pull postgres
$ docker run --rm --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=password -d postgres:latest
$ docker exec -it postgres psql -U postgres
> CREATE database tutorial;
> \c tutorial
Create a data table
Create t_mqtt_msg
table:
CREATE TABLE t_mqtt_msg (
id SERIAL primary key,
msgid character varying(64),
sender character varying(64),
topic character varying(255),
qos integer,
retain integer,
payload text,
arrived timestamp without time zone
);
Configuration instructions
Create a resource
Open EMQX Dashboard, enter the Resources page of the left menu, click the New button, select the PostgreSQL resource type and complete the related configuration for resource creation.
Create a rule
Enter the Rules page on the left menu and click the New button to create a rule. Here we choose to trigger event of message.publish, which means when EMQX receives the message of PUBLISH , the rule is triggered for data processing.
After the trigger event is selected, we can see optional fields and sample SQL on the interface:
Filter required fields
The rules engine uses SQL statements to filter and process data. Here we need data such as msgid, topic, payload, and we want to match messages from all topics. Therefore, we only need to delete the WHERE clause based on the default SQL. In the end, we get the following SQL:
SELECT
*
FROM
"message.publish"
SQL Test
With the SQL test feature, we can quickly confirm whether the SQL statement we just filled out can achieve our purpose. First we fill in the payload and other data for test as follows:
After clicking the Test button, we get the following data output:
{
"client_id": "c_emqx",
"event": "message.publish",
"id": "589A429E9572FB44B0000057C0001",
"node": "emqx@127.0.0.1",
"payload": "{\"msg\":\"Hello, World!\"}",
"peername": "127.0.0.1:50891",
"qos": 1,
"retain": 0,
"timestamp": 1564037750692,
"topic": "testtopic",
"username": "u_emqx"
}
The test output contains all the required data and we can proceed to the next steps.
Add response action, and store message to PostgreSQL
After the SQL condition input and output are correct, we continue to add corresponding actions, configure to write SQL statements, and store the filtered results in PostgreSQL.
Click the Add button in the response action, select the action of Save Data to PostgreSQL , select the PostgreSQL
resource you just created and fill in the SQL template as follows:
insert into t_mqtt_msg(msgid, topic, qos, retain, payload, arrived) values (${id}, ${topic}, ${qos}, ${retain}, ${payload}, to_timestamp(${timestamp}::double precision /1000)) returning id
Finally, click the New button to complete the rule creation.
Test
Expected outcome
We have successfully created a rule that contains a processing action. The expected outcome of the action is as follows:
- When the client reports a message, the message will hit SQL and the number of hits in the rule list will increase by 1;
- A piece of data will be added to the
t_mqtt_msg
table of the PostgreSQLtutorial
database. The data content is consistent with the message content.
Test with Websocket tools in Dashboard
Switch to the Tools -> Websocket page, use any information client to connect to EMQX. After the connection is successful, send the following message in the message card:
Topic:testtopic
Payload:
{"msg":"Hello, World!"}
Click the Send button. After sending successfully, we can see that the number of hits of the current rule has changed to 1.
Then check PostgreSQL to see if the new data point was added successfully:
So far, we have used the rule engine to implement business development to store messages to a PostgreSQL database.
Share