summaryrefslogtreecommitdiffstats
path: root/src/seastar/doc/rpc-streaming.md
blob: d09d43deb398e24dfa7b28d9a8dc33729e3b73dd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# RPC streaming

## Streaming API

### Sink and Source

Basic element of streaming API is `rpc::sink` and `rpc::source`. The former
is used to send data and the later is to receive it. Client and server
has their own pair of sink and source. `rpc::sink` and `rpc::source` are
templated classes where template parameters describe a type of the data
that is sent/received. For instance the sink that is used to send messages
containing `int` and `long` will be of a type `rpc::sink<int, long>`.  The
opposite end of the stream will have a source of the type `rpc::source<int, long>`
which will be used to receive those messages. Messages are received at a
source as `compat::optional` containing an actual message as an `std::tuple`. Unengaged
optional means EOS (end of stream) - the stream was closed by a peer. If
error happen before EOS is received a receiver cannot be sure it received all
the data.

To send the data using `rpc::source<int, long>` one can write (assuming `seastar::async` context):

```c++
      while (has_data()) {
          int data1 = get_data1();
          long data2 = get_data2();
          sink(data1, data2).get(); // sends data
      }
      sink.close().get(); // closes stream
```

To receive:

```c++
      while (true) {
          seastar:optional<std::tuple<int, long>> data = source().get();
          if (!data) {
             // unengaged optional means EOS
             break;
          } else {
             auto [data1, data2] = *data;
             // process data
          }
      }
```

### Creating a stream

To open an RPC stream one needs RPC client to be created already. The stream
will be associated with the client and will be aborted if the client is closed
before streaming is. Given RPC client `rc` one creates `rpc::sink` like that
(again assuming `seastar::async` context):

```c++
    rpc::sink<int, long> sink = rc.make_stream_sink<int, long>().get0();
```

Now the client has the sink that can be used for streaming data to
a server, but how the server will get a corresponding `rpc::source` to
read it? For that the sink should be passed to the server by an RPC
call. To receive a sink a server should register an RPC handler that will
be used to receive it along with any auxiliary information deemed necessary.
To receive the sink above one may register an RPC handler like that:

```c++
    rpc_proto.register_handler(1, [] (int aux_data, rpc::source<int, long> source) {
    });
```

Notice that `rpc::sink` is received as an `rpc::source` since at the server
side it will be used for receive. Now all is left to do is for the client to
invoke this RPC handler with aux_data and the sink.

But what about communicating in another direction: from a server to a
client. For that a server also has to have a sink and a client has to have
a source and since messages in this direction may be of a different type
than from client to server the sink and the source may be of a different
type as well.

Server initiates creation of a communication channel in another direction.
It does this by creating a sink from the source it receives and returning the sink
from RPC handler which will cause it to be received as a source by a client. Lets look
at the full example where server want to send message containing sstring to a client.

Server handler will look like that:

```c++
    rpc_proto.register_handler(1, [] (int aux_data, rpc::source<int, long> source) {
        rpc::sink<sstring> sink = source.make_sink<sstring>();
        // use sink and source asynchronously
        return sink;
    });
```

Client code will be:

```c++
   auto rpc_call = rpc_proto.make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1);
   rpc::sink<int, long> sink = rc.make_stream_sink<int, long>().get0();
   rpc::source<sstring> source = rpc_call(rc, aux_data, sink).get0();
   // use sink and source here
```

## Implementation notes

### RPC stream creation

RPC stream is implemented as a separate TCP connection. RPC server knows that a connection
will be used for streaming if during RPC negotiation `Stream parent` feature is present.
The feature will contain ID of an RPC client that was used to create the stream.

So in the example from previous chapter:

```c++
    rpc::sink<int, long> sink = rc.make_stream_sink<int, long>().get0();
```

the call will initiate a new TCP connection to the same server `rc` is connected to. During RPC
protocol negotiation this connection will have `Stream parent` feature with `rc`'s ID as a value.

### Passing sink/source over RPC call

When `rpc::sink` is sent over RPC call it is serialized as its connection ID. Server's RPC handler
then lookups the connection and creates an `rpc::source` from it. When RPC handler returns `rpc::sink`
the same happens in other direction.