encore.dev/pubsub

Classes

Subscription

Type Parameters

Msg

Msg extends object

Constructors

Constructor
new Subscription<Msg>( topic, name, cfg): Subscription<Msg>;
Parameters
topic

Topic\<Msg>

name

string

cfg

SubscriptionConfig\<Msg>

Returns

Subscription\<Msg>


Topic

A topic is a resource to which you can publish messages to be delivered to subscribers of that topic.

Extends

Type Parameters

Msg

Msg extends object

Implements

Constructors

Constructor

new Topic<Msg>(name, cfg): Topic<Msg>

Parameters
name

string

cfg

TopicConfig\<Msg>

Returns

Topic\<Msg>

Overrides

TopicPerms.constructor

Properties

cfg

readonly cfg: TopicConfig<Msg>

name

readonly name: string

Methods

publish()

publish(msg): Promise<string>

Parameters
msg

Msg

Returns

Promise\<string>

Implementation of

Publisher.publish

ref()

ref<P>(): P

Type Parameters
P

P extends TopicPerms

Returns

P

Interfaces

Publisher

Extends

Type Parameters

Msg

Msg extends object

Methods

publish()

abstract publish(msg): Promise<string>

Parameters
msg

Msg

Returns

Promise\<string>


RetryPolicy

RetryPolicy defines how a subscription should handle retries after errors either delivering the message or processing the message.

The values given to this structure are parsed at compile time, such that the correct Cloud resources can be provisioned to support the queue.

As such the values given here may be clamped to the supported values by the target cloud. (i.e. min/max values brought within the supported range by the target cloud).

Properties

maxBackoff?

optional maxBackoff?: DurationString

The maximum time to wait between retries. Defaults to 10 minutes.

maxRetries?

optional maxRetries?: number

MaxRetries is used to control deadletter queuing logic, when: n == 0: A default value of 100 retries will be used n > 0: Encore will forward a message to a dead letter queue after n retries n == pubsub.InfiniteRetries: Messages will not be forwarded to the dead letter queue by the Encore framework

minBackoff?

optional minBackoff?: DurationString

The minimum time to wait between retries. Defaults to 10 seconds.


SubscriptionConfig

SubscriptionConfig is used when creating a subscription

The values given here may be clamped to the supported values by the target cloud. (i.e. ack deadline may be brought within the supported range by the target cloud pubsub implementation).

Type Parameters

Msg

Msg

Properties

ackDeadline?

optional ackDeadline?: DurationString

AckDeadline is the time a consumer has to process a message before it's returned to the subscription

Default is 30 seconds, however the ack deadline must be at least 1 second.

handler

handler: (msg) => Promise<unknown>

Handler is the function which will be called to process a message sent on the topic.

When this function returns an error the message will be negatively acknowledged (nacked), which will cause a redelivery attempt to be made (unless the retry policy's MaxRetries has been reached).

Parameters
msg

Msg

Returns

Promise\<unknown>

maxConcurrency?

optional maxConcurrency?: number

MaxConcurrency is the maximum number of messages which will be processed simultaneously per instance of the service for this subscription.

Note that this is per instance of the service, so if your service has scaled to 10 instances and this is set to 10, then 100 messages could be processed simultaneously.

If the value is negative, then there will be no limit on the number of messages processed simultaneously.

Note: This is not supported by all cloud providers; specifically on GCP when using Cloud Run instances on an unordered topic the subscription will be configured as a Push Subscription and will have an adaptive concurrency See GCP Push Delivery Rate.

This setting also has no effect on Encore Cloud environments. If not set, it uses a reasonable default based on the cloud provider.

messageRetention?

optional messageRetention?: DurationString

MessageRetention is how long an undelivered message is kept on the topic before it's purged.

Default is 7 days.

retryPolicy?

optional retryPolicy?: RetryPolicy

RetryPolicy defines how a message should be retried when the subscriber returns an error


TopicConfig

TopicConfig is used when creating a Topic

Type Parameters

Msg

Msg extends object

Properties

deliveryGuarantee

deliveryGuarantee: DeliveryGuarantee

DeliveryGuarantee is used to configure the delivery guarantee of a Topic

orderingAttribute?

optional orderingAttribute?: keyof { [Key in string | number | symbol as Extract<Msg[Key], brandedAttribute<string> | brandedAttribute<number> | brandedAttribute<false> | brandedAttribute<true>> extends never ? never : Key]: never }

OrderingAttribute is the message attribute to use as a ordering key for messages and delivery will ensure that messages with the same value will be delivered in the order they where published.

If OrderingAttribute is not set, messages can be delivered in any order.

It is important to note, that in the case of an error being returned by a subscription handler, the message will be retried before any subsequent messages for that ordering key are delivered. This means depending on the retry configuration, a large backlog of messages for a given ordering key may build up. When using OrderingAttribute, it is recommended to use reason about your failure modes and set the retry configuration appropriately.

Once the maximum number of retries has been reached, the message will be forwarded to the dead letter queue, and the next message for that ordering key will be delivered.

To create attributes on a message, use the Attribute type:

type UserEvent = { user_id: Attribute<string>; action: string; }

const topic = new Topic<UserEvent>("user-events", { deliveryGuarantee: DeliveryGuarantee.AtLeastOnce, orderingAttribute: "user_id", // Messages with the same user-id will be delivered in the order they where published })

topic.publish(ctx, {user_id: "1", action: "login"}) // This message will be delivered before the logout topic.publish(ctx, {user_id: "2", action: "login"}) // This could be delivered at any time because it has a different user id topic.publish(ctx, {user_id: "1", action: "logout"}) // This message will be delivered after the first message

By using OrderingAttribute, the throughput will be limited depending on the cloud provider:

Note: OrderingAttribute currently has no effect during local development.


TopicPerms

Extended by

Type Aliases

Attribute

type Attribute<T> = T | brandedAttribute<T>

Attribute represents a field on a message that should be sent as an attribute in a PubSub message, rather than in the message body.

This is useful for ordering messages, or for filtering messages on a subscription - otherwise you should not use this.

To create attributes on a message, use the Attribute type: type Message = { user_id: Attribute<number>; name: string; };

const msg: Message = { user_id: 123, name: "John Doe", };

The union of brandedAttribute is simply used to help the TypeScript compiler understand that the type is an attribute and allow the AttributesOf type to extract the keys of said type.

Type Parameters

T

T extends string | number | boolean


AttributesOf

type AttributesOf<T> = keyof { [Key in keyof T as Extract<T[Key], allBrandedTypes> extends never ? never : Key]: never }

AttributesOf is a helper type to extract all keys from an object who's type is an Attribute type.

For example: type Message = { user_id: Attribute<number>; name: string; age: Attribute<number>; };

type MessageAttributes = AttributesOf<Message>; // "user_id" | "age"

Type Parameters

T

T extends object


DeliveryGuarantee

type DeliveryGuarantee = "at-least-once" | "exactly-once"

DeliveryGuarantee is used to configure the delivery contract for a topic.

References

DurationString

Re-exports DurationString