Fork me on GitHub

Custom Projections Edit on GitHub


Multistream Projections using ViewProjection

The ViewProjection class is an implementation of the IProjection that can handle building a projection from multiple streams.

This can be setup from configuration like:


StoreOptions(_ =>
{
    _.AutoCreateSchemaObjects = AutoCreate.All;
    _.Events.InlineProjections.AggregateStreamsWith<QuestParty>();
    _.Events.ProjectView<PersistedView, Guid>()
        .ProjectEvent<ProjectionEvent<QuestStarted>>((view, @event) => { view.Events.Add(@event.Data); view.StreamIdsForEvents.Add(@event.StreamId); })
        .ProjectEvent<MembersJoined>(e => e.QuestId, (view, @event) => { view.Events.Add(@event); })
        .ProjectEvent<ProjectionEvent<MonsterSlayed>>(e => e.Data.QuestId, (view, @event) => { view.Events.Add(@event.Data); view.StreamIdsForEvents.Add(@event.StreamId); })
        .DeleteEvent<QuestEnded>()
        .DeleteEvent<MembersDeparted>(e => e.QuestId)
        .DeleteEvent<MonsterDestroyed>((session, e) => session.Load<QuestParty>(e.QuestId).Id);
});
or through a class like:

public class PersistViewProjection: ViewProjection<PersistedView, Guid>
{
    public PersistViewProjection()
    {
        ProjectEvent<QuestStarted>(Persist);
        ProjectEvent<MembersJoined>(e => e.QuestId, Persist);
        ProjectEvent<MonsterSlayed>((session, e) => session.Load<QuestParty>(e.QuestId).Id, Persist);
        DeleteEvent<QuestEnded>();
        DeleteEvent<MembersDeparted>(e => e.QuestId);
        DeleteEvent<MonsterDestroyed>((session, e) => session.Load<QuestParty>(e.QuestId).Id);
    }

    private void Persist<T>(PersistedView view, T @event)
    {
        view.Events.Add(@event);
    }
}


ProjectEvent by default takes two parameters:

  • property from event that will be used as projection document selector,
  • apply method that describes the projection by itself.

DeleteEvent takes the first parameter - as by the nature of this method it's only needed to select which document should be deleted.

Both methods may also select multiple Ids:

  • ProjectEvent if a List<TId> is passed, the handler method will be called for each Id in the collection.
  • DeleteEvent if a List<TId> is passed, then each document tied to the Id in the collection will be removed.

Each of these methods take various overloads that allow selecting the Id field implicitly, through a property or through two different Funcs Func<IDocumentSession, TEvent, TId> and Func<TEvent, TId>.

Warning:
Projection class needs to have Id property with public getter or property marked with Identity attribute.

It comes of the way how Marten handles projection mechanism:
  1. Try to find document that has the same Id as the value of the property selected from event (so eg. for UserCreated event it will be UserId).
  2. If such document exists, then new record needs to be created. Marten by default tries to use default constructor.
    Default constructor doesn't have to be public, might be also private or protected.
    If class does not have the default constructor then it creates an uninitialized object (see more).
    Because of that, no member initializers will be run so all of them need to be initialized in the event handler methods.
  3. If document with such Id was found then it's being loaded from database.
  4. Document is updated with the defined in ViewProjection logic (using expression from second ProjectEvent parameter).
  5. Created or updated document is upserted to database.

Using event meta data

If additional Marten event details are needed, then events can use the ProjectionEvent<> generic when setting them up with ProjectEvent. ProjectionEvent exposes the Marten Id, Version, Timestamp and Data.



public class Lap
{
    public Guid Id { get; set; }

    public DateTimeOffset? Start { get; set; }

    public DateTimeOffset? End { get; set; }
}

public class LapStarted
{
    public Guid LapId { get; set; }
}

public class LapFinished
{
    public Guid LapId { get; set; }
}

public class LapViewProjection: ViewProjection<Lap, Guid>
{
    public LapViewProjection()
    {
        ProjectEvent<ProjectionEvent<LapStarted>>(e => e.Data.LapId, Persist);
        ProjectEvent<ProjectionEvent<LapFinished>>(e => e.Data.LapId, Persist);
    }

    private void Persist(Lap view, ProjectionEvent<LapStarted> eventData)
    {
        view.Start = eventData.Timestamp;
    }

    private void Persist(Lap view, ProjectionEvent<LapFinished> eventData)
    {
        view.End = eventData.Timestamp;
    }
}


Injecting helpers classes

ViewProjections instances are created (by default) during the DocumentStore initialization. Marten gives also possible to register them with factory method. With such registration projections are created on runtime during the events application. Thanks to that it's possible to setup custom creation logic or event connect dependency injection mechanism.


StoreOptions(_ =>
{
    _.AutoCreateSchemaObjects = AutoCreate.All;
    _.Events.TenancyStyle = tenancyStyle;
    _.Events.InlineProjections.AggregateStreamsWith<QuestParty>();
    _.Events.InlineProjections.Add(() => new PersistViewProjectionWithInjection(logger));
});

By convention it's needed to provide the default constructor with projections definition and other with code injection (that calls the default constructor).


public class PersistViewProjectionWithInjection: PersistViewProjection
{
    private readonly Logger logger;

    public PersistViewProjectionWithInjection() : base()
    {
        ProjectEvent<QuestPaused>(@event => @event.QuestId, LogAndPersist);
    }

    public PersistViewProjectionWithInjection(Logger logger) : this()
    {
        this.logger = logger;
    }

    private void LogAndPersist<T>(PersistedView view, T @event)
    {
        logger.Log($"Handled {typeof(T).Name} event: {@event.ToString()}");
        view.Events.Add(@event);
    }
}


Using async projections

It's also possible to use async version of ProjectEvent. Using ProjectEventAsync gives possibility to call the async apis (from Marten or other frameworks) to get better resources utilization.

Sample usage could be loading other document/projection to create denormalized view.



// Customer main aggregate
public class Customer
{
    public Guid Id { get; set; }

    public string FullName { get; set; }
}

// Event informing that customer full name was updated
public class CustomerFullNameUpdated
{
    public Guid CustomerId { get; set; }

    public string FullName { get; set; }
}

// Bank Account main aggregate
public class BankAccount
{
    public Guid Id { get; set; }

    // normalized reference with id to related aggregate
    public Guid CustomerId { get; set; }

    public string Number { get; set; }
}

//Bank Account created event with normalized data
public class BankAccountCreated
{
    public Guid BankAccountId { get; set; }

    public Guid CustomerId { get; set; }

    public string Number { get; set; }
}

// Denormalized read model with full data of related document
public class BankAccountView
{
    public Guid Id { get; set; }

    // Full info about customer instead of just CustomerId
    public Customer Customer { get; set; }

    public string Number { get; set; }
}

public class BankAccountViewProjection: ViewProjection<BankAccountView, Guid>
{
    public BankAccountViewProjection()
    {
        ProjectEventAsync<BankAccountCreated>(e => e.BankAccountId, PersistAsync);

        // one customer might have more than one account
        Func<IDocumentSession, CustomerFullNameUpdated, List<Guid>> selectCustomerBankAccountIds =
            (ds, @event) => ds.Query<BankAccountView>()
                              .Where(a => a.Customer.Id == @event.CustomerId)
                              .Select(a => a.Id).ToList();

        ProjectEvent<CustomerFullNameUpdated>(selectCustomerBankAccountIds, Persist);
    }

    private async Task PersistAsync
    (
        IDocumentSession documentSession,
        BankAccountView view,
        BankAccountCreated @event
    )
    {
        // load asynchronously document to use it in denormalized view
        var customer = await documentSession.LoadAsync<Customer>(@event.CustomerId);

        view.Customer = customer;
        view.Number = @event.Number;
    }

    private void Persist(BankAccountView view, CustomerFullNameUpdated @event)
    {
        view.Customer.FullName = @event.FullName;
    }
}

Warning:
Note the "async projections" term in this context means that they are using the .NET async/await mechanism that helps to use threads efficiently without locking them.
It does not refer to async projections as eventually consistent. Such option provides Async Daemon.

Update only projection

ProjectEvent overloads contain additional boolean parameter onlyUpdate. By default, it's set to false which mean that Marten will do create or update operation with projection view.

Lets' look on the following scenario of the projection that manages the newsletter Subscription.

  1. New reader subscribed to newsletter and ReaderSubscribed event was published. Projection handles the event and creates new view record in database.
  2. User opened newsletter and NewsletterOpened event was published. Projection handles the event and updates view in database with incremented opens count.
  3. User unsubscribed from newsletter and ReaderUnsubscribed event was published. Projection removed the view from database (because we market it with DeleteEvent).
  4. User opened newsletter after unsubscribing and NewsletterOpened event was published. As there is no record in database if we use the default behaviour then new record will be created with only data that are applied for the NewsletterOpened event. That's might create views with unexpected state. In that case, onlyUpdate set to true should be used. Having that, if the view does not exist then the event will not be projected and new view record will not be created in database.


public class NewsletterSubscription
{
    public Guid Id { get; set; }

    public Guid NewsletterId { get; set; }

    public Guid ReaderId { get; set; }

    public string FirstName { get; set; }

    public int OpensCount { get; set; }
}

public class ReaderSubscribed
{
    public Guid SubscriptionId { get; }

    public Guid NewsletterId { get; }

    public Guid ReaderId { get; }

    public string FirstName { get; }

    public ReaderSubscribed(Guid subscriptionId, Guid newsletterId, Guid readerId, string firstName)
    {
        SubscriptionId = subscriptionId;
        NewsletterId = newsletterId;
        ReaderId = readerId;
        FirstName = firstName;
    }
}

public class NewsletterOpened
{
    public Guid SubscriptionId { get; }

    public DateTime OpenedAt { get; }

    public NewsletterOpened(Guid subscriptionId, DateTime openedAt)
    {
        SubscriptionId = subscriptionId;
        OpenedAt = openedAt;
    }
}

public class ReaderUnsubscribed
{
    public Guid SubscriptionId { get; }

    public ReaderUnsubscribed(Guid subscriptionId)
    {
        SubscriptionId = subscriptionId;
    }
}

public class NewsletterSubscriptionProjection : ViewProjection<NewsletterSubscription, Guid>
{
    public NewsletterSubscriptionProjection()
    {
        ProjectEvent<ReaderSubscribed>(@event => @event.SubscriptionId, Persist);
        ProjectEvent<NewsletterOpened>(@event => @event.SubscriptionId, Persist, onlyUpdate: true);
        DeleteEvent<ReaderUnsubscribed>(@event => @event.SubscriptionId);
    }

    private void Persist(NewsletterSubscription view, ReaderSubscribed @event)
    {
        view.Id = @event.SubscriptionId;
        view.NewsletterId = @event.NewsletterId;
        view.ReaderId = @event.ReaderId;
        view.FirstName = @event.FirstName;
        view.OpensCount = 0;
    }

    private void Persist(NewsletterSubscription view, NewsletterOpened @event)
    {
        view.OpensCount++;
    }
}