Monitor your WebSockets and ThreadPools for Performance Tuning via JMX

Last week, after releasing Simple RealTime and having the traffic spike from Hacker News, I needed to setup some basic monitoring for performance tuning.

Even if there’s no silver bullet in performance tuning, there are some clear points you may want to look at like: concurrent connections, buffers and thread pool executors for inbound and outbound message channels (see the configuration and performance section for further information).

The following class (based on this) exposes to JMX the number of concurrent connections and the information of the ThreadPoolTaskExecutor used for outbound messages (modify as needed, including the inbound executor for instance):

@ManagedResource
public class WebSocketMessageBrokerStatsMonitor {

	private final Map<String, WebSocketSession> webSocketSessions;
	private final ThreadPoolExecutor outboundExecutor;

	@SuppressWarnings("unchecked")
	public WebSocketMessageBrokerStatsMonitor(SubProtocolWebSocketHandler webSocketHandler, ThreadPoolTaskExecutor outboundTaskExecutor) {
		this.webSocketSessions = (Map<String, WebSocketSession>) new DirectFieldAccessor(webSocketHandler).getPropertyValue("sessions");
		this.outboundExecutor  = outboundTaskExecutor.getThreadPoolExecutor();
	}

	@ManagedAttribute
	public int getCurrentSessions() {
		return webSocketSessions.size();
	}

	@ManagedAttribute
	public String getSendBufferSize() {
		int sendBufferSize = 0;
		for (WebSocketSession session : this.webSocketSessions.values()) {
			ConcurrentWebSocketSessionDecorator concurrentSession = (ConcurrentWebSocketSessionDecorator) session;
			sendBufferSize += concurrentSession.getBufferSize();
		}
		return formatByteCount(sendBufferSize);
	}

	@ManagedAttribute
	public int getOutboundPoolSize() {
		return outboundExecutor.getPoolSize();
	}

	@ManagedAttribute
	public int getOutboundLargestPoolSize() {
		return outboundExecutor.getLargestPoolSize();
	}

	@ManagedAttribute
	public int getOutboundActiveThreads() {
		return outboundExecutor.getActiveCount();
	}

	@ManagedAttribute
	public int getOutboundQueuedTaskCount() {
		return outboundExecutor.getQueue().size();
	}

	@ManagedAttribute
	public long getOutboundCompletedTaskCount() {
		return outboundExecutor.getCompletedTaskCount();
	}

	private static String formatByteCount(long bytes) {
		int unit = 1024;
		if (bytes < unit) return bytes + " B";
		int exp = (int) (Math.log(bytes) / Math.log(unit));
		return String.format("%.1f %sB", bytes / Math.pow(unit, exp), "KMGTPE".charAt(exp - 1));
	}

}

Define the WebSocketMessageBrokerStatsMonitor as a Spring managed bean:

@Bean
public WebSocketMessageBrokerStatsMonitor statsMonitor() {
	return new WebSocketMessageBrokerStatsMonitor((SubProtocolWebSocketHandler) subProtocolWebSocketHandler(), clientOutboundChannelExecutor());
}

Enable JMX annotations to expose our Spring managed beans by registering the AnnotationMBeanExporter:

@Bean
public AnnotationMBeanExporter annotationMBeanExporter() {
    return new AnnotationMBeanExporter();
}

And you are ready to go: open jConsole / VisualVM to see your live stats (remember to add -Dcom.sun.management.jmxremote as a VM argument):

jConsole WebSockets

Detecting WebSocket Connects and Disconnects in Spring 4

Presence detection is an essential feature in real-time applications like games, collaborative apps, chats… In order to do that, we should be able to detect when a client connects and disconnects.

STOMP frames

We’ll start by understanding how STOMP works. To establish a TPC connection the client sends a CONNECT frame similar to the following one:

CONNECT
company:1
accept-version:1.1,1.0
heart-beat:10000,10000

^@

If the server accepts the connection request, it will respond with a CONNECTED frame:

CONNECTED
heart-beat:10000,10000
session:session-l9cWyaqq8AwEXldiFE4Zdw
server:RabbitMQ/3.2.4
version:1.1

^@

In the same fashion, the client can terminate the connection by sending a DISCONNECT frame:

DISCONNECT

^@

Now that we know how STOMP works, let’s see how to detect these frames in Spring 4.

First approach: ChannelInterceptors

All STOMP messages are passed to message channels and can be intercepted with a channel interceptor. The support configuration classes provide two hooks in order to customize the inbound channel (for incoming STOMP messages like CONNECT / DISCONNECT) and the outbound channel (for outgoing STOMP messages like CONNECTED). Here’s our presence channel interceptor:


public class PresenceChannelInterceptor extends ChannelInterceptorAdapter {

	private final Log logger = LogFactory.getLog(PresenceChannelInterceptor.class);

	@Override
	public void postSend(Message<?> message, MessageChannel channel, boolean sent) {

		StompHeaderAccessor sha = StompHeaderAccessor.wrap(message);

		// ignore non-STOMP messages like heartbeat messages
		if(sha.getCommand() == null) {
			return;
		}

		String sessionId = sha.getSessionId();

		switch(sha.getCommand()) {
			case CONNECT:
				logger.debug("STOMP Connect [sessionId: " + sessionId + "]");
				break;
			case CONNECTED:
				logger.debug("STOMP Connected [sessionId: " + sessionId + "]");
				break;
			case DISCONNECT:
				logger.debug("STOMP Disconnect [sessionId: " + sessionId + "]");
				break;
			default:
				break;

		}
	}
}

By wrapping the message with the StompHeaderAccessor, we can access the message type (among many other properties like the headers of the STOMP message) and decide what to do accordingly. But you may wonder at this point how we correlate connection and disconnection messages. How do we know these belong to the same user? This is quite straightforward as every message has a sessionId which can be used to correlate them.

We just need to add the interceptor to the client inbound and outbound channel using the hooks we have available:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

	public void registerStompEndpoints(StompEndpointRegistry registry) {
		registry.addEndpoint("/ws").withSockJS();
	}

	@Bean
	public PresenceChannelInterceptor presenceChannelInterceptor() {
		return new PresenceChannelInterceptor();
	}

	@Override
	public void configureClientInboundChannel(ChannelRegistration registration) {
		registration.setInterceptors(presenceChannelInterceptor());
	}

	@Override
	public void configureClientOutboundChannel(ChannelRegistration registration) {
		registration.taskExecutor().corePoolSize(8);
		registration.setInterceptors(presenceChannelInterceptor());
	}

	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
		registry.enableSimpleBroker("/queue/", "/topic/");
		registry.setApplicationDestinationPrefixes("/app");
	}
}

A better approach: ApplicationEvents

Even if the previous approach works, there’s a better way to do it. By using ApplicationEvents, we are going to have a more decoupled solution with no extra configuration. We can implement listeners for the following events: SessionConnectEvent, SessionConnectedEvent, SessionDisconnectEvent (these events were introduced in Spring 4.0.3, released on 26/03/2014).

Here’s an example of a listener for the SessionConnectEvent:

public class StompConnectEvent implements ApplicationListener<SessionConnectEvent> {

	private final Log logger = LogFactory.getLog(StompConnectEvent.class);

	public void onApplicationEvent(SessionConnectEvent event) {
		StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());

		String  company = sha.getNativeHeader("company").get(0);
		logger.debug("Connect event [sessionId: " + sha.getSessionId() +"; company: "+ company + " ]");
	}
}

In the above example, we also show how to get access to custom STOMP headers that can be passed to the connect method in stomp.js:

        var socket = new SockJS('/ws');
        stompClient = Stomp.over(socket);

        stompClient.connect({company: "1"}, function(frame) {

         });

As you can see, using ApplicationEvents is a clean and decoupled way to detect connects and disconnects having access to all the required properties and sticking to the Spring programming model.

Track your WebSocket messages with Spring 4 and RabbitMQ

Spring 4 introduces WebSocket support with optional support for STOMP as subprotocol and transparent fallback options with SockJS.

When using STOMP over WebSockets a handy built-in message broker takes care of subscriptions and messages, everything in memory. But when it comes to scaling and clustering, going for a full-featured STOMP broker like RabbitMQ is a much better option.

The question is now: how can we track all these websocket messages?

During development is always useful to be able to see your messages. Luckily RabbitMQ has a firehose tracer and useful tracing UI plugin, an extension of the management plugin. Let’s start by enabling the plugin and tracing:

rabbitmq-plugins enable rabbitmq_tracing
rabbitmqctl trace_on [-n node] [-p vhost]

After restarting RabbitMQ, you should see a new tab called Tracing under the Admin menu.

RabbitMQ Tracing Plugin

Add a new trace, and you’ll start tracing your messages in a log file. Message format can be either text or JSON, this is an example of a text message in our log file:

==================================================================
2014-03-17 22:37:16: Message published

Node:         rabbit@localhost
Exchange:     amq.topic
Routing keys: [<<"company-1-status">>]
Properties:   [{<<"headers">>,table,
                [{<<"content-length">>,longstr,<<"37">>}]}]
Payload: 
{"username":"salmar","status":"busy"}

==================================================================
2014-03-17 22:37:16: Message received

Node:         rabbit@localhost
Exchange:     amq.topic
Queue:        amq.gen-LP763HzfjQWpRiVpcD2Tqg
Routing keys: [<<"company-1-status">>]
Properties:   [{<<"headers">>,table,
                [{<<"content-length">>,longstr,<<"37">>}]}]
Payload: 
{"username":"salmar","status":"busy"}