Skip to content

Instantly share code, notes, and snippets.

@2bethere
Created February 15, 2021 22:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 2bethere/e7a500a92e748ac733d549ed7b184523 to your computer and use it in GitHub Desktop.
Save 2bethere/e7a500a92e748ac733d549ed7b184523 to your computer and use it in GitHub Desktop.
dump_druid_data_shape
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import urllib3
import requests
import json
import zipfile
import zlib
from datetime import datetime
from os import mkdir, path as os_path
import glob
class ConfigContext:
__instance = None
host = ""
username = ""
password = ""
@staticmethod
def getInstance():
""" Static access method. """
if ConfigContext.__instance == None:
ConfigContext()
return ConfigContext.__instance
def __init__(self):
""" Virtually private constructor. """
if ConfigContext.__instance != None:
raise Exception("This class is a singleton!")
else:
ConfigContext.__instance = self
# Check server health status
def check_status(config):
try:
r = requests.get(config.host+"/status/health", auth=(config.username, config.password), verify=False)
return r.status_code
except:
return 500
def exec_query(config, query):
return requests.post(config.host+"/druid/v2/sql", auth=(config.username, config.password), verify=False, json = query, headers = {"Content-Type": "application/json", "Accept":"application/json"}).json()
# Get a list of data sources
def get_datasources(config):
r = requests.get(config.host+"/druid/v2/datasources", auth=(config.username, config.password), verify=False)
return r.json()
# Get a list of segment and their sizes + time intervals
def get_segments(config, datasource):
query ={"query":"SELECT * FROM sys.segments where datasource = '{0}'".format(datasource)}
return exec_query(config, query)
# Get a list of columns & data type
def get_columns(config, datasource):
query ={"query":"SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='{0}'".format(datasource)}
return exec_query(config, query)
# For string column, min and max lenth
def get_column_detail_str(config, datasource, column):
query ={"query":"SELECT MIN(LENGTH(\"{0}\")) AS \"MIN\", MAX(LENGTH(\"{0}\")) AS \"MAX\", COUNT(DISTINCT \"{0}\") AS \"DISTINCT\" FROM \"{1}\"".format(column, datasource)}
result = exec_query(config, query)[0]
return {"min":result['MIN'], "max":result['MAX'], "cardinality": result['DISTINCT']}
# For long/float/double column, min, max
def get_column_detail_num(config, datasource, column):
query ={"query":"SELECT MIN(\"{0}\") AS \"MIN\", MAX(\"{0}\") AS \"MAX\", COUNT(DISTINCT \"{0}\") AS \"DISTINCT\" FROM \"{1}\"".format(column, datasource)}
result = exec_query(config, query)[0]
return {"min":result['MIN'], "max":result['MAX'], "cardinality": result['DISTINCT']}
# For timestamp column, min, max, granularity
def get_column_detail_timestamp(config, datasource, column):
query ={"query":"SELECT MIN(\"{0}\") AS \"MIN\", MAX(\"{0}\") AS \"MAX\", COUNT(DISTINCT \"{0}\") AS \"DISTINCT\" FROM \"{1}\"".format(column, datasource)}
result = exec_query(config, query)[0]
return {"min":result['MIN'], "max":result['MAX'], "cardinality": result['DISTINCT']}
# Compress all json objects for shipping
def package(path):
input_files = map(os_path.basename, glob.glob("./{0}/*.json".format(path)))
zf = zipfile.ZipFile("{0}.zip".format(path), mode="w")
try:
for file_name in input_files:
# Add file to the zip file
# first parameter file to zip, second filename in zip
zf.write(path + "/" + file_name, file_name, compress_type=zipfile.ZIP_DEFLATED)
except FileNotFoundError:
print("An error occurred")
finally:
# Don't forget to close the file!
zf.close()
# Parse commline options
def parse_arg(config):
parser = argparse.ArgumentParser(description='Collect data statistics from Druid for performance analysis')
parser.add_argument('-host','--host', metavar='IP:PORT', nargs='+',
help='Host of Druid coordinator/router server and port. Default port to ...')
parser.add_argument('-u','--user', metavar='username', nargs='+',
help='The username used for http basic auth for Druid server. Default to no auth')
parser.add_argument('-p','--password', metavar='password', nargs='+',
help='The password used for the basic auth. Default to no auth')
args = parser.parse_args()
if(args.host != None):
print("Using "+ str(args.host))
config.host = args.host[0]
else:
print("Default to localhost")
config.host = "http://localhost:8080"
if(args.user != None):
config.username = args.user[0]
if(args.password != None):
config.password = args.password[0]
return
if __name__ == "__main__":
urllib3.disable_warnings()
config = ConfigContext.getInstance()
parse_arg(config)
if check_status(config) != 200:
print("Cannot get a healthy server, check your settings")
exit(1)
path = datetime.now().strftime("datashape_%Y%m%d-%H%M%S")
mkdir("./"+path)
datasources = get_datasources(config)
for datasource in datasources:
print("Getting segment info for {0}".format(datasource))
segments = get_segments(config, datasource)
print("Getting column info for {0}".format(datasource))
columns = get_columns(config, datasource)
for i, column in enumerate(columns):
print("Column {0} of {1} for {2}".format(i + 1, len(columns), datasource))
if column['DATA_TYPE'] == 'BIGINT':
column['DATA_SHAPE'] = get_column_detail_num(config, datasource, column['COLUMN_NAME'])
elif column['DATA_TYPE'] == 'FLOAT':
column['DATA_SHAPE'] = get_column_detail_num(config, datasource, column['COLUMN_NAME'])
elif column['DATA_TYPE'] == 'DOUBLE':
column['DATA_SHAPE'] = get_column_detail_num(config, datasource, column['COLUMN_NAME'])
elif column['DATA_TYPE'] == 'VARCHAR':
column['DATA_SHAPE'] = get_column_detail_str(config, datasource, column['COLUMN_NAME'])
elif column['DATA_TYPE'] == 'TIMESTAMP':
column['DATA_SHAPE'] = get_column_detail_timestamp(config, datasource, column['COLUMN_NAME'])
else:
print("No matching type for {0}.{1}".format(column['COLUMN_NAME'], column['DATA_TYPE']))
output = {"datasource": datasource, "segments": segments, "columns": columns}
with open('./{0}/{1}.json'.format(path, datasource), 'w') as outfile:
json.dump(output, outfile, indent=4)
print("Compressing data for shipping...")
package(path)
print("Data collection complete, {0}.zip is ready to ship".format(path))
certifi==2020.12.5
chardet==4.0.0
idna==2.10
requests==2.25.1
urllib3==1.26.3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment