Using Java to develop EMQX plugins

From v4.1, EMQX MQTT broker provides the specified plugin that supports multiple languages emqx_extension_hook. Currently, it is supported that use other programming languages to process the hook events of EMQX. The developer can use Python or Java to quickly develop their plugins or do some expansions based on the official functions to satisfy their business scenarios. For example:

  • Verify the client's login permission: when connecting to the client, the corresponding function will be triggered and the client information will be obtained through parameters. Finally, it judges whether it has login permission after reading the database, comparison, etc.
  • Record the online status of client and online and offline history: trigger corresponding functions when the status of the client changes, the client information will be obtained through parameters, and the online status of the client in the database will be rewritten.
  • Verify the operation permission for PUB/SUB of the client: trigger corresponding functions when publish or subscribe, and the client information and current topics will be obtained through parameters to judge whether it has the corresponding operation permission.
  • Handle session and message events, implement the subscription relation and message processing or storage: trigger corresponding functions when publishing messages and status changes, the current client information, information status and message content will be forwarded to Kafka or database for storage.

Note:the message hook is only supported in the enterprise.

Python and Java drivers are based on the processes Erlang/OTP-Port to implement communication, and have very high throughput performance. This article will take Java expansion as an example to introduce how to use EMQX cross-language expansion.


The example of using Java expansions


  • The broker of EMQX is required to install JDK 1.8 or higher version


  1. Create a Java project
  2. Download file io.emqx.extension.jar and erlport.jar
  3. Add SDK io.emqx.extension.jar and erlport.jar to the dependency of the project
  4. Copy examples/ to your project
  5. Write business code according to the example of SDK to ensure successfully compile.


You need to deploy sdk and code files into EMQX after compiling all the source code.

  1. Copy io.emqx.extension.jar into emqx/data/extension directory
  2. Copy the compiled .class file, such as SampleHandler.class into emqx/data/extension directory
  3. Modify the configuration file emqx/etc/plugins/emqx_extension_hook.conf:
exhook.drivers = java
## Search path for scripts or library = data/extension/ = SampleHandler

Enable plugin emqx_extension_hook. If configuration error or write wrong Java code, it can not be enabled normally. After it is enabled, try to establish the MQTT connection and observer the running situation of the business.


The example of the program is as follows. This program inherits the class DefaultCommunicationHandler of the SDK. This code example demonstrates how to mount all hooks of the EMQX system.


public class SampleHandler extends DefaultCommunicationHandler {

    public ActionOptionConfig getActionOption() {
        ActionOptionConfig option = new ActionOptionConfig();
        option.set(Keys.MESSAGE_PUBLISH_TOPICS, "#");
        option.set(Keys.MESSAGE_DELIVERED_TOPICS, "#");
        option.set(Keys.MESSAGE_ACKED_TOPICS, "#");
        option.set(Keys.MESSAGE_DROPPED_TOPICS, "#");

        return option;

    // Clients
    public void onClientConnect(ConnInfo connInfo, Property[] props) {
        System.err.printf("[Java] onClientConnect: connInfo: %s, props: %s\n", connInfo, props);

    public void onClientConnack(ConnInfo connInfo, ReturnCode rc, Property[] props) {
        System.err.printf("[Java] onClientConnack: connInfo: %s, rc: %s, props: %s\n", connInfo, rc, props);

    public void onClientConnected(ClientInfo clientInfo) {
        System.err.printf("[Java] onClientConnected: clientinfo: %s\n", clientInfo);

    public void onClientDisconnected(ClientInfo clientInfo, Reason reason) {
        System.err.printf("[Java] onClientDisconnected: clientinfo: %s, reason: %s\n", clientInfo, reason);

    // Determine the authentication result, return true or false 
    public boolean onClientAuthenticate(ClientInfo clientInfo, boolean authresult) {
        System.err.printf("[Java] onClientAuthenticate: clientinfo: %s, authresult: %s\n", clientInfo, authresult);

        return true;

    // Determine the ACL check result, return true or false
    public boolean onClientCheckAcl(ClientInfo clientInfo, PubSub pubsub, Topic topic, boolean result) {
        System.err.printf("[Java] onClientCheckAcl: clientinfo: %s, pubsub: %s, topic: %s, result: %s\n", clientInfo, pubsub, topic, result);

        return true;

    public void onClientSubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
        System.err.printf("[Java] onClientSubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);

    public void onClientUnsubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
        System.err.printf("[Java] onClientUnsubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);

    // Sessions
    public void onSessionCreated(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionCreated: clientinfo: %s\n", clientInfo);

    public void onSessionSubscribed(ClientInfo clientInfo, Topic topic, SubscribeOption opts) {
        System.err.printf("[Java] onSessionSubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);

    public void onSessionUnsubscribed(ClientInfo clientInfo, Topic topic) {
        System.err.printf("[Java] onSessionUnsubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);

    public void onSessionResumed(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionResumed: clientinfo: %s\n", clientInfo);

    public void onSessionDiscarded(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionDiscarded: clientinfo: %s\n", clientInfo);

    public void onSessionTakeovered(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionTakeovered: clientinfo: %s\n", clientInfo);

    public void onSessionTerminated(ClientInfo clientInfo, Reason reason) {
        System.err.printf("[Java] onSessionTerminated: clientinfo: %s, reason: %s\n", clientInfo, reason);

    // Messages
    public Message onMessagePublish(Message message) {
        System.err.printf("[Java] onMessagePublish: message: %s\n", message);

        return message;

    public void onMessageDropped(Message message, Reason reason) {
        System.err.printf("[Java] onMessageDropped: message: %s, reason: %s\n", message, reason);

    public void onMessageDelivered(ClientInfo clientInfo, Message message) {
        System.err.printf("[Java] onMessageDelivered: clientinfo: %s, message: %s\n", clientInfo, message);

    public void onMessageAcked(ClientInfo clientInfo, Message message) {
        System.err.printf("[Java] onMessageAcked: clientinfo: %s, message: %s\n", clientInfo, message);

SampleHandler mainly includes two sections:

  1. Override the method getActionOption. This method configures hooks related to Message and specifies the list of topics that need to be in effect.

    | Configuration items | Corresponding hook | | ------------------------ | ------------------ | | MESSAGE_PUBLISH_TOPICS | message_publish | | MESSAGE_DELIVERED_TOPICS | message_delivered | | MESSAGE_ACKED_TOPICS | message_acked | | MESSAGE_DROPPED_TOPICS | message_dropped |

  2. Override the method on<hookName>. These methods are the callback function to deal with hook events. The method how to name function is that add the prefix on in the front of each variant hook name. The way of variant is that use CamelCase after removing the underline of the hook name, for example, the hook client_connect corresponds function name onClientConnect. The events that are generated by EMQX such as: connect, publish, subscribe, etc, will finally be distributed to the callback function of these hook events. Next, the callback function can operate every attribute and status. The program example only prints each parameter. If you only care about partly hook events, only need to override the callback function of this part hook events instead of overriding all the callback functions.

The timing of executing each callback function and the list of supported hooks are the same as the build-in hooks of EMQX, please refer to Hooks - EMQX.

The simplest method is inheriting the superclass DefaultCommunicationHandler, when you implement your expansion programs. This superclass wraps the binding of each hook and callback function, and further wraps the parameter data structure involved in the callback function to facilitate a quick start.

Advanced development

If you have higher requirements for the controllability of Java extensions and the class DefaultCommunicationHandler can not satisfy your requirements, you can control code logic from a lower layer through implementing interface CommunicationHandler.


public interface CommunicationHandler {

    public Object init();

    public void deinit();
  • Method init(): for initialization, declaring which hooks are required in the extension, and the configuration of mounting
  • Method deinit(): for logout

For the detailed introduction of data format, please refer to the design documentation.

Try EMQX Cloud for Free
No credit card required
Get Started →

Related Posts

EMQX plugin persistence series (5) - MySQL MQTT data storage

This article uses the practical example in `CentOS 7.2` to illustrate how to store related MQTT data through MySQL.

EMQX MQTT broker HTTP authentication plugin tutorial

EMQX MQTT broker authentication plugin will throw authentication and access control event for each MQTT client to the user's own WebServer, to implement authentication and ACL.

Persisting MQTT sessions and messages to Redis

This article will use actual examples to introduce how to store the MQTT session and message data of EMQX MQTT broker through Redis.