Skip to content

Instantly share code, notes, and snippets.

@gregglind
Last active February 14, 2017 18:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gregglind/e0f7143894d2c065f6dcac1129ef7b71 to your computer and use it in GitHub Desktop.
Save gregglind/e0f7143894d2c065f6dcac1129ef7b71 to your computer and use it in GitHub Desktop.
shield-studies-addons prototype job
filename = "s3_parquet.lua"
message_matcher = "Type == 'telemetry' && Fields[docType] == 'shield-study-addon'"
ticker_interval = 60
preserve_data = false
parquet_schema = [=[
message shield-study-addon {
required group application {
optional group addons {
optional group activeAddons (MAP) {
repeated group key_value {
required binary key (UTF8);
required binary value (UTF8);
}
}
optional group activeExperiment {
optional binary id (UTF8);
optional binary branch (UTF8);
}
optional group activeGMPlugins (MAP) {
repeated group key_value {
required binary key (UTF8);
required binary value (UTF8);
}
}
optional binary activePlugins (UTF8);
optional binary persona (UTF8);
optional group theme {
optional binary id (UTF8);
optional boolean blocklisted;
optional binary description (UTF8);
optional binary name (UTF8);
optional boolean userDisabled;
optional boolean appDisabled;
optional binary version (UTF8);
optional int64 scope;
optional binary foreignInstall (UTF8);
optional boolean hasBinaryComponents;
optional binary installDay (UTF8);
optional int64 updateDay;
}
}
required binary architecture (UTF8);
required binary buildId (UTF8);
required binary channel (UTF8);
required binary name (UTF8);
required binary platformVersion (UTF8);
required binary version (UTF8);
}
required binary clientId (UTF8);
required binary creationDate (UTF8);
optional group environment {
optional group system {
optional group os {
optional binary name (UTF8);
optional binary version (UTF8);
optional binary locale (UTF8);
}
}
optional group profile {
optional int64 creationDate;
optional int64 resetDate;
}
optional group settings {
optional boolean blocklistEnabled;
optional boolean isDefaultBrowser;
optional binary defaultSearchEngine (UTF8);
optional group defaultSearchEngineData {
optional binary name (UTF8);
optional binary loadPath (UTF8);
optional binary submissionURL (UTF8);
optional binary origin (UTF8);
}
optional boolean e10sEnabled;
optional binary e10sCohort (UTF8);
optional binary locale (UTF8);
optional boolean telemetryEnabled;
optional group update {
optional boolean autoDownload;
optional binary channel (UTF8);
optional boolean enabled;
}
}
}
required binary id (UTF8);
required binary type (UTF8);
required binary version (UTF8);
required group payload {
required int64 version;
required binary study_name (UTF8);
required binary branch (UTF8);
required binary addon_version (UTF8);
required binary shield_version (UTF8);
optional boolean testing;
required group data {
required group attributes (MAP) {
repeated group key_value {
required binary key (UTF8);
required binary value (UTF8);
}
}
}
required binary type (UTF8);
}
required group metadata {
required int64 Timestamp;
required binary submissionDate (UTF8);
required binary Date (UTF8);
required binary normalizedChannel (UTF8);
required binary geoCountry (UTF8);
required binary geoCity (UTF8);
}
}
]=]
-- The name of a top level parquet group used to specify additional information
-- to be extracted from the message (using read_message). If the column name
-- matches a Heka message header name the data is extracted from 'msg.name'
-- otherwise the data is extracted from msg.Fields[name]
metadata_group = "metadata"
-- Array of Heka message variables containing JSON strings. The decoded JSON
-- objects are assembled into a record that is dissected based on the parquet
-- schema. This provides a generic way to cherry pick and re-combine the
-- segmented JSON structures like the Mozilla telemetry pings. A table can be
-- passed as the first value either empty or with some pre-seeded values.
-- If not specified the schema is applied directly to the Heka message.
json_objects = {"Fields[submission]", "Fields[environment.profile]", "Fields[environment.settings]", "Fields[environment.system]", "Fields[payload]"}
s3_path_dimensions = {
-- access message data with using read_message()
{name = "_submission_date", source = "Fields[submissionDate]"},
-- access the record data with a path array
-- {name = "_submission_date", source = {"metadata", "submissionDate"}}
}
-- directory location to store the intermediate output files
batch_dir = "/var/tmp/parquet"
-- Specifies how many parquet writers can be opened at once. If this value is
-- exceeded the least-recently used writer will have its data finalize and be
-- closed. The default is 100. A value of 0 means no maximum **warning** if
-- there are a large number of partitions this can easily run the system out of
-- file handles and/or memory.
max_writers = 100
-- Specifies how many records to aggregate before creating a rowgroup
-- (default 10000)
max_rowgroup_size = 10000
-- Specifies how much data (in bytes) can be written to a single file before
-- it is finalized. The file size is only checked after each rowgroup write
-- (default 300MiB).
max_file_size = 1024 * 1024 * 300
-- Specifies how long (in seconds) to wait before the file is finalized
-- (default 1 hour). Idle files are only checked every ticker_interval seconds.
max_file_age = 60 * 60
hive_compatible = true -- default false
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment