I am working on real-time event-based application using Spring WebSockets, Messaging and RabbitMQ. In this application, messages need to be delivered to clients in the exact order they were inserted into RabbitMQ.
"EDITED"
Our goal is to recieve a message from a browser, process it on the server in order (against a unique object determined by route paramteter), enrich the message and broadcast it to all subscribing browsers via external STOMP MQ (RabbitMQ).
Our MessageMapping method is as follows:
@MessageMapping(/commands.{route}.{data})
public CommandMessage receiveCommand(CommandMessage message, Principal principal) {
try {
// Get object to synch on using route
Object o = ...
syncrhonized(o) {
// Perform command on object
// Set message server sequence
message.setServerSequence(o.getAutoIncrementSequence());
// Log server sequence
log.debug("Message server sequence:" + message.getServerSequence());
// Send to external MQ for broadcasting to all subscribers
return message;
}
} catch (Exception e) {
...
}
return null;
}
If we configure the ClientInboundChannel and ClientOutboundChannels with a corePoolSize and maxPoolSize of 1 each, all messages are in order.
If we increase the corePoolSize and maxPoolSize on the ClientInboundChannel, messages get to MQ in the incorrect order; if we do the same increase for ClientOutboundChannel, messages get to browser in incorrect order.
]
Tests were done using a single browser client.
We turned on tracing for StompBrokerRelayMessageHandler and received logs entries like:
2014-05-06 14:26:39 TRACE [clientInboundChannel-6] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] Processing message=[Payload byte[303]][Headers={stompCommand=SEND, nativeHeaders={content-type=[application/json;charset=UTF-8], destination=[/topic/commands.BKN01.20140318]}, simpMessageType=MESSAGE, simpDestination=/topic/commands.BKN01.20140318, contentType=application/json;charset=UTF-8, simpSessionId=ehjcoxb3, id=a31d0e3d-12cc-f562-1ec2-e2d7ba0899eb, timestamp=1399400799940}]
2014-05-06 14:26:39 DEBUG [clientInboundChannel-6] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:658] Forwarding message to broker
2014-05-06 14:26:39 TRACE [clientInboundChannel-3] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] Ignoring message to destination=/app/commands.BKN01.20140318
2014-05-06 14:26:39 TRACE [clientInboundChannel-7] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] Ignoring message to destination=/app/commands.BKN01.20140318
2014-05-06 14:26:39 TRACE [clientInboundChannel-1] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] Processing message=[Payload byte[314]][Headers={stompCommand=SEND, nativeHeaders={content-type=[application/json;charset=UTF-8], destination=[/topic/commands.BKN01.20140318]}, simpMessageType=MESSAGE, simpDestination=/topic/commands.BKN01.20140318, contentType=application/json;charset=UTF-8, simpSessionId=ehjcoxb3, id=3cc7b4ae-8ea4-ef8a-6c4d-c3bc1ed23bcd, timestamp=1399400799947}]
2014-05-06 14:26:39 DEBUG [clientInboundChannel-1] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:658] Forwarding message to broker
We also turned tracing on for StompSubProtocolHandler in the org.springframework.web.socket.messaging package and received messages like:
2014-05-07 10:58:58 TRACE [http-nio-8080-exec-5] o.s.w.s.m.StompSubProtocolHandler [StompSubProtocolHandler.java:180] Received message from client session=u8wrnsr6
None of the information provides an easy way to map our message.serverSequence property (which is set prior to sending to MQ) with the various ids the log details.
Is there any way to increase inbound/outbound channel threads so ordering is intact? For example, could the channels be tied to a "route" or could a thread be pegged to a "route"?
Please help.
Thank you,
Dan