Skip to content

Instantly share code, notes, and snippets.

@PlugaruT
Created November 12, 2020 10:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PlugaruT/4666406bd8792b7b196dc1519c8885a2 to your computer and use it in GitHub Desktop.
Save PlugaruT/4666406bd8792b7b196dc1519c8885a2 to your computer and use it in GitHub Desktop.
Apache Beam issue
class WriteToObjectTableBigquery(PTransform):
def __init__(
self, project: str, dataset: str, window_interval_seconds: int = 20, window_lateness_seconds: int = 10
):
super().__init__()
self.project = project
self.dataset = dataset
self.window_interval_seconds = window_interval_seconds
self.window_lateness_seconds = window_lateness_seconds
def _choose_table(self, element):
"""
Method to dynamically detect the table in which the object should go
"""
object_name = element.pop("table_name")
table_name = f"{self.project}:{self.dataset}.{object_name}"
logger.info(f"Writing event for object with id={element['id']} in {table_name}")
return table_name
def expand(self, pcoll):
return (
pcoll
| "GroupByObjectType" >> Map(lambda e: (e["object_type"], e))
| "Window"
>> WindowInto(
windowfn=FixedWindows(self.window_interval_seconds), allowed_lateness=self.window_lateness_seconds
)
| "GroupByKey" >> GroupByKey()
| "KeepLastEventOnly" >> ParDo(KeepLastEvents())
| "WriteToBQ"
>> WriteToBigQuery(
table=self._choose_table,
create_disposition=BigQueryDisposition.CREATE_NEVER,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment