This document describes the changes for Decentraland RPC (@dcl/rpc) in the streams.
The current implementation of the RPC Streams works, confirming with an
ack=true
every message that is sent from the Server. It's like the
Client
requesting the messages of the Server
.
Current implementation sequence:
sequenceDiagram
participant C as Client
participant S as Server
C->>S: Request {message_id}
S->>C: Response {message_id,streaming=true,seqId=0}
C->>C: Generate async iterator for {message_id}
C->>S: StreamMessage {ack=true,message_id,seqId=0}
note over C: Ask for a new item to be generated using ack=true
S-->>C: StreamMessage {message_id,payload,seqId=1}
C->>S: StreamMessage {ack=true,message_id,seqId=1}
note over C: Close the message by responding<br/>the last ACK with ack=true,closed=true
S-->>C: StreamMessage {message_id,payload,seqId=2}
C->>S: StreamMessage {ack=true,message_id,seqId=2,closed=true}
S->>S: Close async Generator
C->>C: Close async Iterator
S-->>C: StreamMessage {message_id,closed=true}
C->>C: Close async iterator
The main idea of this implementation is to forget about the possible congestion that the transport can have. And generate messages at the time that is being requested.
But with this implementation, we can't send a continuous sequence of messages from the
Server
to the Client
or vice-versa. And the streams are very slow
due to the waiting sequence. This document will refer to this implementation as
the stream with ack
from now on.
Having this in mind, this proposal wants to change how streams work and have a use-case like
the stream with ack
but using bidirectional streams.
The main idea is to have:
The client makes a request, and the Server starts sending messages independently. The server receives a CloseStream message when the client decides to stop consuming the stream.
sequenceDiagram
participant C as Client
participant S as Server
C->>S: Request {message_id,payload}
S->>C: StreamMessage {message_id,streaming=true,seqId=0}
S-->>C: StreamMessage {message_id,payload,seqId=1}
S-->>C: StreamMessage {message_id,payload,seqId=2}
S-->>C: StreamMessage {message_id,payload,seqId=3}
S-->>C: StreamMessage {message_id,payload,seqId=n}
note over C: On closing stream
C->>S: CloseStream {message_id}
S->>C: CloseStreamAck {message_id}
Protobuf specification for Server Stream:
service SomeService {
rpc ServerStream(Request) returns (stream StreamResponse)
}
It is a stream generated in the client and consumed by the server. It works exactly like
Server Stream
, but the other way around.
Some considerations are needed for client streams:
The client stream MAY continue producing messages until the server sends the CloseClientStream message back to the client.
sequenceDiagram
participant C as Client
participant S as Server
C->>S: Request {message_id,payload,streaming=true}
alt Consume client stream
S->>C: OpenClientStream {message_id,streaming=true,seqId=0}
C-->>S: StreamMessage {message_id,payload,seqId=1}
C-->>S: StreamMessage {message_id,payload,seqId=2}
C-->>S: StreamMessage {message_id,payload,seqId=3}
C-->>S: StreamMessage {message_id,payload,seqId=n}
alt close from server
note over S: stream.close()
S-->>C: CloseClientStream {message_id}
else finishes client side
C-->>S: CloseClientStream {message_id}
note over S: stream.close()
end
end
S->>C: Response {message_id,streaming=true,seqId=0}
Protobuf specification for Client Stream:
service SomeService {
rpc ClientStream (stream StreamRequest) returns (Response)
}
In the case of Bidirectional streams, the Client and the Server can send messages independently without but not limited to synchronization between streams.
Some considerations for bidirectional streams:
async function* bidirectionalMethod(clientStream: AsyncIterator<Req>): AsyncGenerator<Res> {
for await (const request of clientStream) {
yield processRequest(request)
}
}
sequenceDiagram
participant C as Client
participant S as Server
C->>S: Request {message_id,payload}
S->>C: StreamMessage {message_id,streaming=true,seqId=0}
C-->>S: StreamMessage {message_id,payload,seqId=1}
C-->>S: StreamMessage {message_id,payload,seqId=2}
C-->>S: StreamMessage {message_id,payload,seqId=n}
S-->>C: StreamMessage {message_id,payload,seqId=1}
S-->>C: StreamMessage {message_id,payload,seqId=2}
S-->>C: StreamMessage {message_id,payload,seqId=n}
note over S: On closing stream
S->>C: CloseStream {message_id}
C->>S: CloseStreamAck {message_id}
Protobuf specification for Bidirectional Streams:
service {
rpc ClientStream(stream StreamRequest) returns (stream StreamResponse)
}
the stream with ack
)
If the developer wants to have a behaviour like the stream with ack
. It can just
send a StreamMessage
after you receive one of them. Like this sequence diagram
describes:
sequenceDiagram
participant C as Client
participant S as Server
C->>S: Request {message_id,payload}
S->>C: Response {message_id,streaming=true,seqId=0}
C-->>S: StreamMessage {message_id,payload,seqId=1}
S-->>C: StreamMessage {message_id,ack=true,seqId=1}
C-->>S: StreamMessage {message_id,payload,seqId=2}
S-->>C: StreamMessage {message_id,ack=true,seqId=2}
C-->>S: StreamMessage {message_id,payload,seqId=n}
S-->>C: StreamMessage {message_id,ack=true,seqId=n}
note over S: On closing stream
S->>C: CloseStream {message_id}
C->>S: CloseStreamAck {message_id}