Reactive Service to Service Communication With RSocket – Load Balancing & Resumability


25/07/2019

near 9 min of reading

This article is the second one of the mini-series which will help you to get familiar with RSocket – a new binary protocol which may revolutionize machine to machine communication in distributed systems. In the following paragraphs, we will discuss the load balancing problem in the cloud as well as we will present the resumability feature which helps to deal with network issues, especially in the IoT systems.

High availability & load balancing as a crucial part of enterprise-grade systems

Applications availability and reliability are crucial parts of many business areas like banking and insurance. In these demanding industries, the services have to be operational 24/7 even during high traffic, periods of increased network latency or natural disasters. To ensure that the software is always available to the end-users it is usually deployed in redundantly, across the multiple availability zones.

In such a scenario, at least two instances of each microservice are deployed in at least two availability zones. This technique helps our system become resilient and increase its capacity – multiple instances of the microservices are able to handle a significantly higher load. So where is the trick? The redundancy introduces extra complexity. As engineers, we have to ensure that the incoming traffic is spread across all available instances. There are two major techniques which address this problem: server load balancing and client load balancing.

The first approach is based on the assumption that the requester does not know the IP addresses of the responders. Instead of that, the requester communicates with the load balancer, which is responsible for spreading the requests across the microservices connected to it. This design is fairly easy to adopt in the cloud era. IaaS providers usually have built-in, reliable solutions, like Elastic Load Balancer available in Amazon Web Services. Moreover, such a design helps develop routing strategy more sophisticated than plain round ribbon (e.g. adaptive load balancing or chained failover). The major drawback of this technique is the fact that we have to configure and deploy extra resources, which may be painful if our system consists of hundreds of the microservices. Furthermore, it may affect the latency – each request has extra “network hop” on the load balancer.

The second technique inverts the relation. Instead of a central point used to connect to responders, the requester knows IP addresses of each and every instance of the given microservice. Having such knowledge, the client can choose the responder instance to which it sends the request or opens the connection with. This strategy does not require any extra resources, but we have to ensure that the requester has the IP addresses of all instances of the responder (see how to deal with it using service discovery pattern). The main benefit of the client load balancing pattern is its performance – by reduction of one extra “network hop”, we may significantly decrease the latency. This is one of the key reasons why RSocket implements the client load balancing pattern.

Client load balancing in RSocket

On the code level, the implementation of the client load balancing in RSocket is pretty straightforward. The mechanism relies on the LoadBalancedRSocketMono object which works as a bag of available RSocket instances, provided by RSocket supplier. To access RSockets we have to subscribe to the LoadBalancedRSocketMono which onNext signal emits fully-fledged RSocket instance. Moreover, it calculates statistics for each RSocket, so that it is able to estimate the load of each instance and based on that choose the one with the best performance at the given point of time.

The algorithm takes into account multiple parameters like latency, number of maintained connections as well as a number of pending requests. The health of each RSocket is reflected by the availability parameter – which takes values from 0 to 1, where 0 indicates that the given instance cannot handle any requests and 1 is assigned to fully operational socket. The code snippet below shows the very basic example of the load-balanced RSocket, which connects to three different instances of the responder and executes 100 requests. Each time it picks up RSocket from the LoadBalancedRSocketMono object.

@Slf4j
public class LoadBalancedClient {

    static final int[] PORTS = new int[]{7000, 7001, 7002};

    public static void main(String[] args) {

        List rsocketSuppliers = Arrays.stream(PORTS)
                .mapToObj(port -> new RSocketSupplier(() -> RSocketFactory.connect()
                        .transport(TcpClientTransport.create(HOST, port))
                        .start()))
                .collect(Collectors.toList());

        LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono.create((Publisher>) s -> {
            s.onNext(rsocketSuppliers);
            s.onComplete();
        });

        Flux.range(0, 100)
                .flatMap(i -> balancer)
                .doOnNext(rSocket -> rSocket.requestResponse(DefaultPayload.create("test-request")).block())
                .blockLast();
    }

}

It is worth noting, that client load balancer in RSocket deals with dead connections as well. If any of the RSocket instances registered in the LoadBalancedRSocketMono stop responding, the mechanism will automatically try to reconnect. By default, it will execute 5 attempts, in 25 seconds. If it does not succeed, the given RSocket will be removed from the pool of available connections. Such design combines the advantages of the server-side load balancing with low latency and reduction of “network hops” of the client load balancing.

Dead connections & resumabilty mechanism

The question which may arise in the context of dead connections is: what will happen if I have an only single instance of the responder and the connection drops due to network issues. Is there anything we can do with this? Fortunately, RSocket has built-in resumability mechanism.

To clarify the concept let’s consider the following example. We are building an IoT platform which connects to multiple temperature sensors located in different places. Most of them in the distance to the nearest buildings and internet connection sources. Therefore, the devices connect to cloud services using GPRS. The business requirement for our system is that we need to collect temperature readings every second in the real-time, and we cannot lose any data.

In case of the machine-to-the machine communication within the cloud, streaming data in real-time is not a big deal, but if we consider IoT devices located in areas without access to a stable, reliable internet connection, the problem becomes more complex. We can easily identify two major issues we may face in such a system: the network latency and connection stability. From a software perspective, there is not much we can do with the first one, but we can try to deal with the latter. Let’s tackle the problem with RSocket, starting with picking up the proper interaction model. The most suitable in this case is request stream method, where the microservice deployed in the cloud is the requester and temperature sensor is the responder. After choosing the interaction model we apply resumability mechanism. In RSocket, we do it by method resume() invoked on the RSocketFactory, as shown in the examples below:

@Slf4j
public class ResumableRequester {

    private static final int CLIENT_PORT = 7001;

    public static void main(String[] args) {
        RSocket socket = RSocketFactory.connect()
                .resume()
                .resumeSessionDuration(RESUME_SESSION_DURATION)
                .transport(TcpClientTransport.create(HOST, CLIENT_PORT))
                .start()
                .block();
        socket.requestStream(DefaultPayload.create("dummy"))
                .map(payload -> {
                    log.info("Received data: [{}]", payload.getDataUtf8());
                    return payload;
                })
                .blockLast();

    }
}

@Slf4j
public class ResumableResponder {

    private static final int SERVER_PORT = 7000;
    static final String HOST = "localhost";
    static final Duration RESUME_SESSION_DURATION = Duration.ofSeconds(60);


    public static void main(String[] args) throws InterruptedException {

        RSocketFactory.receive()
                .resume()
                .resumeSessionDuration(RESUME_SESSION_DURATION)
                .acceptor((setup, sendingSocket) -> Mono.just(new AbstractRSocket() {
                    @Override
                    public Flux requestStream(Payload payload) {
                        log.info("Received 'requestStream' request with payload: [{}]", payload.getDataUtf8());
                        return Flux.interval(Duration.ofMillis(1000))
                                .map(t -> DefaultPayload.create(t.toString()));
                    }
                }))
                .transport(TcpServerTransport.create(HOST, SERVER_PORT))
                .start()
                .subscribe();
        log.info("Server running");

        Thread.currentThread().join();
    }

}

The mechanism on the requester and responder side works similarly, it is based on a few components. First of all, there is a ResumableFramesStore which works as a buffer for the frames. By default, it stores them in the memory, but we can easily adjust it to our needs by implementing the ResumableFramesStore interface (e.g. store the frames in the distributed cache, like Redis). The store saves the data emitted between keep alive frames, which are sent back and forth periodically and indicates, if the connection between the peers is stable. Moreover, the keep alive frame contains the token, which determines Last received position for the requester and the responder. When the peer wants to resume the connection, it sends the resume frame with an implied position. The implied position is calculated from last received position (is the same value we have seen in the Keep Alive frame) plus the length of the frames received from that moment. This algorithm is applied to both parties of the communication, in the resume frame is it reflected by last received server position and first client available position tokens. The whole flow for resume operation is shown in the diagram below:

By adopting the resumability mechanism built in the RSocket protocol, with the relatively low effort we can reduce the impact of the network issues. Like shown in the example above, the resumability might be extremely useful in the data streaming applications, especially in the case of the device to the cloud communication.

Summary

In this article, we discussed more advanced features of the RSocket protocol, which are helpful in reducing the impact of the network on the system operationality. We covered the implementation of the client load balancing pattern and resumability mechanism. These features, combined with the robust interaction model constitutes the core of the protocol.

In the last article of this mini-series, we will cover available abstraction layers on top of the RSocket.

 



Is it insightful?
Share the article!



Check related articles


Read our blog and stay informed about the industry's latest trends and solutions.


see all articles



Reactive Service to Service Communication With RSocket – Abstraction Over RSocket


Read the article

Reactive Service to Service Communication with RSocket – Introduction


Read the article