About us
Our services

Capabilities

Legacy Modernization
Data Platforms
AI & Advanced Analytics

Industries

Automotive
Finance
Manufacturing

Solutions

Databoostr

Data Sharing & Monetization Platform

Cloudboostr

Multicloud Enterprise Kubernetes

Looking for something else?

Contact us for tailored solutions and expert guidance.

Contact
Case studies
Resources

Resources

Blog

Read our blog and stay informed about the industry’s latest trends and technology.

Ready to find your breaking point?

Stay updated with our newsletter.

Subscribe

Insights

Ebooks

Explore our resources and learn about building modern software solutions from experts and practitioners.

Read more
Careers
Contact
Blog
Software development

Reactive service to service communication with RSocket – load balancing & resumability

Grape up Expert
November 24, 2025
•
5 min read

Table of contents

Heading 2
Heading 3
Heading 4
Heading 5
Heading 6

Schedule a consultation with software experts

Contact us

This article is the second one of the mini-series which will help you to get familiar with RSocket – a new binary protocol which may revolutionize machine to machine communication in distributed systems. In the following paragraphs, we will discuss the load balancing problem in the cloud as well as we will present the resumability feature which helps to deal with network issues, especially in the IoT systems.

  • If you are not familiar with RSocket basics, please see the previous article available here
  • Please notice that code examples presented in the article are available at GitHub

High availability & load balancing as a crucial part of enterprise-grade systems

Applications availability and reliability are crucial parts of many business areas like banking and insurance. In these demanding industries, the services have to be operational 24/7 even during high traffic, periods of increased network latency or natural disasters. To ensure that the software is always available to the end-users it is usually deployed in redundantly, across the multiple availability zones.

In such a scenario, at least two instances of each microservice are deployed in at least two availability zones. This technique helps our system become resilient and increase its capacity - multiple instances of the microservices are able to handle a significantly higher load. So where is the trick? The redundancy introduces extra complexity. As engineers, we have to ensure that the incoming traffic is spread across all available instances. There are two major techniques which address this problem: server load balancing and client load balancing .

The first approach is based on the assumption that the requester does not know the IP addresses of the responders. Instead of that, the requester communicates with the load balancer, which is responsible for spreading the requests across the microservices connected to it. This design is fairly easy to adopt in the cloud era. IaaS providers usually have built-in, reliable solutions, like Elastic Load Balancer available in Amazon Web Services. Moreover, such a design helps develop routing strategy more sophisticated than plain round ribbon (e.g. adaptive load balancing or chained failover ). The major drawback of this technique is the fact that we have to configure and deploy extra resources, which may be painful if our system consists of hundreds of the microservices. Furthermore, it may affect the latency – each request has extra “network hop” on the load balancer.

The second technique inverts the relation. Instead of a central point used to connect to responders, the requester knows IP addresses of each and every instance of the given microservice. Having such knowledge, the client can choose the responder instance to which it sends the request or opens the connection with. This strategy does not require any extra resources, but we have to ensure that the requester has the IP addresses of all instances of the responder ( see how to deal with it using service discovery pattern ). The main benefit of the client load balancing pattern is its performance – by reduction of one extra “network hop”, we may significantly decrease the latency. This is one of the key reasons why RSocket implements the client load balancing pattern.

Client load balancing in RSocket

On the code level, the implementation of the client load balancing in RSocket is pretty straightforward. The mechanism relies on the LoadBalancedRSocketMono object which works as a bag of available RSocket instances, provided by RSocket supplier. To access RSockets we have to subscribe to the LoadBalancedRSocketMono which onNext signal emits fully-fledged RSocket instance. Moreover, it calculates statistics for each RSocket, so that it is able to estimate the load of each instance and based on that choose the one with the best performance at the given point of time.

The algorithm takes into account multiple parameters like latency, number of maintained connections as well as a number of pending requests. The health of each RSocket is reflected by the availability parameter – which takes values from 0 to 1, where 0 indicates that the given instance cannot handle any requests and 1 is assigned to fully operational socket. The code snippet below shows the very basic example of the load-balanced RSocket, which connects to three different instances of the responder and executes 100 requests. Each time it picks up RSocket from the LoadBalancedRSocketMono object.

@Slf4j

public class LoadBalancedClient {



static final int[] PORTS = new int[]{7000, 7001, 7002};



public static void main(String[] args) {



List rsocketSuppliers = Arrays.stream(PORTS)

.mapToObj(port -> new RSocketSupplier(() -> RSocketFactory.connect()

.transport(TcpClientTransport.create(HOST, port))

.start()))

.collect(Collectors.toList());



LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono.create((Publisher>) s -> {

s.onNext(rsocketSuppliers);

s.onComplete();

});



Flux.range(0, 100)

.flatMap(i -> balancer)

.doOnNext(rSocket -> rSocket.requestResponse(DefaultPayload.create("test-request")).block())

.blockLast();

}



}

It is worth noting, that client load balancer in RSocket deals with dead connections as well. If any of the RSocket instances registered in the LoadBalancedRSocketMono stop responding, the mechanism will automatically try to reconnect. By default, it will execute 5 attempts, in 25 seconds. If it does not succeed, the given RSocket will be removed from the pool of available connections. Such design combines the advantages of the server-side load balancing with low latency and reduction of “network hops” of the client load balancing.

Dead connections & resumabilty mechanism

The question which may arise in the context of dead connections is: what will happen if I have an only single instance of the responder and the connection drops due to network issues. Is there anything we can do with this? Fortunately, RSocket has built-in resumability mechanism.

To clarify the concept let’s consider the following example. We are building an IoT platform which connects to multiple temperature sensors located in different places. Most of them in the distance to the nearest buildings and internet connection sources. Therefore, the devices connect to cloud services using GPRS. The business requirement for our system is that we need to collect temperature readings every second in the real-time, and we cannot lose any data.

In case of the machine-to-the machine communication within the cloud, streaming data in real-time is not a big deal, but if we consider IoT devices located in areas without access to a stable, reliable internet connection, the problem becomes more complex. We can easily identify two major issues we may face in such a system: the network latency and connection stability . From a software perspective, there is not much we can do with the first one, but we can try to deal with the latter. Let’s tackle the problem with RSocket, starting with picking up the proper interaction model . The most suitable in this case is request stream method, where the microservice deployed in the cloud is the requester and temperature sensor is the responder. After choosing the interaction model we apply resumability mechanism. In RSocket, we do it by method resume() invoked on the RSocketFactory , as shown in the examples below:

@Slf4j

public class ResumableRequester {



private static final int CLIENT_PORT = 7001;



public static void main(String[] args) {

RSocket socket = RSocketFactory.connect()

.resume()

.resumeSessionDuration(RESUME_SESSION_DURATION)

.transport(TcpClientTransport.create(HOST, CLIENT_PORT))

.start()

.block();

socket.requestStream(DefaultPayload.create("dummy"))

.map(payload -> {

log.info("Received data: [{}]", payload.getDataUtf8());

return payload;

})

.blockLast();



}

}

@Slf4j

public class ResumableResponder {



private static final int SERVER_PORT = 7000;

static final String HOST = "localhost";

static final Duration RESUME_SESSION_DURATION = Duration.ofSeconds(60);





public static void main(String[] args) throws InterruptedException {



RSocketFactory.receive()

.resume()

.resumeSessionDuration(RESUME_SESSION_DURATION)

.acceptor((setup, sendingSocket) -> Mono.just(new AbstractRSocket() {

@Override

public Flux requestStream(Payload payload) {

log.info("Received 'requestStream' request with payload: [{}]", payload.getDataUtf8());

return Flux.interval(Duration.ofMillis(1000))

.map(t -> DefaultPayload.create(t.toString()));

}

}))

.transport(TcpServerTransport.create(HOST, SERVER_PORT))

.start()

.subscribe();

log.info("Server running");



Thread.currentThread().join();

}



}

  • Please notice that to run provided examples, you need ‘socat’ installed on your machine, please see README file for more details

The mechanism on the requester and responder side works similarly, it is based on a few components. First of all, there is a ResumableFramesStore which works as a buffer for the frames. By default, it stores them in the memory, but we can easily adjust it to our needs by implementing the ResumableFramesStore interface (e.g. store the frames in the distributed cache, like Redis). The store saves the data emitted between keep alive frames, which are sent back and forth periodically and indicates, if the connection between the peers is stable. Moreover, the keep alive frame contains the token, which determines Last received position for the requester and the responder. When the peer wants to resume the connection, it sends the resume frame with an implied position . The implied position is calculated from last received position (is the same value we have seen in the Keep Alive frame) plus the length of the frames received from that moment. This algorithm is applied to both parties of the communication, in the resume frame is it reflected by last received server position and first client available position tokens. The whole flow for resume operation is shown in the diagram below:

By adopting the resumability mechanism built in the RSocket protocol, with the relatively low effort we can reduce the impact of the network issues. Like shown in the example above, the resumability might be extremely useful in the data streaming applications, especially in the case of the device to the cloud communication.

Summary

In this article, we discussed more advanced features of the RSocket protocol, which are helpful in reducing the impact of the network on the system operationality. We covered the implementation of the client load balancing pattern and resumability mechanism. These features, combined with the robust interaction model constitutes the core of the protocol.

In the last article of this mini-series , we will cover available abstraction layers on top of the RSocket.

  • Please notice that fully working examples are provided here →

Grape Up guides enterprises on their data-driven transformation journey

Ready to ship? Let's talk.

Check our offer
Blog

Check related articles

Read our blog and stay informed about the industry's latest trends and solutions.

Software development

Reactive service to service communication with RSocket – introduction

This article is the first one of the mini-series which will help you to get familiar with RSocket – a new binary protocol which may revolutionize machine-to-machine communication. In the following paragraphs, we discuss the problems of the distributed systems and explain how these issues may be solved with RSocket. We focus on the communication between microservices and the interaction model of RSocket.

  • Please notice that code examples presented in the article are available on GitHub →

Communication problem in distributed systems

Microservices are everywhere, literally everywhere. We went through the long journey from the monolithic applications, which were terrible to deploy and maintain, to the fully distributed, tiny, scalable microservices. Such architecture design has many benefits; however, it also has drawbacks, worth mentioning. Firstly, to deliver value to the end customers, services have to exchange tons of data. In the monolithic application that was not an issue, as the entire communication occurred within a single JVM. In the microservice architecture, where services are deployed in the separate containers and communicate via an internal or external network, networking is a first-class citizen. Things get more complicated if you decide to run your applications in the cloud, where network issues and periods of increased latency is something you cannot fully avoid. Rather than trying to fix network issues, it is better to make your architecture resilient and fully operational even during a turbulent time.

Let’s dive a bit deeper into the concept of the microservices, data, communication and the cloud. As an example, we will discuss the enterprise-grade system which is accessible through a website and mobile app as well as communicates with small, external devices (e.g home heater controller). The system consists of multiple microservices, mostly written in Java and it has a few Python and node.js components. Obviously, all of them are replicated across multiple availability zones to assure that the whole system is highly available.

To be IaaS provider agnostic and improve developer experience the applications are running on top of PaaS. We have a wide range of possibilities here: Cloud Foundry, Kubernetes or both combined in Cloudboostr are suitable. In terms of communication between services, the design is simple. Each component exposes plain REST APIs – as shown in the diagram below.

At first glance, such an architecture does not look bad. Components are separated and run in the cloud – what could go wrong? Actually, there are two major issues – both of them related to communication.

The first problem is the request/response interaction model of HTTP. While it has a lot of use cases, it was not designed for machine to machine communication. It is not uncommon for the microservice to send some data to another component without taking care about the result of the operation (fire and forget) or stream data automatically when it becomes available (data streaming). These communication patterns are hard to achieve in an elegant, efficient way using a request/response interaction model. Even performing simple fire and forget operation has side effects – the server has to send a response back to the client, even if the client is not interested in processing it.

The second problem is the performance. Let’s assume that our system is massively used by the customers, the traffic increases, and we have noticed that we are struggling to handle more than a few hundred requests per second. Thanks to the containers and the cloud, we are able to scale up our services with ease. However, if we track resource consumption a bit more, we will notice that while we are running out of memory, the CPUs of our VMs are almost idle. The issue comes from the thread per request model usually used with HTTP 1.x, where every single request has its own stack memory. In such a scenario, we can leverage the reactive programming model and non-blocking IO. It will significantly cut down memory usage, nevertheless, it will not reduce the latency. HTTP 1.x is a text-based protocol thus size of data that need to be transferred is significantly higher than in the case of binary protocols.

In the machine to machine communication we should not limit ourselves to HTTP (especially 1.x), its request/response interaction model and poor performance. There are many more suitable and robust solutions out there (on the market). Messaging based on the RabbitMQ, gRPC or even HTTP 2 with its support for multiplexing and binarized payloads will do way better in terms of performance and efficiency than plain HTTP 1.x.

Using multiple protocols allow us to link the microservices in the most efficient and suitable way in a given scenario. However, the adoption of multiple protocols forces us to reinvent the wheel again and again. We have to enrich our data with extra information related to security and create multiple adapters which handle translation between protocols. In some cases, transportation requires external resources (brokers, services, etc.) which need to be highly available. Extra resources entail extra costs, even though all we need is simple, message-based fire and forget operation. Besides, a multitude of different protocols may introduce serious problems related to application management, especially if our system consists of hundreds of microservices.

The issues mentioned above are the core reasons why RSocket was invented and why it may revolutionize communication in the cloud. By its reactiveness and built-in robust interaction model, RSocket may be applied in various business scenarios and eventually unify the communication patterns that we use in the distributed systems.

RSocket to the rescue

RSocket is a new, message-driven, binary protocol which standardizes the approach to communication in the cloud. It helps to resolve common application concerns with a consistent manner as well as it has support for multiple languages (e.g java, js, python) and transport layers (TCP, WebSocket, Aeron).
In the following sections, we will dive deeper into protocol internals and discuss the interaction model.

Framed and message-driven

Interaction in RSocket is broken down into frames. Each frame consists of a frame header which contains the stream id, frame type definition and other data specific to the frame type. The frame header is followed by meta-data and payload – these parts carry data specified by the user.

There are multiple types of frames which represent different actions and available methods of the interaction model. We’re not going to cover all of them as they are extensively described in the official documentation (http://rsocket.io/docs/Protocol). Nevertheless, there are few which are worth noting. One of them is the Setup Frame which the client sends to the server at the very beginning of the communication. This frame can be customized so that you can add your own security rules or other information required during connection initialization. It should be noted that RSocket does not distinguish between the client and the server after the connection setup phase. Each side can start sending the data to the other one – it makes the protocol almost entirely symmetrical.

Performance

The frames are sent as a stream of bytes. It makes RSocket way more efficient than typical text-based protocols. From a developer perspective, it is easier to debug a system while JSONs are flying back and forth through the network, but the impact on the performance makes such convenience questionable. The protocol does not impose any specific serialization/deserialization mechanism, it considers the frame as a bag of bits which could be converted to anything. That makes possible to use JSON serialization or more efficient solutions like Protobuf or AVRO.

The second factor, which has a huge impact on RSocket performance is the multiplexing. The protocol creates logical streams (channels) on the top of the single physical connection. Each stream has its unique ID which, to some extent, can be interpreted as a queue we know from messaging systems. Such design deals with major issues known from HTTP 1.x – connection per request model and weak performance of “pipelining”. Moreover, RSocket natively supports transferring of the large payloads. In such a scenario the payload frame is split into several frames with an extra flag – the ordinal number of the given fragment.

Reactiveness & Flow Control

RSocket protocol fully embraces the principles stated in the Reactive Manifesto . Its asynchronous character and thrift in terms of the resources helps decrease the latency experienced by the end users and costs of the infrastructure. Thanks to streaming we don’t need to pull data from one service to another, instead, the data is pushed when it becomes available. It is an extremely powerful mechanism, but it might be risky as well. Let’s consider a simple scenario: in our system, we are streaming events from service A to service B. The action performed on the receiver side is non-trivial and require some computation time. If service A pushes events faster than B is able to process them, eventually, B will run out of resources – the sender will kill the receiver. Since RSocket uses the reactor, it has built-in support for the flow control , which helps to avoid such situations.

We can easily provide the backpressure mechanism implementation, adjusted to our needs. The receiver can specify how much data it would like to consume and will not get more than that until it notifies the sender that it is ready to process more. On the other hand, to limit the number of incoming frames from the requester, RSocket implements a lease mechanism. The responder can specify how many requests requester may send within a defined time frame.

The API

As mentioned in the previous section, RSocket uses Reactor, so that on the API level we are mainly operating on Mono and Flux objects. It has full support for reactive signals as well – we can easily implement “reaction” on different events – onNext, onError, onClose, etc.

The following paragraphs will cover the API and each and every interaction option available in RSocket. The discussion will be backed with the code snippets and the description for all the examples. Before we jump into the interaction model, it is worth describing the API basics, as it will come up in the multiple code examples.

Setting up the connection with RSocketFactory

Setting up the RSocket connection between the peers is fairly easy. The API provides factory (RSocketFactory) with factory methods receive and connect to create RSocket and CloseableChannel instances on the client and the server side respectively. Second common property present in both parties of the communication (the requester and the responder) is a transport. RSocket can use multiple solutions as a transport layer (TCP, WebSocket, Aeron). Whichever you choose the API provides the factory methods which allows you to tweak and tune the connection.

RSocketFactory.receive()

.acceptor(new HelloWorldSocketAcceptor())

.transport(TcpServerTransport.create(HOST, PORT))

.start()

.subscribe();

RSocketFactory.connect()

.transport(TcpClientTransport.create(HOST, PORT))

.start()

.subscribe();

Moreover, in the case of the responder, we have to create a socket acceptor instance. The SocketAcceptor is an interface which provides the contract between the peers. It has a single method accept which accepts the RSocket for sending requests and returns an instance of RSocket that will be used for handling the requests from the peer. Besides providing the contract the SocketAcceptor enables us to access the setup frame content. On the API level, it is reflected by ConnectionSetupPayload object.

public interface SocketAcceptor {

Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket);

}

As shown above, setting up the connection between the peers is relatively easy, especially for those of you who worked with WebSockets previously – in terms of the API both solutions are quite similar.

Interaction model

After setting up the connection we are able to move on to the interaction model. RSocket supports following operations:

The fire and forget , as well as the metadata push , were designed to push the data from the sender to the receiver. In both scenarios the sender does not care about the result of the operation – it is reflected on API level in a return type (Mono). The difference between these actions sits in the frame. In case of fire and forget the fully-fledged frame is sent to the receiver, while for the metadata push action the frame does not have payload – it consists only of the header and the metadata. Such a lightweight message can be useful in sending notifications to the mobile or peer-to-peer communication of IoT devices.

RSocket is also able to mimic HTTP behavior. It has support for request-response semantics, and probably that will be the main type of interaction you are going to use with RSocket. In streams context, such an operation can be represented as a stream which consists of the single object. In this scenario, the client is waiting for the response frame, but it does it in a fully non-blocking manner.

More interesting in the cloud applications are the request stream and the request channel interactions which operate on the streams of data, usually infinite. In case of the request stream operation, the requester sends a single frame to the responder and gets back the stream of data. Such interaction method enables services to switch from the pull data to the push data strategy. Instead of sending periodical requests to the responder requester can subscribe to the stream and react on the incoming data – it will arrive automatically when it becomes available.

Thanks to the multiplexing and the bi-directional data transfer support, we can go a step further using the request channel method. RSocket is able to stream the data from the requester to the responder and the other way around using a single physical connection. Such interaction may be useful when the requester updates the subscription – for example, to change the subscription criteria. Without the bi-directional channel, the client would have to cancel the stream and re-request it with the new parameters.

In the API, all operations of the interaction model are represented by methods of RSocket interface shown below.

public interface RSocket extends Availability, Closeable {



Mono<Void> fireAndForget(Payload payload);



Mono<Payload> requestResponse(Payload payload);



Flux<Payload> requestStream(Payload payload);



Flux<Payload> requestChannel(Publisher<Payload> payloads);



Mono<Void> metadataPush(Payload payload);



}

To improve the developer experience and avoid the necessity of implementing every single method of the RSocket interface, the API provides abstract AbstractRSocket we can extend. By putting the SocketAcceptor and the AbstractRSocket together, we get the server-side implementation, which in the basic scenario may look like this:

@Slf4j

public class HelloWorldSocketAcceptor implements SocketAcceptor {



@Override

public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {

log.info("Received connection with setup payload: [{}] and meta-data: [{}]", setup.getDataUtf8(), setup.getMetadataUtf8());

return Mono.just(new AbstractRSocket() {

@Override

public Mono<Void> fireAndForget(Payload payload) {

log.info("Received 'fire-and-forget' request with payload: [{}]", payload.getDataUtf8());

return Mono.empty();

}



@Override

public Mono<Payload> requestResponse(Payload payload) {

log.info("Received 'request response' request with payload: [{}] ", payload.getDataUtf8());

return Mono.just(DefaultPayload.create("Hello " + payload.getDataUtf8()));

}



@Override

public Flux<Payload> requestStream(Payload payload) {

log.info("Received 'request stream' request with payload: [{}] ", payload.getDataUtf8());

return Flux.interval(Duration.ofMillis(1000))

.map(time -> DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now()));

}



@Override

public Flux<Payload> requestChannel(Publisher<Payload> payloads) {

return Flux.from(payloads)

.doOnNext(payload -> {

log.info("Received payload: [{}]", payload.getDataUtf8());

})

.map(payload -> DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now()))

.subscribeOn(Schedulers.parallel());

}



@Override

public Mono<Void> metadataPush(Payload payload) {

log.info("Received 'metadata push' request with metadata: [{}]", payload.getMetadataUtf8());

return Mono.empty();

}

});

}

}

On the sender side using the interaction model is pretty simple, all we need to do is invoke a particular method on the RSocket instance we have created using RSocketFactory, e.g.

socket.fireAndForget(DefaultPayload.create("Hello world!"));

  • For more examples of the methods available in the RSocket interaction model please visit GitHub →

More interesting on the sender side is the implementation of the backpressure mechanism. Let’s consider the following example of the requester side implementation:

public class RequestStream {



public static void main(String[] args) {



RSocket socket = RSocketFactory.connect()

.transport(TcpClientTransport.create(HOST, PORT))

.start()

.block();

socket.requestStream(DefaultPayload.create("Jenny", "example-metadata"))

.subscribe(new BackPressureSubscriber());



socket.dispose();

}



@Slf4j

private static class BackPressureSubscriber implements Subscriber<Payload> {



private static final Integer NUMBER_OF_REQUESTED_ITEMS = 5;

private Subscription subscription;

int receivedItems;



@Override

public void onSubscribe(Subscription s) {

this.subscription = s;

subscription.request(NUMBER_OF_REQUESTED_ITEMS);

}



@Override

public void onNext(Payload payload) {

receivedItems++;

if (receivedItems % NUMBER_OF_REQUESTED_ITEMS == 0) {

log.info("Requesting next [{}] elements", NUMBER_OF_REQUESTED_ITEMS);

subscription.request(NUMBER_OF_REQUESTED_ITEMS);

}

}



@Override

public void onError(Throwable t) {

log.error("Stream subscription error [{}]", t);

}



@Override

public void onComplete() {

log.info("Completing subscription");

}

}



}



In this example, we are requesting the stream of data, but to ensure that the incoming frames will not kill the requester we have the backpressure mechanism put in place. To implement this mechanism we use request_n frame which on the API level is reflected by the subscription.request(n) method. At the beginning of the subscription [ onSubscribe(Subscription s) ], we are requesting 5 objects, then we are counting received items in onNext(Payload payload). When all expected frames arrived to the requester, we are requesting the next 5 objects – again using subscription.request(n) method. The flow of this subscriber is shown in the diagram below:

Implementation of the backpressure mechanism presented in this section is very basic. In the production, we should provide a more sophisticated solution based on more accurate metrics e.g. predicted/average time of computation. After all, the backpressure mechanism does not make the problem of an overproducing responder disappear. It shifts the issue to the responder side, where it can be handled better. Further reading about backpressure is available here on Medium and here on GitHub .

Summary

In this article, we discuss the communication issues in the microservice architecture, and how these problems can be solved using RSocket. We covered its API and the interaction model backed with simple “hello world” example and basic backpressure mechanism implementation.

In the next articles of this series, we will cover more advanced features of RSocket including Load Balancing and Resumability as well as we will discuss abstraction over RSocke t – RPC and Spring Reactor.

  • Please notice that fully working examples are provided here →
Read more
Software development

Reactive service to service communication with RSocket – abstraction over RSocket

If you are familiar with the previous articles of this series (     Introduction   ,     Load balancing & Resumability   ), you have probably noticed that RSocket provides a low-level API. We can operate directly on the methods from the interaction model and without any constraints sends the frames back and forth. It gives us a lot of freedom and control, but it may introduce extra issues, especially related to the contract between microservices. To solve these problems, we can use RSocket through a generic abstraction layer. There are two available solutions out there:  RSocket RPC module and  integration with Spring Framework. In the following sections, we will discuss them briefly.

RPC over RSocket

Keeping the contract between microservices clean and well-defined is one of the crucial concerns of the distributed systems. To assure that applications can exchange the data we can leverage Remote Procedure Calls. Fortunately, RSocket has dedicated     RPC module   which uses Protobuf as a serialization mechanism, so that we can benefit from RSocket performance and keep the contract in check at the same time. By combining generated services and objects with RSocket acceptors we can spin up fully operational RPC server, and just as easily consume it using RPC client.

In the first place, we need the definition of the service and the object. In the example below, we create simple  CustomerService with four endpoints – each of them represents a different method from the interaction model.

syntax = "proto3";

option java_multiple_files = true;

option java_outer_classname = "ServiceProto";



package com.rsocket.rpc;



import "google/protobuf/empty.proto";



message SingleCustomerRequest {

   string id = 1;

}



message MultipleCustomersRequest {

   repeated string ids = 1;

}



message CustomerResponse {

   string id = 1;

   string name = 2;

}



service CustomerService {

   rpc getCustomer(SingleCustomerRequest) returns (CustomerResponse) {} //request-response

   rpc getCustomers(MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-stream

   rpc deleteCustomer(SingleCustomerRequest) returns (google.protobuf.Empty) {} //fire'n'forget

   rpc customerChannel(stream MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-channel

}

In the second step, we have to generate classes out of the proto file presented above. To do that we can create a gradle task as follows:

protobuf {

   protoc {

       artifact = 'com.google.protobuf:protoc:3.6.1'

   }

   generatedFilesBaseDir = "${projectDir}/build/generated-sources/"



   plugins {

       rsocketRpc {

           artifact = 'io.rsocket.rpc:rsocket-rpc-protobuf:0.2.17'

       }

   }

   generateProtoTasks {

       all()*.plugins {

           rsocketRpc {}

       }

   }

}

As a result of  generateProto task, we should obtain service interface, service client and service server classes, in this case,  CustomerService ,  CustomerServiceClient ,  CustomerServiceServer respectively. In the next step, we have to implement the business logic of generated service (CustomerService):

public class DefaultCustomerService implements CustomerService {



       private static final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");



       @Override

       public Mono getCustomer(SingleCustomerRequest message, ByteBuf metadata) {

           log.info("Received 'getCustomer' request [{}]", message);

           return Mono.just(CustomerResponse.newBuilder()

                   .setId(message.getId())

                   .setName(getRandomName())

                   .build());

       }



       @Override

       public Flux getCustomers(MultipleCustomersRequest message, ByteBuf metadata) {

           return Flux.interval(Duration.ofMillis(1000))

                   .map(time -> CustomerResponse.newBuilder()

                           .setId(UUID.randomUUID().toString())

                           .setName(getRandomName())

                           .build());

       }



       @Override

       public Mono deleteCustomer(SingleCustomerRequest message, ByteBuf metadata) {

           log.info("Received 'deleteCustomer' request [{}]", message);

           return Mono.just(Empty.newBuilder().build());

       }



       @Override

       public Flux customerChannel(Publisher messages, ByteBuf metadata) {

           return Flux.from(messages)

                   .doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))

                   .map(message -> CustomerResponse.newBuilder()

                           .setId(UUID.randomUUID().toString())

                           .setName(getRandomName())

                           .build());

       }



       private String getRandomName() {

           return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));

       }

}

Finally, we can expose the service via RSocket. To achieve that, we have to create an instance of a service server (CustomerServiceServer) and inject an implementation of our service (DefaultCustomerService). Then, we are ready to create an RSocket acceptor instance. The API provides  RequestHandlingRSocket which wraps service server instance and does the translation of endpoints defined in the contract to methods available in the RSocket interaction model.

public class Server {



   public static void main(String[] args) throws InterruptedException {



       CustomerServiceServer serviceServer = new CustomerServiceServer(new DefaultCustomerService(), Optional.empty(), Optional.empty());



       RSocketFactory

               .receive()

               .acceptor((setup, sendingSocket) -> Mono.just(

                       new RequestHandlingRSocket(serviceServer)

               ))

               .transport(TcpServerTransport.create(7000))

               .start()

               .block();



       Thread.currentThread().join();

   }

}

On the client-side, the implementation is pretty straightforward. All we need to do is create the RSocket instance and inject it to the service client via the constructor, then we are ready to go.

@Slf4j

public class Client {



   public static void main(String[] args) {

       RSocket rSocket = RSocketFactory

               .connect()

               .transport(TcpClientTransport.create(7000))

               .start()

               .block();

       CustomerServiceClient customerServiceClient = new CustomerServiceClient(rSocket);



       customerServiceClient.deleteCustomer(SingleCustomerRequest.newBuilder()

               .setId(UUID.randomUUID().toString()).build())

               .block();



       customerServiceClient.getCustomer(SingleCustomerRequest.newBuilder()

               .setId(UUID.randomUUID().toString()).build())

               .doOnNext(response -> log.info("Received response for 'getCustomer': [{}]", response))

               .block();



       customerServiceClient.getCustomers(MultipleCustomersRequest.newBuilder()

               .addIds(UUID.randomUUID().toString()).build())

               .doOnNext(response -> log.info("Received response for 'getCustomers': [{}]", response))

               .subscribe();



       customerServiceClient.customerChannel(s -> s.onNext(MultipleCustomersRequest.newBuilder()

               .addIds(UUID.randomUUID().toString())

               .build()))

               .doOnNext(customerResponse -> log.info("Received response for 'customerChannel' [{}]", customerResponse))

               .blockLast();

   }



}

Combining RSocket with RPC approach helps to maintain the contract between microservices and improves day to day developer experience. It is suitable for typical scenarios, where we do not need full control over the frames, but on the other hand, it does not limit the protocol flexibility. We can still expose RPC endpoints as well as plain RSocket acceptors in the same application so that we can easily choose the best communication pattern for the given use case.

In the context of RPC over the RSocket one more fundamental question may arise: is it better than gRPC? There is no easy answer to that question. RSocket is a new technology, and it needs some time to get the same maturity level as gRPC has. On the other hand, it surpasses gRPC in two areas: performance (     benchmarks available here   ) and flexibility - it can be used as a transport layer for RPC or as a plain messaging solution. Before making a decision on which one to use in a production environment, you should determine if RSocket align with your  early adoption strategy and does not put your software at risk. Personally, I would recommend introducing RSocket in less critical areas, and then extend its usage to the rest of the system.

  •     Please notice that the examples of RPC over the RSocket are provided here  

Spring boot integration

The second available solution, which provides an abstraction over the RSocket is the  integration with Spring Boot. Here we use RSocket as a reactive messaging solution and leverage spring annotations to link methods with the routes with ease. In the following example, we implement two Spring Boot applications – the requester and the responder. The responder exposes RSocket endpoints through  CustomerController and has a mapping to three routes:  customer ,  customer-stream and  customer-channel . Each of these mappings reflects different method from RSocket interaction model (request-response, request stream, and channel respectively). Customer controller implements simple business logic and returns CustomerResponse object with a random name as shown in the example below:

@Slf4j

@SpringBootApplication

public class RSocketResponderApplication {



   public static void main(String[] args) {

       SpringApplication.run(RSocketResponderApplication.class);

   }



   @Controller

   public class CustomerController {



       private final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");



       @MessageMapping("customer")

       CustomerResponse getCustomer(CustomerRequest customerRequest) {

           return new CustomerResponse(customerRequest.getId(), getRandomName());

       }



       @MessageMapping("customer-stream")

       Flux getCustomers(MultipleCustomersRequest multipleCustomersRequest) {

           return Flux.range(0, multipleCustomersRequest.getIds().size())

                   .delayElements(Duration.ofMillis(500))

                   .map(i -> new CustomerResponse(multipleCustomersRequest.getIds().get(i), getRandomName()));

       }



       @MessageMapping("customer-channel")

       Flux getCustomersChannel(Flux requests) {

           return Flux.from(requests)

                   .doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))

                   .map(message -> new CustomerResponse(message.getId(), getRandomName()));

       }



       private String getRandomName() {

           return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));

       }

   }

}

Please notice that the examples presented below are based on the Spring Boot RSocket starter 2.2.0.M4, which means that it is not an official release yet, and the API may be changed.

It is worth noting that Spring Boot automatically detects the RSocket library on the classpath and spins up the server. All we need to do is specify the port:

spring:

 rsocket:

   server:

     port: 7000



These few lines of code and configuration set up the fully operational responder with message mapping (the code is available     here   )

Let’s take a look on the requester side. Here we implement  CustomerServiceAdapter which is responsible for communication with the responder. It uses RSocketRequester bean that wraps the RSocket instance, mime-type and encoding/decoding details encapsulated inside RSocketStrategies object. The RSocketRequester routes the messages and deals with serialization/deserialization of the data in a reactive manner. All we need to do is provide the route, the data and the way how we would like to consume the messages from the responder – as a single object (Mono) or as a stream (Flux).

@Slf4j

@SpringBootApplication

public class RSocketRequesterApplication {





   public static void main(String[] args) {

       SpringApplication.run(RSocketRequesterApplication.class);

   }



   @Bean

   RSocket rSocket() {

       return RSocketFactory

               .connect()

               .frameDecoder(PayloadDecoder.ZERO_COPY)

               .dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)

               .transport(TcpClientTransport.create(7000))

               .start()

               .block();

   }



   @Bean

   RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {

       return RSocketRequester.wrap(rSocket, MimeTypeUtils.APPLICATION_JSON,

               rSocketStrategies);

   }



   @Component

   class CustomerServiceAdapter {



       private final RSocketRequester rSocketRequester;



       CustomerServiceAdapter(RSocketRequester rSocketRequester) {

           this.rSocketRequester = rSocketRequester;

       }



       Mono getCustomer(String id) {

           return rSocketRequester

                   .route("customer")

                   .data(new CustomerRequest(id))

                   .retrieveMono(CustomerResponse.class)

                   .doOnNext(customerResponse -> log.info("Received customer as mono [{}]", customerResponse));

       }



       Flux getCustomers(List ids) {

           return rSocketRequester

                   .route("customer-stream")

                   .data(new MultipleCustomersRequest(ids))

                   .retrieveFlux(CustomerResponse.class)

                   .doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));

       }



       Flux getCustomerChannel(Flux customerRequestFlux) {

           return rSocketRequester

                   .route("customer-channel")

                   .data(customerRequestFlux, CustomerRequest.class)

                   .retrieveFlux(CustomerResponse.class)

                   .doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));

       }

   }



}

Besides the communication with the responder, the requester exposes the RESTful API with three mappings:  /customers/{id} ,  /customers ,  /customers-channel . Here we use spring web-flux and on top of the HTTP2 protocol. Please notice that the last two mappings produce the text event stream, which means that the value will be streamed to the web browser when it becomes available.

@RestController

class CustomerController {



private final CustomerServiceAdapter customerServiceAdapter;





CustomerController(CustomerServiceAdapter customerServiceAdapter) {

    this.customerServiceAdapter = customerServiceAdapter;

}



@GetMapping("/customers/{id}")

Mono getCustomer(@PathVariable String id) {

    return customerServiceAdapter.getCustomer(id);

}



@GetMapping(value = "/customers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

Publisher getCustomers() {

    return customerServiceAdapter.getCustomers(getRandomIds(10));

}



@GetMapping(value = "/customers-channel", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

Publisher getCustomersChannel() {

    return customerServiceAdapter.getCustomerChannel(Flux.interval(Duration.ofMillis(1000))

            .map(id -> new CustomerRequest(UUID.randomUUID().toString())));

}



private List getRandomIds(int amount) {

    return IntStream.range(0, amount)

            .mapToObj(n -> UUID.randomUUID().toString())

            .collect(toList());

}



}



To play with REST endpoints mentioned above, you can use following curl commands:

curl http://localhost:8080/customers/1

curl http://localhost:8080/customers

curl http://localhost:8080/customers-channel

Please notice that requester application code is available     here  

The integration with Spring Boot and RPC module are complementary solutions on top of the RSocket. The first one is messaging oriented and provides convenient message routing API whereas the RPC module enables the developer to easily control the exposed endpoints and maintain the contract between microservices. Both of these solutions have applications and can be easily combined with RSocket low-level API to fulfill the most sophisticated requirements with consistent manner using a single protocol.

Series summary

This article is the last one of the mini-series related to RSocket – the new binary protocol which may revolutionize service to service communication in the cloud. Its rich     interaction model   , performance and extra features like     client load balancing and resumability   make it a perfect candidate for almost all possible business cases. The usage of the protocol may be simplified by available abstraction layers: Spring Boot integration and RPC module which address most typical day to day scenarios.

Please notice that the protocol is in release candidate version (1.0.0-RC2), therefore it is not recommended to use it in the production environment. Still, you should keep an eye on it, as the growing community and support of the big tech companies (e.g. Netflix, Facebook, Alibaba, Netifi) may turn RSocket as a primary communication protocol in the cloud.

Read more
View all
Connect

Interested in our services?

Reach out for tailored solutions and expert guidance.

Stay updated with our newsletter

Subscribe for fresh insights and industry analysis.

About UsCase studiesContactCareers
Capabilities:
Legacy ModernizationData PlatformsArtificial Intelligence
Industries:
AutomotiveFinanceManufacturing
Solutions:
DataboostrCloudboostr
Resources
BlogInsights
© Grape Up 2025
Cookies PolicyPrivacy PolicyTerms of use
Grape Up uses cookies

This website uses cookies to improve its user experience and provide personalized content for you. We use cookies for web analytics and advertising. You can accept these cookies by clicking "OK" or go to Details in order to manage your cookies preferences more precisely. To learn more, check out our Privacy and Cookies Policy

Accept allDetails
Grape Up uses cookies

Essential website cookies are necessary to provide you with services available through the website, autosave your settings and preferences, and to enhance the performance and security of the website - you have the right not to accept them through your web browser's settings, but your access to some functionality and areas of our website may be restricted.

Analytics cookies: (our own and third-party : Google, HotJar) – you can accept these cookies below:

Marketing cookies (third-party cookies: Hubspot, Facebook, LinkedIn) – you can accept these cookies below:

Ok