Skip to content

Instantly share code, notes, and snippets.

@voycey
Created July 19, 2021 02:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save voycey/5f6b93d04e0081d476f17c3fc17924ab to your computer and use it in GitHub Desktop.
Save voycey/5f6b93d04e0081d476f17c3fc17924ab to your computer and use it in GitHub Desktop.
Load into QuestDB
def write_to_quest(df_row):
HOST = args['questdb_host']
PORT = int(args['questdb_port'])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
sock.sendall(_row_to_line_protocol(df_row).encode())
except socket.error as e:
print("Got error: %s" % (e))
sock.close()
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
allDaily = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'],
table_name = "daily",
transformation_ctx = "allDaily")
#convert to a DF and write each row to quest
allDaily.toDF().foreach(write_to_quest)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment