
The KOS message broker component provides a simple and reliable way for software objects to:
Subscribe to desired message topics
Send messages based on those topics
The message broker provides for a one-to-many distribution of messages. It gives any number of publishers (producers) the ability to send messages to any number of subscribers (consumers). The following diagram illustrates this:
MessageBroker
The MessageBroker is a KOS-provided system component that you inject into in your code using the @Autowired annotation. You use it to subscribe to (receive) and publish (send) messages. |
This section defines the core terms associated with sending and receiving messages. It should give you the knowledge required to better understand the Code and Examples sections that follow.
The MessageBroker exists to allow events, with optional data, to be sent and received between various software objects. The KOS system notifies objects about these events through broker messages.
For example, a message could indicate that the user pressed the dispense button on the UI. There might not be any additional data associated with that message.
Alternatively, a message could indicate that the user pressed some button on the UI, and its associated data could indicate which button, the press duration, etc.
To subscribe is to say "I would like to be notified when events/messages of a certain type occur".
To unsubscribe is to say "I am no longer interested in receiving those events/messages".
A message broker session is a way to group topics. It is an arbitrary object that is required when you subscribe and unsubscribe to/from a message, and is optional when you send a message.
It’s not important what you use as your session. If your code is in a service, you can simply use the service object as the session, for example, or you can use some sort of string or numerical value. Subscriptions are tracked against the session, and you can use the session to unsubscribe from all subscriptions when you need to clean things up.
A session provides two primary benefits:
1) If a session is specified when you send, and it matches your subscription session, then you will not receive your own events. Consider the following code snippet:
messageBroker.subscribe("mySession", "/my/topic/here", callbackHandler);
messageBroker.send("mySession", "/my/topic/here");
In this example, the callbackHandler for the subscription won’t be called because it has the same session as the sender. Typically, the sending class has already handled the event, so there’s no need to receive a message saying the event occurred. This stops any potential race conditions.
2) You can unsubscribe from all topics in your session by using
messageBroker.unsubscribe("mySession");
This makes it very easy to unsubscribe from all related topics.
A message broker topic is any text that identifies the subject of discussion or conversation.
For example, if you wanted to receive alerts when a user logs in or logs out, you could create topics named "user-login", and "user-logout".
messageBroker.subscribe("mySession", "user-login", this);
messageBroker.subscribe("mySession", "user-logout", this);
You can also achieve this using a feature called topic paths. This allows you to create hierarchical topics using the forward-slash character. For example, you could name your topics "/user/login" and "/user/logout":
messageBroker.subscribe("mySession", "/user/login", this);
messageBroker.subscribe("mySession", "/user/logout", this);
However, this is even easier using topic paths combined with a wildcard:
messageBroker.subscribe("mySession", "/user/*", this);
The above call subscribes to all "/user" actions. In this case, both login and logout. Perhaps in the future another user action is defined, such as "suspend". Without changing the code, this object will receive this new "/user/suspend" topic.
To unsubscribe from a topic, call the following using the same session and topic you originally subscribed with:
messageBroker.unsubscribe("mySession", "/my/topic/here");
This section describes the three primary components of the KOS message broker system and the one custom exception:
MessageBroker (component)
MessageBrokerCallback (interface)
MessageData (class)
DuplicateSubscriptionException (exception)
The following code shows the MessageBroker interface. The method parameters are:
session : all subscriptions are tracked by session, so if a session dies, then all related subscriptions are cleaned up
topic : the name of the topic to send or subscribe to
callback : a method to call when a new message is ready
body : the object containing data to transmit
bodyClass : the class to serialize the message’s data to
jsonView : the class to use when converting the body to a JSON payload (a type of filter)
jsonBody : the message body in parsed JSON
The bodyClass parameter, used in the second subscribe()
method (line 7), is the object that the receiver wants. No matter what data the sender provides, the receiver wants this subset of the data. In other words, the bodyClass acts as a filter at data reception time.
The jsonView parameter, used in the third send()
method (line 22), is the object that the sender will transport to the receiver. No matter what is in the body
parameter, the sender will send the subset of that data, as indicated by the jsonView
. In other words, the jsonView
acts as a filter at data send time.
Both of these are demonstrated in the examples in section 5.
public interface MessageBroker {
// Subscribe to the given topic:
void subscribe(Object session, String topic, MessageBrokerCallback callback);
// Subscribe to the given topic, with class for deserialization:
void subscribe(Object session, String topic, Class<?> bodyClass, MessageBrokerCallback callback);
// Unsubscribe from the given topic:
void unsubscribe(Object session, String topic);
// Unsubscribe from all topics in the session:
void unsubscribe(Object session);
// Send a topic:
void send(String topic);
// Send a topic with a message:
void send(String topic, Object body);
// Send a topic with message data filtered by the view:
void send(Object session, String topic, Object body, Class<?> jsonView);
// Send a topic with message and JSON body (typically not used):
void sendJson(Object session, String topic, JsonNode jsonBody);
}
In your code, you inject the KOS system’s MessageBroker by using the @Autowired annotation:
public class MyClass() {
@Autowired
private MessageBroker messageBroker;
// . . .
}
The MessageBrokerCallback interface is implemented by classes that want to subscribe to broker messages. It has a single method that must be overridden:
public interface MessageBrokerCallback {
// Called when there's a new message for a subscribed topic:
void onMessage(MessageData msgData);
}
The MessageData class holds the information passed by the message broker to the subscribing code:
@Data // Lombok: provides all getters, setters, etc.
public class MessageData {
private Object session;
private String topic;
private String subscription;
private Object body;
private JsonNode jsonBody;
// For brevity, not showing:
// Constructors
// Getters & Setters
// toString(), equals(), and hashCode() methods
}
A DuplicateSubscriptionException is thrown when there’s an attempt to subscribe to a session and topic that’s already subscribed to:
public class MyClass {
@Autowired
private MessageBroker messageBroker;
public void myMethod() {
messageBroker.subscribe("sess1", "topic1", this); // successfully subscribed
// . . . CODE . . .
messageBroker.subscribe("sess1", "topic1", this); // throws DuplicateSubscriptionException
}
}
Let’s take a look at a simple example. We’ll create two classes: a publisher and a subscriber.
The publisher handles two topics: "/user/login" and "/user/logout" of a theoretical user.
When the loginTheUser()
method is called, it publishes (to the message broker) the login topic with information about the user.
When the logoutTheUser()
method is called, it publishes the logout topic, but with no data. A message sent without data is simply an "event".
public class Publisher1 {
public static final String LOGIN_TOPIC = "/user/login";
public static final String LOGOUT_TOPIC = "/user/logout";
@Autowired
private MessageBroker messageBroker;
public void loginTheUser() {
UserInfo userInfo = new UserInfo(157, "Sam", "Goody");
messageBroker.send(LOGIN_TOPIC, userInfo);
}
public void logoutTheUser() {
messageBroker.send(LOGOUT_TOPIC);
}
}
The subscriber:
Asks the BeanContext to inject the MessageBroker (lines 5 & 6)
Implements the MessageBrokerCallback interface (lines 1 & 17)
Performs the necessary operations in the onMessage()
callback method (lines 19 & 23)
When start()
is called, the object subscribes to the "/user/*" topics (line 9).
When stop()
is called, the object unsubscribes to all topics associated with the session (line 13).
The "session" can be anything; it’s just a mechanism to group topics together.
public class Subscriber1 implements MessageBrokerCallback {
private static final String USER_TOPICS = "/user/*";
@Autowired
private MessageBroker messageBroker;
public void start() {
messageBroker.subscribe(this, USER_TOPICS, this);
}
public void stop() {
messageBroker.unsubscribe(this);
}
@Override
public void onMessage(MessageData msgData) {
switch (msgData.getTopic()) {
case Publisher1.LOGIN_TOPIC:
// We know the user logged in and who he/she is:
UserInfo userInfo = (UserInfo)msgData.getBody();
break;
case Publisher1.LOGOUT_TOPIC:
// We know the user logged out...
break;
default:
// Unknown topic:
break;
}
}
}
Is this example, we want the publisher to call the send()
method that contains the jsonView
parameter. This jsonView
parameter allows the broker to send a subset of the body
data. Use this to reduce the message payload’s size.
In our example, there’s a class of type UserData, which contains the ID, first name, last name, email address, phone number, and misc notes fields.
In the sendPublicUserData()
method, only the "public" fields are transmitted (firstName and lastName).
In the sendPrivateUserData()
method, both the "public" and "private" fields are transmitted (all fields except miscNotes).
In the sendAllUserData()
method, all fields are transmitted (including miscNotes).
To see how each of these three sets of fields get defined, examine the UserData class, as well as the Views class with its Public and Private subclasses.
public class Publisher2 {
@Autowired
private MessageBroker messageBroker;
private UserData fetchUserData() {
return new UserData(13579, "Fred", "Flintstone",
"fred@oldschool.com", "BR-549", "Very good worker");
}
public void sendPublicUserData() {
// Only sends fields marked with "@JsonView(Views.Public.class)":
messageBroker.send("MySession", "MyTopic", fetchUserData(), Views.Public.class);
}
public void sendPrivateUserData() {
// Only sends fields marked with "@JsonView(Views.Public.class)"
// or "@JsonView(Views.Private.class)":
messageBroker.send("MySession", "MyTopic", fetchUserData(), Views.Private.class);
}
public void sendAllUserData() {
// Sends all fields in UserData class:
messageBroker.send("MyTopic", fetchUserData());
}
@Data // Lombok: provides all getters, setters, etc.
@AllArgsConstructor // Lombok: constructor using all fields
public static final class UserData {
//@formatter:off
@JsonView(Views.Private.class) private Integer id;
@JsonView(Views.Public.class) private String firstName;
@JsonView(Views.Public.class) private String lastName;
@JsonView(Views.Private.class) private String emailAddr;
@JsonView(Views.Private.class) private String phoneNumber;
/* no @JsonView here */ private String miscNotes;
//@formatter:on
}
private static class Views {
//@formatter:off
private static class Public {}
private static class Private extends Public {}
//@formatter:on
}
}
To go along with the above Publisher2, we now create a Subscriber2.
No matter what data is sent to this topic, Subscriber2 only wants four fields: firstName, lastName, phoneNumber, and age. These are defined in the RecvData class.
Take a look at the following subscriber code, then we’ll examine what data is received for each of the three publisher "send" methods described above.
public class Subscriber2 implements MessageBrokerCallback {
@Autowired
private MessageBroker messageBroker;
public void subscribeToMessage() {
// Subscribe to topic and only receive RecvData objects:
messageBroker.subscribe("MySession", "MyTopic", RecvData.class, this);
}
public void unsubscribeFromMessage() {
messageBroker.unsubscribe("MySession", "MyTopic");
}
@Data
private static final class RecvData {
private String firstName;
private String lastName;
private String phoneNumber;
private String miscNotes;
private Integer age;
}
@Override
public void onMessage(MessageData msgData) {
RecvData recvData = (RecvData)msgData.getBody();
log.info("Message received: {}", msgData);
}
}
This section examines what happens with each of the Publisher2 send()
methods are received by the Subscriber2 object.
The following fields are transmitted on each of the three Publisher2 "send" calls:
# method fields transmitted
----------------------------------------------------------------------------------------
1 sendPublicUserData() firstName, lastName
2 sendPrivateUserData() firstName, lastName, id, emailAddr, phoneNumber
3 sendAllUserData() firstName, lastName, id, emailAddr, phoneNumber, miscNotes
The following fields are desired by the Subscriber2 "onMessage()" callback, per the RecvData class:
firstName, lastName, phoneNumber, miscNotes, age
In case #1, the resultant RecvData object contains the firstName and lastName fields.
In case #2, the resultant RecvData object contains the firstName, lastName, and phoneNumber fields.
In case #3, the resultant RecvData object contains the firstName, lastName, phoneNumber, and miscNotes fields.
The "age" field in every case is null, as that value is never transmitted.
This example demonstrated how you can:
filter what data is transmitted, and
filter what data is received.
All descriptions to this point have dealt with local messaging; that is, the sending and receiving objects exist in the same JVM.
This section deals with remote messaging, where the sending and receiving objects are on different JVMs.
Remote subscriptions connect through the KOS routers.
In every KOS system, there is a BrokerRouterBridge that manages remote subscriptions, and the session for all subscriptions is the source address of the remote connection. This has a few implications.
First, you will never receive your own events since the bridge will use source address on all sends to the broker. This is usually what you want, so that’s not an issue.
Second, each remote client can only subscribe to a given topic once. As mentioned above, the bridge will simply log the fact that there are multiple subscriptions, but nothing inherently bad happens. The concern is that if a client is subscribing more than once and one of those code paths unsubscribes, the other is also unsubscribed. Clients should therefore manage internal subscriptions such that a single external subscription is created per topic of interest.
To create a remote subscription, the client sends a message to the target that it wants to receive events from:
type:kos.broker.subscribe\n
\n
["/my/topic/here"]\n
This message is shown with explicit newlines, which delineate the "header" from the "body". The type of the message is always kos.broker.subscribe
, and the body is a list of topics to subscribe to.
To unsubscribe, use a type of kos.broker.unsubscribe
.
As with all router-related functionality, if there is no "dst-addr" in the message, then the message is processed by the router you’re connected to. This is typically what you want.
Next, consider loading the UI on one JVM, but wanting to subscribe to events on another. For example, the UI loads from the Studio JVM, but then you connect to and want to process events from a KOS JVM. In this case, you simply need to set the dst-addr header to the KOS JVM and your subscription will be forwarded to it. Similarly, when events are generated, they will be routed back to the UI automatically.