Skip to content

Instantly share code, notes, and snippets.

@datancoffee
Created April 23, 2024 10:47
Show Gist options
  • Save datancoffee/684927c06630af7ac62b29ef47da694f to your computer and use it in GitHub Desktop.
Save datancoffee/684927c06630af7ac62b29ef47da694f to your computer and use it in GitHub Desktop.
import dlt
from dlt.common.typing import TDataItems
from dlt.common.schema import TTableSchema
from dlt.common.destination import Destination
from typing import Any
from core.actions import Action
class Read(Action):
def __init__(self, actionname: str = None):
super().__init__(actionname)
# needs to say from_reference("destination"... to work
self.dltdestination = Destination.from_reference(
"destination",
destination_name=self.actionname+"_destination",
destination_callable=self.read_destination)
self.dltpipeline = dlt.pipeline(
self.actionname+"_destination_pipeline",
destination = self.dltdestination)
self.clean_state()
def read_destination(self, items: TDataItems, table: TTableSchema) -> None:
tablename = table["name"]
if tablename not in self.readitems:
self.readitems[tablename] = []
self.readitems[tablename].extend(items)
self.readtableschema[tablename] = table
def clean_state(self):
self.readitems={}
self.readtableschema={}
def do(self, *args:Any, **kwargs: Any):
self.clean_state()
self.dltpipeline.run(*args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment