Skip to content

Appending Events

TIP

For CQRS style command handlers that append events to an existing event stream, the Marten team very strongly recommends the FetchForWriting API. This API is used underneath the Wolverine Aggregate Handler Workflow that is probably the very simplest possible way to build command handlers with Marten event sourcing today.

With Marten, events are captured and appended to logical "streams" of events. Marten provides methods to create a new stream with the initial events, append events to an existing stream, and also to append events with some protection for concurrent access to single streams.

The event data is persisted to two tables:

  1. mt_events -- stores the actual event data and some metadata that describes the event
  2. mt_streams -- stores information about the current state of an event stream. There is a foreign key relationship from mt_events to mt_streams

Events can be captured by either starting a new stream or by appending events to an existing stream. In addition, Marten has some tricks up its sleeve for dealing with concurrency issues that may result from multiple transactions trying to simultaneously append events to the same stream.

"Rich" vs "Quick" Appends 7.25

TIP

Long story short, the new "Quick" model appears to provide much better performance and scalability.

Before diving into starting new event streams or appending events to existing streams, just know that there are two different modes of event appending you can use with Marten:

cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
    {
        // This is the default Marten behavior from 4.0 on
        opts.Events.AppendMode = EventAppendMode.Rich;

        // Lighter weight mode that should result in better
        // performance, but with a loss of available metadata
        // within inline projections
        opts.Events.AppendMode = EventAppendMode.Quick;
    })
    .UseNpgsqlDataSource();

snippet source | anchor

The classic Rich mode will append events in a two step process where the local session will first determine all possible metadata for the events about to be appended such that inline projections can use event versions and the global event sequence numbers at the time that the inline projections are created.

WARNING

If you are using Inline projections with the "Quick" mode, just be aware that you will not have access to the final event sequence or stream version at the time the projections are built. Marten is able to set the stream version into a single stream projection document built Inline, but that's done on the server side. Just be warned.

The newer Quick mode eschews version and sequence metadata in favor of performing the event append and stream creation operations with minimal overhead. The improved performance comes at the cost of not having the IEvent.Version and IEvent.Sequence information available at the time that inline projections are executed.

From initial load testing, the "Quick" mode appears to lead to a 40-50% time reduction Marten's process of appending events. Your results will vary of course. Maybe more importantly, the "Quick" mode seems to make a large positive in the functioning of the asynchronous projections and subscriptions by preventing the event "skipping" issue that can happen with the "Rich" mode when a system becomes slow under heavy loads. Lastly, the Marten team believes that the "Quick" mode can alleviate concurrency issues from trying to append events to the same stream without utilizing optimistic or exclusive locking on the stream.

If using inline projections for a single stream (SingleStreamProjection or snapshots) and the Quick mode, the Marten team highly recommends using the IRevisioned interface on your projected aggregate documents so that Marten can "move" the version set by the database operations to the version of the projected documents loaded from the database later. Mapping a custom member to the Revision metadata will work as well.

Starting a new Stream

You can optionally start a new event stream against some kind of .Net type that theoretically marks the type of stream you're capturing. Marten does not yet use this type as anything more than metadata, but our thought is that some projections would key off this information and in a future version use that aggregate type to perform versioned snapshots of the entire stream. We may also make the aggregate type optional so that you could just supply either a string to mark the "stream type" or work without a stream type.

As usual, our sample problem domain is the Lord of the Rings style "Quest." For now, you can either start a new stream and let Marten assign the Guid id for the stream:

cs
public async Task start_stream_with_guid_stream_identifiers(IDocumentSession session)
{
    var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } };
    var departed = new MembersDeparted { Members = new[] { "Thom" } };

    // Let Marten assign a new Stream Id, and mark the stream with an aggregate type
    // 'Quest'
    var streamId1 = session.Events.StartStream<Quest>(joined, departed).Id;

    // Or pass the aggregate type in without generics
    var streamId2 = session.Events.StartStream(typeof(Quest), joined, departed);

    // Or instead, you tell Marten what the stream id should be
    var userDefinedStreamId = Guid.NewGuid();
    session.Events.StartStream<Quest>(userDefinedStreamId, joined, departed);

    // Or pass the aggregate type in without generics
    session.Events.StartStream(typeof(Quest), userDefinedStreamId, joined, departed);

    // Or forget about the aggregate type whatsoever
    var streamId4 = session.Events.StartStream(joined, departed);

    // Or start with a known stream id and no aggregate type
    session.Events.StartStream(userDefinedStreamId, joined, departed);

    // And persist the new stream of course
    await session.SaveChangesAsync();
}

snippet source | anchor

For stream identity (strings vs. Guids), see here.

Note that StartStream checks for an existing stream and throws ExistingStreamIdCollisionException if a matching stream already exists.

Appending Events

TIP

AppendEvent() will create a new stream for the stream id if it does not already exist at the time that IDocumentSession.SaveChanges() is called.

If you have an existing stream, you can later append additional events with IEventStore.Append() as shown below:

cs
var joined = new MembersJoined { Members = new[] { "Rand", "Matt", "Perrin", "Thom" } };
var departed = new MembersDeparted { Members = new[] { "Thom" } };

session.Events.Append(id, joined, departed);

await session.SaveChangesAsync();

snippet source | anchor

Mandatory Stream Types 7.30

WARNING

Absolutely use this flag on new development work or when you want to take advantage of the optimized projection rebuilds introduced in Marten 7.30, but be aware of the consequences outlined in this section.

The default behavior in Marten is to allow you to happily start event streams without a stream type marker (the "T" in StartStream<T>()), but in some cases there are optimizations that Marten can do for performance if it can assume the stream type marker is present in the database:

  • The optimized single stream projection rebuilds
  • Specifying event filtering on a projection running asynchronously where Marten cannot derive the event types itself -- like you'd frequently encounter with projections using explicit code instead of the aggregation method conventions

To make the stream type markers mandatory, you can use this flag in the configuration:

cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
    opts.Connection(builder.Configuration.GetConnectionString("marten"));

    // Force users to supply a stream type on StartStream, and disallow
    // appending events if the stream does not already exist
    opts.Events.UseMandatoryStreamTypeDeclaration = true;
});

snippet source | anchor

This causes a couple side effects that force stricter usage of Marten:

  1. Marten will throw a StreamTypeMissingException exception if you call a StartStream() overload that doesn't include the stream type
  2. Marten will throw a NonExistentStreamException if you try to append events to a stream that does not already exist

Optimistic Versioned Append

TIP

This may not be very effective as it only helps you detect changes between calling AppendOptimistic() and SaveChangesAsync().

You can also use the new AppendOptimistic() method to do optimistic concurrency with the event stream version with an automatic stream version lookup like this:

cs
public async Task append_optimistic(IDocumentSession session, Guid streamId, object[] events)
{
    // This is doing data access, so it's an async method
    await session.Events.AppendOptimistic(streamId, events);

    // Assume that there is other work happening right here...

    await session.SaveChangesAsync();
}

snippet source | anchor

Serialized Access to the Stream

The AppendExclusive() method will actually reserve a database lock on the stream itself until the IDocumentSession is saved or disposed. That usage is shown below:

cs
public async Task append_exclusive(IDocumentSession session, Guid streamId)
{
    // You *could* pass in events here too, but doing this establishes a transaction
    // lock on the stream.
    await session.Events.AppendExclusive(streamId);

    var events = determineNewEvents(streamId);

    // The next call can just be Append()
    session.Events.Append(streamId, events);

    // This will commit the unit of work and release the
    // lock on the event stream
    await session.SaveChangesAsync();
}

snippet source | anchor

This usage will in effect serialize access to a single event stream.

Tombstone Events

It's an imperfect world, and sometimes transactions involving Marten events will fail in process. That historically caused issues with Marten's asynchronous projection support when there were "gaps" in the event store sequence due to failed transactions. Marten V4 introduced support for "tombstone" events where Marten tries to insert placeholder rows in the events table with the event sequence numbers that failed in a Marten transaction. This is done strictly to improve the functioning of the async daemon that looks for gaps in the event sequence to "know" how far it's safe to process asynchronous projections. If you see event rows in your database of type "tombstone", it's representative of failed transactions (maybe from optimistic concurrency violations, transient network issues, timeouts, etc.).

Where this is not yet formal support in Marten's API surface for deleting "tombstone" events, it is perfectly safe to delete tombstone events from your database:

  • At any time if you do not use any asynchronous projections and do not use the async daemon at runtime
  • Where the seq_id column value is less than the "high water mark" of the async daemon. You can find the "high water mark" value from the mt_event_progression table or through this API call:

cs
public static async Task ShowDaemonDiagnostics(IDocumentStore store)
{
    // This will tell you the current progress of each known projection shard
    // according to the latest recorded mark in the database
    var allProgress = await store.Advanced.AllProjectionProgress();
    foreach (var state in allProgress)
    {
        Console.WriteLine($"{state.ShardName} is at {state.Sequence}");
    }

    // This will allow you to retrieve some basic statistics about the event store
    var stats = await store.Advanced.FetchEventStoreStatistics();
    Console.WriteLine($"The event store highest sequence is {stats.EventSequenceNumber}");

    // This will let you fetch the current shard state of a single projection shard,
    // but in this case we're looking for the daemon high water mark
    var daemonHighWaterMark = await store.Advanced.ProjectionProgressFor(new ShardName(ShardState.HighWaterMark));
    Console.WriteLine($"The daemon high water sequence mark is {daemonHighWaterMark}");
}

snippet source | anchor

Released under the MIT License.