Skip to content

Instantly share code, notes, and snippets.

@saterus
Created July 22, 2021 16:21
Show Gist options
  • Save saterus/a67776f64aea31fde8f47bed26958853 to your computer and use it in GitHub Desktop.
Save saterus/a67776f64aea31fde8f47bed26958853 to your computer and use it in GitHub Desktop.
Flow Document JSON Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Catalog",
"description": "Each catalog source defines a portion of a Flow Catalog, by defining collections, derivations, tests, and materializations of the Catalog. Catalog sources may reference and import other sources, in order to collections and other entities that source defines.",
"type": "object",
"properties": {
"$schema": {
"title": "JSON-Schema against which the Catalog is validated.",
"default": null,
"type": [
"string",
"null"
]
},
"captures": {
"title": "Captures of this Catalog.",
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/CaptureDef"
}
},
"collections": {
"title": "Collections defined by this Catalog.",
"default": {},
"examples": [
{
"acmeCo/collection": {
"derivation": null,
"key": [
"/json/ptr"
],
"projections": {},
"schema": "../path/to/local.yaml"
}
}
],
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/CollectionDef"
}
},
"import": {
"title": "Import other Flow catalog sources.",
"description": "By importing another Flow catalog source, the collections, schemas, and derivations it defines become usable within this Catalog source. Each import is an absolute URI, or a URI which is relative to this source location.",
"default": [],
"examples": [
[
"../path/to/local.yaml",
"https://example/resource"
]
],
"type": "array",
"items": {
"$ref": "#/definitions/RelativeUrl"
}
},
"journalRules": {
"title": "Journal rules of the Catalog.",
"description": "Rules which template and modify the JournalSpecs managed by Flow. Each rule may specify a label selector, which must be matched for the rule to apply. Rules are evaluated in ascending global lexical order of their rule name.",
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/JournalRule"
}
},
"materializations": {
"title": "Materializations of this Catalog.",
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/MaterializationDef"
}
},
"npmDependencies": {
"title": "NPM package dependencies of the Catalog.",
"description": "Dependencies are included when building the catalog's build NodeJS package, as {\"package-name\": \"version\"}. I.e. {\"moment\": \"^2.24\"}.\n\nVersion strings can take any form understood by NPM. See https://docs.npmjs.com/files/package.json#dependencies",
"default": {
"a-npm-package": "^1.2.3"
},
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"tests": {
"default": {
"Test that fob quips ipsum": []
},
"examples": [
{
"Test that fob quips ipsum": [
{
"ingest": {
"collection": "acmeCo/collection",
"documents": [
{
"example": "document"
},
{
"another": "document"
}
]
}
},
{
"verify": {
"collection": "acmeCo/collection",
"documents": [
{
"expected": "document"
}
],
"partitions": null
}
}
]
}
],
"type": "object",
"additionalProperties": {
"type": "array",
"items": {
"$ref": "#/definitions/TestStep"
}
}
}
},
"additionalProperties": false,
"definitions": {
"AirbyteSourceConfig": {
"description": "Airbyte source connector specification.",
"type": "object",
"required": [
"config",
"image"
],
"properties": {
"config": {
"title": "Configuration of the connector.",
"type": "object",
"additionalProperties": true
},
"image": {
"title": "Image of the connector.",
"type": "string"
}
}
},
"CaptureBinding": {
"examples": [
{
"resource": {
"stream": "a_stream"
},
"target": "target/collection"
}
],
"type": "object",
"required": [
"resource",
"target"
],
"properties": {
"resource": {
"title": "Endpoint resource to capture from.",
"type": "object",
"additionalProperties": true
},
"target": {
"title": "Name of the collection to capture into.",
"allOf": [
{
"$ref": "#/definitions/Collection"
}
]
}
},
"additionalProperties": false
},
"CaptureDef": {
"description": "A Capture binds an external system and target (e.x., a SQL table or cloud storage bucket) from which data should be continuously captured, with a Flow collection into that captured data is ingested. Multiple Captures may be bound to a single collection, but only one capture may exist for a given endpoint and target.",
"type": "object",
"required": [
"bindings",
"endpoint"
],
"properties": {
"bindings": {
"title": "Bound collections to capture from the endpoint.",
"type": "array",
"items": {
"$ref": "#/definitions/CaptureBinding"
}
},
"endpoint": {
"title": "Endpoint to capture from.",
"allOf": [
{
"$ref": "#/definitions/CaptureEndpoint"
}
]
},
"interval": {
"title": "Interval of time between invocations of the capture.",
"description": "Configured intervals are applicable only to connectors which are unable to continuously tail their source, and which instead produce a current quantity of output and then exit. Flow will start the connector again after the given interval of time has passed.\n\nIntervals are relative to the start of an invocation and not its completion. For example, if the interval is five minutes, and an invocation of the capture finishes after two minutes, then the next invocation will be started after three additional minutes.",
"default": "5m",
"type": [
"string",
"null"
],
"pattern": "^\\d+(s|m|h)$"
}
},
"additionalProperties": false
},
"CaptureEndpoint": {
"description": "An Endpoint connector used for Flow captures.",
"anyOf": [
{
"title": "An Airbyte source connector.",
"type": "object",
"required": [
"airbyteSource"
],
"properties": {
"airbyteSource": {
"$ref": "#/definitions/AirbyteSourceConfig"
}
}
},
{
"title": "A remote implementation of an endpoint gRPC driver.",
"type": "object",
"required": [
"remote"
],
"properties": {
"remote": {
"$ref": "#/definitions/RemoteDriverConfig"
}
}
}
]
},
"Collection": {
"description": "Collection names consist of Unicode letters, numbers, and symbols: - _ . /\n\nSpaces and other special characters are disallowed.",
"examples": [
"acmeCo/collection"
],
"type": "string",
"pattern": "^[^ \t\n\\!@#$%^&*()+=\\<\\>?;:'\"\\[\\]\\|~`]+$"
},
"CollectionDef": {
"description": "Collection describes a set of related documents, where each adheres to a common schema and grouping key. Collections are append-only: once a document is added to a collection, it is never removed. However, it may be replaced or updated (either in whole, or in part) by a future document sharing its key. Each new document of a given key is \"reduced\" into existing documents of the key. By default, this reduction is achieved by completely replacing the previous document, but much richer reduction behaviors can be specified through the use of annotated reduction strategies of the collection schema.",
"examples": [
{
"derivation": null,
"key": [
"/json/ptr"
],
"projections": {},
"schema": "../path/to/local.yaml"
}
],
"type": "object",
"required": [
"key",
"schema"
],
"properties": {
"derivation": {
"title": "Derivation which builds this collection from others.",
"anyOf": [
{
"$ref": "#/definitions/Derivation"
},
{
"type": "null"
}
]
},
"key": {
"title": "Composite key of this collection.",
"allOf": [
{
"$ref": "#/definitions/CompositeKey"
}
]
},
"projections": {
"title": "Projections and logical partitions of this collection.",
"default": {
"a_field": "/json/ptr",
"a_partition": {
"location": "/json/ptr",
"partition": true
}
},
"allOf": [
{
"$ref": "#/definitions/Projections"
}
]
},
"schema": {
"title": "Schema against which collection documents are validated and reduced.",
"examples": [
"../path/to/schema#/$defs/subPath"
],
"allOf": [
{
"$ref": "#/definitions/Schema"
}
]
}
},
"additionalProperties": false
},
"CompositeKey": {
"description": "Ordered JSON-Pointers which define how a composite key may be extracted from a collection document.",
"examples": [
[
"/json/ptr"
]
],
"type": "array",
"items": {
"$ref": "#/definitions/JsonPointer"
}
},
"CompressionCodec": {
"examples": [
"GZIP_OFFLOAD_DECOMPRESSION"
],
"type": "string",
"enum": [
"INVALID",
"NONE",
"GZIP",
"ZSTANDARD",
"SNAPPY",
"GZIP_OFFLOAD_DECOMPRESSION"
]
},
"Derivation": {
"description": "A derivation specifies how a collection is derived from other collections. A collection without a derivation is a \"captured\" collection, into which documents are directly ingested.",
"examples": [
{
"register": {
"initial": null,
"schema": true
},
"transform": {
"nameOfTransform": {
"priority": 0,
"publish": {
"lambda": "typescript"
},
"readDelay": null,
"shuffle": null,
"source": {
"name": "source/collection",
"partitions": null,
"schema": null
},
"update": null
}
}
}
],
"type": "object",
"properties": {
"register": {
"title": "Register configuration of this derivation.",
"default": {
"initial": null,
"schema": true
},
"allOf": [
{
"$ref": "#/definitions/Register"
}
]
},
"transform": {
"title": "Transforms which make up this derivation.",
"default": {
"nameOfTransform": {
"source": {
"name": "a/source/collection"
}
}
},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/Transform"
}
}
},
"additionalProperties": false
},
"FlowSinkConfig": {
"description": "Flow sink connector specification.",
"type": "object",
"required": [
"config",
"image"
],
"properties": {
"config": {
"title": "Configuration of the connector.",
"type": "object",
"additionalProperties": true
},
"image": {
"title": "Image of the connector.",
"type": "string"
}
}
},
"JournalRule": {
"examples": [
{
"selector": {
"exclude": {
"labels": []
},
"include": {
"labels": [
{
"name": "estuary.dev/collection",
"value": "a/collection"
}
]
}
},
"template": {
"fragment": {
"compression_codec": "INVALID",
"flush_interval": "0s",
"length": 0,
"path_postfix_template": "",
"refresh_interval": "0s",
"retention": "0s",
"stores": [
"s3://my-bucket/path"
]
},
"labels": {
"labels": []
},
"max_append_rate": 0,
"replication": 0
}
}
],
"type": "object",
"required": [
"template"
],
"properties": {
"selector": {
"title": "Selector which determines whether the rule applies.",
"default": {
"exclude": {
"labels": []
},
"include": {
"labels": []
}
},
"allOf": [
{
"$ref": "#/definitions/LabelSelector"
}
]
},
"template": {
"title": "Template applied to the journal's specification.",
"allOf": [
{
"$ref": "#/definitions/JournalSpec"
}
]
}
},
"additionalProperties": false
},
"JournalSpec": {
"examples": [
{
"fragment": {
"compression_codec": "INVALID",
"flush_interval": "0s",
"length": 0,
"path_postfix_template": "",
"refresh_interval": "0s",
"retention": "0s",
"stores": [
"s3://bucket/and/path"
]
},
"labels": {
"labels": []
},
"max_append_rate": 0,
"replication": 0
}
],
"type": "object",
"properties": {
"fragment": {
"title": "Fragment defines how journal content is mapped to fragment files.",
"default": {
"compression_codec": "INVALID",
"flush_interval": "0s",
"length": 0,
"path_postfix_template": "",
"refresh_interval": "0s",
"retention": "0s",
"stores": []
},
"allOf": [
{
"$ref": "#/definitions/JournalSpecFragment"
}
]
},
"labels": {
"title": "Assigned journal labels.",
"description": "Labels are a multi-map, where each label name may include multiple values.",
"default": {
"labels": []
},
"allOf": [
{
"$ref": "#/definitions/LabelSet"
}
]
},
"max_append_rate": {
"title": "Maximum rate, in bytes-per-second, at which data may be written to a journal.",
"description": "If zero, no journal-specific rate limiting is applied.",
"default": 0,
"type": "integer",
"format": "int64"
},
"replication": {
"title": "Desired replication of journals.",
"description": "The default value is three, meaning that three machines in two deployment zones must fail before data loss can occur.",
"default": 0,
"type": "integer",
"format": "int32"
}
},
"additionalProperties": false
},
"JournalSpecFragment": {
"examples": [
{
"compression_codec": "INVALID",
"flush_interval": "0s",
"length": 0,
"path_postfix_template": "",
"refresh_interval": "0s",
"retention": "0s",
"stores": [
"s3://bucket/and/path"
]
}
],
"type": "object",
"properties": {
"compression_codec": {
"title": "Codec used to compress Journal Fragments.",
"default": "INVALID",
"allOf": [
{
"$ref": "#/definitions/CompressionCodec"
}
]
},
"flush_interval": {
"title": "Maximum flush delay before in-progress fragments are closed and persisted into cloud storage.",
"description": "Intervals are converted into uniform time segments: 24h will \"roll\" all fragments at midnight UTC every day, 1h at the top of every hour, 15m a :00, :15, :30, :45 past the hour, and so on.",
"default": "0s",
"type": [
"string",
"null"
],
"pattern": "^\\d+(s|m|h)$"
},
"length": {
"title": "Desired byte content length of each fragment, before compression.",
"description": "When a journal fragment reaches this threshold, it will be closed off and a new one started, making its way to cloud storage.",
"default": 0,
"type": "integer",
"format": "int64"
},
"path_postfix_template": {
"title": "Path postfix template evaluates to a partial path under which fragments are persisted to the store.",
"default": "",
"type": "string"
},
"refresh_interval": {
"title": "Period between refreshes of fragment listings from configured stores.",
"default": "0s",
"type": [
"string",
"null"
],
"pattern": "^\\d+(s|m|h)$"
},
"retention": {
"title": "Duration for which historical data of a journal should be kept.",
"description": "If zero, then fragments are retained indefinitely.",
"default": "0s",
"type": [
"string",
"null"
],
"pattern": "^\\d+(s|m|h)$"
},
"stores": {
"title": "Storage backend base path for this Journal's Fragments.",
"description": "Must be in URL form, with the choice of backend defined by the scheme. The persisted path of a journal fragment is determined by joining the current store path with the journal name, and finally a content-address fragment file name.\n\nEg, given a store of \"s3://My-AWS-bucket/a/prefix\" and a journal of name \"my/journal\", a complete persisted path might be: \"s3://My-AWS-bucket/a/prefix/my/journal/000123-000456-789abcdef.gzip\n\nMultiple stores may be specified, and all stores are reguarly scanned to index applicable journal fragments. New fragments are always persisted to the first store in the list.\n\nThis can be helpful in performing bucket migrations: adding a new store to the front of the list causes ongoing data to be written to that location, while historical data continues to be read and served from the prior stores.\n\nAt least one store must be specified.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false
},
"JsonPointer": {
"description": "JSON Pointer which identifies a location in a document.",
"examples": [
"/json/ptr"
],
"type": "string",
"pattern": "^(/[^/]+)*$"
},
"Label": {
"examples": [
{
"name": "a/label",
"value": "value"
}
],
"type": "object",
"required": [
"name"
],
"properties": {
"name": {
"title": "Name of the Label.",
"type": "string"
},
"value": {
"title": "Value of the Label.",
"description": "When used within a selector, if value is empty or omitted than the label selection matches any value.",
"default": "",
"type": "string"
}
},
"additionalProperties": false
},
"LabelSelector": {
"examples": [
{
"exclude": {
"labels": [
{
"name": "a/label",
"value": "value"
}
]
},
"include": {
"labels": [
{
"name": "a/label",
"value": "value"
}
]
}
}
],
"type": "object",
"properties": {
"exclude": {
"title": "Excluded labels of the selector.",
"default": {
"labels": []
},
"allOf": [
{
"$ref": "#/definitions/LabelSet"
}
]
},
"include": {
"title": "Included labels of the selector.",
"default": {
"labels": []
},
"allOf": [
{
"$ref": "#/definitions/LabelSet"
}
]
}
},
"additionalProperties": false
},
"LabelSet": {
"examples": [
{
"labels": [
{
"name": "a/label",
"value": "value"
}
]
}
],
"type": "object",
"required": [
"labels"
],
"properties": {
"labels": {
"description": "Labels of the set.",
"type": "array",
"items": {
"$ref": "#/definitions/Label"
}
}
},
"additionalProperties": false
},
"Lambda": {
"description": "Lambdas are user functions which are invoked by the Flow runtime to process and transform source collection documents into derived collections. Flow supports multiple lambda run-times, with a current focus on TypeScript and remote HTTP APIs.\n\nTypeScript lambdas are invoked within on-demand run-times, which are automatically started and scaled by Flow's task distribution in order to best co-locate data and processing, as well as to manage fail-over.\n\nRemote lambdas may be called from many Flow tasks, and are up to the API provider to provision and scale.",
"examples": [
"typescript",
{
"remote": "http://example/api"
}
],
"anyOf": [
{
"type": "string",
"enum": [
"typescript"
]
},
{
"type": "object",
"required": [
"remote"
],
"properties": {
"remote": {
"type": "string"
}
}
}
]
},
"MaterializationBinding": {
"examples": [
{
"fields": {
"exclude": [],
"include": {},
"recommended": true
},
"partitions": null,
"resource": {
"table": "a_table"
},
"source": "source/collection"
}
],
"type": "object",
"required": [
"resource",
"source"
],
"properties": {
"fields": {
"title": "Selected projections for this materialization.",
"default": {
"exclude": [],
"include": {},
"recommended": true
},
"allOf": [
{
"$ref": "#/definitions/MaterializationFields"
}
]
},
"partitions": {
"title": "Selector over partitions of the source collection to read.",
"default": {
"exclude": {
"other_partition": [
32,
64
]
},
"include": {
"a_partition": [
"A",
"B"
]
}
},
"anyOf": [
{
"$ref": "#/definitions/PartitionSelector"
},
{
"type": "null"
}
]
},
"resource": {
"title": "Endpoint resource to materialize into.",
"type": "object",
"additionalProperties": true
},
"source": {
"title": "Name of the collection to be materialized.",
"allOf": [
{
"$ref": "#/definitions/Collection"
}
]
}
},
"additionalProperties": false
},
"MaterializationDef": {
"description": "A Materialization binds a Flow collection with an external system & target (e.x, a SQL table) into which the collection is to be continuously materialized.",
"type": "object",
"required": [
"bindings",
"endpoint"
],
"properties": {
"bindings": {
"title": "Bound collections to materialize into the endpoint.",
"type": "array",
"items": {
"$ref": "#/definitions/MaterializationBinding"
}
},
"endpoint": {
"title": "Endpoint to materialize into.",
"allOf": [
{
"$ref": "#/definitions/MaterializationEndpoint"
}
]
}
},
"additionalProperties": false
},
"MaterializationEndpoint": {
"description": "An Endpoint connector used for Flow materializations.",
"anyOf": [
{
"title": "A PostgreSQL database.",
"type": "object",
"required": [
"postgres"
],
"properties": {
"postgres": {
"$ref": "#/definitions/PostgresConfig"
}
}
},
{
"title": "A remote implementation of an endpoint gRPC driver.",
"type": "object",
"required": [
"remote"
],
"properties": {
"remote": {
"$ref": "#/definitions/RemoteDriverConfig"
}
}
},
{
"title": "A SQLite database.",
"type": "object",
"required": [
"sqlite"
],
"properties": {
"sqlite": {
"$ref": "#/definitions/SqliteConfig"
}
}
},
{
"title": "A Snowflake database.",
"type": "object",
"required": [
"snowflake"
],
"properties": {
"snowflake": {
"$ref": "#/definitions/SnowflakeConfig"
}
}
},
{
"title": "A Webhook.",
"type": "object",
"required": [
"webhook"
],
"properties": {
"webhook": {
"$ref": "#/definitions/WebhookConfig"
}
}
},
{
"title": "A Flow sink.",
"type": "object",
"required": [
"flowSink"
],
"properties": {
"flowSink": {
"$ref": "#/definitions/FlowSinkConfig"
}
}
}
]
},
"MaterializationFields": {
"description": "MaterializationFields defines a selection of projections to materialize, as well as optional per-projection, driver-specific configuration.",
"examples": [
{
"exclude": [
"removed"
],
"include": {
"added": {}
},
"recommended": true
}
],
"type": "object",
"required": [
"recommended"
],
"properties": {
"exclude": {
"title": "Fields to exclude.",
"description": "This removes from recommended projections, where enabled.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"include": {
"title": "Fields to include.",
"description": "This supplements any recommended fields, where enabled. Values are passed through to the driver, e.x. for customization of the driver's schema generation or runtime behavior with respect to the field.",
"default": {},
"type": "object",
"additionalProperties": {
"type": "object",
"additionalProperties": true
}
},
"recommended": {
"title": "Should recommended projections for the endpoint be used?",
"type": "boolean"
}
},
"additionalProperties": false
},
"PartitionSelector": {
"description": "Partition selectors identify a desired subset of the available logical partitions of a collection.",
"examples": [
{
"exclude": {
"other_partition": [
32,
64
]
},
"include": {
"a_partition": [
"A",
"B"
]
}
}
],
"type": "object",
"properties": {
"exclude": {
"description": "Partition field names and values which are excluded from the source collection. Any documents matching *any one* of the partition values will be excluded.",
"default": {},
"type": "object",
"additionalProperties": {
"type": "array",
"items": true
}
},
"include": {
"description": "Partition field names and corresponding values which must be matched from the Source collection. Only documents having one of the specified values across all specified partition names will be matched. For example, source: [App, Web] region: [APAC] would mean only documents of 'App' or 'Web' source and also occurring in the 'APAC' region will be processed.",
"default": {},
"type": "object",
"additionalProperties": {
"type": "array",
"items": true
}
}
},
"additionalProperties": false
},
"PostgresConfig": {
"description": "PostgreSQL endpoint configuration. Compare to https://pkg.go.dev/github.com/lib/pq#hdr-Connection_String_Parameters",
"type": "object",
"required": [
"host",
"password",
"user"
],
"properties": {
"dbname": {
"title": "Logical database (default: $user).",
"type": [
"string",
"null"
]
},
"host": {
"title": "Host address of the database.",
"type": "string"
},
"password": {
"title": "Connection password.",
"type": "string"
},
"port": {
"title": "Port of the database (default: 5432).",
"type": [
"integer",
"null"
],
"format": "uint16",
"minimum": 0.0
},
"user": {
"title": "Connection user.",
"type": "string"
}
},
"additionalProperties": true
},
"Projection": {
"description": "A projection representation that allows projections to be specified either as a simple JSON Pointer, or as an object with separate properties for the location and partition indicator.",
"anyOf": [
{
"$ref": "#/definitions/JsonPointer"
},
{
"type": "object",
"required": [
"location"
],
"properties": {
"location": {
"title": "Location of this projection.",
"allOf": [
{
"$ref": "#/definitions/JsonPointer"
}
]
},
"partition": {
"title": "Is this projection a logical partition?",
"default": false,
"type": "boolean"
}
}
}
]
},
"Projections": {
"description": "Projections are named locations within a collection document which may be used for logical partitioning or directly exposed to databases into which collections are materialized.",
"examples": [
{
"a_field": "/json/ptr",
"a_partition": {
"location": "/json/ptr",
"partition": true
}
}
],
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/Projection"
}
},
"Publish": {
"description": "Publish lambdas take a source document, a current register and (if there is also an \"update\" lambda) a previous register, and transform them into one or more documents to be published into a derived collection.",
"examples": [
{
"lambda": "typescript"
}
],
"type": "object",
"required": [
"lambda"
],
"properties": {
"lambda": {
"title": "Lambda invoked by the publish.",
"allOf": [
{
"$ref": "#/definitions/Lambda"
}
]
}
},
"additionalProperties": false
},
"Register": {
"description": "Registers are the internal states of a derivation, which can be read and updated by all of its transformations. They're an important building block for joins, aggregations, and other complex stateful workflows.\n\nRegisters are implemented using JSON-Schemas, often ones with reduction annotations. When reading source documents, every distinct shuffle key by which the source collection is read is mapped to a corresponding register value (or, if no shuffle key is defined, the source collection's key is used instead).\n\nThen, an \"update\" lambda of the transformation produces updates which are reduced into the register, and a \"publish\" lambda reads the current (and previous, if updated) register value.",
"type": "object",
"required": [
"schema"
],
"properties": {
"initial": {
"title": "Initial value of a keyed register which has never been updated.",
"description": "If not specified, the default is \"null\".",
"default": null
},
"schema": {
"title": "Schema which validates and reduces register documents.",
"allOf": [
{
"$ref": "#/definitions/Schema"
}
]
}
},
"additionalProperties": false
},
"RelativeUrl": {
"description": "A URL identifying a resource, which may be a relative local path with respect to the current resource (i.e, ../path/to/flow.yaml), or may be an external absolute URL (i.e., http://example/flow.yaml).",
"examples": [
"../path/to/local.yaml",
"https://example/resource"
],
"type": "string"
},
"RemoteDriverConfig": {
"type": "object",
"required": [
"address"
],
"properties": {
"address": {
"title": "gRPC address of the driver.",
"type": "string"
}
},
"additionalProperties": true
},
"Schema": {
"description": "A schema is a draft 2019-09 JSON Schema which validates Flow documents. Schemas also provide annotations at document locations, such as reduction strategies for combining one document into another.\n\nSchemas may be defined inline to the catalog, or given as a relative or absolute URI. URIs may optionally include a JSON fragment pointer that locates a specific sub-schema therein.\n\nI.e, \"schemas/marketing.yaml#/$defs/campaign\" would reference the schema at location {\"$defs\": {\"campaign\": ...}} within ./schemas/marketing.yaml.",
"examples": [
"http://example/schema#/$defs/subPath",
"../path/to/schema#/$defs/subPath",
{
"properties": {
"bar": {
"const": 42
},
"foo": {
"type": "integer"
}
},
"type": "object"
},
{
"properties": {
"foo_count": {
"reduce": {
"strategy": "sum"
},
"type": "integer"
}
},
"reduce": {
"strategy": "merge"
},
"type": "object"
}
],
"anyOf": [
{
"$ref": "#/definitions/RelativeUrl"
},
{
"type": "object",
"additionalProperties": true
},
{
"type": "boolean"
}
]
},
"Shuffle": {
"description": "A Shuffle specifies how a shuffling key is to be extracted from collection documents.",
"examples": [
{
"priority": 0,
"publish": {
"lambda": "typescript"
},
"readDelay": null,
"shuffle": null,
"source": {
"name": "source/collection",
"partitions": null,
"schema": null
},
"update": null
}
],
"anyOf": [
{
"description": "Shuffle by extracting the given fields.",
"type": "object",
"required": [
"key"
],
"properties": {
"key": {
"$ref": "#/definitions/CompositeKey"
}
}
},
{
"description": "Invoke the lambda for each source document, and shuffle on its returned key.",
"type": "object",
"required": [
"lambda"
],
"properties": {
"lambda": {
"$ref": "#/definitions/Lambda"
}
}
}
]
},
"SnowflakeConfig": {
"description": "Snowflake endpoint configuration. Compare to https://pkg.go.dev/github.com/snowflakedb/gosnowflake#Config",
"type": "object",
"required": [
"account",
"database",
"password",
"role",
"schema",
"user",
"warehouse"
],
"properties": {
"account": {
"type": "string"
},
"database": {
"type": "string"
},
"password": {
"type": "string"
},
"role": {
"type": "string"
},
"schema": {
"type": "string"
},
"user": {
"type": "string"
},
"warehouse": {
"type": "string"
}
},
"additionalProperties": true
},
"SqliteConfig": {
"description": "Sqlite endpoint configuration. Compare to https://github.com/mattn/go-sqlite3#connection-string",
"type": "object",
"required": [
"path"
],
"properties": {
"path": {
"title": "Path of the database, relative to this catalog source.",
"allOf": [
{
"$ref": "#/definitions/RelativeUrl"
}
]
}
},
"additionalProperties": true
},
"TestStep": {
"examples": [
{
"ingest": {
"collection": "acmeCo/collection",
"documents": [
{
"example": "document"
},
{
"another": "document"
}
]
}
},
{
"verify": {
"collection": "acmeCo/collection",
"documents": [
{
"expected": "document"
}
],
"partitions": null
}
}
],
"anyOf": [
{
"description": "Ingest document fixtures into a collection.",
"type": "object",
"required": [
"ingest"
],
"properties": {
"ingest": {
"$ref": "#/definitions/TestStepIngest"
}
}
},
{
"description": "Verify the contents of a collection match a set of document fixtures.",
"type": "object",
"required": [
"verify"
],
"properties": {
"verify": {
"$ref": "#/definitions/TestStepVerify"
}
}
}
]
},
"TestStepIngest": {
"description": "An ingestion test step ingests document fixtures into the named collection.",
"examples": [
{
"collection": "acmeCo/collection",
"documents": [
{
"example": "document"
},
{
"another": "document"
}
]
}
],
"type": "object",
"required": [
"collection",
"documents"
],
"properties": {
"collection": {
"title": "Name of the collection into which the test will ingest.",
"allOf": [
{
"$ref": "#/definitions/Collection"
}
]
},
"documents": {
"title": "Documents to ingest.",
"description": "Each document must conform to the collection's schema.",
"type": "array",
"items": true
}
},
"additionalProperties": false
},
"TestStepVerify": {
"description": "A verification test step verifies that the contents of the named collection match the expected fixtures, after fully processing all preceding ingestion test steps.",
"examples": [
{
"collection": "acmeCo/collection",
"documents": [
{
"expected": "document"
}
],
"partitions": null
}
],
"type": "object",
"required": [
"collection",
"documents"
],
"properties": {
"collection": {
"title": "Collection into which the test will ingest.",
"allOf": [
{
"$ref": "#/definitions/Collection"
}
]
},
"documents": {
"title": "Documents to verify.",
"description": "Each document may contain only a portion of the matched document's properties, and any properties present in the actual document but not in this document fixture are ignored. All other values must match or the test will fail.",
"type": "array",
"items": true
},
"partitions": {
"title": "Selector over partitions to verify.",
"default": {
"exclude": {
"other_partition": [
32,
64
]
},
"include": {
"a_partition": [
"A",
"B"
]
}
},
"anyOf": [
{
"$ref": "#/definitions/PartitionSelector"
},
{
"type": "null"
}
]
}
},
"additionalProperties": false
},
"Transform": {
"description": "A Transform reads and shuffles documents of a source collection, and processes each document through either one or both of a register \"update\" lambda and a derived document \"publish\" lambda.",
"examples": [
{
"priority": 0,
"publish": {
"lambda": "typescript"
},
"readDelay": null,
"shuffle": null,
"source": {
"name": "source/collection",
"partitions": null,
"schema": null
},
"update": null
}
],
"type": "object",
"required": [
"source"
],
"properties": {
"priority": {
"title": "Priority applied to documents processed by this transform.",
"description": "When all transforms are of equal priority, Flow processes documents according to their associated publishing time, as encoded in the document UUID.\n\nHowever, when one transform has a higher priority than others, then *all* ready documents are processed through the transform before *any* documents of other transforms are processed.",
"default": 0,
"type": "integer",
"format": "uint32",
"minimum": 0.0
},
"publish": {
"title": "Publish that maps a source document and registers into derived documents of the collection.",
"default": {
"lambda": "typescript"
},
"anyOf": [
{
"$ref": "#/definitions/Publish"
},
{
"type": "null"
}
]
},
"readDelay": {
"title": "Delay applied to documents processed by this transform.",
"description": "Delays are applied as an adjustment to the UUID clock encoded within each document, which is then used to impose a relative ordering of all documents read by this derivation. This means that read delays are applied in a consistent way, even when back-filling over historical documents. When caught up and tailing the source collection, delays also \"gate\" documents such that they aren't processed until the current wall-time reflects the delay.",
"default": null,
"type": [
"string",
"null"
],
"pattern": "^\\d+(s|m|h)$"
},
"shuffle": {
"title": "Shuffle by which source documents are mapped to registers.",
"description": "If empty, the key of the source collection is used.",
"default": [
"/json/ptr"
],
"anyOf": [
{
"$ref": "#/definitions/Shuffle"
},
{
"type": "null"
}
]
},
"source": {
"title": "Source collection read by this transform.",
"allOf": [
{
"$ref": "#/definitions/TransformSource"
}
]
},
"update": {
"title": "Update that maps a source document into register updates.",
"default": {
"lambda": "typescript"
},
"anyOf": [
{
"$ref": "#/definitions/Update"
},
{
"type": "null"
}
]
}
},
"additionalProperties": false
},
"TransformSource": {
"description": "SourcePartitions is optional partitions of a read source collection.",
"examples": [
{
"name": "source/collection",
"partitions": null,
"schema": null
}
],
"type": "object",
"required": [
"name"
],
"properties": {
"name": {
"title": "Name of the collection to be materialized.",
"allOf": [
{
"$ref": "#/definitions/Collection"
}
]
},
"partitions": {
"title": "Selector over partition of the source collection to read.",
"default": {
"exclude": {
"other_partition": [
32,
64
]
},
"include": {
"a_partition": [
"A",
"B"
]
}
},
"anyOf": [
{
"$ref": "#/definitions/PartitionSelector"
},
{
"type": "null"
}
]
},
"schema": {
"title": "Optional JSON-Schema to validate against the source collection.",
"description": "All data in the source collection is already validated against the schema of that collection, so providing a source schema is only used for _additional_ validation beyond that.\n\nThis is useful in building \"Extract Load Transform\" patterns, where a collection is captured with minimal schema applied (perhaps because it comes from an uncontrolled third party), and is then progressively verified as collections are derived. If None, the principal schema of the collection is used instead.",
"default": "../path/to/schema#/$defs/subPath",
"anyOf": [
{
"$ref": "#/definitions/Schema"
},
{
"type": "null"
}
]
}
},
"additionalProperties": false
},
"Update": {
"description": "Update lambdas take a source document and transform it into one or more register updates, which are then reduced into the associated register by the runtime. For example these register updates might update counters, or update the state of a \"join\" window.",
"examples": [
{
"lambda": "typescript"
}
],
"type": "object",
"required": [
"lambda"
],
"properties": {
"lambda": {
"title": "Lambda invoked by the update.",
"allOf": [
{
"$ref": "#/definitions/Lambda"
}
]
}
},
"additionalProperties": false
},
"WebhookConfig": {
"description": "Webhook configuration.",
"type": "object",
"required": [
"address"
],
"properties": {
"address": {
"title": "URL address of the Webhook.",
"type": "string"
}
},
"additionalProperties": true
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment