Spring Message Channel Design

Ryan Zheng
4 min readJul 14, 2020

The Channel here we talked about is different from the Channel in Java NIO. NIO Channel is a wrapper around file descriptors. The Channel here is about message sending.

Spring Channel is the fundamental building block of Spring Messaging. Channel is a way to send messages. Here send means literally “send”. That’s why the very top interface of Spring Channel has only send method.

Since MessageChannel is a way to send messages, so who is invoking the send method?

The answer is some other consumer is using the channel to send messages.

But we care about how to connect channels together, pipe the messages down the stream. Since the send method of the channel is used to send a message. Then we could just call another channel’s send method inside the current channel to link the channels together. Dummy implementation looks like this

class FirstChannel implements MessageChannel {
send(Message message) {
SecondChannel downStream = new SecondChannel();
downStream.send(message)
}
}
class SecondChannel implements MessageChannel {
send(Message message) {
//dummy
}
}

But this is too simple. Maybe different channels are responsible for different message formats. So in the first channel, we might want to process or transform the message before sending it to the second channel. We could create a MessageHandler and call this handler inside the channel send method. So let’s create another interface for MessageHandler

interface MessageHandler {
handleMessage(Messsage message)
}

Different implementations of message handlers are dependent on business needs. It’s easy to see that MessageHandlers should exist inside the Channel.

class FirstChannel implements MessageChannel {
List<MessageHandler> handlers;
...
}

In object-oriented programming, we want to decouple message handlers with the channel and give the user the ability to dynamically configure what handlers the channel could use. So we could actually create a register method on the handler side, take the channel as an input parameter.

class DummyHandler implements MessageHandler {
registerChannel(MessageChannel channel) {
channel.addHandlers(this) --> add handler to channel
}

handleMessage(Message message) {
...
}
}

Now we enabled to register message handlers with channels. We could loop through each handler to process the message inside the channel.

class FirstChannel implements MessageChannel {
List<MessageHandler> handlers;

send(Message message) {
handlers.forEach(handler -> { --> foreach handler
handler.handleMessage(message)
})
}
}

But how to send the transformed messages down the stream? The answer is We could call another channel’s send method inside the MessageHandler’s handleMessage method.

class DummyHandler implements MessageHandler {
MessageChannel outputChannel;

DummyHandler(MessageChannel outputChannel) {this.outputChannel = outputChannel}
registerChannel(MessageChannel channel) {
channel.addHandlers(this)
}

handleMessage(Message message) {
/some message transformation might happen here*/ /*call output channel to pipe the changed message another channel */
outputChannel.send(message);
}
}

Now we have two problems to address

(1)What if the handler method of a certain MessageHandler is very computation heavy?

(2)We might have two different message handlers, HandlerB is not working on the result of HandlerA. They both work on the same message, but no relationship at all. They could be working in parallel.

The Solution

We need to use a separate thread to handle the message. How to design this? One way is to make each MessageHandler also extends Runnable.

interface MessageHandler extends Runnable { --> Runnable has run()
handleMessage(Message message);
}

Then we have to call the handleMessage inside the run() method. In order to start a separate thread, we could put one Executor inside the Channel implementation to start a separate thread for each handler.

class FirstChannel implements MessageChannel {
ExecutorService executor;
List<MessageHandler> handlers;

send(Message message) {
handlers.forEach(handler -> {
executor(handler.run()); --> here
})
}
}

Now let’s look at the original Spring implementation, how it corresponds to what we have described so far.

MessageHandlingRunnable.java

MessageHandlingRunnable.java    --> Handler extends Runnablepublic interface MessageHandlingRunnable extends Runnable {
Message<?> getMessage();

MessageHandler getMessageHandler();
}

ExecutorSubscribableChannel.java

/*we have simplified the file, but the original file looks pretty much like this. The subscribers here are message handlers. It's just different names*/public boolean sendInternal(Message<?> message, long timeout) {
subscribers.foreach(subscriber -> {
ExecutorSubscribableChannel.SendTask sendTask = new ExecutorSubscribableChannel.SendTask(message, handler);
this.executor.execute(sendTask); --> Executor run SendTask
}
})
}
/*SendTask is one inner class which implements run method*/class SendTask implements MessageHandlingRunnable {
@Override
public void run() {
this.messageHandler.handleMessage(message);
}
}

Now I want to make some corrections, so far all the above design corresponds to ExecutorSubscribableChannel. But it already gives us a general design of how Spring channels could be connected together by handlers. And also shows us how the channel could use different threading models to send the message. There are other types of channels like PollableChannel, QueueChannel. But the general design is the same, using handlers to process the message and pipe the messages down the stream.

If there is anything incorrect, please leave a comment. My next topic will be describing the threading models of Rabbitmq.

--

--

Ryan Zheng

I am a software developer who is keen to know how things work