Skip to content

Event Subscriptions 7.7

TIP

The new subscription model is leaner, and more efficient for background work than using the IProjection model that does a lot of preprocessing and grouping that is not necessary or always desirable for subscriptions anyway.

The existing projections model in Marten has a world of recipes for "projecting" Marten event storage into read-only views of the event data, but what if you need to carry out some kind of background processing on these events as they are captured? For example, maybe you need to:

  • Publish events to an external system as some kind of integration?
  • Carry out background processing based on a captured event
  • Build a view representation of the events in something outside of the current PostgreSQL database, like maybe an Elastic Search view for better searching

In previous versions of Marten, you had to utilize the IProjection interface as a mechanism for integrating Marten events to other systems or just for conducting background processing on published events as shown in the blog post Integrating Marten with other systems. Now though, you can also utilize Marten's ISubscription model that runs within Marten's async daemon subsystem to "push" events into your subscriptions as events flow into your system. Note that this is a background process within your application, and happen in a completely different thread than the initial work of appending and saving events to the Marten event storage.

Marten's Subscription Model

Subscriptions will always be an implementation of the ISubscription interface shown below:

cs
/// <summary>
/// Basic abstraction for custom subscriptions to Marten events through the async daemon. Use this in
/// order to do custom processing against an ordered stream of the events
/// </summary>
public interface ISubscription : IAsyncDisposable
{
    /// <summary>
    /// Processes a page of events at a time
    /// </summary>
    /// <param name="page"></param>
    /// <param name="controller">Use to log dead letter events that are skipped or to stop the subscription from processing based on an exception</param>
    /// <param name="operations">Access to Marten queries and writes that will be committed with the progress update for this subscription</param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller,
        IDocumentOperations operations,
        CancellationToken cancellationToken);
}

snippet source | anchor

So far, the subscription model gives you these abilities:

  • Access to the Marten IDocumentOperations service that is scoped to the processing of a single page and can be used to either query additional data or to make database writes within the context of the same transaction that Marten will use to record the current progress of the subscription to the database
  • Error handling abilities via the ISubscriptionController interface argument that can be used to record events that were skipped by the subscription or to completely stop all further processing
  • By returning an IChangeListener, the subscription can be notified right before and right after Marten commits the database transaction for any changes including recording the current progress of the subscription for the current page. This was done purposely to enable transactional outbox approaches like the one in Wolverine. See the async daemon diagnostics for more information.
  • The ability to filter the event types or stream types that the subscription is interested in as a way to greatly optimize the runtime performance by preventing Marten from having to fetch events that the subscription will not process
  • The ability to create the actual subscription objects from the application's IoC container when that is necessary
  • Flexible control over where or when the subscription starts when it is first applied to an existing event store
  • Some facility to "rewind and replay" subscriptions

To make this concrete, here's the simplest possible subscription you can make to simply write out a console message for every event:

cs
public class ConsoleSubscription: ISubscription
{
    public Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller, IDocumentOperations operations,
        CancellationToken cancellationToken)
    {
        Console.WriteLine($"Starting to process events from {page.SequenceFloor} to {page.SequenceCeiling}");
        foreach (var e in page.Events)
        {
            Console.WriteLine($"Got event of type {e.Data.GetType().NameInCode()} from stream {e.StreamId}");
        }

        // If you don't care about being signaled for
        return Task.FromResult(NullChangeListener.Instance);
    }

    public ValueTask DisposeAsync()
    {
        return new ValueTask();
    }
}

snippet source | anchor

And to register that with our Marten store:

cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("marten"));

        // Because this subscription has no service dependencies, we
        // can use this simple mechanism
        opts.Events.Subscribe(new ConsoleSubscription());

        // Or with additional configuration like:
        opts.Events.Subscribe(new ConsoleSubscription(), s =>
        {
            s.SubscriptionName = "Console"; // Override Marten's naming
            s.SubscriptionVersion = 2; // Potentially version as an all new subscription

            // Optionally create an allow list of
            // event types to subscribe to
            s.IncludeType<InvoiceApproved>();
            s.IncludeType<InvoiceCreated>();

            // Only subscribe to new events, and don't try
            // to apply this subscription to existing events
            s.Options.SubscribeFromPresent();
        });
    })
    .AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

snippet source | anchor

Here's a slightly more complicated sample that publishes events to a configured Kafka topic:

cs
public class KafkaSubscription: SubscriptionBase
{
    private readonly KafkaProducerConfig _config;

    public KafkaSubscription(KafkaProducerConfig config)
    {
        _config = config;

        SubscriptionName = "Kafka";

        // Access to any or all filtering rules
        IncludeType<InvoiceApproved>();

        // Fine grained control over how the subscription runs
        // in the async daemon
        Options.BatchSize = 1000;
        Options.MaximumHopperSize = 10000;

        // Effectively run as a hot observable
        Options.SubscribeFromPresent();
    }

    // The daemon will "push" a page of events at a time to this subscription
    public override async Task<IChangeListener> ProcessEventsAsync(
        EventRange page,
        ISubscriptionController controller,
        IDocumentOperations operations,
        CancellationToken cancellationToken)
    {
        using var kafkaProducer =
            new ProducerBuilder<string, string>(_config.ProducerConfig).Build();

        foreach (var @event in page.Events)
        {
            await kafkaProducer.ProduceAsync(_config.Topic,
                new Message<string, string>
                {
                    // store event type name in message Key
                    Key = @event.Data.GetType().Name,
                    // serialize event to message Value
                    Value = JsonConvert.SerializeObject(@event.Data)
                }, cancellationToken);

        }

        // We don't need any kind of callback, so the nullo is fine
        return NullChangeListener.Instance;
    }

}

// Just assume this is registered in your IoC container
public class KafkaProducerConfig
{
    public ProducerConfig? ProducerConfig { get; set; }
    public string? Topic { get; set; }
}

snippet source | anchor

This time, it's requiring IoC services injected through its constructor, so we're going to use this mechanism to add it to Marten:

cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("marten"));
    })
    // Marten also supports a Scoped lifecycle, and quietly forward Transient
    // to Scoped
    .AddSubscriptionWithServices<KafkaSubscription>(ServiceLifetime.Singleton, o =>
    {
        // This is a default, but just showing what's possible
        o.IncludeArchivedEvents = false;

        o.FilterIncomingEventsOnStreamType(typeof(Invoice));

        // Process no more than 10 events at a time
        o.Options.BatchSize = 10;
    })
    .AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

snippet source | anchor

Registering Subscriptions

INFO

Marten can support both the Singleton and Scoped lifetimes when using the IoC container to build out your subscription. If you specify Transient, Marten will still use the Scoped lifetime.

WARNING

Do not try to pull Marten's IDocumentSession or IQuerySession as IoC dependencies of your subscription as that can easily cause a bi-directional dependency issue that prevents application startup. Instead, push the IDocumentOperations from the subscription signature into your code that needs to use Marten during the execution of subscriptions. Failing that, the Marten team's suggestion is to have the subscription merely publish a message to a local service bus. Wolverine is a great tool for this.

Stateless subscriptions can simply be registered like this:

cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("marten"));

        // Because this subscription has no service dependencies, we
        // can use this simple mechanism
        opts.Events.Subscribe(new ConsoleSubscription());

        // Or with additional configuration like:
        opts.Events.Subscribe(new ConsoleSubscription(), s =>
        {
            s.SubscriptionName = "Console"; // Override Marten's naming
            s.SubscriptionVersion = 2; // Potentially version as an all new subscription

            // Optionally create an allow list of
            // event types to subscribe to
            s.IncludeType<InvoiceApproved>();
            s.IncludeType<InvoiceCreated>();

            // Only subscribe to new events, and don't try
            // to apply this subscription to existing events
            s.Options.SubscribeFromPresent();
        });
    })
    .AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

snippet source | anchor

But, if you need to utilize services from your IoC container within your subscription -- and you very likely do -- you can utilize the AddSubscriptionWithServices() mechanisms:

cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("marten"));
    })
    // Marten also supports a Scoped lifecycle, and quietly forward Transient
    // to Scoped
    .AddSubscriptionWithServices<KafkaSubscription>(ServiceLifetime.Singleton, o =>
    {
        // This is a default, but just showing what's possible
        o.IncludeArchivedEvents = false;

        o.FilterIncomingEventsOnStreamType(typeof(Invoice));

        // Process no more than 10 events at a time
        o.Options.BatchSize = 10;
    })
    .AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

snippet source | anchor

Starting Position of Subscriptions

By default, a registered subscription will be started at the very beginning of the event store (but does ignore archived events by default). That's not always going to be a good default, so Marten gives you the ability to specify the starting point of a subscription when a brand new subscription with no existing progress is started for the first time:

cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("marten"));
    })
    // Marten also supports a Scoped lifecycle, and quietly forward Transient
    // to Scoped
    .AddSubscriptionWithServices<KafkaSubscription>(ServiceLifetime.Singleton, o =>
    {
        // Start the subscription at the most current "high water mark" of the
        // event store. This effectively makes the subscription a "hot"
        // observable that only sees events when the subscription is active
        o.Options.SubscribeFromPresent();

        // Only process events in the store from a specified event sequence number
        o.Options.SubscribeFromSequence(1000);

        // Only process events in the store by determining the floor by the event
        // timestamp information
        o.Options.SubscribeFromTime(new DateTimeOffset(2024, 4, 1, 0, 0, 0, 0.Seconds()));

        // All of these options can be explicitly applied to only a single
        // named database when using multi-tenancy through separate databases
        o.Options.SubscribeFromPresent("Database1");
        o.Options.SubscribeFromSequence(2000, "Database2");
    })
    .AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

snippet source | anchor

If you specify starting rules for a certain database, that rule will only apply to that database. Other databases will fall through to global rules.

Event Filtering

Without any explicit configuration, all subscriptions will receive all possible event types, but Marten will filter out events marked as archived. If your subscription only cares about a subset of the possible event types in your application, there's a big performance advantage to filtering the event types for your subscription by effectively creating an allow list of allowable event types or stream types. The following is an example:

cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        opts.Connection(builder.Configuration.GetConnectionString("marten"));
    })
    // Marten also supports a Scoped lifecycle, and quietly forward Transient
    // to Scoped
    .AddSubscriptionWithServices<KafkaSubscription>(ServiceLifetime.Singleton, o =>
    {
        // Archived events are ignored by default, but you can override that here
        o.IncludeArchivedEvents = true;

        // If you use more than one type of stream type marker, it might
        // be quick step to just include any events from a stream marked
        // as the "Invoice" type
        o.FilterIncomingEventsOnStreamType(typeof(Invoice));

        // Or be explicit about the specific event types
        // NOTE: you need to use concrete types here
        o.IncludeType<InvoiceCreated>();
        o.IncludeType<InvoiceApproved>();
    })
    .AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

snippet source | anchor

Note that all filters are applied with a SQL OR operator in the underlying data fetching.

Using SubscriptionBase

The SubscriptionBase class can be used as a convenient base class for subscriptions that also serves to embed all the various configuration options for that subscription right into the subscription itself. The usage of that base class is shown below:

cs
public class KafkaSubscription: SubscriptionBase
{
    private readonly KafkaProducerConfig _config;

    public KafkaSubscription(KafkaProducerConfig config)
    {
        _config = config;

        SubscriptionName = "Kafka";

        // Access to any or all filtering rules
        IncludeType<InvoiceApproved>();

        // Fine grained control over how the subscription runs
        // in the async daemon
        Options.BatchSize = 1000;
        Options.MaximumHopperSize = 10000;

        // Effectively run as a hot observable
        Options.SubscribeFromPresent();
    }

    // The daemon will "push" a page of events at a time to this subscription
    public override async Task<IChangeListener> ProcessEventsAsync(
        EventRange page,
        ISubscriptionController controller,
        IDocumentOperations operations,
        CancellationToken cancellationToken)
    {
        using var kafkaProducer =
            new ProducerBuilder<string, string>(_config.ProducerConfig).Build();

        foreach (var @event in page.Events)
        {
            await kafkaProducer.ProduceAsync(_config.Topic,
                new Message<string, string>
                {
                    // store event type name in message Key
                    Key = @event.Data.GetType().Name,
                    // serialize event to message Value
                    Value = JsonConvert.SerializeObject(@event.Data)
                }, cancellationToken);

        }

        // We don't need any kind of callback, so the nullo is fine
        return NullChangeListener.Instance;
    }

}

// Just assume this is registered in your IoC container
public class KafkaProducerConfig
{
    public ProducerConfig? ProducerConfig { get; set; }
    public string? Topic { get; set; }
}

snippet source | anchor

Rewinding or Replaying Subscriptions

INFO

There are plans for a commercial add on to Marten to expose this functionality through a user interface control panel

Marten today has a limited ability to rewind a subscription to a certain point, then restart it to run continuously from there on. Note that this only works today within a single process. Here's a sample of doing this operation:

cs
// IProjectionCoordinator is a service from Marten that's added to your IoC
// container and gives you access to the running async daemon instance in
// your process
public static async Task rewinding_subscription(IProjectionCoordinator coordinator)
{
    var daemon = coordinator.DaemonForMainDatabase();

    // Rewind and restart the named subscription at sequence 0
    await daemon.RewindSubscriptionAsync("Kafka",  CancellationToken.None);

    // Rewind and restart the named subscription at sequence 2000
    await daemon.RewindSubscriptionAsync("Kafka",  CancellationToken.None, sequenceFloor:2000);

    // Rewind and restart the named subscription for the events after a certain time
    await daemon.RewindSubscriptionAsync("Kafka",  CancellationToken.None, timestamp:DateTimeOffset.UtcNow.Subtract(1.Days()));
}

snippet source | anchor

Error Handling

WARNING

If you allow an exception to bubble out of the ProcessEventsAsync() method in your subscription, Marten will treat that as a critical exception that will rollback the ongoing work and pause the subscription. The subscription will be "rewound" to its previous position when Marten tries to restart the subscription.

As the author of a subscription, you should strive to handle exceptions internally within the subscription itself as much as possible. You do have the ability to use the ISubscriptionController argument to record "dead letter events" that are skipped internally with an exception or to signal to Marten when a subscription has to be stopped partway thought the current page. Doing this will allow the previous work to go forward, but the subscription will be paused afterward at the point that the controller is told.

The following is an example of using these facilities for error handling:

cs
public class ErrorHandlingSubscription: SubscriptionBase
{
    public override async Task<IChangeListener> ProcessEventsAsync(
        // The current "page" of events in strict sequential order
        // If using conjoined tenancy, this will be a mix of tenants!
        EventRange page,
        ISubscriptionController controller,

        // This gives you access to make "writes" to the
        // underlying Marten store
        IDocumentOperations operations,
        CancellationToken cancellationToken)
    {

        long lastProcessed = page.SequenceFloor;

        // Do any processing of events you want here
        foreach (var e in page.Events)
        {
            Console.WriteLine($"Got event of type {e.Data.GetType().NameInCode()} from stream {e.StreamId}");

            try
            {
                await handleEvent(e);
                lastProcessed = e.Sequence;
            }
            catch (ReallyBadException ex)
            {
                // We've detected some kind of critical exception that makes us
                // want to stop all further processing
                await controller.ReportCriticalFailureAsync(ex, lastProcessed);
            }
            catch (Exception ex)
            {
                // Not great, but hey, we can skip this and keep going!
                await controller.RecordDeadLetterEventAsync(e, ex);
            }
        }

        // This is a mechanism for subscriptions to "know" when the progress for a page of events and any
        // pending writes to the Marten store are about to be committed or have just been committed
        // This was added specifically to enable Wolverine integration with its transactional outbox
        return new Callback();
    }

    private async Task handleEvent(IEvent @event)
    {
        // do some custom work on this event
    }

    // This is a mechanism to allow the subscription to "know" when Marten is about to persist
    internal class Callback: IChangeListener
    {
        public Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token)
        {
            Console.WriteLine("Marten is about to make a commit for any changes");
            return Task.CompletedTask;
        }

        public Task BeforeCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token)
        {
            Console.WriteLine("Marten just made a commit for any changes");
            return Task.CompletedTask;
        }
    }
}

public class ReallyBadException: Exception
{
    public ReallyBadException(string message) : base(message)
    {
    }
}

snippet source | anchor

Released under the MIT License.