Aggregation with Conventional Methods
Aggregate Creation
TIP
As of Marten 7, if your aggregation projection has both a Create() function or constructor for an event type, and an Apply() method for the same event type, Marten will only call one or the other method depending on whether the aggregate already exists but never both for one single event.
Aggregates can initially be created behind the scenes by Marten if there's a no-arg constructor function on the aggregate document type -- which doesn't have to be public by the way.
You can also use a constructor that takes an event type as shown in this sample of a Trip stream aggregation:
public class Trip
{
// Probably safest to have an empty, default
// constructor unless you can guarantee that
// a certain event type will always be first in
// the event stream
public Trip()
{
}
// Create a new aggregate based on the initial
// event type
internal Trip(TripStarted started)
{
StartedOn = started.Day;
Active = true;
}
public Guid Id { get; set; }
public int EndedOn { get; set; }
public double Traveled { get; set; }
public string State { get; set; }
public bool Active { get; set; }
public int StartedOn { get; set; }
public Guid? RepairShopId { get; set; }
// The Apply() methods would mutate the aggregate state
internal void Apply(Arrival e) => State = e.State;
internal void Apply(Travel e) => Traveled += e.TotalDistance();
internal void Apply(TripEnded e)
{
Active = false;
EndedOn = e.Day;
}
// We think stream aggregation is mostly useful for live aggregations,
// but hey, if you want to use a aggregation as an asynchronous projection,
// you can also specify when the aggregate document should be deleted
internal bool ShouldDelete(TripAborted e) => true;
internal bool ShouldDelete(Breakdown e) => e.IsCritical;
internal bool ShouldDelete(VacationOver e) => Traveled > 1000;
}Or finally, you can use a method named Create() on a projection type as shown in this sample:
public class TripProjection: SingleStreamProjection<Trip, Guid>
{
public TripProjection()
{
DeleteEvent<TripAborted>();
DeleteEvent<Breakdown>(x => x.IsCritical);
DeleteEvent<VacationOver>((trip, _) => trip.Traveled > 1000);
}
// These methods can be either public, internal, or private but there's
// a small performance gain to making them public
public void Apply(Arrival e, Trip trip) => trip.State = e.State;
public void Apply(Travel e, Trip trip)
{
Debug.WriteLine($"Trip {trip.Id} Traveled " + e.TotalDistance());
trip.Traveled += e.TotalDistance();
Debug.WriteLine("New total distance is " + e.TotalDistance());
}
public void Apply(TripEnded e, Trip trip)
{
trip.Active = false;
trip.EndedOn = e.Day;
}
public Trip Create(IEvent<TripStarted> started)
{
return new Trip { Id = started.StreamId, StartedOn = started.Data.Day, Active = true };
}
}The Create() method has to return either the aggregate document type or Task<T> where T is the aggregate document type. There must be an argument for the specific event type or IEvent<T> where T is the event type if you need access to event metadata. You can also take in an IQuerySession if you need to look up additional data as part of the transformation or IEvent in addition to the exact event type just to get at event metadata.
Applying Changes to the Aggregate Document
TIP
Apply() methods or ProjectEvent<T>() method calls can also use interfaces or abstract types that are implemented by specific event types, and Marten will apply all those event types that can be cast to the interface or abstract type to that method when executing the projection.
To make changes to an existing aggregate, you can either use inline Lambda functions per event type with one of the overloads of ProjectEvent():
public class TripProjection: SingleStreamProjection<Trip, Guid>
{
public TripProjection()
{
ProjectEvent<Arrival>((trip, e) => trip.State = e.State);
ProjectEvent<Travel>((trip, e) => trip.Traveled += e.TotalDistance());
ProjectEvent<TripEnded>((trip, e) =>
{
trip.Active = false;
trip.EndedOn = e.Day;
});
ProjectEventAsync<Breakdown>(async (session, trip, e) =>
{
var repairShop = await session.Query<RepairShop>()
.Where(x => x.State == trip.State)
.FirstOrDefaultAsync();
trip.RepairShopId = repairShop?.Id;
});
}
}I'm not personally that wild about using lots of inline Lambdas like the example above, and to that end, Marten now supports the Apply() method convention. Here's the same TripProjection, but this time using methods to mutate the Trip document:
public class TripProjection: SingleStreamProjection<Trip, Guid>
{
public TripProjection()
{
DeleteEvent<TripAborted>();
DeleteEvent<Breakdown>(x => x.IsCritical);
DeleteEvent<VacationOver>((trip, _) => trip.Traveled > 1000);
}
// These methods can be either public, internal, or private but there's
// a small performance gain to making them public
public void Apply(Arrival e, Trip trip) => trip.State = e.State;
public void Apply(Travel e, Trip trip)
{
Debug.WriteLine($"Trip {trip.Id} Traveled " + e.TotalDistance());
trip.Traveled += e.TotalDistance();
Debug.WriteLine("New total distance is " + e.TotalDistance());
}
public void Apply(TripEnded e, Trip trip)
{
trip.Active = false;
trip.EndedOn = e.Day;
}
public Trip Create(IEvent<TripStarted> started)
{
return new Trip { Id = started.StreamId, StartedOn = started.Data.Day, Active = true };
}
}The Apply() methods can accept any combination of these arguments:
- The actual event type
IEvent<T>where theTis the actual event type. Use this if you want access to the event metadata like versions or timestamps.IEventaccess the event metadata. It's perfectly valid to accept bothIEventfor the metadata and the specific event type just out of convenience.IQuerySessionif you need to do additional data lookups- The aggregate type
The valid return types are:
voidif you are mutating the aggregate document- The aggregate type itself, and this allows you to use immutable aggregate types
Taskif you are mutating the aggregate document with the use of external data read throughIQuerySessionTask<T>whereTis the aggregate type. This allows you to use immutable aggregate types while also using external data read throughIQuerySession
Deleting the Aggregate Document
In asynchronous or inline projections, receiving a certain event may signal that the projected document is now obsolete and should be deleted from document storage. If a certain event type always signals a deletion to the aggregated view, you can use this mechanism inside of the constructor function of your aggregate projection type:
public class TripProjection: SingleStreamProjection<Trip, Guid>
{
public TripProjection()
{
// The current Trip aggregate would be deleted if
// the projection encountered a TripAborted event
DeleteEvent<TripAborted>();
}
}If the deletion of the aggregate document needs to be done by testing some combination of the current aggregate state, the event, and maybe even other document state in your Marten database, you can use more overloads of DeleteEvent() as shown below:
public class TripProjection: SingleStreamProjection<Trip, Guid>
{
public TripProjection()
{
// The current Trip aggregate would be deleted if
// the Breakdown event is "critical"
DeleteEvent<Breakdown>(x => x.IsCritical);
// Alternatively, delete the aggregate if the trip
// is currently in New Mexico and the breakdown is critical
DeleteEvent<Breakdown>((trip, e) => e.IsCritical && trip.State == "New Mexico");
DeleteEventAsync<Breakdown>(async (session, trip, e) =>
{
var anyRepairShopsInState = await session.Query<RepairShop>()
.Where(x => x.State == trip.State)
.AnyAsync();
// Delete the trip if there are no repair shops in
// the current state
return !anyRepairShopsInState;
});
}
}Another option is to use a method convention with a method named ShouldDelete(), with this equivalent using the ShouldDelete() : bool method convention:
public class TripProjection: SingleStreamProjection<Trip, Guid>
{
// The current Trip aggregate would be deleted if
// the Breakdown event is "critical"
public bool ShouldDelete(Breakdown breakdown) => breakdown.IsCritical;
// Alternatively, delete the aggregate if the trip
// is currently in New Mexico and the breakdown is critical
public bool ShouldDelete(Trip trip, Breakdown breakdown)
=> breakdown.IsCritical && trip.State == "New Mexico";
public async Task<bool> ShouldDelete(IQuerySession session, Trip trip, Breakdown breakdown)
{
var anyRepairShopsInState = await session.Query<RepairShop>()
.Where(x => x.State == trip.State)
.AnyAsync();
// Delete the trip if there are no repair shops in
// the current state
return !anyRepairShopsInState;
}
}The ShouldDelete() method can take any combination of these arguments:
- The actual event type
IEvent<T>where theTis the actual event type. Use this if you want access to the event metadata like versions or timestamps.IQuerySessionif you need to do additional data lookups- The aggregate type
Additionally, ShouldDelete() methods should return either a Boolean or Task<Boolean> if doing data lookups with IQuerySession -- and we'very strongly recommend using strictly asynchronous APIs if running the projection asynchronously or using SaveChangesAsync() when executing projections inline.

