Skip to content

Instantly share code, notes, and snippets.

@sbellware
Created May 15, 2016 02:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sbellware/dfea0b5bde42c406cb7870d0e18df2ff to your computer and use it in GitHub Desktop.
Save sbellware/dfea0b5bde42c406cb7870d0e18df2ff to your computer and use it in GitHub Desktop.
Event Sourcing Basics for Postgres
CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA public;
-- ----------------------------
-- Table structure for events
-- ----------------------------
DROP TABLE IF EXISTS "public"."events";
CREATE TABLE "public"."events" (
"id" uuid DEFAULT gen_random_uuid() NOT NULL,
"type" varchar NOT NULL COLLATE "default",
"stream" varchar NOT NULL COLLATE "default",
"stream_position" int4 NOT NULL,
"category" varchar NOT NULL COLLATE "default",
"global_position" bigserial NOT NULL ,
"data" jsonb NOT NULL,
"metadata" jsonb,
"created_time" TIMESTAMP WITHOUT TIME ZONE DEFAULT (now() AT TIME ZONE 'utc') NOT NULL
)
WITH (OIDS=FALSE);
-- ----------------------------
-- Primary key structure for table events
-- ----------------------------
ALTER TABLE "public"."events" ADD PRIMARY KEY ("global_position") NOT DEFERRABLE INITIALLY IMMEDIATE;
-- ----------------------------
-- Indexes structure for table events
-- ----------------------------
CREATE INDEX CONCURRENTLY "events_category_global_position_idx" ON "public"."events" USING btree(category COLLATE "default" "pg_catalog"."text_ops" ASC NULLS LAST, "global_position" "pg_catalog"."int8_ops" ASC NULLS LAST);
CREATE INDEX CONCURRENTLY "events_category_idx" ON "public"."events" USING btree(category COLLATE "default" "pg_catalog"."text_ops" ASC NULLS LAST);
CREATE UNIQUE INDEX CONCURRENTLY "events_event_id_uniq_idx" ON "public"."events" USING btree(id "pg_catalog"."uuid_ops" ASC NULLS LAST);
CREATE UNIQUE INDEX CONCURRENTLY "events_stream_stream_position_uniq_idx" ON "public"."events" USING btree(stream COLLATE "default" "pg_catalog"."text_ops" ASC NULLS LAST, "stream_position" "pg_catalog"."int4_ops" ASC NULLS LAST);
CREATE OR REPLACE FUNCTION insert_event(
id uuid,
type varchar,
stream varchar,
data jsonb,
metadata jsonb DEFAULT NULL,
expected_version int DEFAULT NULL
)
RETURNS int
AS $$
DECLARE
stream_version int;
category varchar;
BEGIN
stream_version := stream_version(stream);
if expected_version is not null then
if expected_version != stream_version then
raise exception 'Wrong expected version: % (Stream: %, Stream Version: %)', expected_version, stream, stream_version using
errcode='XPCTV',
hint='The event cannot be written if the stream version and expected verion do not match';
end if;
end if;
stream_version := stream_version + 1;
category := category(stream);
insert into "events"
(
"id",
"type",
"stream",
"stream_position",
"category",
"data",
"metadata"
)
values
(
id,
type,
stream,
stream_version,
category,
data,
metadata
)
;
return stream_version;
END;
$$ LANGUAGE plpgsql;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment