Streaming APIs

How to create APIs that stream data

Encore makes it easy to create API endpoints that can stream data to and from your applications.

Related example
Simple chat app using the Streaming API create a WebSocket stream from a web frontend.
$ encore app create --example=ts/streaming-chat

Different kinds of stream

Encore supports three types of streams, each designed for a specific data flow direction:

  • StreamIn: When you need to stream data into your service.
  • StreamOut: When you need to stream data out from your service.
  • StreamInOut: When you need to stream data into and out of your service.

How it works

When you connect to a streaming API endpoint, the client and server will do a handshake in the form of a HTTP request. If the server accepts the handshake request, a stream is returned to the client and to the API handler. Under the hood the stream is a WebSocket that can be used to send and receive messages over.

Path parameters, query parameters and headers can be passed via the handshake request. The stream returned to the client and to the API handler are typed with the incoming and outgoing message types that you specify in your API.

Defining streaming APIs

Similar to how you can define RESTful API endpoints with Encore, you can also easily define type-safe streaming API endpoints. They accept a handshake type, an incoming and an outgoing message type (depending on your choice of stream direction). The type parameters are required for Encore to understand your API.

If you don't need any data from the handshake, you can ignore that type, and only specify the incoming and outgoing message types.

Related example
Example showcases the all different streaming APIs: api.streamIn, api.streamOut, and api.streamInOut
$ encore app create --example=ts/streaming

StreamIn

Use api.streamIn when you want to have a stream from client to server, for example if you are uploading something from the client to the server:

import { api } from "encore.dev/api"; import log from "encore.dev/log"; // Used to pass initial data, optional. interface Handshake { user: string; } // What the clients sends over the stream. interface Message { data: string; done: boolean; } // Returned when the stream is done, optional. interface Response { success: boolean; } export const uploadStream = api.streamIn<Handshake, Message, Response>( { path: "/upload", expose: true }, async (handshake, stream) => { const chunks: string[] = []; try { // The stream object is an AsyncIterator that yields incoming messages. for await (const data of stream) { chunks.push(data.data); // Stop the stream if the client sends a "done" message if (data.done) break; } } catch (err) { log.error(`Upload error by ${handshake.user}:`, err); return { success: false }; } log.info(`Upload complete by ${handshake.user}`); return { success: true }; }, );

For api.streamIn you need to specify the incoming message type, the handshake type is optional. You can also specify a optional outgoing type if your API handler responds with some data when it is done with the incoming stream.

api.streamIn<Handshake, Incoming, Outgoing>( {...}, async (handshake, stream): Promise<Outgoing> => {...})
api.streamIn<Handshake, Incoming>( {...}, async (handshake, stream) => {...})
api.streamIn<Incoming, Outgoing>( {...}, async (stream): Promise<Outgoing> => {...})
api.streamIn<Incoming>( {...}, async (stream) => {...})

StreamOut

Use api.streamOut if you want to have a stream of messages from the server to client, for example if you are streaming logs from the server:

import { api, StreamOut } from "encore.dev/api"; import log from "encore.dev/log"; // Used to pass initial data, optional. interface Handshake { rows: number; } // What the server sends over the stream. interface Message { row: string; } export const logStream = api.streamOut<Handshake, Message>( { path: "/logs", expose: true }, async (handshake, stream) => { try { for await (const row of mockedLogs(handshake.rows, stream)) { // Send the message to the client await stream.send({ row }); } } catch (err) { log.error("Upload error:", err); } }, ); // This function generates an async iterator that yields mocked log rows async function* mockedLogs(rows: number, stream: StreamOut<Message>) { for (let i = 0; i < rows; i++) { yield new Promise<string>((resolve) => { setTimeout(() => { resolve(`Log row ${i + 1}`); }, 500); }); } // Close the stream when all logs have been sent await stream.close(); }

For api.streamOut you need to specify the outgoing message type, the handshake type is optional.

api.streamOut<Handshake, Outgoing>( {...}, async (handshake, stream) => {...})
api.streamOut<Outgoing>( {...}, async (stream) => {...})

StreamInOut

Use api.streamInOut when you want to stream messages in both directions, for example if you are building a chat application:

import { api } from "encore.dev/api"; interface InMessage { // ... } interface OutMessage { // ... } export const ChatStream = api.streamInOut<InMessage, OutMessage>( { path: "/chat", expose: true }, async (stream) => { for await (const chatMessage of stream) { // Respond to the message by sending something back await stream.send({ /* ... */ }) } } );

For api.streamInOut you need to specify both the incoming and outgoing message types, the handshake type is optional.

api.streamInOut<Handshake, Incoming, Outgoing>( {...}, async (handshake, stream) => {...})
api.streamInOut<Incoming, Outgoing>( {...}, async (stream) => {...})

Handshake

When you connect to a streaming API endpoint, the client and server will do a handshake in the form of a HTTP request. For all stream types the handshake type is optional, and only needs to be used whenever you need data from the initial request, such as path parameters, query parameters or headers.

Note that if you add a handshake data type you also get two arguments to your handler, one for the handshake data and one for the stream, and if you omit the handshake type you only get the stream.

Requiring authentication

You can use your authHandler in the same way as for regular endpoints, just specify auth: true in your endpoint options. The auth data will be passed from the client to the server via query parameters or headers in the initial handshake request.

After a request has been successfully authenticated, you can access authentication data passed from the authHandler by calling getAuthData(). See more details in the auth handler docs.

Broadcasting messages

To broadcast messages to all connected clients, you can store the streams in a map and iterate over them when a new message is received. If a client disconnects, you can remove the stream from the map.

import { api, StreamInOut } from "encore.dev/api"; // Map to hold all connected streams const connectedStreams: Map< string, StreamInOut<ChatMessage, ChatMessage> > = new Map(); // Object sent from the client to the server when establishing a connection interface HandshakeRequest { id: string; } // Object by both server and client interface ChatMessage { username: string; msg: string; } export const chat = api.streamInOut<HandshakeRequest, ChatMessage, ChatMessage>( { expose: true, auth: false, path: "/chat" }, async (handshake, stream) => { connectedStreams.set(handshake.id, stream); try { // The stream object is an AsyncIterator that yields incoming messages. // The loop will continue as long as the client keeps the connection open. for await (const chatMessage of stream) { for (const [key, val] of connectedStreams) { try { // Send the users message to all connected clients. await val.send(chatMessage); } catch (err) { // If there is an error sending the message, remove the client from the map. connectedStreams.delete(key); } } } } catch (err) { // If there is an error reading from the stream, remove the client from the map. connectedStreams.delete(handshake.id); } // When the client disconnects, remove them from the map. connectedStreams.delete(handshake.id); }, );

Connecting with the client

Using the generated client, you can connect to a streaming API endpoint that have expose set to true. The client stream acts as an async iterator, allowing you to retrieve messages by simply iterating over it:

const stream = client.serviceName.endpointName(); for await (const msg of stream) { // Do something with each message }

To send messages to the service, use the async send method:

const stream = client.serviceName.endpointName(); await stream.send({ ... });

To handle network errors or do some cleanup after the connection is closed, you can attach event listeners on the underlying socket:

const stream = client.serviceName.endpointName(); stream.socket.on("error", (event) => { // An error occurred }); stream.socket.on("close", (event) => { // Connection was closed });