Skip to content

Explicit Aggregations

The original concept for Marten projections was the conventional method model (Apply() / Create() / ShouldDelete() methods), but we quickly found out that the workflow generated from these methods just isn't sufficient for many user needs. At the same time, other users just prefer explicit code anyway, so Marten provides the CustomProjection<TDoc, TId> base class as a way to configure custom projections that use explicit code for the actual work of building projected, aggregate documents from raw events.

Alright, let's jump right into an example. Two of the drivers for this feature were for aggregations to document types that were soft-deleted or aggregations where some events should only apply to the aggregate document if the document already existed. To illustrate this with a contrived example, let's say that we've got these event types:

cs
public class Start
{
}

public class End
{
}

public class Restart
{
}

public class Increment
{
}

snippet source | anchor

And a simple aggregate document type like this:

cs
public class StartAndStopAggregate: ISoftDeleted
{
    public int Count { get; set; }

    public Guid Id { get; set; }

    // These are Marten controlled
    public bool Deleted { get; set; }
    public DateTimeOffset? DeletedAt { get; set; }

    public void Increment()
    {
        Count++;
    }
}

snippet source | anchor

As you can see, StartAndStopAggregate as a Guid as its identity and is also soft-deleted when stored by Marten by virtue of implementing the ISoftDeleted interface.

With all that being done, here's a sample aggregation that inherits from the Marten Marten.Events.Aggregation.CustomAggregation<TDoc, TId> base class:

cs
public class StartAndStopProjection: CustomProjection<StartAndStopAggregate, Guid>
{
    public StartAndStopProjection()
    {
        // I'm telling Marten that events are assigned to the aggregate
        // document by the stream id
        AggregateByStream();

        // This is an optional, but potentially important optimization
        // for the async daemon so that it sets up an allow list
        // of the event types that will be run through this projection
        IncludeType<Start>();
        IncludeType<End>();
        IncludeType<Restart>();
        IncludeType<Increment>();
    }

    public override ValueTask ApplyChangesAsync(DocumentSessionBase session,
        EventSlice<StartAndStopAggregate, Guid> slice, CancellationToken cancellation,
        ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline)
    {
        var aggregate = slice.Aggregate;

        foreach (var data in slice.AllData())
        {
            switch (data)
            {
                case Start:
                    aggregate = new StartAndStopAggregate
                    {
                        // Have to assign the identity ourselves
                        Id = slice.Id
                    };
                    break;
                case Increment when aggregate is { Deleted: false }:
                    // Use explicit code to only apply this event
                    // if the aggregate already exists
                    aggregate.Increment();
                    break;
                case End when aggregate is { Deleted: false }:
                    // This will be a "soft delete" because the aggregate type
                    // implements the IDeleted interface
                    session.Delete(aggregate);
                    aggregate.Deleted = true; // Got to help Marten out a little bit here
                    break;
                case Restart when aggregate == null || aggregate.Deleted:
                    // Got to "undo" the soft delete status
                    session
                        .UndoDeleteWhere<StartAndStopAggregate>(x => x.Id == slice.Id);
                    break;
            }
        }

        // Apply any updates!
        if (aggregate != null)
        {
            session.Store(aggregate);
        }

        // We didn't do anything that required an asynchronous call
        return new ValueTask();
    }
}

snippet source | anchor

Custom Grouping

All aggregations in Marten come in two parts:

  1. Grouping incoming events into "slices" of events that should be applied to an aggregate by id
  2. Applying incoming events from each slice into the identified aggregate

CustomAggregate supports aggregating by the stream identity as shown above. You can also use all the same customizable grouping functionality as the older MultiStreamProjection subclass.

Simple Workflows 7.28

The base class can be used for strictly live aggregations. If all you're doing is using this mechanism for Live aggregation, or have a simple workflow where the aggregate is always going to be built strictly from the event data, you can override only the Apply() method as shown below:

cs
public class ExplicitCounter: CustomProjection<SimpleAggregate, Guid>
{
    public override SimpleAggregate Apply(SimpleAggregate snapshot, IReadOnlyList<IEvent> events)
    {
        snapshot ??= new SimpleAggregate();
        foreach (var e in events.Select(x => x.Data))
        {
            if (e is AEvent) snapshot.ACount++;
            if (e is BEvent) snapshot.BCount++;
            if (e is CEvent) snapshot.CCount++;
            if (e is DEvent) snapshot.DCount++;
        }

        // You have to explicitly return the new value
        // of the aggregated document no matter what!
        return snapshot;
    }
}

snippet source | anchor

Note that this usage is valid for all possible projection lifecycles now (Live, Inline, and Async).

Released under the MIT License.