gRPC Streaming


23/02/2023

near 5 min of reading

Previous articles presented what Protobuf is and how it can be combined with gRPC to implement simple synchronous API. However, it didn’t present the true power of gRPC, which is streaming, fully utilizing the capabilities of HTTP/2.0.

Contract definition

We must define the method with input and output parameters like the previous service. To follow the separation of concerns, let’s create a dedicated service for GPS tracking purposes. Our existing proto should be extended with the following snippet.

message SubscribeRequest {
  string vin = 1;
}

service GpsTracker {
  rpc Subscribe(SubscribeRequest) returns (stream Geolocation);
}

The most crucial part here of enabling streaming is specifying it in input or output type. To do that, a keyword stream is used. It indicates that the server will keep the connection open, and we can expect Geolocation messages to be sent by it.

Implementation

@Override
public void subscribe(SubscribeRequest request, StreamObserver<Geolocation> responseObserver) {
    responseObserver.onNext(
        Geolocation.newBuilder()
            .setVin(request.getVin())
            .setOccurredOn(TimestampMapper.convertInstantToTimestamp(Instant.now()))
            .setCoordinates(LatLng.newBuilder()
                .setLatitude(78.2303792628867)
                .setLongitude(15.479358124673292)
                .build())
            .build());
}

The simple implementation of the method doesn’t differ from the implementation of a unary call. The only difference is in how onNext the method behaves; in regular synchronous implementation, the method can’t be invoked more than once. However, for method operating on stream, onNext may be invoked as many times as you want.

As you may notice on the attached screenshot, the geolocation position was returned but the connection is still established and the client awaits more data to be sent in the stream. If the server wants to inform the client that there is no more data, it should invoke: the onCompleted method; however, sending single messages is not why we want to use stream.

Use cases for streaming capabilities are mainly transferring significant responses as streams of data chunks or real-time events. I’ll try to demonstrate the second use case with this service. Implementation will be based on the reactor (https://projectreactor.io/ ) as it works well for the presented use case.

Let’s prepare a simple implementation of the service. To make it work, web flux dependency will be required.

implementation 'org.springframework.boot:spring-boot-starter-webflux'

We must prepare a service for publishing geolocation events for a specific vehicle.

InMemoryGeolocationService.java

import com.grapeup.grpc.example.model.GeolocationEvent;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

@Service
public class InMemoryGeolocationService implements GeolocationService {

    private final Sinks.Many<GeolocationEvent> sink = Sinks.many().multicast().directAllOrNothing();

    @Override
    public void publish(GeolocationEvent event) {
        sink.tryEmitNext(event);
    }

    @Override
    public Flux<GeolocationEvent> getRealTimeEvents(String vin) {
        return sink.asFlux().filter(event -> event.vin().equals(vin));
    }

}

Let’s modify the GRPC service prepared in the previous article to insert the method and use our new service to publish events.

@Override
public void insert(Geolocation request, StreamObserver<Empty> responseObserver) {
    GeolocationEvent geolocationEvent = convertToGeolocationEvent(request);
    geolocationRepository.save(geolocationEvent);
    geolocationService.publish(geolocationEvent);

    responseObserver.onNext(Empty.newBuilder().build());
    responseObserver.onCompleted();
}

Finally, let’s move to our GPS tracker implementation; we can replace the previous dummy implementation with the following one:

@Override
public void subscribe(SubscribeRequest request, StreamObserver<Geolocation> responseObserver) {
    geolocationService.getRealTimeEvents(request.getVin())
        .subscribe(event -> responseObserver.onNext(toProto(event)),
            responseObserver::onError,
            responseObserver::onCompleted);
}

Here we take advantage of using Reactor, as we not only can subscribe for incoming events but also handle errors and completion of stream in the same way.

To map our internal model to response, the following helper method is used:

private static Geolocation toProto(GeolocationEvent event) {
    return Geolocation.newBuilder()
        .setVin(event.vin())
        .setOccurredOn(TimestampMapper.convertInstantToTimestamp(event.occurredOn()))
        .setSpeed(Int32Value.of(event.speed()))
        .setCoordinates(LatLng.newBuilder()
            .setLatitude(event.coordinates().latitude())
            .setLongitude(event.coordinates().longitude())
            .build())
        .build();
}

Action!

As you may be noticed, we sent the following requests with GPS position and received them in real-time from our open stream connection. Streaming data using gRPC or another tool like Kafka is widely used in many IoT systems, including Automotive.

Bidirectional stream

What if our client would like to receive data for multiple vehicles but without initial knowledge about all vehicles they are interested in? Creating new connections for each vehicle isn’t the best approach. But worry no more! While using gRPC, the client may reuse the same connection as it supports bidirectional streaming, which means that both client and server may send messages using open channels.

rpc SubscribeMany(stream SubscribeRequest) returns (stream Geolocation);

Unfortunately, IntelliJ doesn’t allow us to test this functionality with their built-in client, so we have to develop one ourselves.

localhost:9090/com. grapeup.geolocation.GpsTracker/SubscribeMany

com.intellij.grpc.requests.RejectedRPCException: Unsupported method is called

Our dummy client could look something like that, based on generated classes from the protobuf contract:

var channel = ManagedChannelBuilder.forTarget("localhost:9090")
            .usePlaintext()
            .build();
var observer = GpsTrackerGrpc.newStub(channel)
    .subscribeMany(new StreamObserver<>() {
        @Override
        public void onNext(Geolocation value) {
            System.out.println(value);
        }

        @Override
        public void onError(Throwable t) {
            System.err.println("Error " + t.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Completed.");
        }
    });
observer.onNext(SubscribeRequest.newBuilder().setVin("JF2SJAAC1EH511148").build());
observer.onNext(SubscribeRequest.newBuilder().setVin("1YVGF22C3Y5152251").build());
while (true) {} // to keep client subscribing for demo purposes :)

If you send the updates for the following random VINs: JF2SJAAC1EH511148, 1YVGF22C3Y5152251, you should be able to see the output in the console. Check it out!

Tip of the iceberg

Presented examples are just gRPC basics; there is much more to it, like disconnecting from the channel from both ends and reconnecting to the server in case of network failure. The following articles were intended to share with YOU that gRPC architecture has so much to offer, and there are plenty of possibilities for how it can be used in systems. Especially in systems requiring low latency or the ability to provide client code with strict contract validation.

 



Is it insightful?
Share the article!



Check related articles


Read our blog and stay informed about the industry's latest trends and solutions.


see all articles



gRPC Remote Procedure Call (with Protobuf)


Read the article

Protobuf: How To Serialize Data Effectively with Protocol Buffers


Read the article