gRPC Streaming
Previous articles presented what Protobuf is and how it can be combined with gRPC to implement simple synchronous API. However, it didn’t present the true power of gRPC, which is streaming, fully utilizing the capabilities of HTTP/2.0.
Contract definition
We must define the method with input and output parameters like the previous service. To follow the separation of concerns, let’s create a dedicated service for GPS tracking purposes. Our existing proto should be extended with the following snippet.
message SubscribeRequest {
string vin = 1;
}
service GpsTracker {
rpc Subscribe(SubscribeRequest) returns (stream Geolocation);
}
The most crucial part here of enabling streaming is specifying it in input or output type. To do that, a keyword stream is used. It indicates that the server will keep the connection open, and we can expect Geolocation
messages to be sent by it.
Implementation
@Override
public void subscribe(SubscribeRequest request, StreamObserver<Geolocation> responseObserver) {
responseObserver.onNext(
Geolocation.newBuilder()
.setVin(request.getVin())
.setOccurredOn(TimestampMapper.convertInstantToTimestamp(Instant.now()))
.setCoordinates(LatLng.newBuilder()
.setLatitude(78.2303792628867)
.setLongitude(15.479358124673292)
.build())
.build());
}
The simple implementation of the method doesn’t differ from the implementation of a unary call. The only difference is in how onNext the method behaves; in regular synchronous implementation, the method can’t be invoked more than once. However, for method operating on stream, onNext
may be invoked as many times as you want.
As you may notice on the attached screenshot, the geolocation position was returned but the connection is still established and the client awaits more data to be sent in the stream. If the server wants to inform the client that there is no more data, it should invoke: the onCompleted method; however, sending single messages is not why we want to use stream.
Use cases for streaming capabilities are mainly transferring significant responses as streams of data chunks or real-time events. I’ll try to demonstrate the second use case with this service. Implementation will be based on the reactor (https://projectreactor.io/ ) as it works well for the presented use case.
Let’s prepare a simple implementation of the service. To make it work, web flux dependency will be required.
implementation 'org.springframework.boot:spring-boot-starter-webflux'
We must prepare a service for publishing geolocation events for a specific vehicle.
InMemoryGeolocationService.java
import com.grapeup.grpc.example.model.GeolocationEvent;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
@Service
public class InMemoryGeolocationService implements GeolocationService {
private final Sinks.Many<GeolocationEvent> sink = Sinks.many().multicast().directAllOrNothing();
@Override
public void publish(GeolocationEvent event) {
sink.tryEmitNext(event);
}
@Override
public Flux<GeolocationEvent> getRealTimeEvents(String vin) {
return sink.asFlux().filter(event -> event.vin().equals(vin));
}
}
Let’s modify the GRPC service prepared in the previous article to insert the method and use our new service to publish events.
@Override
public void insert(Geolocation request, StreamObserver<Empty> responseObserver) {
GeolocationEvent geolocationEvent = convertToGeolocationEvent(request);
geolocationRepository.save(geolocationEvent);
geolocationService.publish(geolocationEvent);
responseObserver.onNext(Empty.newBuilder().build());
responseObserver.onCompleted();
}
Finally, let’s move to our GPS tracker implementation; we can replace the previous dummy implementation with the following one:
@Override
public void subscribe(SubscribeRequest request, StreamObserver<Geolocation> responseObserver) {
geolocationService.getRealTimeEvents(request.getVin())
.subscribe(event -> responseObserver.onNext(toProto(event)),
responseObserver::onError,
responseObserver::onCompleted);
}
Here we take advantage of using Reactor, as we not only can subscribe for incoming events but also handle errors and completion of stream in the same way.
To map our internal model to response, the following helper method is used:
private static Geolocation toProto(GeolocationEvent event) {
return Geolocation.newBuilder()
.setVin(event.vin())
.setOccurredOn(TimestampMapper.convertInstantToTimestamp(event.occurredOn()))
.setSpeed(Int32Value.of(event.speed()))
.setCoordinates(LatLng.newBuilder()
.setLatitude(event.coordinates().latitude())
.setLongitude(event.coordinates().longitude())
.build())
.build();
}
Action!
As you may be noticed, we sent the following requests with GPS position and received them in real-time from our open stream connection. Streaming data using gRPC or another tool like Kafka is widely used in many IoT systems, including Automotive.
Bidirectional stream
What if our client would like to receive data for multiple vehicles but without initial knowledge about all vehicles they are interested in? Creating new connections for each vehicle isn’t the best approach. But worry no more! While using gRPC, the client may reuse the same connection as it supports bidirectional streaming, which means that both client and server may send messages using open channels.
rpc SubscribeMany(stream SubscribeRequest) returns (stream Geolocation);
Unfortunately, IntelliJ doesn’t allow us to test this functionality with their built-in client, so we have to develop one ourselves.
localhost:9090/com. grapeup.geolocation.GpsTracker/SubscribeMany
com.intellij.grpc.requests.RejectedRPCException: Unsupported method is called
Our dummy client could look something like that, based on generated classes from the protobuf contract:
var channel = ManagedChannelBuilder.forTarget("localhost:9090")
.usePlaintext()
.build();
var observer = GpsTrackerGrpc.newStub(channel)
.subscribeMany(new StreamObserver<>() {
@Override
public void onNext(Geolocation value) {
System.out.println(value);
}
@Override
public void onError(Throwable t) {
System.err.println("Error " + t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Completed.");
}
});
observer.onNext(SubscribeRequest.newBuilder().setVin("JF2SJAAC1EH511148").build());
observer.onNext(SubscribeRequest.newBuilder().setVin("1YVGF22C3Y5152251").build());
while (true) {} // to keep client subscribing for demo purposes :)
If you send the updates for the following random VINs: JF2SJAAC1EH511148
, 1YVGF22C3Y5152251
, you should be able to see the output in the console. Check it out!
Tip of the iceberg
Presented examples are just gRPC basics; there is much more to it, like disconnecting from the channel from both ends and reconnecting to the server in case of network failure. The following articles were intended to share with YOU that gRPC architecture has so much to offer, and there are plenty of possibilities for how it can be used in systems. Especially in systems requiring low latency or the ability to provide client code with strict contract validation.
Check related articles
Read our blog and stay informed about the industry's latest trends and solutions.
see all articles