The Most Trusted MQTT Platform for loV and Connected Cars →

Efficient Persistence of IoT Streaming Data | EMQX and HStreamDB Data Integration Practice

Bin Wang
Mar 22, 2023
Efficient Persistence of IoT Streaming Data | EMQX and HStreamDB Data Integration Practice

In the context of the IoT, one commonly faces significant challenges, such as handling a vast number of devices, coping with high data generation rates, and dealing with large accumulated data volumes. Consequently, the problem of how to access, store, and process these massive amounts of device-generated data has become a critical issue.

EMQX is a powerful MQTT message broker designed for the IoT, capable of handling billions of device connections in a single cluster, while also offering a rich set of data aggregation capabilities. HStreamDB is a distributed streaming database that not only efficiently stores massive device data generated by EMQX but also provides real-time processing and analysis capabilities. Both EMQX and HStreamDB are highly scalable and reliable. By integrating them, one can not only meet the performance and stability requirements of large-scale IoT applications but also improve the real-time nature of the applications.

Streaming Database

With the release of EMQX Enterprise 4.4.15, which includes support for the latest version of HStreamDB, this article provides a comprehensive guide on how to leverage the EMQX rule engine to persist data to HStreamDB, thereby enabling the storage and real-time processing of MQTT data streams.

Note: For the integration steps presented in this article, it is assumed that the user is utilizing EMQX 4.4.15 and HStreamDB 0.14.0 or a later version.

Connecting to HStreamDB cluster

In the following tutorial, we assume that there is a running EMQX Enterprise cluster and running HStream cluster. To develop or obtain an EMQX Enterprise cluster, please refer to the EMQX Enterprise docs. To develop or obtain an HStreamDB cluster, please refer to the HStreamDB docs, which contains instructions on how to quick start with Docker or deploy locally or on the cloud.

We can connect to the HStreamDB cluster through the HStream client. Here we use Docker for this purpose:

# get some help
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream --help

We use command hstream stream here to create some streams, for demonstration purposes:

# create streams via `hstream stream`
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream stream create basic_condition_info_0 -r 3 -b $(( 7 * 24 * 60 * 60 ))

Then, connect to interactive HStream SQL Shell:

docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql --service-url "<<YOUR-SERVICE-URL>>"
# and fill --tls-ca, --tls-key, --tls-cert if you are using TLS or HStreamDB cloud

If the connection to cluster is established, we will see:

      __  _________________  _________    __  ___
     / / / / ___/_  __/ __ \/ ____/   |  /  |/  /
    / /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
   / __  /___/ // / / _, _/ /___/ ___ |/ /  / /
  /_/ /_//____//_/ /_/ |_/_____/_/  |_/_/  /_/

  :h                           To show these help info
  :q                           To exit command line interface
  :help [sql_operation]        To show full usage of sql statement

  To create a simplest stream:
    CREATE STREAM stream_name;
  To create a query select all fields from a stream:
    SELECT * FROM stream_name EMIT CHANGES;
  To insert values to a stream:
    INSERT INTO stream_name (field1, field2) VALUES (1, 2);

After enter the HStream SQL Shell, we can use show streams; to view info about existed streams:

> show streams;
| Stream Name            | Replica | Retention Time | Shard Count |
| basic_condition_info_0 | 1       | 0 seconds      | 1           |

Creating an HStreamDB Resource

Before utilizing the EMQX Rule Engine to persist data to HStreamDB, it is necessary to create an HStreamDB resource.

To do this, access the EMQX Dashboard, click on Rules Engine -> Resources -> Create, select HStreamDB Resources, enter the resource address, and fill out the necessary options. The available options are:

Name Description Type Required Default Value
HStream Server HStream Server String Yes
Pool Size Size of HStream Connection Pool Number Yes 8
gRPC Timeout Timeout for gRPC calls to the HStreamDB server (ms) Number No 5000
Enable SSL Enable SSL Boolean No No

When SSL is enabled, there are some additional options, which user can paste or upload SSL configurations.

upload SSL configurations

Creating a Rule

Click on Rules Engine -> Rule -> Create.

Creating a Rule

Edit the SQL rule and add an action, where you can use an SQL variable in a string template.

Please note that the SQL rules presented in this document are for demonstration purposes only, and actual SQL should be written according to the business design.

Click Add Action and select Data Persistence to save the data to HStreamDB. Select the resource that was created in the previous step and enter the parameters. The available parameters are:

Name Description Type Required Default Value
Stream Stream. Cannot be placeholders String Yes
PartitionKey PartitionKey. Placeholders supported String No default PartitionKey
gRPC Flush Period gRPC Flush Period (ms) Number No 10000
Enable Batch Append Enable Batch Append Boolean No true
Max Batch Append Count Max Batch Append Count Number No 100
Max Batch Interval Maximum interval in milliseconds that is allowed between two successive (batch) request (ms) Number No 500

Add Action

Click Confirm to create the rule.

create the rule

Getting timely data updates from HStream SQL Shell

Data that has been persisted from the EMQX rules engine to HStreamDB can be read in real-time using the HStream SQL Shell to query new data written to the stream. With the data already written to HStream, any consumer can be used to consume messages. The documentation presents a simple consumption method using the HStream SQL Shell. In addition, readers are free to choose their preferred programming language and SDK to write their consumer application.

# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;

The current select query does not have any results to print because no data has been written to HStreamDB through the EMQX rules engine yet. Once data is written, you will be able to observe real-time updates to the data in the HStream SQL Shell. Currently, when using SQL to query streams in HStream, the results are printed after the query is executed. If a query is created after EMQX has stopped writing to HStreamDB, the results may not be observed. For more information and concepts about streams and SQL in HStreamDB, please refer to the HStreamDB SQL documentation.

Writing Data to EMQX

Now use the state-of-the-art MQTT desktop client - MQTTX to connect to EMQX and send a message.

MQTT desktop client

Obtaining the Running Status of the Rule Engine from EMQX Dashboard

We can check the metrics of rules via EMQX Dashboard:

MQTT Dashboard

If the metrics for the rules engine are operating normally, this means that EMQX is persisting data to HStreamDB. Once data is successfully written, real-time updates to the data can be obtained through the HStream SQL Shell.

> select * from basic_condition_info_0 emit changes;


Thus, we have completed the main process of persisting data from EMQX rules engine to HStreamDB.

Once the data collected by EMQX is stored in HStreamDB, real-time processing and analysis of this data can be performed to support upper-level AI, big data, and other applications, further uncovering and utilizing the value of data. As the first cloud-native streaming database designed for streaming data, HStreamDB, in combination with EMQX, can provide a one-stop solution for storing and processing massive IoT data in real-time, streamlining the IoT application data stack and accelerating enterprise IoT application development.

Try EMQX Enterprise for Free
Get Started →

Related Posts

Mar 26, 2021Shifan Yu
Verifying Kuiper stream processing with MQTTX

This article will use the script and timing function of MQTT X to simulate temperature and humidity data reporting, and combine with EMQX Edge to verify Kuiper's stream processing function.