Skip to content

Instantly share code, notes, and snippets.

@edgarrmondragon
Created January 8, 2024 16:58
Show Gist options
  • Save edgarrmondragon/0d39ec2b309c0ef7b055ca18be9584c7 to your computer and use it in GitHub Desktop.
Save edgarrmondragon/0d39ec2b309c0ef7b055ca18be9584c7 to your computer and use it in GitHub Desktop.
Singer SDK: split an API response into multiple streams
from typing_extensions import override
from singer_sdk import Stream, Tap
class ParentStream(Stream):
name = "parent"
schema = {
"properties": {
"W": {"type": "string"},
"X": {"type": "string"},
}
}
def get_response(self):
return {
"A": "",
"B": "",
"C": [
{
"W": "",
"X": "",
"Y": [{"L": "", "M": "", "N": ""}],
"Z": [{"P": "", "Q": "", "R": ""}],
}
],
}
@override
def get_records(self, context):
response = self.get_response()
yield from response["C"]
def get_child_context(self, record, context):
return {
"W": record["W"],
"Y": record.pop("Y", []),
"Z": record.pop("Z", []),
}
class ChildStreamY(Stream):
name = "child_y"
schema = {
"properties": {
"W": {"type": "string"},
"L": {"type": "string"},
"M": {"type": "string"},
"N": {"type": "string"},
}
}
parent_stream_type = ParentStream
state_partitioning_keys = ["W"]
@override
def get_records(self, context):
yield from context["Y"]
class ChildStreamZ(Stream):
name = "child_z"
schema = {
"properties": {
"W": {"type": "string"},
"P": {"type": "string"},
"Q": {"type": "string"},
"R": {"type": "string"},
}
}
parent_stream_type = ParentStream
state_partitioning_keys = ["W"]
@override
def get_records(self, context):
yield from context["Z"]
class MyTap(Tap):
name = "my-tap"
def discover_streams(self):
return [ParentStream(self), ChildStreamY(self), ChildStreamZ(self)]
if __name__ == "__main__":
MyTap.cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment