Fork me on GitHub

Schema Feature Extensions Edit on GitHub


New in Marten 2.4.0 is the ability to add additional features with custom database schema objects that simply plug into Marten's schema management facilities. The key abstraction is the IFeatureSchema interface shown below:


public interface IFeatureSchema
{
    IEnumerable<Type> DependentTypes();

    bool IsActive(StoreOptions options);

    ISchemaObject[] Objects { get; }

    Type StorageType { get; }

    string Identifier { get; }

    void WritePermissions(DdlRules rules, StringWriter writer);
}


Not to worry though, Marten comes with a base class that makes it a bit simpler to build out new features. Here's a very simple example that defines a custom table with one column:


public class FakeStorage : FeatureSchemaBase
{
    public FakeStorage(StoreOptions options) : base("fake", options)
    {
    }

    protected override IEnumerable<ISchemaObject> schemaObjects()
    {
        var table = new Table(new DbObjectName(Options.DatabaseSchemaName, "mt_fake_table"));
        table.AddColumn("name", "varchar");

        yield return table;
    }
}


Now, to actually apply this feature to your Marten applications, use this syntax:


var store = DocumentStore.For(_ =>
{
    // Creates a new instance of FakeStorage and
    // passes along the current StoreOptions
    _.Storage.Add<FakeStorage>();

    // or

    _.Storage.Add(new FakeStorage(_));
});

Do note that when you use the Add<T>() syntax, Marten will pass along the current StoreOptions to the constructor function if there is a constructor with that signature. Otherwise, it uses the no-arg constructor.

While you can directly implement the ISchemaObject interface for something Marten doesn't already support, it's probably far easier to just configure one of the existing implementations shown in the following sections.

  • Table
  • Function
  • Sequence

Table

Postgresql tables can be modeled with the Table class as shown in this example from the event store inside of Marten:


public class EventsTable: Table
{
    public EventsTable(EventGraph events) : base(new DbObjectName(events.DatabaseSchemaName, "mt_events"))
    {
        var stringIdType = events.GetStreamIdDBType();

        AddPrimaryKey(new TableColumn("seq_id", "bigint"));
        AddColumn("id", "uuid", "NOT NULL");
        AddColumn("stream_id", stringIdType, (events.TenancyStyle != TenancyStyle.Conjoined) ? $"REFERENCES {events.DatabaseSchemaName}.mt_streams ON DELETE CASCADE" : null);
        AddColumn("version", "integer", "NOT NULL");
        AddColumn("data", "jsonb", "NOT NULL");
        AddColumn("type", "varchar(500)", "NOT NULL");
        AddColumn("timestamp", "timestamptz", "default (now()) NOT NULL");
        AddColumn<TenantIdColumn>();
        AddColumn(new DotNetTypeColumn { Directive = "NULL" });

        if (events.TenancyStyle == TenancyStyle.Conjoined)
        {
            Constraints.Add($"FOREIGN KEY(stream_id, {TenantIdColumn.Name}) REFERENCES {events.DatabaseSchemaName}.mt_streams(id, {TenantIdColumn.Name})");
            Constraints.Add($"CONSTRAINT pk_mt_events_stream_and_version UNIQUE(stream_id, {TenantIdColumn.Name}, version)");
        }
        else
        {
            Constraints.Add("CONSTRAINT pk_mt_events_stream_and_version UNIQUE(stream_id, version)");
        }

        Constraints.Add("CONSTRAINT pk_mt_events_id_unique UNIQUE(id)");
    }
}


Function

Postgresql functions can be managed by creating a subclass of the Function base class as shown below from the big "append event" function in the event store:


    public class AppendEventFunction: Function
    {
        private readonly EventGraph _events;

        public AppendEventFunction(EventGraph events) : base(new DbObjectName(events.DatabaseSchemaName, "mt_append_event"))
        {
            _events = events;
        }

        public override void Write(DdlRules rules, StringWriter writer)
        {
            var streamIdType = _events.GetStreamIdDBType();
            var databaseSchema = _events.DatabaseSchemaName;

            var tenancyStyle = _events.TenancyStyle;

            var streamsWhere = "id = stream";

            if (tenancyStyle == TenancyStyle.Conjoined)
            {
                streamsWhere += " AND tenant_id = tenantid";
            }

            writer.WriteLine($@"
CREATE OR REPLACE FUNCTION {Identifier}(stream {streamIdType}, stream_type varchar, tenantid varchar, event_ids uuid[], event_types varchar[], dotnet_types varchar[], bodies jsonb[]) RETURNS int[] AS $$
DECLARE
	event_version int;
	event_type varchar;
	event_id uuid;
	body jsonb;
	index int;
	seq int;
    actual_tenant varchar;
	return_value int[];
BEGIN
	select version into event_version from {databaseSchema}.mt_streams where {streamsWhere}{(_events.UseAppendEventForUpdateLock ? " for update" : string.Empty)};
	if event_version IS NULL then
		event_version = 0;
		insert into {databaseSchema}.mt_streams (id, type, version, timestamp, tenant_id) values (stream, stream_type, 0, now(), tenantid);
    else
        if tenantid IS NOT NULL then
            select tenant_id into actual_tenant from {databaseSchema}.mt_streams where {streamsWhere};
            if actual_tenant != tenantid then
                RAISE EXCEPTION 'Marten: The tenantid does not match the existing stream';
            end if;
        end if;
	end if;

	index := 1;
	return_value := ARRAY[event_version + array_length(event_ids, 1)];

	foreach event_id in ARRAY event_ids
	loop
	    seq := nextval('{databaseSchema}.mt_events_sequence');
		return_value := array_append(return_value, seq);

	    event_version := event_version + 1;
		event_type = event_types[index];
		body = bodies[index];

		insert into {databaseSchema}.mt_events
			(seq_id, id, stream_id, version, data, type, tenant_id, {DocumentMapping.DotNetTypeColumn})
		values
			(seq, event_id, stream, event_version, body, event_type, tenantid, dotnet_types[index]);

		index := index + 1;
	end loop;

	update {databaseSchema}.mt_streams set version = event_version, timestamp = now() where {streamsWhere};

	return return_value;
END
$$ LANGUAGE plpgsql;
");
        }

        protected override string toDropSql()
        {
            var streamIdType = _events.GetStreamIdDBType();
            return $"drop function if exists {Identifier} ({streamIdType}, varchar, varchar, uuid[], varchar[], jsonb[]);";
        }
    }


Sequence

Postgresql sequences can be managed with this usage:


var sequence = new Sequence(new DbObjectName(DatabaseSchemaName, "mt_events_sequence"))
{
    Owner = eventsTable.Identifier,
    OwnerColumn = "seq_id"
};