# RPC streaming ## Streaming API ### Sink and Source Basic element of streaming API is `rpc::sink` and `rpc::source`. The former is used to send data and the later is to receive it. Client and server has their own pair of sink and source. `rpc::sink` and `rpc::source` are templated classes where template parameters describe a type of the data that is sent/received. For instance the sink that is used to send messages containing `int` and `long` will be of a type `rpc::sink`. The opposite end of the stream will have a source of the type `rpc::source` which will be used to receive those messages. Messages are received at a source as `compat::optional` containing an actual message as an `std::tuple`. Unengaged optional means EOS (end of stream) - the stream was closed by a peer. If error happen before EOS is received a receiver cannot be sure it received all the data. To send the data using `rpc::source` one can write (assuming `seastar::async` context): ```c++ while (has_data()) { int data1 = get_data1(); long data2 = get_data2(); sink(data1, data2).get(); // sends data } sink.close().get(); // closes stream ``` To receive: ```c++ while (true) { seastar:optional> data = source().get(); if (!data) { // unengaged optional means EOS break; } else { auto [data1, data2] = *data; // process data } } ``` ### Creating a stream To open an RPC stream one needs RPC client to be created already. The stream will be associated with the client and will be aborted if the client is closed before streaming is. Given RPC client `rc` one creates `rpc::sink` like that (again assuming `seastar::async` context): ```c++ rpc::sink sink = rc.make_stream_sink().get0(); ``` Now the client has the sink that can be used for streaming data to a server, but how the server will get a corresponding `rpc::source` to read it? For that the sink should be passed to the server by an RPC call. To receive a sink a server should register an RPC handler that will be used to receive it along with any auxiliary information deemed necessary. To receive the sink above one may register an RPC handler like that: ```c++ rpc_proto.register_handler(1, [] (int aux_data, rpc::source source) { }); ``` Notice that `rpc::sink` is received as an `rpc::source` since at the server side it will be used for receive. Now all is left to do is for the client to invoke this RPC handler with aux_data and the sink. But what about communicating in another direction: from a server to a client. For that a server also has to have a sink and a client has to have a source and since messages in this direction may be of a different type than from client to server the sink and the source may be of a different type as well. Server initiates creation of a communication channel in another direction. It does this by creating a sink from the source it receives and returning the sink from RPC handler which will cause it to be received as a source by a client. Lets look at the full example where server want to send message containing sstring to a client. Server handler will look like that: ```c++ rpc_proto.register_handler(1, [] (int aux_data, rpc::source source) { rpc::sink sink = source.make_sink(); // use sink and source asynchronously return sink; }); ``` Client code will be: ```c++ auto rpc_call = rpc_proto.make_client (int, rpc::sink)>(1); rpc::sink sink = rc.make_stream_sink().get0(); rpc::source source = rpc_call(rc, aux_data, sink).get0(); // use sink and source here ``` ## Implementation notes ### RPC stream creation RPC stream is implemented as a separate TCP connection. RPC server knows that a connection will be used for streaming if during RPC negotiation `Stream parent` feature is present. The feature will contain ID of an RPC client that was used to create the stream. So in the example from previous chapter: ```c++ rpc::sink sink = rc.make_stream_sink().get0(); ``` the call will initiate a new TCP connection to the same server `rc` is connected to. During RPC protocol negotiation this connection will have `Stream parent` feature with `rc`'s ID as a value. ### Passing sink/source over RPC call When `rpc::sink` is sent over RPC call it is serialized as its connection ID. Server's RPC handler then lookups the connection and creates an `rpc::source` from it. When RPC handler returns `rpc::sink` the same happens in other direction.