Reactive Service to Service Communication With RSocket – Abstraction Over RSocket


08/08/2019

near 10 min of reading

If you are familiar with the previous articles of this series (IntroductionLoad balancing & Resumability), you have probably noticed that RSocket provides a low-level API. We can operate directly on the methods from the interaction model and without any constraints sends the frames back and forth. It gives us a lot of freedom and control, but it may introduce extra issues, especially related to the contract between microservices. To solve these problems, we can use RSocket through a generic abstraction layer. There are two available solutions out there: RSocket RPC module and integration with Spring Framework. In the following sections, we will discuss them briefly.

RPC over RSocket

Keeping the contract between microservices clean and well-defined is one of the crucial concerns of the distributed systems. To assure that applications can exchange the data we can leverage Remote Procedure Calls. Fortunately, RSocket has dedicated RPC module which uses Protobuf as a serialization mechanism, so that we can benefit from RSocket performance and keep the contract in check at the same time. By combining generated services and objects with RSocket acceptors we can spin up fully operational RPC server, and just as easily consume it using RPC client.

In the first place, we need the definition of the service and the object. In the example below, we create simple CustomerService with four endpoints – each of them represents a different method from the interaction model.

syntax = "proto3";
option java_multiple_files = true;
option java_outer_classname = "ServiceProto";

package com.rsocket.rpc;

import "google/protobuf/empty.proto";

message SingleCustomerRequest {
    string id = 1;
}

message MultipleCustomersRequest {
    repeated string ids = 1;
}

message CustomerResponse {
    string id = 1;
    string name = 2;
}

service CustomerService {
    rpc getCustomer(SingleCustomerRequest) returns (CustomerResponse) {} //request-response
    rpc getCustomers(MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-stream
    rpc deleteCustomer(SingleCustomerRequest) returns (google.protobuf.Empty) {} //fire'n'forget
    rpc customerChannel(stream MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-channel
}

In the second step, we have to generate classes out of the proto file presented above. To do that we can create a gradle task as follows:

protobuf {
    protoc {
        artifact = 'com.google.protobuf:protoc:3.6.1'
    }
    generatedFilesBaseDir = "${projectDir}/build/generated-sources/"

    plugins {
        rsocketRpc {
            artifact = 'io.rsocket.rpc:rsocket-rpc-protobuf:0.2.17'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            rsocketRpc {}
        }
    }
}

As a result of generateProto task, we should obtain service interface, service client and service server classes, in this case, CustomerServiceCustomerServiceClientCustomerServiceServer respectively. In the next step, we have to implement the business logic of generated service (CustomerService):

public class DefaultCustomerService implements CustomerService {

        private static final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");

        @Override
        public Mono getCustomer(SingleCustomerRequest message, ByteBuf metadata) {
            log.info("Received 'getCustomer' request [{}]", message);
            return Mono.just(CustomerResponse.newBuilder()
                    .setId(message.getId())
                    .setName(getRandomName())
                    .build());
        }

        @Override
        public Flux getCustomers(MultipleCustomersRequest message, ByteBuf metadata) {
            return Flux.interval(Duration.ofMillis(1000))
                    .map(time -> CustomerResponse.newBuilder()
                            .setId(UUID.randomUUID().toString())
                            .setName(getRandomName())
                            .build());
        }

        @Override
        public Mono deleteCustomer(SingleCustomerRequest message, ByteBuf metadata) {
            log.info("Received 'deleteCustomer' request [{}]", message);
            return Mono.just(Empty.newBuilder().build());
        }

        @Override
        public Flux customerChannel(Publisher messages, ByteBuf metadata) {
            return Flux.from(messages)
                    .doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))
                    .map(message -> CustomerResponse.newBuilder()
                            .setId(UUID.randomUUID().toString())
                            .setName(getRandomName())
                            .build());
        }

        private String getRandomName() {
            return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));
        }
}

Finally, we can expose the service via RSocket. To achieve that, we have to create an instance of a service server (CustomerServiceServer) and inject an implementation of our service (DefaultCustomerService). Then, we are ready to create an RSocket acceptor instance. The API provides RequestHandlingRSocket which wraps service server instance and does the translation of endpoints defined in the contract to methods available in the RSocket interaction model.

public class Server {

    public static void main(String[] args) throws InterruptedException {

        CustomerServiceServer serviceServer = new CustomerServiceServer(new DefaultCustomerService(), Optional.empty(), Optional.empty());

        RSocketFactory
                .receive()
                .acceptor((setup, sendingSocket) -> Mono.just(
                        new RequestHandlingRSocket(serviceServer)
                ))
                .transport(TcpServerTransport.create(7000))
                .start()
                .block();

        Thread.currentThread().join();
    }
}

On the client-side, the implementation is pretty straightforward. All we need to do is create the RSocket instance and inject it to the service client via the constructor, then we are ready to go.

@Slf4j
public class Client {

    public static void main(String[] args) {
        RSocket rSocket = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(7000))
                .start()
                .block();
        CustomerServiceClient customerServiceClient = new CustomerServiceClient(rSocket);

        customerServiceClient.deleteCustomer(SingleCustomerRequest.newBuilder()
                .setId(UUID.randomUUID().toString()).build())
                .block();

        customerServiceClient.getCustomer(SingleCustomerRequest.newBuilder()
                .setId(UUID.randomUUID().toString()).build())
                .doOnNext(response -> log.info("Received response for 'getCustomer': [{}]", response))
                .block();

        customerServiceClient.getCustomers(MultipleCustomersRequest.newBuilder()
                .addIds(UUID.randomUUID().toString()).build())
                .doOnNext(response -> log.info("Received response for 'getCustomers': [{}]", response))
                .subscribe();

        customerServiceClient.customerChannel(s -> s.onNext(MultipleCustomersRequest.newBuilder()
                .addIds(UUID.randomUUID().toString())
                .build()))
                .doOnNext(customerResponse -> log.info("Received response for 'customerChannel' [{}]", customerResponse))
                .blockLast();
    }

}

Combining RSocket with RPC approach helps to maintain the contract between microservices and improves day to day developer experience. It is suitable for typical scenarios, where we do not need full control over the frames, but on the other hand, it does not limit the protocol flexibility. We can still expose RPC endpoints as well as plain RSocket acceptors in the same application so that we can easily choose the best communication pattern for the given use case.

In the context of RPC over the RSocket one more fundamental question may arise: is it better than gRPC? There is no easy answer to that question. RSocket is a new technology, and it needs some time to get the same maturity level as gRPC has. On the other hand, it surpasses gRPC in two areas: performance (benchmarks available here) and flexibility – it can be used as a transport layer for RPC or as a plain messaging solution. Before making a decision on which one to use in a production environment, you should determine if RSocket align with your early adoption strategy and does not put your software at risk. Personally, I would recommend introducing RSocket in less critical areas, and then extend its usage to the rest of the system.

Spring Boot Integration

The second available solution, which provides an abstraction over the RSocket is the integration with Spring Boot. Here we use RSocket as a reactive messaging solution and leverage spring annotations to link methods with the routes with ease. In the following example, we implement two Spring Boot applications – the requester and the responder. The responder exposes RSocket endpoints through CustomerController and has a mapping to three routes: customercustomer-stream and customer-channel. Each of these mappings reflects different method from RSocket interaction model (request-response, request stream, and channel respectively). Customer controller implements simple business logic and returns CustomerResponse object with a random name as shown in the example below:

@Slf4j
@SpringBootApplication
public class RSocketResponderApplication {

    public static void main(String[] args) {
        SpringApplication.run(RSocketResponderApplication.class);
    }

    @Controller
    public class CustomerController {

        private final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");

        @MessageMapping("customer")
        CustomerResponse getCustomer(CustomerRequest customerRequest) {
            return new CustomerResponse(customerRequest.getId(), getRandomName());
        }

        @MessageMapping("customer-stream")
        Flux getCustomers(MultipleCustomersRequest multipleCustomersRequest) {
            return Flux.range(0, multipleCustomersRequest.getIds().size())
                    .delayElements(Duration.ofMillis(500))
                    .map(i -> new CustomerResponse(multipleCustomersRequest.getIds().get(i), getRandomName()));
        }

        @MessageMapping("customer-channel")
        Flux getCustomersChannel(Flux requests) {
            return Flux.from(requests)
                    .doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))
                    .map(message -> new CustomerResponse(message.getId(), getRandomName()));
        }

        private String getRandomName() {
            return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));
        }
    }
}

Please notice that the examples presented below are based on the Spring Boot RSocket starter 2.2.0.M4, which means that it is not an official release yet, and the API may be changed.

It is worth noting that Spring Boot automatically detects the RSocket library on the classpath and spins up the server. All we need to do is specify the port:

spring:
  rsocket:
    server:
      port: 7000


These few lines of code and configuration set up the fully operational responder with message mapping (the code is available here )

Let’s take a look on the requester side. Here we implement CustomerServiceAdapter which is responsible for communication with the responder. It uses RSocketRequester bean that wraps the RSocket instance, mime-type and encoding/decoding details encapsulated inside RSocketStrategies object. The RSocketRequester routes the messages and deals with serialization/deserialization of the data in a reactive manner. All we need to do is provide the route, the data and the way how we would like to consume the messages from the responder – as a single object (Mono) or as a stream (Flux).

@Slf4j
@SpringBootApplication
public class RSocketRequesterApplication {


    public static void main(String[] args) {
        SpringApplication.run(RSocketRequesterApplication.class);
    }

    @Bean
    RSocket rSocket() {
        return RSocketFactory
                .connect()
                .frameDecoder(PayloadDecoder.ZERO_COPY)
                .dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
                .transport(TcpClientTransport.create(7000))
                .start()
                .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket, MimeTypeUtils.APPLICATION_JSON,
                rSocketStrategies);
    }

    @Component
    class CustomerServiceAdapter {

        private final RSocketRequester rSocketRequester;

        CustomerServiceAdapter(RSocketRequester rSocketRequester) {
            this.rSocketRequester = rSocketRequester;
        }

        Mono getCustomer(String id) {
            return rSocketRequester
                    .route("customer")
                    .data(new CustomerRequest(id))
                    .retrieveMono(CustomerResponse.class)
                    .doOnNext(customerResponse -> log.info("Received customer as mono [{}]", customerResponse));
        }

        Flux getCustomers(List ids) {
            return rSocketRequester
                    .route("customer-stream")
                    .data(new MultipleCustomersRequest(ids))
                    .retrieveFlux(CustomerResponse.class)
                    .doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));
        }

        Flux getCustomerChannel(Flux customerRequestFlux) {
            return rSocketRequester
                    .route("customer-channel")
                    .data(customerRequestFlux, CustomerRequest.class)
                    .retrieveFlux(CustomerResponse.class)
                    .doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));
        }
    }

}

Besides the communication with the responder, the requester exposes the RESTful API with three mappings: /customers/{id},/customers,/customers-channel. Here we use spring web-flux and on top of the HTTP2 protocol. Please notice that the last two mappings produce the text event stream, which means that the value will be streamed to the web browser when it becomes available.

@RestController
class CustomerController {

	private final CustomerServiceAdapter customerServiceAdapter;


	CustomerController(CustomerServiceAdapter customerServiceAdapter) {
    	this.customerServiceAdapter = customerServiceAdapter;
	}

	@GetMapping("/customers/{id}")
	Mono getCustomer(@PathVariable String id) {
    	return customerServiceAdapter.getCustomer(id);
	}

	@GetMapping(value = "/customers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	Publisher getCustomers() {
    	return customerServiceAdapter.getCustomers(getRandomIds(10));
	}

	@GetMapping(value = "/customers-channel", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	Publisher getCustomersChannel() {
    	return customerServiceAdapter.getCustomerChannel(Flux.interval(Duration.ofMillis(1000))
            	.map(id -> new CustomerRequest(UUID.randomUUID().toString())));
	}

	private List getRandomIds(int amount) {
    	return IntStream.range(0, amount)
            	.mapToObj(n -> UUID.randomUUID().toString())
            	.collect(toList());
	}

}


To play with REST endpoints mentioned above, you can use following curl commands:

curl http://localhost:8080/customers/1
curl http://localhost:8080/customers
curl http://localhost:8080/customers-channel

Please notice that requester application code is available here

The integration with Spring Boot and RPC module are complementary solutions on top of the RSocket. The first one is messaging oriented and provides convenient message routing API whereas the RPC module enables the developer to easily control the exposed endpoints and maintain the contract between microservices. Both of these solutions have applications and can be easily combined with RSocket low-level API to fulfill the most sophisticated requirements with consistent manner using a single protocol.

Series summary

This article is the last one of the mini-series related to RSocket – the new binary protocol which may revolutionize service to service communication in the cloud. Its rich interaction model, performance and extra features like client load balancing and resumability make it a perfect candidate for almost all possible business cases. The usage of the protocol may be simplified by available abstraction layers: Spring Boot integration and RPC module which address most typical day to day scenarios.

Please notice that the protocol is in release candidate version (1.0.0-RC2), therefore it is not recommended to use it in the production environment. Still, you should keep an eye on it, as the growing community and support of the big tech companies (e.g. Netflix, Facebook, Alibaba, Netifi) may turn RSocket as a primary communication protocol in the cloud.



Is it insightful?
Share the article!



Check related articles


Read our blog and stay informed about the industry's latest trends and solutions.


see all articles



Reactive Service to Service Communication with RSocket – Introduction


Read the article

Reactive Service to Service Communication With RSocket – Load Balancing & Resumability


Read the article