Fork me on GitHub

Projecting by Event Type Edit on GitHub


While projections can target specific stream or streams, it is also possible to project by event types. The following sample demonstrates this with the CandleProjection that implements the ViewProjection interface to build Candle projections from events of type Tick.

Introduce a type to hold candle data:


public sealed class Candle
{
    public string Id { get; set; }
    public string Symbol { get; set; }
    public decimal High { get; set; }
    public decimal Low { get; set; }
    public decimal Open { get; set; }
    public decimal Close { get; set; }
    public DateTime? From { get; set; }
    public DateTime? To { get; set; }
}


This data will then be populated and updated from observing ticks:


public sealed class Tick
{
    public Guid Id { get; set; }
    public readonly DateTime Time;
    public readonly decimal Price;
    public readonly string Symbol;

    public Tick(DateTime time, string symbol, decimal price)
    {
        Time = time;
        Price = price;
        Symbol = symbol;
    }
}


We then introduce a projection that subscribes to the 'Tick' event:


public sealed class CandleProjection: ViewProjection<Candle, string>
{
    public CandleProjection(int transactions)
    {
        // We project from the event Tick
        ProjectEvent<Tick>(
            (session, tick) =>
            {
                // We check the number of events from our stream and assume it as the transaction count
                var version = session.Events.FetchStreamState(tick.Symbol)?.Version ?? 0;
                // We use the transaction count to ensure our candle holds at most data for specified number of transactions
                return tick.Symbol + (version / transactions).ToString();
            },
            OnTick);
    }

    public CandleProjection(TimeSpan window)
    {
        // We project from the event Tick
        // We use the specified time window to ensure our candle holds at most data for window length of ticks
        ProjectEvent<Tick>(tick => tick.Symbol + tick.Time.Ticks / window.Ticks, OnTick);
    }

    // Update our candle with the tick data
    private void OnTick(Candle candle, Tick tick)
    {
        candle.Symbol = tick.Symbol;

        if (!candle.From.HasValue)
        {
            candle.From = tick.Time;
            candle.Open = candle.Low = candle.High = tick.Price;
        }

        if (tick.Price < candle.Low)
        {
            candle.Low = tick.Price;
        }

        if (tick.Price > candle.High)
        {
            candle.High = tick.Price;
        }

        candle.To = tick.Time;
        candle.Close = tick.Price;
    }
}


Lastly, we configure the Event Store to use the newly introduced projection:


StoreOptions(storeOptions =>
{
    // We index our streams by strings
    storeOptions.Events.StreamIdentity = StreamIdentity.AsString;

    // Build candles in 1 second windows
    storeOptions.Events.InlineProjections.Add(new CandleProjection(TimeSpan.FromSeconds(1)));
    // Build candles for 5 transactions
    storeOptions.Events.InlineProjections.Add(new CandleProjection(5));
});

var random = new Random();
var symbols = new[] { "AAPL", "GOOG", "MSFT" };

var ticks = Enumerable.Range(1, 15)
    .Select(i => new Tick(DateTime.UtcNow.AddMinutes(i), symbols[random.Next(symbols.Length)], random.Next(100 * i)))
    .ToList();

using (var s = theStore.OpenSession())
{
    foreach (var tick in ticks)
    {
        // We index the streams by the name of the symbol
        s.Events.Append(tick.Symbol, tick);
    }
    s.SaveChanges();
}

var maxPrice = ticks.Max(x => x.Price);
var maxTick = ticks.First(x => x.Price == maxPrice);

using (var s = theStore.QuerySession())
{
    // We should have a candle with the maximum from our maxTick
    var candle = s.Query<Candle>().Where(x => x.Symbol == maxTick.Symbol).OrderByDescending(x => x.High).First();
    var count = s.Query<Candle>().Count();

    Assert.Equal(maxTick.Price, candle.High);
    // 15*1 second windows + 3*5 transactions
    Assert.Equal(18, count);
}