Aggregate Projections
TIP
Definitely check out the content on CQRS Command Handler Workflow for Capturing Events and Reading Aggregates to get the best possible performance and development usability for aggregate projections with Marten. Also see the combination with Wolverine in its Aggregate Handler Workflow for literally the lowest code ceremony possible to use Marten within a CQRS architecture.
Aggregate Projections in Marten combine some sort of grouping of events and process them to create a single aggregated document representing the state of those events. To jump into a simple example, here's a simple aggregated view called QuestParty
that creates an aggregated view of MembersJoined
, MembersDeparted
, and QuestStarted
events related to a group of heroes traveling on a quest in your favorite fantasy novel:
public sealed record QuestParty(Guid Id, List<string> Members)
{
// These methods take in events and update the QuestParty
public static QuestParty Create(QuestStarted started) => new(started.QuestId, []);
public static QuestParty Apply(MembersJoined joined, QuestParty party) =>
party with
{
Members = party.Members.Union(joined.Members).ToList()
};
public static QuestParty Apply(MembersDeparted departed, QuestParty party) =>
party with
{
Members = party.Members.Where(x => !departed.Members.Contains(x)).ToList()
};
public static QuestParty Apply(MembersEscaped escaped, QuestParty party) =>
party with
{
Members = party.Members.Where(x => !escaped.Members.Contains(x)).ToList()
};
}
Once again, here's the class diagram of the key projection types inside of Marten, but please note the SingleStreamProjection<T>
:
Marten supports a few different types of aggregated projections:
- Single Stream Projections -- creating a rolled up view of all or a segment of the events within an event stream. This is done through either a live stream aggregation, or
SingleStreamProjection<T>
as a base class for your projection or by doing snapshots. - Multi Stream Projections -- creating a rolled up view of a user-defined grouping of events across streams. These projections are done by sub-classing the
MultiStreamProjection<TDoc, TId>
class and is further described in Multi-Stream Projections.
Please note that all aggregated projections share the same set of method conventions described in this page.
Aggregate by Stream
TIP
Projection types and the associated aggregate types need to be scoped as public because of Marten's internal code generation techniques. Some methods discovered by the method conventions can be internal or private, but the holding type must be public.
The easiest type of aggregate to create is a document that rolls up the state of a single event stream. You can do that by either creating a public aggregate document that directly mutates itself through method conventions or by sub-classing the SingleStreamProjection<T>
class like this sample for a fictional Trip
aggregate document:
public class TripProjection: SingleStreamProjection<Trip>
{
public TripProjection()
{
DeleteEvent<TripAborted>();
DeleteEvent<Breakdown>(x => x.IsCritical);
DeleteEvent<VacationOver>((trip, _) => trip.Traveled > 1000);
}
// These methods can be either public, internal, or private but there's
// a small performance gain to making them public
public void Apply(Arrival e, Trip trip) => trip.State = e.State;
public void Apply(Travel e, Trip trip) => trip.Traveled += e.TotalDistance();
public void Apply(TripEnded e, Trip trip)
{
trip.Active = false;
trip.EndedOn = e.Day;
}
public Trip Create(TripStarted started)
{
return new Trip { StartedOn = started.Day, Active = true };
}
}
And register that projection like this:
TIP
Remember to start the Async Daemon when using async projections, see Asynchronous Projections Daemon
var store = DocumentStore.For(opts =>
{
opts.Connection("some connection string");
// Register as inline
opts.Projections.Add<TripProjection>(ProjectionLifecycle.Inline);
// Or instead, register to run asynchronously
opts.Projections.Add<TripProjection>(ProjectionLifecycle.Async);
});
Any projection based on SingleStreamProjection<T>
will allow you to define steps by event type to either create, delete, or mutate an aggregate document through a mix of inline Lambda expressions in the constructor function of the projection class or by using specially named methods on the projection class. It's completely up to your preference to decide which to use.
Alternatively, if your aggregate will never be deleted you can use a stream aggregation as explained in the last section of this page.
To create aggregate projections that include events in multiple streams, see Multi-Stream Projections.
Strong Typed Identifiers 7.29
Marten supports using strong-typed identifiers as the document identity for aggregated documents. Here's an example:
[StronglyTypedId(Template.Guid)]
public readonly partial struct PaymentId;
public class Payment
{
[JsonInclude] public PaymentId? Id { get; private set; }
[JsonInclude] public DateTimeOffset CreatedAt { get; private set; }
[JsonInclude] public PaymentState State { get; private set; }
public static Payment Create(IEvent<PaymentCreated> @event)
{
return new Payment
{
Id = new PaymentId(@event.StreamId), CreatedAt = @event.Data.CreatedAt, State = PaymentState.Created
};
}
public void Apply(PaymentCanceled @event)
{
State = PaymentState.Canceled;
}
public void Apply(PaymentVerified @event)
{
State = PaymentState.Verified;
}
}
Just note that for single stream aggregations, your strong typed identifier types will need to wrap either a Guid
or string
depending on your application's StreamIdentity
.
Aggregate Creation
TIP
As of Marten 7, if your aggregation projection has both a Create()
function or constructor for an event type, and an Apply()
method for the same event type, Marten will only call one or the other method depending on whether the aggregate already exists but never both for one single event.
Aggregates can initially be created behind the scenes by Marten if there's a no-arg constructor function on the aggregate document type -- which doesn't have to be public by the way.
You can also use a constructor that takes an event type as shown in this sample of a Trip
stream aggregation:
public class Trip
{
// Probably safest to have an empty, default
// constructor unless you can guarantee that
// a certain event type will always be first in
// the event stream
public Trip()
{
}
// Create a new aggregate based on the initial
// event type
internal Trip(TripStarted started)
{
StartedOn = started.Day;
Active = true;
}
public Guid Id { get; set; }
public int EndedOn { get; set; }
public double Traveled { get; set; }
public string State { get; set; }
public bool Active { get; set; }
public int StartedOn { get; set; }
public Guid? RepairShopId { get; set; }
// The Apply() methods would mutate the aggregate state
internal void Apply(Arrival e) => State = e.State;
internal void Apply(Travel e) => Traveled += e.TotalDistance();
internal void Apply(TripEnded e)
{
Active = false;
EndedOn = e.Day;
}
// We think stream aggregation is mostly useful for live aggregations,
// but hey, if you want to use a aggregation as an asynchronous projection,
// you can also specify when the aggregate document should be deleted
internal bool ShouldDelete(TripAborted e) => true;
internal bool ShouldDelete(Breakdown e) => e.IsCritical;
internal bool ShouldDelete(VacationOver e) => Traveled > 1000;
}
Or finally, you can use a method named Create()
on a projection type as shown in this sample:
public class TripProjection: SingleStreamProjection<Trip>
{
public TripProjection()
{
DeleteEvent<TripAborted>();
DeleteEvent<Breakdown>(x => x.IsCritical);
DeleteEvent<VacationOver>((trip, _) => trip.Traveled > 1000);
}
// These methods can be either public, internal, or private but there's
// a small performance gain to making them public
public void Apply(Arrival e, Trip trip) => trip.State = e.State;
public void Apply(Travel e, Trip trip) => trip.Traveled += e.TotalDistance();
public void Apply(TripEnded e, Trip trip)
{
trip.Active = false;
trip.EndedOn = e.Day;
}
public Trip Create(TripStarted started)
{
return new Trip { StartedOn = started.Day, Active = true };
}
}
The Create()
method has to return either the aggregate document type or Task<T>
where T
is the aggregate document type. There must be an argument for the specific event type or IEvent<T>
where T
is the event type if you need access to event metadata. You can also take in an IQuerySession
if you need to look up additional data as part of the transformation or IEvent
in addition to the exact event type just to get at event metadata.
Applying Changes to the Aggregate Document
TIP
Apply()
methods or ProjectEvent<T>()
method calls can also use interfaces or abstract types that are implemented by specific event types, and Marten will apply all those event types that can be cast to the interface or abstract type to that method when executing the projection.
To make changes to an existing aggregate, you can either use inline Lambda functions per event type with one of the overloads of ProjectEvent()
:
public class TripProjection: SingleStreamProjection<Trip>
{
public TripProjection()
{
ProjectEvent<Arrival>((trip, e) => trip.State = e.State);
ProjectEvent<Travel>((trip, e) => trip.Traveled += e.TotalDistance());
ProjectEvent<TripEnded>((trip, e) =>
{
trip.Active = false;
trip.EndedOn = e.Day;
});
ProjectEventAsync<Breakdown>(async (session, trip, e) =>
{
var repairShop = await session.Query<RepairShop>()
.Where(x => x.State == trip.State)
.FirstOrDefaultAsync();
trip.RepairShopId = repairShop?.Id;
});
}
}
I'm not personally that wild about using lots of inline Lambdas like the example above, and to that end, Marten now supports the Apply()
method convention. Here's the same TripProjection
, but this time using methods to mutate the Trip
document:
public class TripProjection: SingleStreamProjection<Trip>
{
public TripProjection()
{
DeleteEvent<TripAborted>();
DeleteEvent<Breakdown>(x => x.IsCritical);
DeleteEvent<VacationOver>((trip, _) => trip.Traveled > 1000);
}
// These methods can be either public, internal, or private but there's
// a small performance gain to making them public
public void Apply(Arrival e, Trip trip) => trip.State = e.State;
public void Apply(Travel e, Trip trip) => trip.Traveled += e.TotalDistance();
public void Apply(TripEnded e, Trip trip)
{
trip.Active = false;
trip.EndedOn = e.Day;
}
public Trip Create(TripStarted started)
{
return new Trip { StartedOn = started.Day, Active = true };
}
}
The Apply()
methods can accept any combination of these arguments:
- The actual event type
IEvent<T>
where theT
is the actual event type. Use this if you want access to the event metadata like versions or timestamps.IEvent
access the event metadata. It's perfectly valid to accept bothIEvent
for the metadata and the specific event type just out of convenience.IQuerySession
if you need to do additional data lookups- The aggregate type
The valid return types are:
void
if you are mutating the aggregate document- The aggregate type itself, and this allows you to use immutable aggregate types
Task
if you are mutating the aggregate document with the use of external data read throughIQuerySession
Task<T>
whereT
is the aggregate type. This allows you to use immutable aggregate types while also using external data read throughIQuerySession
Deleting the Aggregate Document
In asynchronous or inline projections, receiving a certain event may signal that the projected document is now obsolete and should be deleted from document storage. If a certain event type always signals a deletion to the aggregated view, you can use this mechanism inside of the constructor function of your aggregate projection type:
public class TripProjection: SingleStreamProjection<Trip>
{
public TripProjection()
{
// The current Trip aggregate would be deleted if
// the projection encountered a TripAborted event
DeleteEvent<TripAborted>();
}
}
If the deletion of the aggregate document needs to be done by testing some combination of the current aggregate state, the event, and maybe even other document state in your Marten database, you can use more overloads of DeleteEvent()
as shown below:
public class TripProjection: SingleStreamProjection<Trip>
{
public TripProjection()
{
// The current Trip aggregate would be deleted if
// the Breakdown event is "critical"
DeleteEvent<Breakdown>(x => x.IsCritical);
// Alternatively, delete the aggregate if the trip
// is currently in New Mexico and the breakdown is critical
DeleteEvent<Breakdown>((trip, e) => e.IsCritical && trip.State == "New Mexico");
DeleteEventAsync<Breakdown>(async (session, trip, e) =>
{
var anyRepairShopsInState = await session.Query<RepairShop>()
.Where(x => x.State == trip.State)
.AnyAsync();
// Delete the trip if there are no repair shops in
// the current state
return !anyRepairShopsInState;
});
}
}
Another option is to use a method convention with a method named ShouldDelete()
, with this equivalent using the ShouldDelete() : bool
method convention:
public class TripProjection: SingleStreamProjection<Trip>
{
// The current Trip aggregate would be deleted if
// the Breakdown event is "critical"
public bool ShouldDelete(Breakdown breakdown) => breakdown.IsCritical;
// Alternatively, delete the aggregate if the trip
// is currently in New Mexico and the breakdown is critical
public bool ShouldDelete(Trip trip, Breakdown breakdown)
=> breakdown.IsCritical && trip.State == "New Mexico";
public async Task<bool> ShouldDelete(IQuerySession session, Trip trip, Breakdown breakdown)
{
var anyRepairShopsInState = await session.Query<RepairShop>()
.Where(x => x.State == trip.State)
.AnyAsync();
// Delete the trip if there are no repair shops in
// the current state
return !anyRepairShopsInState;
}
}
The ShouldDelete()
method can take any combination of these arguments:
- The actual event type
IEvent<T>
where theT
is the actual event type. Use this if you want access to the event metadata like versions or timestamps.IQuerySession
if you need to do additional data lookups- The aggregate type
Additionally, ShouldDelete()
methods should return either a Boolean
or Task<Boolean>
if doing data lookups with IQuerySession
-- and we'very strongly recommend using strictly asynchronous APIs if running the projection asynchronously or using SaveChangesAsync()
when executing projections inline.
Stream Aggregations
You can use the SingleStreamProjection<T>
method conventions for stream aggregations, which we just mean to be an aggregate document type that implements its own Apply()
or ShouldDelete()
methods to mutate itself. Using that concept, let's take the TripProjection
we have been using and apply that instead to a Trip
type:
public class Trip
{
// Probably safest to have an empty, default
// constructor unless you can guarantee that
// a certain event type will always be first in
// the event stream
public Trip()
{
}
// Create a new aggregate based on the initial
// event type
internal Trip(TripStarted started)
{
StartedOn = started.Day;
Active = true;
}
public Guid Id { get; set; }
public int EndedOn { get; set; }
public double Traveled { get; set; }
public string State { get; set; }
public bool Active { get; set; }
public int StartedOn { get; set; }
public Guid? RepairShopId { get; set; }
// The Apply() methods would mutate the aggregate state
internal void Apply(Arrival e) => State = e.State;
internal void Apply(Travel e) => Traveled += e.TotalDistance();
internal void Apply(TripEnded e)
{
Active = false;
EndedOn = e.Day;
}
// We think stream aggregation is mostly useful for live aggregations,
// but hey, if you want to use a aggregation as an asynchronous projection,
// you can also specify when the aggregate document should be deleted
internal bool ShouldDelete(TripAborted e) => true;
internal bool ShouldDelete(Breakdown e) => e.IsCritical;
internal bool ShouldDelete(VacationOver e) => Traveled > 1000;
}
Here's an example of using the various ways of doing Trip
stream aggregation:
internal async Task use_a_stream_aggregation()
{
var store = DocumentStore.For(opts =>
{
opts.Connection("some connection string");
// Run the Trip as an inline projection
opts.Projections.Snapshot<Trip>(SnapshotLifecycle.Inline);
// Or run it as an asynchronous projection
opts.Projections.Snapshot<Trip>(SnapshotLifecycle.Async);
});
// Or more likely, use it as a live aggregation:
// Just pretend you already have the id of an existing
// trip event stream id here...
var tripId = Guid.NewGuid();
// We'll open a read only query session...
await using var session = store.QuerySession();
// And do a live aggregation of the Trip stream
var trip = await session.Events.AggregateStreamAsync<Trip>(tripId);
}
Aggregate Versioning
It's frequently valuable to know the version of the underlying event stream that a single stream aggregate represents. Marten 5.4 added a new, built in convention to automatically set the aggregate version on the aggregate document itself. The immediate usage is probably to help Marten users opt into Marten's optimistic concurrency for appending events by making it easier to get the current aggregate (stream) version that you need in order to opt into the optimistic concurrency check.
To start with, let's say we have an OrderAggregate
defined like this:
public class OrderAggregate
{
// This is most likely the stream id
public Guid Id { get; set; }
// This would be set automatically by Marten if
// used as the target of a SingleStreamAggregation
public int Version { get; set; }
public void Apply(OrderShipped shipped) => HasShipped = true;
public bool HasShipped { get; private set; }
}
Notice the Version
property of that document above. Using a naming convention (we'll talk about how to go around the convention in just a second), Marten "knows" that that property should reflect the latest versioned event within the individual stream encountered by this projection. So if there have been 5 events captured for a particular stream and all five events have been processed through the projection, the value of the Version
property will be 5.
There are of course some restrictions:
- The version member can be either a field or a property
- The getter can be internal or private (but the mechanics are a tiny bit smoother with a public setter)
- The version member can be either an
int
(Int32) orlong
(Int64)
Marten determines whether a member is the version of the aggregate by first finding all public members of either type int
or long
, then running down these rules:
- A member marked with the
[Version]
attribute will override the naming convention - Look for an member named "version" (it's not case sensitive)
- But, ignore any member marked with
[MartenIgnore]
in case "Version" has a different meaning on your aggregate document
Using Event Metadata in Aggregates
All the previous examples showed Apply
/ Create
/ ShouldDelete
methods that accepted the specific event type as the first argument. If there is a need for accessing the event metadata (timestamps, causation/correlation information, custom event headers), you can alternatively accept an argument of type IEvent<T>
where T
is the actual event type (do this in place of the event body) or by accepting an additional argument of type IEvent
just to access the event metadata.
Below is a small example of accessing event metadata during aggregation:
public class TripProjection: SingleStreamProjection<Trip>
{
// Access event metadata through IEvent<T>
public Trip Create(IEvent<TripStarted> @event)
{
var trip = new Trip
{
Id = @event.StreamId, // Marten does this for you anyway
Started = @event.Timestamp,
CorrelationId = @event.Timestamp, // Open telemetry type tracing
Description = @event.Data.Description // Still access to the event body
};
// Use a custom header
if (@event.Headers.TryGetValue("customer", out var customerId))
{
trip.CustomerId = (string)customerId;
}
return trip;
}
public void Apply(TripEnded ended, Trip trip, IEvent @event)
{
trip.Ended = @event.Timestamp;
}
// Other Apply/ShouldDelete methods
public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<Trip> slice)
{
// Emit other events or messages during asynchronous projection
// processing
// Access to the current state as of the projection
// event page being processed *right* now
var currentTrip = slice.Aggregate;
if (currentTrip.TotalMiles > 1000)
{
// Append a new event to this stream
slice.AppendEvent(new PassedThousandMiles());
// Append a new event to a different event stream by
// first specifying a different stream id
slice.AppendEvent(currentTrip.InsuranceCompanyId, new IncrementThousandMileTrips());
// "Publish" outgoing messages when the event page is successfully committed
slice.PublishMessage(new SendCongratulationsOnLongTrip(currentTrip.Id));
// And yep, you can make additional changes to Marten
operations.Store(new CompletelyDifferentDocument
{
Name = "New Trip Segment",
OriginalTripId = currentTrip.Id
});
}
// This usage has to be async in case you're
// doing any additional data access with the
// Marten operations
return new ValueTask();
}
}
Working with Event Metadata 7.12
INFO
As of Marten 7.33, this mechanism executes for every single event in the current event slice in order.
At any point in an Apply()
or Create()
or ShouldDelete()
method, you can take in either the generic IEvent
wrapper or the specific IEvent<T>
wrapper type for the specific event. Sometimes though, you may want to automatically take your aggregated document with metadata from the very last event the projection is encountering at one time. If you are using either SingleStreamProjection<T>
or MultiStreamProjection<TDoc, TId>
as the base class for a projection, you can override the ApplyMetadata(T aggregate, IEvent lastEvent)
method in your projection to manually map event metadata to your aggregate in any way you wish.
Here's an example of using a custom header value of the events captured to update an aggregate based on the last event encountered:
public class Item
{
public Guid Id { get; set; }
public string Description { get; set; }
public bool Started { get; set; }
public DateTimeOffset WorkedOn { get; set; }
public bool Completed { get; set; }
public string LastModifiedBy { get; set; }
public DateTimeOffset? LastModified { get; set; }
}
public record ItemStarted(string Description);
public record ItemWorked;
public record ItemFinished;
public class ItemProjection: SingleStreamProjection<Item>
{
public void Apply(Item item, ItemStarted started)
{
item.Started = true;
item.Description = started.Description;
}
public void Apply(Item item, IEvent<ItemWorked> worked)
{
// Nothing, I know, this is weird
}
public void Apply(Item item, ItemFinished finished)
{
item.Completed = true;
}
public override Item ApplyMetadata(Item aggregate, IEvent lastEvent)
{
// Apply the last timestamp
aggregate.LastModified = lastEvent.Timestamp;
if (lastEvent.Headers.TryGetValue("last-modified-by", out var person))
{
aggregate.LastModifiedBy = person?.ToString() ?? "System";
}
return aggregate;
}
}
And the same projection in usage in a unit test to see how it's all put together:
public class using_apply_metadata : OneOffConfigurationsContext
{
[Fact]
public async Task apply_metadata()
{
StoreOptions(opts =>
{
opts.Projections.Add<ItemProjection>(ProjectionLifecycle.Inline);
// THIS IS NECESSARY FOR THIS SAMPLE!
opts.Events.MetadataConfig.HeadersEnabled = true;
});
// Setting a header value on the session, which will get tagged on each
// event captured by the current session
theSession.SetHeader("last-modified-by", "Glenn Frey");
var id = theSession.Events.StartStream<Item>(new ItemStarted("Blue item")).Id;
await theSession.SaveChangesAsync();
theSession.Events.Append(id, new ItemWorked(), new ItemWorked(), new ItemFinished());
await theSession.SaveChangesAsync();
var item = await theSession.LoadAsync<Item>(id);
// RIP Glenn Frey, take it easy!
item.LastModifiedBy.ShouldBe("Glenn Frey");
}
}
Raising Events, Messages, or other Operations in Aggregation Projections 7.27
Man, that's a mouthful of a title. Sometimes, it can be valuable to emit new events during the processing of a projection when you first know the new state of the projected aggregate documents. Or maybe what you might want to do is to send a message for the new state of an updated projection. Here's a couple possible scenarios that might lead you here:
- There's some kind of business logic that can be processed against an aggregate to "decide" what the system can do next
- You need to send updates about the aggregated projection state to clients via web sockets
- You need to replicate the Marten projection data in a completely different database
- There are business processes that can be kicked off for updates to the aggregated state
To do any of this, you can override the RaiseSideEffects()
method in any aggregated projection that uses one of the following base classes:
SingleStreamProjection
MultiStreamProjection
CustomStreamProjection
Here's an example of that method overridden in a projection:
public class TripProjection: SingleStreamProjection<Trip>
{
// Access event metadata through IEvent<T>
public Trip Create(IEvent<TripStarted> @event)
{
var trip = new Trip
{
Id = @event.StreamId, // Marten does this for you anyway
Started = @event.Timestamp,
CorrelationId = @event.Timestamp, // Open telemetry type tracing
Description = @event.Data.Description // Still access to the event body
};
// Use a custom header
if (@event.Headers.TryGetValue("customer", out var customerId))
{
trip.CustomerId = (string)customerId;
}
return trip;
}
public void Apply(TripEnded ended, Trip trip, IEvent @event)
{
trip.Ended = @event.Timestamp;
}
// Other Apply/ShouldDelete methods
public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<Trip> slice)
{
// Emit other events or messages during asynchronous projection
// processing
// Access to the current state as of the projection
// event page being processed *right* now
var currentTrip = slice.Aggregate;
if (currentTrip.TotalMiles > 1000)
{
// Append a new event to this stream
slice.AppendEvent(new PassedThousandMiles());
// Append a new event to a different event stream by
// first specifying a different stream id
slice.AppendEvent(currentTrip.InsuranceCompanyId, new IncrementThousandMileTrips());
// "Publish" outgoing messages when the event page is successfully committed
slice.PublishMessage(new SendCongratulationsOnLongTrip(currentTrip.Id));
// And yep, you can make additional changes to Marten
operations.Store(new CompletelyDifferentDocument
{
Name = "New Trip Segment",
OriginalTripId = currentTrip.Id
});
}
// This usage has to be async in case you're
// doing any additional data access with the
// Marten operations
return new ValueTask();
}
}
A couple important facts about this new functionality:
- The
RaiseSideEffects()
method is only called during continuous asynchronous projection execution, and will not be called during projection rebuilds orInline
projection usage - Events emitted during the side effect method are not immediately applied to the current projected document value by Marten
- You can alter the aggregate value or replace it yourself in this side effect method to reflect new events, but the onus is on you the user to apply idempotent updates to the aggregate based on these new events in the actual handlers for the new events when those events are handled by the daemon in a later batch
- There is a Wolverine integration (of course) to publish the messages through Wolverine if using the
AddMarten()IntegrateWithWolverine()
option
This relatively new behavior that was built for a specific JasperFx Software client project, but has been on the backlog for quite some time. If there are any difficulties with this approach, please feel free to join the Marten Discord room.