Bulk Appending Events
8.xTIP
This feature is intended for data seeding, migration from other event stores, load testing, and importing events from external systems. For normal application event appending, use the standard Appending Events API instead.
Marten provides a high-throughput bulk event append API that uses PostgreSQL's COPY ... FROM STDIN BINARY protocol to efficiently load large numbers of events into the event store. This bypasses the normal append pipeline for maximum speed, making it suitable for scenarios where you need to insert millions or even billions of events.
How It Works
The bulk append API:
- Pre-allocates event sequence numbers from the
mt_events_sequence - Uses
NpgsqlBinaryImporterto COPY stream records intomt_streams - Uses
NpgsqlBinaryImporterto COPY event records intomt_events - Updates the high water mark in
mt_event_progressionso the async daemon knows where to start
This approach is significantly faster than the normal append path because it avoids per-row function calls, version checking, and individual INSERT statements.
Basic Usage
Build a list of StreamAction objects representing new event streams, then call BulkInsertEventsAsync on the document store:
public static async Task BulkAppendBasicExample(DocumentStore store)
{
// Build up a list of stream actions with events
var streams = new List<StreamAction>();
for (int i = 0; i < 1000; i++)
{
var streamId = Guid.NewGuid();
var events = new object[]
{
new BulkOrderPlaced(streamId, "Widget", 5),
new BulkOrderShipped(streamId, $"TRACK-{i}"),
new BulkOrderDelivered(streamId, DateTimeOffset.UtcNow)
};
streams.Add(StreamAction.Start(store.Events, streamId, events));
}
// Bulk insert all events using PostgreSQL COPY for maximum throughput
await store.BulkInsertEventsAsync(streams);
}Multi-Tenancy
When using conjoined multi-tenancy, use the tenant-specific overload:
public static async Task BulkAppendWithTenantExample(DocumentStore store)
{
var streams = new List<StreamAction>();
for (int i = 0; i < 500; i++)
{
var streamId = Guid.NewGuid();
var events = new object[]
{
new BulkOrderPlaced(streamId, "Gadget", 2),
new BulkOrderShipped(streamId, $"TRACK-{i}")
};
streams.Add(StreamAction.Start(store.Events, streamId, events));
}
// Bulk insert events for a specific tenant when using conjoined tenancy
await store.BulkInsertEventsAsync("tenant-abc", streams);
}Event Metadata
You can set metadata on individual events before bulk inserting. This works with any combination of enabled metadata columns (correlation ID, causation ID, headers, user name):
public static async Task BulkAppendWithMetadataExample(DocumentStore store)
{
var streamId = Guid.NewGuid();
var events = new object[]
{
new BulkOrderPlaced(streamId, "Widget", 10),
new BulkOrderShipped(streamId, "TRACK-123")
};
var action = StreamAction.Start(store.Events, streamId, events);
// Set metadata on individual events before bulk inserting
foreach (var e in action.Events)
{
e.CorrelationId = "import-batch-42";
e.CausationId = "migration-job";
e.SetHeader("source", "legacy-system");
}
await store.BulkInsertEventsAsync(new[] { action });
}Controlling Batch Size
For very large imports, you can control the COPY batch size. Each batch is a separate PostgreSQL COPY operation, which helps manage memory usage:
public static async Task BulkAppendWithBatchSizeExample(DocumentStore store)
{
var streams = new List<StreamAction>();
// Generate a large number of streams
for (int i = 0; i < 100_000; i++)
{
var streamId = Guid.NewGuid();
streams.Add(StreamAction.Start(store.Events, streamId,
new object[] { new BulkOrderPlaced(streamId, "Item", 1) }));
}
// Control the COPY batch size for memory management.
// Each batch is a separate PostgreSQL COPY operation.
await store.BulkInsertEventsAsync(streams, batchSize: 5000);
}String Stream Identity
The bulk append API works with both Guid and string stream identities:
public static async Task BulkAppendWithStringIdentityExample(DocumentStore store)
{
// When using StreamIdentity.AsString, use string-keyed stream actions
var streams = new List<StreamAction>();
for (int i = 0; i < 100; i++)
{
var key = $"order-{Guid.NewGuid():N}";
var events = new object[]
{
new BulkOrderPlaced(Guid.NewGuid(), "Widget", 1),
new BulkOrderShipped(Guid.NewGuid(), $"TRACK-{i}")
};
streams.Add(StreamAction.Start(store.Events, key, events));
}
await store.BulkInsertEventsAsync(streams);
}Supported Configurations
The bulk append API supports all combinations of:
| Configuration | Options |
|---|---|
| Stream identity | AsGuid, AsString |
| Tenancy | Single, Conjoined |
| Archived stream partitioning | On, Off |
| Metadata columns | Correlation ID, Causation ID, Headers, User Name (any combination) |
Limitations
The bulk append API intentionally trades off features for throughput:
- No inline projections -- events are written directly without triggering inline projections. Use async projections and rebuild after bulk loading.
- No optimistic concurrency -- there is no version checking against existing streams. This API is designed for initial data loading, not concurrent writes.
- New streams only -- bulk append creates new streams. It does not support appending to existing streams.
- No event tags -- DCB tag operations are not included in the COPY pipeline. Tags would need to be handled separately after bulk loading.
Performance
In local benchmarks, the bulk append API achieves approximately 80,000-110,000 events/second depending on event complexity and PostgreSQL configuration. This compares to approximately 60,000-80,000 events/second using Marten's QuickAppend mode with parallel sessions.
The bulk append approach is especially advantageous when loading tens of millions of events or more, where the reduced per-event overhead of PostgreSQL COPY becomes significant.

