Fork me on GitHub

Querying Event and Stream Data Edit on GitHub


Fetch Events for a Stream

You can retrieve the events for a single stream at any time with the IEventStore.FetchStream() methods shown below:


public void load_event_stream(IDocumentSession session, Guid streamId)
{
    // Fetch *all* of the events for this stream
    var events1 = session.Events.FetchStream(streamId);

    // Fetch the events for this stream up to and including version 5
    var events2 = session.Events.FetchStream(streamId, 5);

    // Fetch the events for this stream at this time yesterday
    var events3 = session.Events
                .FetchStream(streamId, timestamp: DateTime.UtcNow.AddDays(-1));
}

public async Task load_event_stream_async(IDocumentSession session, Guid streamId)
{
    // Fetch *all* of the events for this stream
    var events1 = await session.Events.FetchStreamAsync(streamId);

    // Fetch the events for this stream up to and including version 5
    var events2 = await session.Events.FetchStreamAsync(streamId, 5);

    // Fetch the events for this stream at this time yesterday
    var events3 = await session.Events
                .FetchStreamAsync(streamId, timestamp: DateTime.UtcNow.AddDays(-1));
}


The data returned is a list of IEvent objects, where each is a strongly-typed Event<T> object shown below:


public interface IEvent
{
    Guid Id { get; set; }
    int Version { get; set; }

    long Sequence { get; set; }

    object Data { get; }

    Guid StreamId { get; set; }

    string StreamKey { get; set; }

    DateTimeOffset Timestamp { get; set; }

    string TenantId { get; set; }

    void Apply<TAggregate>(TAggregate state, IAggregator<TAggregate> aggregator)
        where TAggregate : class;
}


Stream State

If you just need to check on the state of an event stream - what version (effectively the number of events in the stream) it is and what, if any, aggregate type it represents - you can use the IEventStore.FetchStreamState()/FetchStreamStateAsync() methods or IBatchQuery.Events.FetchStreamState(), as shown below:


public class fetching_stream_state: IntegrationContextWithIdentityMap<NulloIdentityMap>
{
    private Guid theStreamId;

    public fetching_stream_state(DefaultStoreFixture fixture) : base(fixture)
    {
        var joined = new MembersJoined { Members = new string[] { "Rand", "Matt", "Perrin", "Thom" } };
        var departed = new MembersDeparted { Members = new[] { "Thom" } };

        theStreamId = theSession.Events.StartStream<Quest>(joined, departed).Id;
        theSession.SaveChanges();
    }

    [Fact]
    public void can_fetch_the_stream_version_and_aggregate_type()
    {
        var state = theSession.Events.FetchStreamState(theStreamId);

        state.Id.ShouldBe(theStreamId);
        state.Version.ShouldBe(2);
        state.AggregateType.ShouldBe(typeof(Quest));
        state.LastTimestamp.ShouldNotBe(DateTime.MinValue);
        state.Created.ShouldNotBe(DateTime.MinValue);
    }

    [Fact]
    public async Task can_fetch_the_stream_version_and_aggregate_type_async()
    {
        var state = await theSession.Events.FetchStreamStateAsync(theStreamId).ConfigureAwait(false);

        state.Id.ShouldBe(theStreamId);
        state.Version.ShouldBe(2);
        state.AggregateType.ShouldBe(typeof(Quest));
        state.LastTimestamp.ShouldNotBe(DateTime.MinValue);
        state.Created.ShouldNotBe(DateTime.MinValue);
    }

    [Fact]
    public async Task can_fetch_the_stream_version_through_batch_query()
    {
        var batch = theSession.CreateBatchQuery();

        var stateTask = batch.Events.FetchStreamState(theStreamId);

        await batch.Execute().ConfigureAwait(false);

        var state = await stateTask.ConfigureAwait(false);

        state.Id.ShouldBe(theStreamId);
        state.Version.ShouldBe(2);
        state.AggregateType.ShouldBe(typeof(Quest));
        state.LastTimestamp.ShouldNotBe(DateTime.MinValue);
    }

    [Fact]
    public async Task can_fetch_the_stream_events_through_batch_query()
    {
        var batch = theSession.CreateBatchQuery();

        var eventsTask = batch.Events.FetchStream(theStreamId);

        await batch.Execute().ConfigureAwait(false);

        var events = await eventsTask.ConfigureAwait(false);

        events.Count.ShouldBe(2);
    }
}


Furthermore, StreamState contains metadata for when the stream was created, StreamState.Created, and when the stream was last updated, StreamState.LastTimestamp.

Fetch a Single Event

You can fetch the information for a single event by id, including its version number within the stream, by using IEventStore.Load() as shown below:


public void load_a_single_event_synchronously(IDocumentSession session, Guid eventId)
{
    // If you know what the event type is already
    var event1 = session.Events.Load<MembersJoined>(eventId);

    // If you do not know what the event type is
    var event2 = session.Events.Load(eventId);
}

public async Task load_a_single_event_asynchronously(IDocumentSession session, Guid eventId)
{
    // If you know what the event type is already
    var event1 = await session.Events.LoadAsync<MembersJoined>(eventId)
        .ConfigureAwait(false);

    // If you do not know what the event type is
    var event2 = await session.Events.LoadAsync(eventId)
        .ConfigureAwait(false);
}


Querying Directly Against Event Data

We urge caution about this functionality because it requires a search against the entire mt_events table. To issue Linq queries against any specific event type, use the method shown below:


[Fact]
public void can_query_against_event_type()
{
    theSession.Events.StartStream<Quest>(joined1, departed1);
    theSession.Events.StartStream<Quest>(joined2, departed2);

    theSession.SaveChanges();

    theSession.Events.QueryRawEventDataOnly<MembersJoined>().Count().ShouldBe(2);
    theSession.Events.QueryRawEventDataOnly<MembersJoined>().ToArray().SelectMany(x => x.Members).Distinct()
        .OrderBy(x => x)
        .ShouldHaveTheSameElementsAs("Egwene", "Matt", "Nynaeve", "Perrin", "Rand", "Thom");

    theSession.Events.QueryRawEventDataOnly<MembersDeparted>().Where(x => x.Members.Contains("Matt"))
        .Single().Id.ShouldBe(departed2.Id);
}


You can use any Linq operator that Marten supports to query against event data. We think that this functionality is probably more useful for diagnostics or troubleshooting rather than something you would routinely use to support your application. We recommend that you favor event projection views over querying within the raw event table.

With Marten 1.0, you can issue queries with Marten's full Linq support against the raw event data with this method:


public void example_of_querying_for_event_data(IDocumentSession session, Guid stream)
{
    var events = session.Events.QueryAllRawEvents()
        .Where(x => x.StreamId == stream)
        .OrderBy(x => x.Sequence)
        .ToList();
}


This mechanism will allow you to query by any property of the IEvent interface shown above.