-
-
Save geojins/09096637dac8e9beda370233c3bdfce8 to your computer and use it in GitHub Desktop.
Python Script to Read data from one Phoenix cluster and Write to another
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
import sys | |
def phoenix_data_export_import(spark, sourceZk, destinationZk, table): | |
#Read from phoenix as Dataframe | |
df = spark.read \ | |
.format("org.apache.phoenix.spark") \ | |
.option("table", table) \ | |
.option("zkUrl", sourceZk) \ | |
.load() | |
#Write the dataframe to destination phoenix. | |
df.write \ | |
.format("org.apache.phoenix.spark") \ | |
.mode("overwrite") \ | |
.option("table", table) \ | |
.option("zkUrl", destinationZk) \ | |
.save() | |
# sys.argv[1] - Source Zookeeper URL ( HOST:PORT) | |
# sys.argv[2] - Destination Zookeeper URL ( HOST:PORT) | |
# sys.argv[2] - File containing list of tables, one table per line | |
if __name__ == "__main__": | |
spark = SparkSession \ | |
.builder \ | |
.appName("Phoenix Data Export Import Job") \ | |
.getOrCreate() | |
# path of file containing list of tables. | |
tableList = open(sys.argv[3], "r") | |
for table in tableList: | |
try: | |
phoenix_data_export_import(spark, sys.argv[1], sys.argv[2], table.strip()) | |
except: | |
print("Exception in exporting/import of table: {} ".format(table)) | |
tableList.close() | |
spark.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment