Verifying Kuiper stream processing with MQTTX
This article will use the MQTTX scripts and timing function to simulate reporting temperature and humidity data. EMQX Edge acts as the messaging middleware to forward messages, and EMQX Kuiper performs receiving messages and processing rules. Finally, the processed data will be sent to MQTTX via EMQX Edge.
Introduction and installation
All the runtime environments demonstrated in this article are all built through Docker. If you have other installation needs, you can also refer to the download link and installation document provided below to build them.
Kuiper
Kuiper is a lightweight IoT edge analysis and stream processing open source software implemented by Golang, and it can run on various resource-limited edge devices. One of the main design goals of Kuiper is to migrate the real-time stream computing frameworks (such as Apache Spark, Apache Storm and Apache Flink, etc.) running in the cloud-side to the edge-side. Kuiper refers to the architecture and implementation of the above-mentioned cloud-side stream processing projects above, use the features of edge streaming data processing and adopt the rule engine which is written based on Source
, SQL(business logic processing)
and target (Sink)
to implement edge streaming data processing. Project address: https://github.com/emqx/kuiper
Version: v1.0.2
Download link | Installation document
# Get a Docker mirroring
$ docker pull emqx/kuiper:1.0.2
# Enabling a Docker container
$ docker run -p 9081:9081 -d --name kuiper emqx/kuiper:1.0.2
Kuiper-manager
This article will use Kuiper-manager to visually manage and use EMQX Kuiper. Kuiper-manager is a Web management console that can be used to manage Kuiper nodes, streams, rules, plugins, etc.
Version: v1.0.2
Currently only supported using Docker mirroring
# Get a Docker mirroring
$ docker pull emqx/kuiper-manager:1.0.2
# Enabling a Docker container
$ docker run -p 9082:9082 -d emqx/kuiper-manager:1.0.2
EMQX Edge
EMQX Edge is a lightweight multi-protocol IoT edge message middleware that supports being deployed on resource-limited IoT edge hardware.
Version: v4.2.4
Download link | Installation document
# Get a Docker mirroring
$ docker pull emqx/emqx-edge:4.2.4
# Enabling a Docker container
$ docker run -d --name emqx -p 1883:1883 emqx/emqx-edge:4.2.4
MQTTX
MQTTX is a cross-platform MQTT 5.0 desktop test client that supports macOS, Linux, Windows. Users can quickly create multiple simultaneous online MQTT client for convenient testing the connect/publish/subscribe functions of MQTT/TCP, MQTT/TLS, MQTT/WebSocket and other MQTT protocol features.
Version: v1.4.2
Users can download the installation package according to their operating system from the MQTTX website or GitHub download page.
Linux users can download in Snapcraft: https://snapcraft.io/mqttx
Tutorials
After the environment has been built, we can collocate with the features between the modules to use it, perform feature testing and verification.
Use of Kuiper-manager
First, we create and configure the streams and rules for Kuiper. After installing and successfully running Kuiper-manager, we open a browser and enter http://localhost:9082
. If you access kuiper-manager from other computers, please change localhost
to the IP address where you are running kuiper-manager. The password and username you will need to enter when you first open it is: admin
/ public
. It is recommended that you change your password after logging in for the first time.
Nodes
After a successful login, you will be taken to a node management interface. Click on the Add Node
button, there will be a pop-up box, and you need to add an instance node of Kuiper. In here, because we are using the general node, we select the first item Directly Connected Node
. In addition to the directly connected node, adding the Huawei IEF node is also supported now, which will not be covered in this article. Then, you need to enter the endpoint address
of the Kuiper instance to be manipulated and enter the node name
to identify the node.
Note: If you use Docker to start it, the endpoint address needs to be entered as the IP address within the Docker container.
After successfully added, we can access the node instance through clicking on the node name in the node list. Once inside, we will then create and configure the flows and rules of this Kuiper instance.
Streams
Once you are on the Kuiper instance page, you will go to the Tab page of streams, where we click on the Create Streams
button on the right to go to the Create Streams page, where you can follow these steps:
Enter a
stream name
to identify it, here we enter the stream name asdemo
;Enter the structure definition, for example, we can define in advance which field types are in the stream data that this stream need to receive. To add, simply enter the name of the field and select the type to add, which includes
bigint
,float
,string
,boolean
,array
,struct
etc. Structure definitions are optional and can be cancelled or switched on by checkingwhether stream with structure
above the structure list; when the structure definition is cancelled, data of any structure type will be received. In this article we have specified the data structures to be processed, so we add two separate fields:temperature
andhumidity
, both of typebigint
.Enter
Data source
, we will use MQTT as the message source in this article, so this configuration allows you to enter theTopic
for receiving messages, here we enter/kuiper/stream
.Select
Stream Type
, which will be chosen here as MQTT.Select
Configuration Group
, and the configuration group is the configuration information defined under the stream type, for example, the default MQTT configuration groupservers
information is['tcp://127.0.0.1:1883']
. You can customize this configuration information by clicking on theSource Configuration
button above to go to the page to configure, or you can go to theetc
directory to modify the configuration file. Here we select the reconfigureddemo_conf
configuration group.Note: If the MQTT Broker used is the EMQX Edge initiated by Docker, the address of Servers needs to be changed to the IP address within the Docker container
Select
Stream Format
, which will be chosen asjson
finally.
In addition to the above visual creation methods, we can also switch to text mode by clicking on the toggle button in the top right corner of the page. A stream can be created by entering the SQL statement used to create the stream directly. SQL example:
CREATE STREAM demo (
temperature bigint,
humidity bigint,
) WITH (DATASOURCE="/kuiper/stream", FORMAT="json", CONF_KEY="demo_conf", TYPE="mqtt");
After clicking the Submit
button, we have successfully created a stream. The next step is to set up the rules for the created stream.
Rules
Click on the Tab item of the rule to go to the list of rules page. We click on the Create Rule
button on the right to go to the create rule page, and at this point, you can follow the steps below.
Enter the
rule ID
to mark the rule, here we enterdemoRule
.Enter the SQL statement for the rule runtime query. Here you will define a SQL statement that queries the temperature and humidity data in the data stream and sets the filter condition to that the temperature is greater than 30. SQL example:
SELECT temperature, humidity FROM demo WHERE temperature > 30
Select the
Action
of the added rule, which is the Sink action group, the data can be multi-selected, and Sink is the target of the output when the rule is executed. Here we are still using MQTT and forwarding the data that has executed by the rule via MQTT. After the selection is complete, you can enter the configuration information for the MQTT Sink. In this article, we will only configure the address of the MQTT Broker and theTopic
information, andTopic
is the topic of the received message.Note: If the MQTT Broker used is the EMQX Edge initiated by Docker, the address of Broker needs to be filled as the IP address within the Docker container
Set
Options
, and part of options are optional and all options have default values. If you wish to change them, you can do so by referring to the Kuiper documentation.
In addition to the above visual creation methods, we can also switch to text mode by clicking on the toggle button in the top right corner of the page. Rules can be created by entering the JSON data of creation rule directly, JSON example:
{
"id": "demoRule",
"sql": "SELECT temperature, humidity FROM demo WHERE temperature > 30",
"actions": [
{
"mqtt": {
"server": "tcp://172.17.0.2:1883",
"topic": "/kuiper/rule"
}
}
]
}
After clicking the Submit
button, we have successfully created a rule. So far, we have completed the Kuiper data stream and rule configuration. Next we will use MQTTX to test and verify Kuiper's stream processing capabilities.
The use of MQTTX
Once the download and installation is complete, we open MQTTX and create a new connection called edge1
to an EMQX Edge with the same configuration as the Kuiper Source. After testing the connection successfully, we go to the Scripts
page and use the example script provided below to generate the simulation data.
/**
* Simulated temperature and humidity reporting
* @return Return a simulated temperature and humidity JSON data - { "temperature": 23, "humidity": 40 }
* @param value, MQTT Payload - {}
*/
function random(min, max) {
return Math.round(Math.random() * (max - min)) + min
}
function handlePayload(value) {
let _value = value
if (typeof value === 'string') {
_value = JSON.parse(value)
}
_value.temperature = random(10, 40)
_value.humidity = random(20, 40)
return JSON.stringify(_value, null, 2)
}
execute(handlePayload)
Testing found that the simulated data was successful, and we went to the connection page, opened the script to use the function (using the script function is not described in detail in this article, you can refer to the MQTTX documentation). Enter the Payload
data template to be sent as {}
, enter Topic
as the Data Source
in the stream definition, in this case /kuiper/stream
, then set the timing message, set the sending frequency to 1 second, then click Send. After the message has been successfully sent, MQTTX will automatically send one simulated test data per second.
At this point, we create a new connection again called edge2
to the EMQX Edge with the same configuration as the Kuiper Sink and subscribe to the Topic
configured in the MQTT Sink. In this case, we subscribe to the /kuiper/rule
topic, to receive data processed by Kuiper.
Verify Results
Once we have sent the simulated data, we can see if any messages are coming in or going out by clicking on the Status
button in the rules list. We can see from the screenshot below that Kuiper received a total of 40 messages and filtered out 14 messages.
Then continue to look at the messages within MQTTX. edge1
has sent a total of 40 simulated messages at regular intervals, switching to edge2
we see that a total of 14 messages have been received. The sent and received data is consistent with the Kuiper inflow and outflow data, and the temperature
in the received messages is exactly above 30, which satisfies the filtering conditions we set in Kuiper. This means that our Kuiper stream processing function has successfully completed the data processing requirements we set, and the test and verification was successful.
In addition to viewing the information of data processed by Kuiper rules via the Status button, you can also click on the `Topology' button to go to the topology diagram of the rule, which shows the complete flow of data and the status of the rule, and allows you to view real-time dynamic information about the specific data processing modules.
Summary
This article completes an easy tutorial on using the MQTTX client to verify the function of Kuiper stream processing. Kuiper can be used in various IoT edge scenarios. The system response speed can be improved, network bandwidth costs and storage costs can be saved, and system security can be improved via the processing of Kuiper at the edge.
In addition to the MQTT Source and MQTT Sink exemplified in the article, Kuiper has many diverse Source and Sink configurations built in and includes the ability to integrate with EdgeX Foundry, KubeEdge, EMQX Edge, etc. Rule SQL also support for 60+ common functions, provide extension points available to extend custom functions. A powerful plugin system is provided that is highly extensible.
The three projects used in this article are all fully open source. You can go to GitHub (EMQX Kuiper, EMQX Edge, MQTTX) to submit problems you encountered during use, or to Fork our projects and submit revised PRs to us, which we will review and address promptly. We would also like to thank all the users in the community for their contributions and feedback.