Streaming APIs
How to create APIs that stream data
Encore makes it easy to create API endpoints that can stream data to and from your applications.
Related example
$ encore app create --example=ts/streaming-chat
Different kinds of stream
Encore supports three types of streams, each designed for a specific data flow direction:
- StreamIn: When you need to stream data into your service.
- StreamOut: When you need to stream data out from your service.
- StreamInOut: When you need to stream data into and out of your service.
How it works
When you connect to a streaming API endpoint, the client and server will do a handshake in the form of a HTTP request. If the server accepts the handshake request, a stream is returned to the client and to the API handler. Under the hood the stream is a WebSocket that can be used to send and receive messages over.
Path parameters, query parameters and headers can be passed via the handshake request. The stream returned to the client and to the API handler are typed with the incoming and outgoing message types that you specify in your API.
Defining streaming APIs
Similar to how you can define RESTful API endpoints with Encore, you can also easily define type-safe streaming API endpoints. They accept a handshake type, an incoming and an outgoing message type (depending on your choice of stream direction). The type parameters are required for Encore to understand your API.
If you don't need any data from the handshake, you can ignore that type, and only specify the incoming and outgoing message types.
Related example
$ encore app create --example=ts/streaming
StreamIn
Use api.streamIn
when you want to have a stream from client to server, for example if you are uploading something from the client to the server:
import { api } from "encore.dev/api";
import log from "encore.dev/log";
// Used to pass initial data, optional.
interface Handshake {
user: string;
}
// What the clients sends over the stream.
interface Message {
data: string;
done: boolean;
}
// Returned when the stream is done, optional.
interface Response {
success: boolean;
}
export const uploadStream = api.streamIn<Handshake, Message, Response>(
{ path: "/upload", expose: true },
async (handshake, stream) => {
const chunks: string[] = [];
try {
// The stream object is an AsyncIterator that yields incoming messages.
for await (const data of stream) {
chunks.push(data.data);
// Stop the stream if the client sends a "done" message
if (data.done) break;
}
} catch (err) {
log.error(`Upload error by ${handshake.user}:`, err);
return { success: false };
}
log.info(`Upload complete by ${handshake.user}`);
return { success: true };
},
);
For api.streamIn
you need to specify the incoming message type, the handshake type is optional. You can also specify a optional outgoing type if your API handler responds with some data when it is done with the incoming stream.
api.streamIn<Handshake, Incoming, Outgoing>(
{...}, async (handshake, stream): Promise<Outgoing> => {...})
api.streamIn<Handshake, Incoming>(
{...}, async (handshake, stream) => {...})
api.streamIn<Incoming, Outgoing>(
{...}, async (stream): Promise<Outgoing> => {...})
api.streamIn<Incoming>(
{...}, async (stream) => {...})
StreamOut
Use api.streamOut
if you want to have a stream of messages from the server to client, for example if you are streaming logs from the server:
import { api, StreamOut } from "encore.dev/api";
import log from "encore.dev/log";
// Used to pass initial data, optional.
interface Handshake {
rows: number;
}
// What the server sends over the stream.
interface Message {
row: string;
}
export const logStream = api.streamOut<Handshake, Message>(
{ path: "/logs", expose: true },
async (handshake, stream) => {
try {
for await (const row of mockedLogs(handshake.rows, stream)) {
// Send the message to the client
await stream.send({ row });
}
} catch (err) {
log.error("Upload error:", err);
}
},
);
// This function generates an async iterator that yields mocked log rows
async function* mockedLogs(rows: number, stream: StreamOut<Message>) {
for (let i = 0; i < rows; i++) {
yield new Promise<string>((resolve) => {
setTimeout(() => {
resolve(`Log row ${i + 1}`);
}, 500);
});
}
// Close the stream when all logs have been sent
await stream.close();
}
For api.streamOut
you need to specify the outgoing message type, the handshake type is optional.
api.streamOut<Handshake, Outgoing>(
{...}, async (handshake, stream) => {...})
api.streamOut<Outgoing>(
{...}, async (stream) => {...})
StreamInOut
Use api.streamInOut
when you want to stream messages in both directions, for example if you are building a chat application:
import { api } from "encore.dev/api";
interface InMessage {
// ...
}
interface OutMessage {
// ...
}
export const ChatStream = api.streamInOut<InMessage, OutMessage>(
{ path: "/chat", expose: true },
async (stream) => {
for await (const chatMessage of stream) {
// Respond to the message by sending something back
await stream.send({ /* ... */ })
}
}
);
For api.streamInOut
you need to specify both the incoming and outgoing message types, the handshake type is optional.
api.streamInOut<Handshake, Incoming, Outgoing>(
{...}, async (handshake, stream) => {...})
api.streamInOut<Incoming, Outgoing>(
{...}, async (stream) => {...})
Handshake
When you connect to a streaming API endpoint, the client and server will do a handshake in the form of a HTTP request. For all stream types the handshake type is optional, and only needs to be used whenever you need data from the initial request, such as path parameters, query parameters or headers.
Note that if you add a handshake data type you also get two arguments to your handler, one for the handshake data and one for the stream, and if you omit the handshake type you only get the stream.
Requiring authentication
You can use your authHandler
in the same way as for regular endpoints, just specify auth: true
in your endpoint options. The auth data will be passed from the client to the server via query parameters or headers in the initial handshake request.
After a request has been successfully authenticated, you can access authentication data passed from the authHandler
by calling getAuthData()
. See more details in the auth handler docs.
Broadcasting messages
To broadcast messages to all connected clients, you can store the streams in a map and iterate over them when a new message is received. If a client disconnects, you can remove the stream from the map.
import { api, StreamInOut } from "encore.dev/api";
// Map to hold all connected streams
const connectedStreams: Map<
string,
StreamInOut<ChatMessage, ChatMessage>
> = new Map();
// Object sent from the client to the server when establishing a connection
interface HandshakeRequest {
id: string;
}
// Object by both server and client
interface ChatMessage {
username: string;
msg: string;
}
export const chat = api.streamInOut<HandshakeRequest, ChatMessage, ChatMessage>(
{ expose: true, auth: false, path: "/chat" },
async (handshake, stream) => {
connectedStreams.set(handshake.id, stream);
try {
// The stream object is an AsyncIterator that yields incoming messages.
// The loop will continue as long as the client keeps the connection open.
for await (const chatMessage of stream) {
for (const [key, val] of connectedStreams) {
try {
// Send the users message to all connected clients.
await val.send(chatMessage);
} catch (err) {
// If there is an error sending the message, remove the client from the map.
connectedStreams.delete(key);
}
}
}
} catch (err) {
// If there is an error reading from the stream, remove the client from the map.
connectedStreams.delete(handshake.id);
}
// When the client disconnects, remove them from the map.
connectedStreams.delete(handshake.id);
},
);
Connecting with the client
Using the generated client, you can connect to a streaming API endpoint that have expose
set to true
. The client stream acts as an async iterator, allowing you to retrieve messages by simply iterating over it:
const stream = client.serviceName.endpointName();
for await (const msg of stream) {
// Do something with each message
}
To send messages to the service, use the async send
method:
const stream = client.serviceName.endpointName();
await stream.send({ ... });
To handle network errors or do some cleanup after the connection is closed, you can attach event listeners on the underlying socket:
const stream = client.serviceName.endpointName();
stream.socket.on("error", (event) => {
// An error occurred
});
stream.socket.on("close", (event) => {
// Connection was closed
});
Service to service streaming
Like with other endpoint types you can easily use streaming between services by importing ~encore/clients
.
If you want the stream to only be reachable by other services (and not from the public internet), set the expose
option to false.
Example of using a stream endpoint from a regular api endpoint:
import { chat } from "~encore/clients"; // import 'chat' service
export const myOtherAPI = api({}, async (): Promise<void> => {
const stream = await chat.myStreamingEndpoint();
// send a message to the chat service over the stream
await stream.send({ msg: "data" });
for await (const msg of stream) {
// handle incoming message
}
});