CQRS Command Handler Workflow for Capturing Events
TIP
Definitely see the Wolverine Aggregate Handler Workflow for a low ceremony approach to CQRS "writes" that uses the FetchForWriting()
API under the covers that is introduced in this topic.
So you're using Marten's event sourcing functionality within some kind architecture (CQRS maybe?) where your business logic needs to emit events modeling business state changes based on external inputs (commands). These commands are most likely working on a single event stream at one time. Your business logic will probably need to evaluate the incoming command against the current state of the event stream to either decide what events should be created, or to reject the incoming command altogether if the system is not in the proper state for the command. And by the way, you probably also need to be concerned with concurrent access to the business data represented by a single event stream.
FetchForWriting 7.0
TIP
This API is usable with aggregation projections that are running with an asynchronous lifecycle. This is key to create "zero downtime deployments" for projection changes.
TIP
The more recent FetchLatest API is a lighter weight, read only version of FetchForWriting
that may be slightly more performant if all you care about is getting the latest data. Do note that there are significant optimizations for using FetchForWriting
, then appending new events, saving the session, and using FetchLatest
to get the current state of the aggregate being updated.
WARNING
FetchForWriting()
is only possible with single stream aggregation projections, which includes the "self-aggregating" snapshot feature. This API assumes that it's working with one stream, and directly accesses the stream table. Multi-stream projections will not work with this feature.
To that end, Marten has the FetchForWriting()
operation for optimized command handling with Marten.
Let's say that you are building an order fulfillment system, so we're naturally going to model our domain as an Order
aggregate:
public class Item
{
public string Name { get; set; }
public bool Ready { get; set; }
}
public class Order
{
// This would be the stream id
public Guid Id { get; set; }
// This is important, by Marten convention this would
// be the
public int Version { get; set; }
public Order(OrderCreated created)
{
foreach (var item in created.Items)
{
Items[item.Name] = item;
}
}
public void Apply(IEvent<OrderShipped> shipped) => Shipped = shipped.Timestamp;
public void Apply(ItemReady ready) => Items[ready.Name].Ready = true;
public DateTimeOffset? Shipped { get; private set; }
public Dictionary<string, Item> Items { get; set; } = new();
public bool IsReadyToShip()
{
return Shipped == null && Items.Values.All(x => x.Ready);
}
}
And with some events like these:
public record OrderShipped;
public record OrderCreated(Item[] Items);
public record OrderReady;
public record ItemReady(string Name);
Let's jump right into the first sample with simple concurrency handling:
public async Task Handle1(MarkItemReady command, IDocumentSession session)
{
// Fetch the current value of the Order aggregate
var stream = await session
.Events
.FetchForWriting<Order>(command.OrderId);
var order = stream.Aggregate;
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Mark that the this item is ready
stream.AppendOne(new ItemReady(command.ItemName));
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
stream.AppendOne(new OrderReady());
}
await session.SaveChangesAsync();
}
In this usage, FetchForWriting<Order>()
is finding the current state of the stream based on the stream id we passed in. If the Order
aggregate is configured as:
Live
, Marten is executing the live stream aggregation on the fly by loading all the events for this stream into memory and calculating the fullOrder
state by applying each event in memoryInline
, Marten is loading the persistedOrder
document directly from the underlying database
Regardless of how Marten is loading or deriving the state of Order
, it's also quietly fetching the current version of that Order
stream at the point that the aggregate was fetched. Stepping down inside the code, we're doing some crude validation of the current state of the Order
and potentially rejecting the entire command. Past that we're appending a new event for ItemReady
and conditionally appending a second event for OrderReady
if every item within the Order
is ready (for shipping I guess, this isn't really a fully formed domain model here).
After appending the events via the new IEventStream.AppendOne()
(there's also an AppendMany()
method), we're ready to save the new events with the standard IDocumentSession.SaveChangesAsync()
method call. At that point, if some other process has managed to commit changes to the same Order
stream between our handler calling FetchForWriting()
and IDocumentSession.SaveChangesAsync()
, the entire command will fail with a Marten ConcurrencyException
.
Inline Optimization 7.25
If you are using and Inline
single stream projection for the aggregate being targeted by FetchForWriting()
, you can make a performance optimization with this setting:
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection("some connection string");
// Force Marten to use the identity map for only the aggregate type
// that is the targeted "T" in FetchForWriting<T>() when using
// an Inline projection for the "T". Saves on Marten doing an extra
// database fetch of the same data you already fetched from FetchForWriting()
// when Marten needs to apply the Inline projection as part of SaveChanges()
opts.Events.UseIdentityMapForAggregates = true;
})
// This is non-trivial performance optimization if you never
// need identity map mechanics in your commands or query handlers
.UseLightweightSessions();
It's pretty involved, but the key takeaway is that if you are using lightweight sessions for a performance optimization -- and you probably should even though that's not a Marten default! -- and also using FetchForWriting<T>()
with Inline
projections, this optimizes your system to make fewer network round trips to the database and reuse the data you already fetched when applying the Inline
projection. This is an opt in setting because it can be harmful to existing code that might be modifying the aggregate document fetched by FetchForWriting()
outside of Marten itself.
Explicit Optimistic Concurrency
This time let's explicitly opt into optimistic concurrency checks by telling Marten what the expected starting version of the stream should be in order for the command to be processed. In this usage, you're probably assuming that the command message was based on the starting state.
The ever so slightly version of the original handler is shown below:
public async Task Handle2(MarkItemReady command, IDocumentSession session)
{
// Fetch the current value of the Order aggregate
var stream = await session
.Events
// Explicitly tell Marten the exptected, starting version of the
// event stream
.FetchForWriting<Order>(command.OrderId, command.Version);
var order = stream.Aggregate;
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Mark that the this item is ready
stream.AppendOne(new ItemReady(command.ItemName));
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
stream.AppendOne(new OrderReady());
}
await session.SaveChangesAsync();
}
In this case, Marten will throw a ConcurrencyException
if the expected starting version being passed to FetchForWriting()
has been incremented by some other process before this command. The same expected version check will also be evaluated during the call to IDocumentSession.SaveChangesAsync()
.
Exclusive Concurrency
The last flavor of concurrency is to leverage Postgresql's ability to do row level locking and wait to achieve an exclusive lock on the event stream. This might be applicable when the result of the command is just dependent upon the initial state of the Order
aggregate. This usage is shown below:
public async Task Handle3(MarkItemReady command, IDocumentSession session)
{
// Fetch the current value of the Order aggregate
var stream = await session
.Events
// Explicitly tell Marten the exptected, starting version of the
// event stream
.FetchForExclusiveWriting<Order>(command.OrderId);
var order = stream.Aggregate;
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Mark that the this item is ready
stream.AppendOne(new ItemReady(command.ItemName));
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
stream.AppendOne(new OrderReady());
}
await session.SaveChangesAsync();
}
Do note that the FetchForExclusiveWriting()
command can time out if it is unable to achieve a lock in a timely manner. In this case, Marten will throw a StreamLockedException
. The lock will be released when either IDocumentSession.SaveChangesAsync()
is called or the IDocumentSession
is disposed.
WriteToAggregate
Lastly, there are several overloads of a method called IEventStore.WriteToAggregate()
that just puts some syntactic sugar over the top of FetchForWriting()
to simplify the entire workflow. Using that method, our handler versions above becomes:
public Task Handle4(MarkItemReady command, IDocumentSession session)
{
return session.Events.WriteToAggregate<Order>(command.OrderId, command.Version, stream =>
{
var order = stream.Aggregate;
if (order.Items.TryGetValue(command.ItemName, out var item))
{
// Mark that the this item is ready
stream.AppendOne(new ItemReady(command.ItemName));
}
else
{
// Some crude validation
throw new InvalidOperationException($"Item {command.ItemName} does not exist in this order");
}
// If the order is ready to ship, also emit an OrderReady event
if (order.IsReadyToShip())
{
stream.AppendOne(new OrderReady());
}
});
}
Optimizing FetchForWriting with Inline Aggregates
If you are utilizing FetchForWriting()
for your command handlers -- and you really, really should! -- and at least some of your aggregates are updated Inline
as shown below:
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection("some connection string");
// The Order aggregate is updated Inline inside the
// same transaction as the events being appended
opts.Projections.Snapshot<Order>(SnapshotLifecycle.Inline);
// Opt into an optimization for the inline aggregates
// used with FetchForWriting()
opts.Projections.UseIdentityMapForAggregates = true;
})
// This is also a performance optimization in Marten to disable the
// identity map tracking overall in Marten sessions if you don't
// need that tracking at runtime
.UseLightweightSessions();
You can potentially gain some significant performance optimization by using the UseIdentityMapForInlineAggregates
flag shown above. To be clear, this optimization mostly helps when you have the combination in a command handler that:
- Uses
FetchForWriting
for an aggregate type - That aggregate type is updated or built through an
Inline
projection or snapshot
With this optimization, Marten will take steps to make sure that it uses the version of the aggregate document that was originally fetched by FetchForWriting()
as the starting point for updating that aggregate in its Inline
projection with the events that were appended by the command itself.
This optimization will be harmful if you alter the loaded aggregate in any way between FetchForWriting()
and SaveChangesAsync()
by potentially making your projected data being saved be invalid.