Transactional Pub/Sub outbox
Guarantee consistency between your database and Pub/Sub subscribers
One of the hardest parts of building an event-driven application is ensuring consistency between services. A common pattern is for each service to have its own database and use Pub/Sub to notify other systems of business events. Inevitably this leads to inconsistencies since the Pub/Sub publishing is not transactional with the database writes.
While there are several approaches to solving this, it's important the solution doesn't add too much complexity to what is often an already complex architecture. Perhaps the best solution in this regard is the transactional outbox pattern.
Encore provides support for the transactional outbox pattern in the x.encore.dev/infra/pubsub/outbox package.
The transactional outbox works by binding a Pub/Sub topic to a database transaction, translating all calls to topic.Publish
into inserting a database row in an outbox
table. If/when the transaction later commits, the messages are picked up by
a Relay that polls the outbox
table and publishes the
messages to the actual Pub/Sub topic.
Publishing messages to the outbox
To publish messages to the outbox, a topic must first be bound to the outbox. This is done using Pub/Sub topic references which allows you to retain complete type safety and the same interface as regular Pub/Sub topics, allowing existing code to continue to work without changes.
Please note
In regular (non-outbox) usage the message id returned by topic.Publish
is the same as the message id the subscriber
receives when processing the message. With the outbox, this message id is not available until the transaction commits,
so topic.Publish
returns an id referencing the outbox row instead.
The topic binding supports pluggable storage backends, enabling use of the outbox pattern with any
transactional storage backend. Implementation are provided out-of-the-box for use with Encore's
encore.dev/storage/sqldb
package, as well as the standard library database/sql
and github.com/jackc/pgx/v5
drivers,
but it's easy to write your own for other use cases.
See the Go package reference for more information.
For example, to use a transactional outbox to notify subscribers when a user is created:
outbox.godb_migration.sql// Create a SignupsTopic somehow.
var SignupsTopic = pubsub.NewTopic[*SignupEvent](/* ... */)
// Create a topic ref with publisher permissions.
ref := pubsub.TopicRef[pubsub.Publisher[*SignupEvent]](SignupsTopic)
// Bind it to the transactional outbox
import "x.encore.dev/infra/pubsub/outbox"
var tx *sqldb.Tx // somehow get a transaction
ref = outbox.Bind(ref, outbox.TxPersister(tx))
// Calls to ref.Publish() will now insert a row in the outbox table.
Once the transaction commits any published messages via ref
above will be stored in the outbox
table.
Consuming messages from the outbox
Once committed, the messages are ready to be picked up and published to the actual Pub/Sub topic.
That is done via the Relay.
The relay continuously polls the outbox
table and publishes any new messages to the actual Pub/Sub topic.
The relay supports pluggable storage backends, enabling use of the outbox pattern with any transactional storage backend. An implementation is provided out-of-the-box that uses Encore's built-in SQL database support, but it's easy to write your own for other databases.
The topics to poll must be registered with the relay, typically during service initialization. For example:
user/service.gopackage user
import (
"context"
"encore.dev/pubsub"
"encore.dev/storage/sqldb"
"x.encore.dev/infra/pubsub/outbox"
)
type Service struct {
signupsRef pubsub.Publisher[*SignupEvent]
}
// db is the database the outbox table is stored in
var db = sqldb.NewDatabase(...)
// Create the SignupsTopic somehow.
var SignupsTopic = pubsub.NewTopic[*SignupEvent](/* ... */)
func initService() (*Service, error) {
// Initialize the relay to poll from our database.
relay := outbox.NewRelay(outbox.SQLDBStore(db))
// Register the SignupsTopic to be polled.
signupsRef := pubsub.TopicRef[pubsub.Publisher[*SignupEvent]](SignupsTopic)
outbox.RegisterTopic(relay, signupsRef)
// Start polling.
go relay.PollForMessage(context.Background(), -1)
return &Service{signupsRef: signupsRef}, nil
}