Created
November 12, 2020 10:09
-
-
Save PlugaruT/4666406bd8792b7b196dc1519c8885a2 to your computer and use it in GitHub Desktop.
Apache Beam issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class WriteToObjectTableBigquery(PTransform): | |
def __init__( | |
self, project: str, dataset: str, window_interval_seconds: int = 20, window_lateness_seconds: int = 10 | |
): | |
super().__init__() | |
self.project = project | |
self.dataset = dataset | |
self.window_interval_seconds = window_interval_seconds | |
self.window_lateness_seconds = window_lateness_seconds | |
def _choose_table(self, element): | |
""" | |
Method to dynamically detect the table in which the object should go | |
""" | |
object_name = element.pop("table_name") | |
table_name = f"{self.project}:{self.dataset}.{object_name}" | |
logger.info(f"Writing event for object with id={element['id']} in {table_name}") | |
return table_name | |
def expand(self, pcoll): | |
return ( | |
pcoll | |
| "GroupByObjectType" >> Map(lambda e: (e["object_type"], e)) | |
| "Window" | |
>> WindowInto( | |
windowfn=FixedWindows(self.window_interval_seconds), allowed_lateness=self.window_lateness_seconds | |
) | |
| "GroupByKey" >> GroupByKey() | |
| "KeepLastEventOnly" >> ParDo(KeepLastEvents()) | |
| "WriteToBQ" | |
>> WriteToBigQuery( | |
table=self._choose_table, | |
create_disposition=BigQueryDisposition.CREATE_NEVER, | |
insert_retry_strategy=RetryStrategy.RETRY_NEVER, | |
) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment