Spring Boot Reactive with RSocket

Ihor Kosandiak
4 min readAug 19, 2021

Spring 5 has introduced us many novice features. In addition to reactive programming we’ve a super feature called RSocket. Here we will speak about how to add RSocket communication to your reactive Spring Boot application and what this gives to your application.

RSocket is a new messaging protocol that’s designed to solve some common communication challenges. It gives you modern controls like multiplexing, back-pressure, resumption, and routing, and you get multiple messaging modes including fire-and-forget, request-response, and streaming. RSocket is fully reactive, so it’s ideal for your high-throughput applications.

So to start off we need to create simple Spring Boot application and add just two dependencies to your pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Next would be enabling support for RSocket by adding small configurations to our application.yml file and creating a config file with setup

spring:
rsocket:
server:
mapping-path: /rsocket
transport: websocket

Also let us create RSocketConfiguration.class that will keep all the configurations for RSocket communication.

@Configuration
public class RSocketConfiguration {

@Bean
public Mono<RSocketRequester> rSocketRequester(
RSocketStrategies rSocketStrategies,
RSocketProperties rSocketProps) {
return RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.dataMimeType(MediaType.APPLICATION_JSON)
.metadataMimeType(MediaType.APPLICATION_JSON)
.connectWebSocket(getURI(rSocketProps));
}


private URI getURI(RSocketProperties rSocketProps) {
return URI.create(String.format("ws://localhost:%d%s", 8080, rSocketProps.getServer().getMappingPath()));
}
}

Also we need to create a kind of a publisher that will take care of streaming data to our connected clients. So we can also do this by creating Spring Beans that can be than used anywhere in the app to access data stream.

@Configuration
public class RSocketStreamSender {

@Bean
public Sinks.Many<String> sink(){
return Sinks.many().replay().latest();
}

@Bean
public Flux<String> streamFlux(Sinks.Many<String> sink){
return sink.asFlux();
}

}

Here we defined a bean sink() and by accessing it we will be able to push any String value to our stream.

Also we created streamFlux() and so it will be available in any place of your application too, and when you would subscribe to this stream — you will be able to receive data via RSocket.

Let’s imagine the case when we need to stream to every second, and users that will be subscribing to our RSocket server need to receive messages at the same time. So it should be something like a common channel that pushes updates to clients.

Let’s do this via creating OnStartup class that will implement CommandLineRunner interface and will start streaming data at the application startup.

@Component
public class OnStartup implements CommandLineRunner {

private final Sinks.Many<String> sink;

@Autowired
public OnStartup(Sinks.Many<String> sink) {
this.sink = sink;
}

@Override
public void run(String... args) throws Exception {
Flux.interval(Duration.ofSeconds(2))
.subscribe(e -> {
this.sink.tryEmitNext(
"Hello from server. Current time is: " + System.currentTimeMillis()
);
});
}
}

So here we said to send messages every 2 seconds, with text that will show current system time in milliseconds.

And the final step of server setup is to add a controller, that would expose available RSocket channels and will add ability for users to subscribe to channels and receive data streams.

@Controller
public class RSocketController {

private final Flux<String> stream;

@Autowired
public RSocketController(final Flux<String> stream) {
this.stream = stream;
}

@MessageMapping("my.time-updates.stream")
public Flux<String> getTimeUpdatesStream() {
return stream;
}
}

So we exposed my.time-updates.stream and by subscribing to this stream users will receive text messages every 2 seconds from the server via RSocket.

For this example let us add small javascript code for connecting and subscribing to the exposed channel and receiving messages.

import { Component, OnInit } from '@angular/core';
import { IdentitySerializer, JsonSerializer, RSocket, RSocketClient } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
import { Subject } from 'rxjs';

@Component({
selector: 'app-auth-container',
templateUrl: './auth-container.component.html',
styleUrls: ['./auth-container.component.scss']
})
export class DemoComponent implements OnInit {

client: RSocketClient;
channel = 'my.time-updates.stream';

constructor() {}

ngOnInit(): void {

this.client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
keepAlive: 60000,
lifetime: 180000,
dataMimeType: 'application/json',
metadataMimeType: 'message/x.rsocket.routing.v0',
},
transport: new RSocketWebSocketClient({
url: 'ws://localhost:8080/rsocket'
}),
});

this.client.connect().subscribe({
onComplete: (socket: RSocket) => {
socket
.requestStream({
data: null,
metadata: String.fromCharCode(this.channel.length) + this.channel
})
.subscribe({
onComplete: () => console.log('complete'),
onError: (error: string) => {
console.log('Connection has been closed due to:: ' + error);
},
onNext: (payload: { data: any; }) => {
console.log(payload);
},
onSubscribe: (subscription: { request: (arg0: number) => void; }) => {
subscription.request(1000000);
},
});
},
onError: (error: string) => {
console.log('Connection has been refused due to:: ' + error);
},
onSubscribe: () => {}
});
}
}

Here we used Angular example, and when we run both Spring Boot and Angular application and open browser we will see the following:

So as you see, browsers receives messages every 2 seconds that are sent from our reactive spring boot server via RSocket protocol. And that’s great!

To sum up, in this guide we learned how to add RSocket support into Spring Boot Reactive Webflux application and how to subscribe to common data channel that will deliver data to every connected client at the same time.

So thank you for your time and stay tuned, because in the following guides we’ll speak about adding security to RSocket communication with JWT and extending our functionality.

--

--

Ihor Kosandiak

Java Software Developer. Passionate about new technologies. Sharing own experience with others.