Creates a collection containing a specified set of elements.
YAML/JSON-style mappings will be interpreted as Beam rows. For example::
type: Create
config:
elements:
- {first: 0, second: {str: "foo", values: [1, 2, 3]}}
will result in a schema of the form (int, Row(string, List[int])).
-
elements
Array[?]
: The set of elements that should belong to the PCollection. YAML/JSON-style mappings will be interpreted as Beam rows. -
reshuffle
boolean
(Optional) : Whether to introduce a reshuffle (to possibly redistribute the work) if there is more than one element in the collection. Defaults to True.
type: Create
config:
elements:
- elements
- ...
reshuffle: true|false
Explodes (aka unnest/flatten) one or more fields producing multiple rows.
Given one or more fields of iterable type, produces multiple rows, one for
each value of that field. For example, a row of the form ('a', [1, 2, 3])
would expand to ('a', 1)
, ('a', 2')
, and ('a', 3)
when exploded on
the second field.
This is akin to a FlatMap
when paired with the MapToFields transform.
-
fields
?
(Optional) : The list of fields to expand. -
cross_product
boolean
(Optional) : If multiple fields are specified, indicates whether the full cross-product of combinations should be produced, or if the first element of the first field corresponds to the first element of the second field, etc. For example, the row(['a', 'b'], [1, 2])
would expand to the four rows('a', 1)
,('a', 2)
,('b', 1)
, and('b', 2)
whencross_product
is set totrue
but only the two rows('a', 1)
and('b', 2)
when it is set tofalse
. Only meaningful (and required) if multiple rows are specified. -
error_handling
Map[string, ?]
(Optional) : Whether and how to handle errors during iteration.
type: Explode
input: ...
config:
fields: fields
cross_product: true|false
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
Flattens multiple PCollections into a single PCollection.
The elements of the resulting PCollection will be the (disjoint) union of all the elements of all the inputs.
Note that in YAML transforms can always take a list of inputs which will be implicitly flattened.
No configuration parameters.
type: Flatten
input: ...
Logs each element of its input PCollection.
The output of this transform is a copy of its input for ease of use in chain-style pipelines.
No configuration parameters.
type: LogForTesting
input: ...
A Python PTransform identified by fully qualified name.
This allows one to import, construct, and apply any Beam Python transform. This can be useful for using transforms that have not yet been exposed via a YAML interface. Note, however, that conversion may be required if this transform does not accept or produce Beam Rows.
For example,
type: PyTransform
config:
constructor: apache_beam.pkg.mod.SomeClass
args: [1, 'foo']
kwargs:
baz: 3
can be used to access the transform
apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)
.
-
constructor
string
: Fully qualified name of a callable used to construct the transform. Often this is a class such asapache_beam.pkg.mod.SomeClass
but it can also be a function or any other callable that returns a PTransform. -
args
Array[?]
(Optional) : A list of parameters to pass to the callable as positional arguments. -
kwargs
Map[string, ?]
(Optional) : A list of parameters to pass to the callable as keyword arguments.
type: PyTransform
input: ...
config:
constructor: "constructor"
args:
- args
- ...
kwargs:
a: kwargs_value_a
b: kwargs_value_b
c: ...
A transform that executes a SQL query on its input PCollections.
If a single input is given, it may be referred to as PCOLLECTION
, e.g. the query could be of the form
SELECT a, sum(b) FROM PCOLLECTION
If multiple inputs are given, the should be named as they are in the query, e.g.
SELECT a, b, c FROM pcoll_1 join pcoll_2 using (b)
For more details about Beam SQL in general see the Beam SQL documentation.
- query
string
: SQL query to execute
type: Sql
config:
query: "query"
A window transform assigning windows to each element of a PCollection.
The assigned windows will affect all downstream aggregating operations, which will aggregate by window as well as by key.
See the Beam documentation on windowing for more details.
Note that any Yaml transform can have a windowing parameter, which is applied to its inputs (if any) or outputs (if there are no inputs) which means that explicit WindowInto operations are not typically needed.
- windowing
?
(Optional) : the type and parameters of the windowing to perform
type: WindowInto
input: ...
config:
windowing: windowing
A PTransform
for reading records from avro files.
Each record of the resulting PCollection will contain
a single record read from a source. Records that are of simple types will be
mapped to beam Rows with a single record
field containing the records
value. Records that are of Avro type RECORD
will be mapped to Beam rows
that comply with the schema contained in the Avro file that contains those
records.
- path
?
(Optional)
type: ReadFromAvro
config:
path: path
A PTransform
for writing avro files.
If the input has a schema, a corresponding avro schema will be automatically generated and used to write the output records.
- path
?
(Optional)
type: WriteToAvro
input: ...
config:
path: path
Reads data from BigQuery.
Exactly one of table or query must be set. If query is set, neither row_restriction nor fields should be set.
-
query
string
(Optional) : The SQL query to be executed to read from the BigQuery table. -
table
string
(Optional) : The fully-qualified name of the BigQuery table to read from. Format: [${PROJECT}:]${DATASET}.${TABLE} -
fields
Array[string]
(Optional) : Read only the specified fields (columns) from a BigQuery table. Fields may not be returned in the order specified. If no value is specified, then all fields are returned. Example: "col1, col2, col3" -
row_restriction
string
(Optional) : Read only rows that match this filter, which must be compatible with Google standard SQL. This is not supported when reading via query.
type: ReadFromBigQuery
config:
query: "query"
table: "table"
fields:
- "fields"
- ...
row_restriction: "row_restriction"
Writes data to BigQuery using the Storage Write API (https://cloud.google.com/bigquery/docs/write-api).
This expects a single PCollection of Beam Rows and outputs two dead-letter queues (DLQ) that contain failed rows. The first DLQ has tag [FailedRows] and contains the failed rows. The second DLQ has tag [FailedRowsWithErrors] and contains failed rows and along with their respective errors.
-
table
string
: The bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE} -
create_disposition
string
(Optional) : Optional field that specifies whether the job is allowed to create new tables. The following values are supported: CREATE_IF_NEEDED (the job may create the table), CREATE_NEVER (the job must fail if the table does not exist already). -
write_disposition
string
(Optional) : Specifies the action that occurs if the destination table already exists. The following values are supported: WRITE_TRUNCATE (overwrites the table data), WRITE_APPEND (append the data to the table), WRITE_EMPTY (job must fail if the table is not empty). -
error_handling
Row
(Optional) : This option specifies whether and where to output unwritable rows.Row fields:
- output
string
: The name of the output PCollection containing failed writes.
- output
type: WriteToBigQuery
input: ...
config:
table: "table"
create_disposition: "create_disposition"
write_disposition: "write_disposition"
error_handling:
output: "output"
A PTransform for reading comma-separated values (csv) files into a PCollection.
- path
string
: The file path to read from. The path can contain glob characters such as*
and?
.
type: ReadFromCsv
config:
path: "path"
A PTransform for writing a schema'd PCollection as a (set of) comma-separated values (csv) files.
- path
string
: The file path to write to. The files written will begin with this prefix, followed by a shard identifier (seenum_shards
) according to thefile_naming
parameter.
type: WriteToCsv
input: ...
config:
path: "path"
-
driver_class_name
string
-
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
query
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
type: ReadFromJdbc
config:
driver_class_name: "driver_class_name"
url: "url"
username: "username"
password: "password"
table: "table"
query: "query"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
-
driver_class_name
string
-
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
type: WriteToJdbc
input: ...
config:
driver_class_name: "driver_class_name"
url: "url"
username: "username"
password: "password"
table: "table"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
A PTransform for reading json values from files into a PCollection.
- path
string
: The file path to read from. The path can contain glob characters such as*
and?
.
type: ReadFromJson
config:
path: "path"
A PTransform for writing a PCollection as json values to files.
- path
string
: The file path to write to. The files written will begin with this prefix, followed by a shard identifier (seenum_shards
) according to thefile_naming
parameter.
type: WriteToJson
input: ...
config:
path: "path"
-
schema
string
(Optional) : The schema in which the data is encoded in the Kafka topic. For AVRO data, this is a schema defined with AVRO schema syntax (https://avro.apache.org/docs/1.10.2/spec.html#schemas). For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/). If a URL to Confluent Schema Registry is provided, then this field is ignored, and the schema is fetched from Confluent Schema Registry. -
consumer_config
Map[string, string]
(Optional) : A list of key-value pairs that act as configuration parameters for Kafka consumers. Most of these configurations will not be needed, but if you need to customize your Kafka consumer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html -
format
string
(Optional) : The encoding format for the data stored in Kafka. Valid options are: RAW,AVRO,JSON,PROTO -
topic
string
-
bootstrap_servers
string
: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the formhost1:port1,host2:port2,...
-
confluent_schema_registry_url
string
(Optional) -
confluent_schema_registry_subject
string
(Optional) -
auto_offset_reset_config
string
(Optional) : What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. (1) earliest: automatically reset the offset to the earliest offset. (2) latest: automatically reset the offset to the latest offset (3) none: throw exception to the consumer if no previous offset is found for the consumer’s group -
error_handling
Row
(Optional) : This option specifies whether and where to output unwritable rows.Row fields:
- output
string
: The name of the output PCollection containing failed writes.
- output
-
file_descriptor_path
string
(Optional) : The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization. -
message_name
string
(Optional) : The name of the Protocol Buffer message to be used for schema extraction and data conversion.
type: ReadFromKafka
config:
schema: "schema"
consumer_config:
a: "consumer_config_value_a"
b: "consumer_config_value_b"
c: ...
format: "format"
topic: "topic"
bootstrap_servers: "bootstrap_servers"
confluent_schema_registry_url: "confluent_schema_registry_url"
confluent_schema_registry_subject: "confluent_schema_registry_subject"
auto_offset_reset_config: "auto_offset_reset_config"
error_handling:
output: "output"
file_descriptor_path: "file_descriptor_path"
message_name: "message_name"
-
format
string
: The encoding format for the data stored in Kafka. Valid options are: RAW,JSON,AVRO,PROTO -
topic
string
-
bootstrap_servers
string
: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. | Format: host1:port1,host2:port2,... -
producer_config_updates
Map[string, string]
(Optional) : A list of key-value pairs that act as configuration parameters for Kafka producers. Most of these configurations will not be needed, but if you need to customize your Kafka producer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html -
file_descriptor_path
string
(Optional) : The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization. -
message_name
string
(Optional) : The name of the Protocol Buffer message to be used for schema extraction and data conversion.
type: WriteToKafka
input: ...
config:
format: "format"
topic: "topic"
bootstrap_servers: "bootstrap_servers"
producer_config_updates:
a: "producer_config_updates_value_a"
b: "producer_config_updates_value_b"
c: ...
file_descriptor_path: "file_descriptor_path"
message_name: "message_name"
-
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
query
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
type: ReadFromMySql
config:
url: "url"
username: "username"
password: "password"
table: "table"
query: "query"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
-
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
type: WriteToMySql
input: ...
config:
url: "url"
username: "username"
password: "password"
table: "table"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
-
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
query
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
type: ReadFromOracle
config:
url: "url"
username: "username"
password: "password"
table: "table"
query: "query"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
-
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
type: WriteToOracle
input: ...
config:
url: "url"
username: "username"
password: "password"
table: "table"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
A PTransform
for reading Parquet files.
- path
?
(Optional)
type: ReadFromParquet
config:
path: path
A PTransform
for writing parquet files.
- path
?
(Optional)
type: WriteToParquet
input: ...
config:
path: path
-
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
query
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
type: ReadFromPostgres
config:
url: "url"
username: "username"
password: "password"
table: "table"
query: "query"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
-
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
type: WriteToPostgres
input: ...
config:
url: "url"
username: "username"
password: "password"
table: "table"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
Reads messages from Cloud Pub/Sub.
-
topic
string
(Optional) : Cloud Pub/Sub topic in the form "projects//topics/". If provided, subscription must be None. -
subscription
string
(Optional) : Existing Cloud Pub/Sub subscription to use in the form "projects//subscriptions/". If not specified, a temporary subscription will be created from the specified topic. If provided, topic must be None. -
format
string
: The expected format of the message payload. Currently suported formats are- raw: Produces records with a single
payload
field whose contents are the raw bytes of the pubsub message. - avro: Parses records with a given avro schema.
- json: Parses records with a given json schema.
- raw: Produces records with a single
-
schema
?
(Optional) : Schema specification for the given format. -
attributes
Array[string]
(Optional) : List of attribute keys whose values will be flattened into the output message as additional fields. For example, if the format israw
and attributes is["a", "b"]
then this read will produce elements of the formRow(payload=..., a=..., b=...)
. -
attributes_map
string
(Optional) -
id_attribute
string
(Optional) : The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, we cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort. -
timestamp_attribute
string
(Optional) : Message value to use as element timestamp. If None, uses message publishing time as the timestamp.Timestamp values should be in one of two formats:
- A numerical value representing the number of milliseconds since the Unix epoch.
- A string in RFC 3339 format, UTC timezone. Example:
2015-10-29T23:41:41.123Z
. The sub-second component of the timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored.
-
error_handling
Row
Row fields:
- output
string
- output
type: ReadFromPubSub
config:
topic: "topic"
subscription: "subscription"
format: "format"
schema: schema
attributes:
- "attributes"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
Writes messages to Cloud Pub/Sub.
-
topic
string
: Cloud Pub/Sub topic in the form "/topics//". -
format
string
: How to format the message payload. Currently suported formats are- raw: Expects a message with a single field (excluding attribute-related fields) whose contents are used as the raw bytes of the pubsub message.
- avro: Encodes records with a given avro schema, which may be inferred from the input PCollection schema.
- json: Formats records with a given json schema, which may be inferred from the input PCollection schema.
-
schema
?
(Optional) : Schema specification for the given format. -
attributes
Array[string]
(Optional) : List of attribute keys whose values will be pulled out as PubSub message attributes. For example, if the format israw
and attributes is["a", "b"]
then elements of the formRow(any_field=..., a=..., b=...)
will result in PubSub messages whose payload has the contents of any_field and whose attribute will be populated with the values ofa
andb
. -
attributes_map
string
(Optional) -
id_attribute
string
(Optional) : If set, will set an attribute for each Cloud Pub/Sub message with the given name and a unique value. This attribute can then be used in a ReadFromPubSub PTransform to deduplicate messages. -
timestamp_attribute
string
(Optional) : If set, will set an attribute for each Cloud Pub/Sub message with the given name and the message's publish time as the value. -
error_handling
Row
Row fields:
- output
string
- output
type: WriteToPubSub
input: ...
config:
topic: "topic"
format: "format"
schema: schema
attributes:
- "attributes"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
-
project
string
(Optional) : The GCP project where the Pubsub Lite reservation resides. This can be a project number of a project ID. -
schema
string
(Optional) : The schema in which the data is encoded in the Kafka topic. For AVRO data, this is a schema defined with AVRO schema syntax (https://avro.apache.org/docs/1.10.2/spec.html#schemas). For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/). -
format
string
: The encoding format for the data stored in Pubsub Lite. Valid options are: RAW,AVRO,JSON -
subscription_name
string
: The name of the subscription to consume data. This will be concatenated with the project and location parameters to build a full subscription path. -
location
string
: The region or zone where the Pubsub Lite reservation resides. -
attributes
Array[string]
(Optional) : List of attribute keys whose values will be flattened into the output message as additional fields. For example, if the format isRAW
and attributes is["a", "b"]
then this read will produce elements of the formRow(payload=..., a=..., b=...)
-
attribute_map
string
(Optional) : Name of a field in which to store the full set of attributes associated with this message. For example, if the format isRAW
andattribute_map
is set to"attrs"
then this read will produce elements of the formRow(payload=..., attrs=...)
whereattrs
is a Map type of string to string. If bothattributes
andattribute_map
are set, the overlapping attribute values will be present in both the flattened structure and the attribute map. -
attribute_id
string
(Optional) : The attribute on incoming Pubsub Lite messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, we cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort.
type: ReadFromPubSubLite
config:
project: "project"
schema: "schema"
format: "format"
subscription_name: "subscription_name"
location: "location"
attributes:
- "attributes"
- ...
attribute_map: "attribute_map"
attribute_id: "attribute_id"
-
project
string
: The GCP project where the Pubsub Lite reservation resides. This can be a project number of a project ID. -
format
string
: The encoding format for the data stored in Pubsub Lite. Valid options are: RAW,JSON,AVRO -
topic_name
string
: The name of the topic to publish data into. This will be concatenated with the project and location parameters to build a full topic path. -
location
string
: The region or zone where the Pubsub Lite reservation resides. -
attributes
Array[string]
(Optional) : List of attribute keys whose values will be pulled out as Pubsub Lite message attributes. For example, if the format isJSON
and attributes is["a", "b"]
then elements of the formRow(any_field=..., a=..., b=...)
will result in Pubsub Lite messages whose payload has the contents of any_field and whose attribute will be populated with the values ofa
andb
. -
attribute_id
string
(Optional) : If set, will set an attribute for each Pubsub Lite message with the given name and a unique value. This attribute can then be used in a ReadFromPubSubLite PTransform to deduplicate messages.
type: WriteToPubSubLite
input: ...
config:
project: "project"
format: "format"
topic_name: "topic_name"
location: "location"
attributes:
- "attributes"
- ...
attribute_id: "attribute_id"
-
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
query
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
type: ReadFromSqlServer
config:
url: "url"
username: "username"
password: "password"
table: "table"
query: "query"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
-
url
string
-
username
string
(Optional) -
password
string
(Optional) -
table
string
(Optional) -
driver_jars
string
(Optional) -
connection_properties
string
(Optional) -
connection_init_sql
Array[string]
(Optional)
type: WriteToSqlServer
input: ...
config:
url: "url"
username: "username"
password: "password"
table: "table"
driver_jars: "driver_jars"
connection_properties: "connection_properties"
connection_init_sql:
- "connection_init_sql"
- ...
Reads lines from a text files.
The resulting PCollection consists of rows with a single string filed named "line."
- path
string
: The file path to read from. The path can contain glob characters such as*
and?
.
type: ReadFromText
config:
path: "path"
Writes a PCollection to a (set of) text files(s).
The input must be a PCollection whose schema has exactly one field.
- path
string
: The file path to write to. The files written will begin with this prefix, followed by a shard identifier.
type: WriteToText
input: ...
config:
path: "path"