encore.dev/pubsub
Classes
Subscription
Type Parameters
Msg
Msg extends object
Constructors
Constructor
new Subscription<Msg>(
topic,
name,
cfg): Subscription<Msg>;
Parameters
topic
Topic\<Msg>
name
string
cfg
SubscriptionConfig\<Msg>
Returns
Subscription\<Msg>
Topic
A topic is a resource to which you can publish messages to be delivered to subscribers of that topic.
Extends
Type Parameters
Msg
Msg extends object
Implements
Publisher\<Msg>
Constructors
Constructor
new Topic<Msg>(name, cfg): Topic<Msg>
Parameters
name
string
cfg
TopicConfig\<Msg>
Returns
Topic\<Msg>
Overrides
TopicPerms.constructor
Properties
cfg
readonly cfg: TopicConfig<Msg>
name
readonly name: string
Methods
publish()
publish(msg): Promise<string>
Parameters
msg
Msg
Returns
Promise\<string>
Implementation of
ref()
ref<P>(): P
Type Parameters
P
P extends TopicPerms
Returns
P
Interfaces
Publisher
Extends
Type Parameters
Msg
Msg extends object
Methods
publish()
abstract publish(msg): Promise<string>
Parameters
msg
Msg
Returns
Promise\<string>
RetryPolicy
RetryPolicy defines how a subscription should handle retries after errors either delivering the message or processing the message.
The values given to this structure are parsed at compile time, such that the correct Cloud resources can be provisioned to support the queue.
As such the values given here may be clamped to the supported values by the target cloud. (i.e. min/max values brought within the supported range by the target cloud).
Properties
maxBackoff?
optional maxBackoff?: DurationString
The maximum time to wait between retries. Defaults to 10 minutes.
maxRetries?
optional maxRetries?: number
MaxRetries is used to control deadletter queuing logic, when: n == 0: A default value of 100 retries will be used n > 0: Encore will forward a message to a dead letter queue after n retries n == pubsub.InfiniteRetries: Messages will not be forwarded to the dead letter queue by the Encore framework
minBackoff?
optional minBackoff?: DurationString
The minimum time to wait between retries. Defaults to 10 seconds.
SubscriptionConfig
SubscriptionConfig is used when creating a subscription
The values given here may be clamped to the supported values by the target cloud. (i.e. ack deadline may be brought within the supported range by the target cloud pubsub implementation).
Type Parameters
Msg
Msg
Properties
ackDeadline?
optional ackDeadline?: DurationString
AckDeadline is the time a consumer has to process a message before it's returned to the subscription
Default is 30 seconds, however the ack deadline must be at least 1 second.
handler
handler: (msg) => Promise<unknown>
Handler is the function which will be called to process a message sent on the topic.
When this function returns an error the message will be negatively acknowledged (nacked), which will cause a redelivery attempt to be made (unless the retry policy's MaxRetries has been reached).
Parameters
msg
Msg
Returns
Promise\<unknown>
maxConcurrency?
optional maxConcurrency?: number
MaxConcurrency is the maximum number of messages which will be processed simultaneously per instance of the service for this subscription.
Note that this is per instance of the service, so if your service has scaled to 10 instances and this is set to 10, then 100 messages could be processed simultaneously.
If the value is negative, then there will be no limit on the number of messages processed simultaneously.
Note: This is not supported by all cloud providers; specifically on GCP when using Cloud Run instances on an unordered topic the subscription will be configured as a Push Subscription and will have an adaptive concurrency See GCP Push Delivery Rate.
This setting also has no effect on Encore Cloud environments. If not set, it uses a reasonable default based on the cloud provider.
messageRetention?
optional messageRetention?: DurationString
MessageRetention is how long an undelivered message is kept on the topic before it's purged.
Default is 7 days.
retryPolicy?
optional retryPolicy?: RetryPolicy
RetryPolicy defines how a message should be retried when the subscriber returns an error
TopicConfig
TopicConfig is used when creating a Topic
Type Parameters
Msg
Msg extends object
Properties
deliveryGuarantee
deliveryGuarantee: DeliveryGuarantee
DeliveryGuarantee is used to configure the delivery guarantee of a Topic
orderingAttribute?
optional orderingAttribute?: keyof { [Key in string | number | symbol as Extract<Msg[Key], brandedAttribute<string> | brandedAttribute<number> | brandedAttribute<false> | brandedAttribute<true>> extends never ? never : Key]: never }
OrderingAttribute is the message attribute to use as a ordering key for messages and delivery will ensure that messages with the same value will be delivered in the order they where published.
If OrderingAttribute is not set, messages can be delivered in any order.
It is important to note, that in the case of an error being returned by a subscription handler, the message will be retried before any subsequent messages for that ordering key are delivered. This means depending on the retry configuration, a large backlog of messages for a given ordering key may build up. When using OrderingAttribute, it is recommended to use reason about your failure modes and set the retry configuration appropriately.
Once the maximum number of retries has been reached, the message will be forwarded to the dead letter queue, and the next message for that ordering key will be delivered.
To create attributes on a message, use the Attribute type:
type UserEvent = { user_id: Attribute<string>; action: string; }
const topic = new Topic<UserEvent>("user-events", { deliveryGuarantee: DeliveryGuarantee.AtLeastOnce, orderingAttribute: "user_id", // Messages with the same user-id will be delivered in the order they where published })
topic.publish(ctx, {user_id: "1", action: "login"}) // This message will be delivered before the logout topic.publish(ctx, {user_id: "2", action: "login"}) // This could be delivered at any time because it has a different user id topic.publish(ctx, {user_id: "1", action: "logout"}) // This message will be delivered after the first message
By using OrderingAttribute, the throughput will be limited depending on the cloud provider:
- AWS: 300 messages per second for the topic (see AWS SQS Quotas).
- GCP: 1MB/s for each ordering key (see GCP PubSub Quotas).
Note: OrderingAttribute currently has no effect during local development.
TopicPerms
Extended by
Type Aliases
Attribute
type Attribute<T> = T | brandedAttribute<T>
Attribute represents a field on a message that should be sent as an attribute in a PubSub message, rather than in the message body.
This is useful for ordering messages, or for filtering messages on a subscription - otherwise you should not use this.
To create attributes on a message, use the Attribute type:
type Message = {
user_id: Attribute<number>;
name: string;
};
const msg: Message = { user_id: 123, name: "John Doe", };
The union of brandedAttribute is simply used to help the TypeScript compiler understand that the type is an attribute and allow the AttributesOf type to extract the keys of said type.
Type Parameters
T
T extends string | number | boolean
AttributesOf
type AttributesOf<T> = keyof { [Key in keyof T as Extract<T[Key], allBrandedTypes> extends never ? never : Key]: never }
AttributesOf is a helper type to extract all keys from an object who's type is an Attribute type.
For example: type Message = { user_id: Attribute<number>; name: string; age: Attribute<number>; };
type MessageAttributes = AttributesOf<Message>; // "user_id" | "age"
Type Parameters
T
T extends object
DeliveryGuarantee
type DeliveryGuarantee = "at-least-once" | "exactly-once"
DeliveryGuarantee is used to configure the delivery contract for a topic.
References
DurationString
Re-exports DurationString