Java Reference

MessageBroker

Introduction

The KOS message broker component provides a simple and reliable way for software objects to:

  • Subscribe to desired message topics

  • Send messages based on those topics

Overview

The message broker provides for a one-to-many distribution of messages. It gives any number of publishers (producers) the ability to send messages to any number of subscribers (consumers). The following diagram illustrates this:

Message Broker
Figure 1. Typical message broker system
MessageBroker

The MessageBroker is a KOS-provided system component that you inject into in your code using the @Autowired annotation. You use it to subscribe to (receive) and publish (send) messages.

Definitions

This section defines the core terms associated with sending and receiving messages. It should give you the knowledge required to better understand the Code and Examples sections that follow.

Events/Messages

The MessageBroker exists to allow events, with optional data, to be sent and received between various software objects. The KOS system notifies objects about these events through broker messages.

For example, a message could indicate that the user pressed the dispense button on the UI. There might not be any additional data associated with that message.

Alternatively, a message could indicate that the user pressed some button on the UI, and its associated data could indicate which button, the press duration, etc.

Subscribe/Unsubscribe

To subscribe is to say "I would like to be notified when events/messages of a certain type occur".

To unsubscribe is to say "I am no longer interested in receiving those events/messages".

Sessions

A message broker session is a way to group topics. It is an arbitrary object that is required when you subscribe and unsubscribe to/from a message, and is optional when you send a message.

It’s not important what you use as your session. If your code is in a service, you can simply use the service object as the session, for example, or you can use some sort of string or numerical value. Subscriptions are tracked against the session, and you can use the session to unsubscribe from all subscriptions when you need to clean things up.

A session provides two primary benefits:

1) If a session is specified when you send, and it matches your subscription session, then you will not receive your own events. Consider the following code snippet:

messageBroker.subscribe("mySession", "/my/topic/here", callbackHandler);
messageBroker.send("mySession", "/my/topic/here");

In this example, the callbackHandler for the subscription won’t be called because it has the same session as the sender. Typically, the sending class has already handled the event, so there’s no need to receive a message saying the event occurred. This stops any potential race conditions.

2) You can unsubscribe from all topics in your session by using

messageBroker.unsubscribe("mySession");

This makes it very easy to unsubscribe from all related topics.

Topics

A message broker topic is any text that identifies the subject of discussion or conversation.

For example, if you wanted to receive alerts when a user logs in or logs out, you could create topics named "user-login", and "user-logout".

messageBroker.subscribe("mySession", "user-login", this);
messageBroker.subscribe("mySession", "user-logout", this);

You can also achieve this using a feature called topic paths. This allows you to create hierarchical topics using the forward-slash character. For example, you could name your topics "/user/login" and "/user/logout":

messageBroker.subscribe("mySession", "/user/login", this);
messageBroker.subscribe("mySession", "/user/logout", this);

However, this is even easier using topic paths combined with a wildcard:

messageBroker.subscribe("mySession", "/user/*", this);

The above call subscribes to all "/user" actions. In this case, both login and logout. Perhaps in the future another user action is defined, such as "suspend". Without changing the code, this object will receive this new "/user/suspend" topic.

To unsubscribe from a topic, call the following using the same session and topic you originally subscribed with:

messageBroker.unsubscribe("mySession", "/my/topic/here");

API Components

This section describes the three primary components of the KOS message broker system and the one custom exception:

  • MessageBroker (component)

  • MessageBrokerCallback (interface)

  • MessageData (class)

  • DuplicateSubscriptionException (exception)

MessageBroker (interface)

The following code shows the MessageBroker interface. The method parameters are:

  • session : all subscriptions are tracked by session, so if a session dies, then all related subscriptions are cleaned up

  • topic : the name of the topic to send or subscribe to

  • callback : a method to call when a new message is ready

  • body : the object containing data to transmit

  • bodyClass : the class to serialize the message’s data to

  • jsonView : the class to use when converting the body to a JSON payload (a type of filter)

  • jsonBody : the message body in parsed JSON

The bodyClass parameter, used in the second subscribe() method (line 7), is the object that the receiver wants. No matter what data the sender provides, the receiver wants this subset of the data. In other words, the bodyClass acts as a filter at data reception time.

The jsonView parameter, used in the third send() method (line 22), is the object that the sender will transport to the receiver. No matter what is in the body parameter, the sender will send the subset of that data, as indicated by the jsonView. In other words, the jsonView acts as a filter at data send time.

Both of these are demonstrated in the examples in section 5.

API: MessageBroker component
public interface MessageBroker {

    // Subscribe to the given topic:
    void subscribe(Object session, String topic, MessageBrokerCallback callback);

    // Subscribe to the given topic, with class for deserialization:
    void subscribe(Object session, String topic, Class<?> bodyClass, MessageBrokerCallback callback);

    // Unsubscribe from the given topic:
    void unsubscribe(Object session, String topic);

    // Unsubscribe from all topics in the session:
    void unsubscribe(Object session);

    // Send a topic:
    void send(String topic);

    // Send a topic with a message:
    void send(String topic, Object body);

    // Send a topic with message data filtered by the view:
    void send(Object session, String topic, Object body, Class<?> jsonView);

    // Send a topic with message and JSON body (typically not used):
    void sendJson(Object session, String topic, JsonNode jsonBody);
}

In your code, you inject the KOS system’s MessageBroker by using the @Autowired annotation:

Use @Autowired to instantiate the MessageBroker
public class MyClass() {

    @Autowired
    private MessageBroker messageBroker;

    // . . .
}

MessageBrokerCallback (interface)

The MessageBrokerCallback interface is implemented by classes that want to subscribe to broker messages. It has a single method that must be overridden:

API: MessageBrokerCallback interface
public interface MessageBrokerCallback {

    // Called when there's a new message for a subscribed topic:
    void onMessage(MessageData msgData);
}

MessageData (class)

The MessageData class holds the information passed by the message broker to the subscribing code:

API: MessageData class
@Data  // Lombok: provides all getters, setters, etc.
public class MessageData {
    private Object session;
    private String topic;
    private String subscription;
    private Object body;
    private JsonNode jsonBody;

    // For brevity, not showing:
    // Constructors
    // Getters & Setters
    // toString(), equals(), and hashCode() methods
}

DuplicateSubscriptionException

A DuplicateSubscriptionException is thrown when there’s an attempt to subscribe to a session and topic that’s already subscribed to:

DuplicateSubscriptionException example
public class MyClass {

    @Autowired
    private MessageBroker messageBroker;

    public void myMethod() {
        messageBroker.subscribe("sess1", "topic1", this);  // successfully subscribed
        // . . . CODE . . .
        messageBroker.subscribe("sess1", "topic1", this);  // throws DuplicateSubscriptionException
    }
}

Simple Example

Let’s take a look at a simple example. We’ll create two classes: a publisher and a subscriber.

Publisher

The publisher handles two topics: "/user/login" and "/user/logout" of a theoretical user.

When the loginTheUser() method is called, it publishes (to the message broker) the login topic with information about the user.

When the logoutTheUser() method is called, it publishes the logout topic, but with no data. A message sent without data is simply an "event".

Simple publisher
public class Publisher1 {

    public static final String LOGIN_TOPIC  = "/user/login";
    public static final String LOGOUT_TOPIC = "/user/logout";

    @Autowired
    private MessageBroker messageBroker;

    public void loginTheUser() {
        UserInfo userInfo = new UserInfo(157, "Sam", "Goody");
        messageBroker.send(LOGIN_TOPIC, userInfo);
    }

    public void logoutTheUser() {
        messageBroker.send(LOGOUT_TOPIC);
    }
}

Subscriber

The subscriber:

  • Asks the BeanContext to inject the MessageBroker (lines 5 & 6)

  • Implements the MessageBrokerCallback interface (lines 1 & 17)

  • Performs the necessary operations in the onMessage() callback method (lines 19 & 23)

When start() is called, the object subscribes to the "/user/*" topics (line 9).

When stop() is called, the object unsubscribes to all topics associated with the session (line 13).

The "session" can be anything; it’s just a mechanism to group topics together.

Simple Subscriber
public class Subscriber1 implements MessageBrokerCallback {

    private static final String USER_TOPICS = "/user/*";

    @Autowired
    private MessageBroker messageBroker;

    public void start() {
        messageBroker.subscribe(this, USER_TOPICS, this);
    }

    public void stop() {
        messageBroker.unsubscribe(this);
    }

    @Override
    public void onMessage(MessageData msgData) {
        switch (msgData.getTopic()) {
            case Publisher1.LOGIN_TOPIC:
                // We know the user logged in and who he/she is:
                UserInfo userInfo = (UserInfo)msgData.getBody();
                break;
            case Publisher1.LOGOUT_TOPIC:
                // We know the user logged out...
                break;
            default:
                // Unknown topic:
                break;
        }
    }
}

Advanced Example

Filtering what gets transmitted

Is this example, we want the publisher to call the send() method that contains the jsonView parameter. This jsonView parameter allows the broker to send a subset of the body data. Use this to reduce the message payload’s size.

In our example, there’s a class of type UserData, which contains the ID, first name, last name, email address, phone number, and misc notes fields.

  1. In the sendPublicUserData() method, only the "public" fields are transmitted (firstName and lastName).

  2. In the sendPrivateUserData() method, both the "public" and "private" fields are transmitted (all fields except miscNotes).

  3. In the sendAllUserData() method, all fields are transmitted (including miscNotes).

To see how each of these three sets of fields get defined, examine the UserData class, as well as the Views class with its Public and Private subclasses.

Publisher that uses the JsonView parameter
public class Publisher2 {

    @Autowired
    private MessageBroker messageBroker;

    private UserData fetchUserData() {
        return new UserData(13579, "Fred", "Flintstone",
                "fred@oldschool.com", "BR-549", "Very good worker");
    }

    public void sendPublicUserData() {
        // Only sends fields marked with "@JsonView(Views.Public.class)":
        messageBroker.send("MySession", "MyTopic", fetchUserData(), Views.Public.class);
    }

    public void sendPrivateUserData() {
        // Only sends fields marked with "@JsonView(Views.Public.class)"
        //                            or "@JsonView(Views.Private.class)":
        messageBroker.send("MySession", "MyTopic", fetchUserData(), Views.Private.class);
    }

    public void sendAllUserData() {
        // Sends all fields in UserData class:
        messageBroker.send("MyTopic", fetchUserData());
    }

    @Data                // Lombok: provides all getters, setters, etc.
    @AllArgsConstructor  // Lombok: constructor using all fields
    public static final class UserData {
        //@formatter:off
        @JsonView(Views.Private.class) private Integer id;
        @JsonView(Views.Public.class)  private String  firstName;
        @JsonView(Views.Public.class)  private String  lastName;
        @JsonView(Views.Private.class) private String  emailAddr;
        @JsonView(Views.Private.class) private String  phoneNumber;
        /* no @JsonView here */        private String  miscNotes;
        //@formatter:on
    }

    private static class Views {
        //@formatter:off
        private static class Public {}
        private static class Private extends Public {}
        //@formatter:on
    }
}

Filtering what gets received

To go along with the above Publisher2, we now create a Subscriber2.

No matter what data is sent to this topic, Subscriber2 only wants four fields: firstName, lastName, phoneNumber, and age. These are defined in the RecvData class.

Take a look at the following subscriber code, then we’ll examine what data is received for each of the three publisher "send" methods described above.

Subscriber that uses the "bodyClass" parameter
public class Subscriber2 implements MessageBrokerCallback {

    @Autowired
    private MessageBroker messageBroker;

    public void subscribeToMessage() {
        // Subscribe to topic and only receive RecvData objects:
        messageBroker.subscribe("MySession", "MyTopic", RecvData.class, this);
    }

    public void unsubscribeFromMessage() {
        messageBroker.unsubscribe("MySession", "MyTopic");
    }

    @Data
    private static final class RecvData {
        private String firstName;
        private String lastName;
        private String phoneNumber;
        private String miscNotes;
        private Integer age;
    }

    @Override
    public void onMessage(MessageData msgData) {
        RecvData recvData = (RecvData)msgData.getBody();
        log.info("Message received: {}", msgData);
    }
}

Examination of messages

This section examines what happens with each of the Publisher2 send() methods are received by the Subscriber2 object.

The following fields are transmitted on each of the three Publisher2 "send" calls:

#  method                 fields transmitted
----------------------------------------------------------------------------------------
1  sendPublicUserData()   firstName, lastName
2  sendPrivateUserData()  firstName, lastName, id, emailAddr, phoneNumber
3  sendAllUserData()      firstName, lastName, id, emailAddr, phoneNumber, miscNotes

The following fields are desired by the Subscriber2 "onMessage()" callback, per the RecvData class:

firstName, lastName, phoneNumber, miscNotes, age

In case #1, the resultant RecvData object contains the firstName and lastName fields.

In case #2, the resultant RecvData object contains the firstName, lastName, and phoneNumber fields.

In case #3, the resultant RecvData object contains the firstName, lastName, phoneNumber, and miscNotes fields.

The "age" field in every case is null, as that value is never transmitted.

This example demonstrated how you can:

  1. filter what data is transmitted, and

  2. filter what data is received.

Remote Messaging

All descriptions to this point have dealt with local messaging; that is, the sending and receiving objects exist in the same JVM.

This section deals with remote messaging, where the sending and receiving objects are on different JVMs.

Explained

Remote subscriptions connect through the KOS routers.

In every KOS system, there is a BrokerRouterBridge that manages remote subscriptions, and the session for all subscriptions is the source address of the remote connection. This has a few implications.

First, you will never receive your own events since the bridge will use source address on all sends to the broker. This is usually what you want, so that’s not an issue.

Second, each remote client can only subscribe to a given topic once. As mentioned above, the bridge will simply log the fact that there are multiple subscriptions, but nothing inherently bad happens. The concern is that if a client is subscribing more than once and one of those code paths unsubscribes, the other is also unsubscribed. Clients should therefore manage internal subscriptions such that a single external subscription is created per topic of interest.

To create a remote subscription, the client sends a message to the target that it wants to receive events from:

type:kos.broker.subscribe\n
\n
["/my/topic/here"]\n

This message is shown with explicit newlines, which delineate the "header" from the "body". The type of the message is always kos.broker.subscribe, and the body is a list of topics to subscribe to.

To unsubscribe, use a type of kos.broker.unsubscribe.

As with all router-related functionality, if there is no "dst-addr" in the message, then the message is processed by the router you’re connected to. This is typically what you want.

Next, consider loading the UI on one JVM, but wanting to subscribe to events on another. For example, the UI loads from the Studio JVM, but then you connect to and want to process events from a KOS JVM. In this case, you simply need to set the dst-addr header to the KOS JVM and your subscription will be forwarded to it. Similarly, when events are generated, they will be routed back to the UI automatically.

Summary

This page showed how to use the KOS system to publish and subscribe to messages.

Previous
Next
On this page
Java Development
Seamlessly transition from Legacy+ systems to Freestyle microdosing and advanced distributed dispense systems.
UI Development
Using KOS SDKs, integrating Consumer and Non-consumer facing UIs becomes seamless, giving you less hassle and more time to create.
Video Library
Meet some of our development team, as they lead you through the tools, features, and tips and tricks of various KOS tools.
Resources
Familiarize yourself with KOS terminology, our reference materials, and explore additional resources that complement your KOS journey.
Copyright © 2024 TCCC. All rights reserved.