Instant Integration of EMQX and HStreamDB

With the challenge of massive connections of devices in the IoT era and the large-scale real-time data streams generated along with them, EMQ provides a modern data infrastructure from the edge to the cloud, facilitating the unified "connect, move, process and analyse"of cloud-edge IoT data.

EMQX, an all-in-one cloud-native distributed messaging broker with the SQL-based rule engine, has overcome the challenge of massive connections. While the streaming database HStreamDB is trying to solve the other half: storage, processing and real-time analysis of these enormous IoT data.

HStreamDB, as the first stream-native database explicitly designed for streaming data, is dedicated to efficient large-scale data stream storage and management. Combining EMQX and HStreamDB, one-stop management of massive data access, storage, real-time processing and analysis will be no longer complicated.

EMQX and HStreamDB

In the HStreamDB v0.6 release, HStream has provided data append Rest API, which allows writing data to HStreamDB through Rest API with any language and is convenient for users in the community to carry out further development with HStreamDB. We utilise this feature combined with the Webhook of EMQX to realise a fast integration of EMQX and HStreamDB.

This article will introduce and explain how to bridge the data from EMQX to HStreamDB and fulfil the persistent data storage.

Note The practice in this article bases on EMQX 4.3 and the image of hstreamdb/hstream:v0.6.1.

Start EMQX and HStreamDB

First, we need a running EMQX. For how to install, deploy and start, please refer to EMQ Docs.

At the same time, we need a running HStreamDB. For more detailed tutorials on installing, deploying, and starting it, please refer to HStreamDB Docs.

Users who are not familiar with HStreamDB can quickly start a stand-alone HStreamDB cluster through docker-compose as follows.

Start HStreamDB

First, download the docker-compose.yaml file directly through the link.

Create a file to store database data:

mkdir /data/store

Start HStreamDB in the background:

docker-compose -f quick-start.yaml up -d


docker-compose -f quick-start.yaml logs hstream-http-server

You will see the following log:

Server is configured with: 
     gRPCServerHost: hserver 
     gRPCServerPort: 6570 
     httpServerPort: 6580 
Setting gRPC connection 
Setting HTTP server 
Server started on port 6580

Create the required Stream through HStreamDB CLI

A stream is an object used to store streaming data in HStreamDB, which can be regarded as a collection of data.

Start HStreamDB CLI

Start an HStreamDB command-line interface with docker:

docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream hstream-client --port 6570 --client-id 1

You will enter the following interface:

      __  _________________  _________    __  ___
     / / / / ___/_  __/ __ \/ ____/   |  /  |/  /
    / /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
   / __  /___/ // / / _  _/ /___/ ___ |/ /  / /
  /_/ /_//____//_/ /_/ |_/_____/_/  |_/_/  /_/


Create HStreamDB Stream to store the bridged data:

> CREATE STREAM emqx_rule_engine_output ; 

Of course, we can also get the created Stream through SHOW:


Configure EMQX

Then, we open the Dashboard of EMQX, click Rule Engine and enter the Resource section.

EMQX Dashboard Resource

We can first create a WebHook resource, as shown below:

EMQX Dashboard Create WebHook

Fill in the listening address of hstream-http-server in the column of Request URL,<host>:6580/streams/emqx_rule_engine_output:publish.Then, click on the test connection button to test the connection.

EMQX Dashboard test connection

Next, we will create the required rules for the integration:

create EMQX rules

  str(payload) as payload,
  0 as flag

We need to add an Action Handler, select Action as Data to Web Server:

add EMQX Action

Set Method to POST and add content-type application/json to Header.

At this time, we have completed the most basic bridging settings. Next, let's test it through Websocket and HStreamDB CLI.

Check whether integration is complete with HStreamDB CLI

First, we create a query in the HStreamDB CLI that we just started:

1> SELECT * FROM emqx_rule_engine_output EMIT CHANGES;

In HStreamDB, each stream represents a series of continuously changing streaming data. Therefore, a Query does not simply read data, but continuously reads and outputs the processed data written in the stream. In the CLI, the starting point for reading and outputting data is the moment when the Query is successfully created. Currently, what we can observe is that there is no output in CLI.

At this point, we can write data to EMQX through the WebSocket of EMQX DashBoard or other mqtt clients (such as MQTT X).

The WebSocket is used in the following example. We can first connect to the emqx cluster we started:

EMQX DashBoard WebSocket

Then, send data to the specified topic:

EMQX DashBoard send data

If everything works as expected, we can see the data we sent to EMQX in the HStreamDB CLI in real-time.

1> SELECT * FROM emqx_rule_engine_output EMIT CHANGES;

So far, we have completed the persistent storage of the data sent to EMQX in HStreamDB.

By integrating EMQX with HStreamDB, we can achieve persistent storage of the data sent to EMQX and perform real-time processing and analysis to obtain further data insight.

With the continuous development of the two products, we believe that the combination of EMQX + HStreamDB will play an essential role in the analysis and processing scenarios, catalyze data conversion and monetization and power the value creation of enterprise data assets, especially in the IoT field.

Related Posts

EMQX rule engine series (7) store messages to the MongoDB database

MongoDB is a product between relational database and non-relational database. Among non-relational databases, MongoDB has the most abundant functions and most resembles relational database. MongoDB is written in C++ and is an open source database system based on distributed file storage. MongoDB is designed to provide a scalable, high-performance data storage solution for data storage. It can easily add more nodes under high load to ensure service performance.

EMQX Rule Engine Series - Store Messages to InfluxDB Time Series Database

InfluxDB is an open source database for storing and analyzing time series data, with built-in HTTP API, and the support for SQL-like statements and unstructured features are very friendly for users. Its powerful data throughput and stable performance make it ideal for the IoT area.

EMQX Rule Engine Series (14)-Bridge Messages to Pulsar

In this scenario, it is required to bridge messages under the EMQX specified topic that meet the criteria to the Pulsar . To facilitate subsequent analysis and retrieval, the message content needs to be split.

Haigui Deng 2020-03-09