Streaming APIs
How to create APIs that stream data
Encore makes it easy to create API endpoints that can stream data to and from your applications.
Different kinds of stream
Encore supports three types of streams, each designed for a specific data flow direction:
- StreamIn: When you need to stream data into your service.
- StreamOut: When you need to stream data out from your service.
- StreamInOut: When you need to stream data into and out of service.
How it works
When you connect to a streaming API endpoint, the client and server will do a handshake in the form of a http request. If the server accepts the handshake request, a stream is returned to the client and to the API handler. Under the hood the stream is a websocket that can be used to send and receive messages over.
Path parameters, query parameters and headers can be passed via the handshake request. The stream returned to the client and to the API handler are typed with the incoming and outgoing message types that you specify in your api.
Defining streaming APIs
Similar to how you can define RESTful API endpoints with Encore, you can also easily define type-safe streaming API endpoints. They accept a handshake type, an incoming and an outgoing message type (depending on your choice of stream direction).
If you don't need any data from the handshake, you can ignore that type, and only specify the incoming and outgoing message types.
For example, if you want to have a stream of messages from server to client for a specific chat room, you could setup an api.streamOut
:
interface Handshake {
roomId: string;
}
interface Message {
text: string;
sender: string;
}
// messageStream streams out all chat messages for a certain id
export const messageStream = api.streamOut<Handshake, Message>(
{ path: "/chat/:roomId", expose: true },
async (handshake, stream) => {
// Use `handshake.roomId` to fetch messages
for await (const msg of messages) {
await stream.send(msg);
}
}
);
Or if you want to have a stream from client to server, you can set it up with api.streamIn
:
interface DataChunk {
// ...
}
export const uploadStream = api.streamIn<DataChunk>(
{ path: "/upload", expose: true },
async (stream) => {
for await (const data of stream) {
// do something with data
}
}
);
Finally if you want to stream messages in both directions, you should use api.streamInOut
:
interface PingMessage {
// ...
}
interface PongMessage {
// ...
}
export const pingpongStream = api.streamInOut<PingMessage, PongMessage>(
{ path: "/pingpong", expose: true },
async (stream) => {
for await (const ping of stream) {
// do something with the ping payload
await stream.send({ /* ... */ })
}
}
);
The type parameters you need to specify depends on the kind of stream you are using. For an InOut stream you'll need to specify an incoming and outgoing type, while for the other types you need to specify either the incoming or outgoing type depending on the direction of the stream. For incoming stream endpoints you can also specify a optional outgoing type if your api handler responds with some data when it is done with the incoming stream.
For all stream types the handshake type is optional, and only needs to be used whenever you need data from the initial request, such as path parameters, query parameters or headers.
These are the different variations that you can use when defining your streaming APIs:
api.streamInOut<Handshake, Incoming, Outgoing>({ ... }, async (handshake, stream) => {})
api.streamInOut<Incoming, Outgoing>({ ... }, async (stream) => {})
api.streamIn<Handshake, Incoming, Outgoing>({ ... }, async (handshake, stream): Promise<Outgoing> => {})
api.streamIn<Handshake, Incoming>({ ... }, async (handshake, stream) => {})
api.streamIn<Incoming, Outgoing>({ ... }, async (stream): Promise<Outgoing> => {})
api.streamIn<Incoming>({ ... }, async (stream) => {})
api.streamOut<Handshake, Outgoing>({ ... }, async (handshake, stream) => {})
api.streamOut<Outgoing>({ ... }, async (stream) => {})
Note that if you add a handshake data type you also get two arguments to your handler, one for the handshake data and one for the stream, and if you omit the handshake type you only get the stream.
The type parameters are required for Encore to understand your api. Encore currently does not support inferring the types from the handler functions types.
In your handler you get a stream class, it is an async iterator for incoming messages, and have an async send
method for sending data out.
The same goes for the generated client, when you call the stream endpoint you will get a stream class returned that implements an async iterator for messages sent to the client, and an async send
method for sending data to the server.
Requiring authentication
You can use authHandler
in the same way as for regular endpoints, just specify auth: true
in your endpoint options. You can get the auth data in your handler by calling getAuthData()
. See more details in the auth handler docs.
Connecting with the client
Using the generated client, you can connect to a streaming API endpoint that have expose
set to true
. The client stream acts as an async iterator, allowing you to retrieve messages by simply iterating over it:
const stream = client.serviceName.endpointName();
for await (const msg of stream) {
// Do something with each message
}
To send messages to the service, use the send
method:
const stream = client.serviceName.endpointName();
await stream.send({ ... });
To handle network errors or do some cleanup after the connection is closed, you can attach event listeners on the underlying socket:
const stream = client.serviceName.endpointName();
stream.socket.on("error", (event) => {
// An error occurred
});
stream.socket.on("close", (event) => {
// Connection was closed
});
Learn more in the Client Library Generation docs.