Skip to content

Instantly share code, notes, and snippets.

@mrn-aglic
Created October 11, 2022 19:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrn-aglic/50174b840094d428ceff93f4cde74a5a to your computer and use it in GitHub Desktop.
Save mrn-aglic/50174b840094d428ceff93f4cde74a5a to your computer and use it in GitHub Desktop.
class TransferPsqlToMysql(BaseOperator):
template_fields = ["sql"]
template_ext = [".sql"]
template_fields_renderers = {"sql": "sql"}
# pylint: disable=too-many-arguments
def __init__(
self,
postgres_conn_id,
mysql_conn_id,
sql,
dst_table_name,
truncate_table,
**kwargs,
):
self.postgres_conn_id = postgres_conn_id
self.mysql_conn_id = mysql_conn_id
self.sql = sql
self.dst_table_name = dst_table_name
self.truncate_table = truncate_table
super().__init__(**kwargs)
def execute(self, context: Context) -> Any:
postgres_hook = PostgresHook(self.postgres_conn_id)
mysql_hook = MySqlHook(self.mysql_conn_id)
with NamedTemporaryFile("w+b") as file:
logging.info("Exporting data to csv file")
postgres_hook.copy_expert(
f"COPY ({self.sql}) TO STDOUT WITH (FORMAT csv, DELIMITER '\t')",
file.name,
)
file.flush()
file.seek(0)
if self.truncate_table:
mysql_hook.run(f"TRUNCATE TABLE {self.dst_table_name}")
mysql_hook.bulk_load(self.dst_table_name, file.name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment