Skip to content

CQRS Command Handler Workflow for Capturing Events

TIP

Definitely see the Wolverine Aggregate Handler Workflow for a low ceremony approach to CQRS "writes" that uses the FetchForWriting() API under the covers that is introduced in this topic.

So you're using Marten's event sourcing functionality within some kind architecture (CQRS maybe?) where your business logic needs to emit events modeling business state changes based on external inputs (commands). These commands are most likely working on a single event stream at one time. Your business logic will probably need to evaluate the incoming command against the current state of the event stream to either decide what events should be created, or to reject the incoming command altogether if the system is not in the proper state for the command. And by the way, you probably also need to be concerned with concurrent access to the business data represented by a single event stream.

FetchForWriting

TIP

As of Marten 7, this API is usable with aggregation projections that are running with an asynchronous lifecycle. This is key to create "zero downtime deployments" for projection changes.

To that end, Marten has the FetchForWriting() operation for optimized command handling with Marten.

Let's say that you are building an order fulfillment system, so we're naturally going to model our domain as an Order aggregate:

cs
public class Item
{
    public string Name { get; set; }
    public bool Ready { get; set; }
}

public class Order
{
    // This would be the stream id
    public Guid Id { get; set; }

    // This is important, by Marten convention this would
    // be the
    public int Version { get; set; }

    public Order(OrderCreated created)
    {
        foreach (var item in created.Items)
        {
            Items[item.Name] = item;
        }
    }

    public void Apply(IEvent<OrderShipped> shipped) => Shipped = shipped.Timestamp;
    public void Apply(ItemReady ready) => Items[ready.Name].Ready = true;

    public DateTimeOffset? Shipped { get; private set; }

    public Dictionary<string, Item> Items { get; set; } = new();

    public bool IsReadyToShip()
    {
        return Shipped == null && Items.Values.All(x => x.Ready);
    }
}

snippet source | anchor

And with some events like these:

cs
public record OrderShipped;
public record OrderCreated(Item[] Items);
public record OrderReady;

public record ItemReady(string Name);

snippet source | anchor

Let's jump right into the first sample with simple concurrency handling:

cs
public async Task Handle1(MarkItemReady command, IDocumentSession session)
{
    // Fetch the current value of the Order aggregate
    var stream = await session
        .Events
        .FetchForWriting<Order>(command.OrderId);

    var order = stream.Aggregate;

    if (order.Items.TryGetValue(command.ItemName, out var item))
    {
        // Mark that the this item is ready
        stream.AppendOne(new ItemReady(command.ItemName));
    }
    else
    {
        // Some crude validation
        throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
    }

    // If the order is ready to ship, also emit an OrderReady event
    if (order.IsReadyToShip())
    {
        stream.AppendOne(new OrderReady());
    }

    await session.SaveChangesAsync();
}

snippet source | anchor

In this usage, FetchForWriting<Order>() is finding the current state of the stream based on the stream id we passed in. If the Order aggregate is configured as:

  1. Live, Marten is executing the live stream aggregation on the fly by loading all the events for this stream into memory and calculating the full Order state by applying each event in memory
  2. Inline, Marten is loading the persisted Order document directly from the underlying database

Regardless of how Marten is loading or deriving the state of Order, it's also quietly fetching the current version of that Order stream at the point that the aggregate was fetched. Stepping down inside the code, we're doing some crude validation of the current state of the Order and potentially rejecting the entire command. Past that we're appending a new event for ItemReady and conditionally appending a second event for OrderReady if every item within the Order is ready (for shipping I guess, this isn't really a fully formed domain model here).

After appending the events via the new IEventStream.AppendOne() (there's also an AppendMany() method), we're ready to save the new events with the standard IDocumentSession.SaveChangesAsync() method call. At that point, if some other process has managed to commit changes to the same Order stream between our handler calling FetchForWriting() and IDocumentSession.SaveChangesAsync(), the entire command will fail with a Marten ConcurrencyException.

Explicit Optimistic Concurrency

This time let's explicitly opt into optimistic concurrency checks by telling Marten what the expected starting version of the stream should be in order for the command to be processed. In this usage, you're probably assuming that the command message was based on the starting state.

The ever so slightly version of the original handler is shown below:

cs
public async Task Handle2(MarkItemReady command, IDocumentSession session)
{
    // Fetch the current value of the Order aggregate
    var stream = await session
        .Events

        // Explicitly tell Marten the exptected, starting version of the
        // event stream
        .FetchForWriting<Order>(command.OrderId, command.Version);

    var order = stream.Aggregate;

    if (order.Items.TryGetValue(command.ItemName, out var item))
    {
        // Mark that the this item is ready
        stream.AppendOne(new ItemReady(command.ItemName));
    }
    else
    {
        // Some crude validation
        throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
    }

    // If the order is ready to ship, also emit an OrderReady event
    if (order.IsReadyToShip())
    {
        stream.AppendOne(new OrderReady());
    }

    await session.SaveChangesAsync();
}

snippet source | anchor

In this case, Marten will throw a ConcurrencyException if the expected starting version being passed to FetchForWriting() has been incremented by some other process before this command. The same expected version check will also be evaluated during the call to IDocumentSession.SaveChangesAsync().

Exclusive Concurrency

The last flavor of concurrency is to leverage Postgresql's ability to do row level locking and wait to achieve an exclusive lock on the event stream. This might be applicable when the result of the command is just dependent upon the initial state of the Order aggregate. This usage is shown below:

cs
public async Task Handle3(MarkItemReady command, IDocumentSession session)
{
    // Fetch the current value of the Order aggregate
    var stream = await session
        .Events

        // Explicitly tell Marten the exptected, starting version of the
        // event stream
        .FetchForExclusiveWriting<Order>(command.OrderId);

    var order = stream.Aggregate;

    if (order.Items.TryGetValue(command.ItemName, out var item))
    {
        // Mark that the this item is ready
        stream.AppendOne(new ItemReady(command.ItemName));
    }
    else
    {
        // Some crude validation
        throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
    }

    // If the order is ready to ship, also emit an OrderReady event
    if (order.IsReadyToShip())
    {
        stream.AppendOne(new OrderReady());
    }

    await session.SaveChangesAsync();
}

snippet source | anchor

Do note that the FetchForExclusiveWriting() command can time out if it is unable to achieve a lock in a timely manner. In this case, Marten will throw a StreamLockedException. The lock will be released when either IDocumentSession.SaveChangesAsync() is called or the IDocumentSession is disposed.

WriteToAggregate

Lastly, there are several overloads of a method called IEventStore.WriteToAggregate() that just puts some syntactic sugar over the top of FetchForWriting() to simplify the entire workflow. Using that method, our handler versions above becomes:

cs
public Task Handle4(MarkItemReady command, IDocumentSession session)
{
    return session.Events.WriteToAggregate<Order>(command.OrderId, command.Version, stream =>
    {
        var order = stream.Aggregate;

        if (order.Items.TryGetValue(command.ItemName, out var item))
        {
            // Mark that the this item is ready
            stream.AppendOne(new ItemReady(command.ItemName));
        }
        else
        {
            // Some crude validation
            throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
        }

        // If the order is ready to ship, also emit an OrderReady event
        if (order.IsReadyToShip())
        {
            stream.AppendOne(new OrderReady());
        }
    });
}

snippet source | anchor

Released under the MIT License.