Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@geojins
Created February 23, 2020 01:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save geojins/09096637dac8e9beda370233c3bdfce8 to your computer and use it in GitHub Desktop.
Save geojins/09096637dac8e9beda370233c3bdfce8 to your computer and use it in GitHub Desktop.
Python Script to Read data from one Phoenix cluster and Write to another
from pyspark.sql import SparkSession
import sys
def phoenix_data_export_import(spark, sourceZk, destinationZk, table):
#Read from phoenix as Dataframe
df = spark.read \
.format("org.apache.phoenix.spark") \
.option("table", table) \
.option("zkUrl", sourceZk) \
.load()
#Write the dataframe to destination phoenix.
df.write \
.format("org.apache.phoenix.spark") \
.mode("overwrite") \
.option("table", table) \
.option("zkUrl", destinationZk) \
.save()
# sys.argv[1] - Source Zookeeper URL ( HOST:PORT)
# sys.argv[2] - Destination Zookeeper URL ( HOST:PORT)
# sys.argv[2] - File containing list of tables, one table per line
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Phoenix Data Export Import Job") \
.getOrCreate()
# path of file containing list of tables.
tableList = open(sys.argv[3], "r")
for table in tableList:
try:
phoenix_data_export_import(spark, sys.argv[1], sys.argv[2], table.strip())
except:
print("Exception in exporting/import of table: {} ".format(table))
tableList.close()
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment