Created
November 7, 2018 20:36
-
-
Save eric-maynard/9a36d32d1d281f4db7373c27c3fdc4eb to your computer and use it in GitHub Desktop.
A script to analyze Impala queries using Impala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
################################################## | |
# Constants # | |
################################################## | |
# curl constants: | |
HOST="bkestelman-1.gce.cloudera.com" | |
CM_USER="admin" | |
CM_PASS="admin" | |
CM_PORT="7180" | |
CLUSTER_NAME="Cluster%201" # spaces need to be represented as %20 for curl | |
IMPALA_SERVICE="IMPALA-1" | |
FROM_TIME="2018-11-06" | |
LIMIT="1000" | |
LOCAL_FILE="queries.json" | |
impala-shell='' | |
# impala table constants | |
DB_NAME="default" | |
TABLE_NAME="queries_extract" | |
################################################## | |
# Script # | |
################################################## | |
echo "Downloading Impala queries from CM API..." | |
curl -u ${CM_USER}:${CM_PASS} "${HOST}:${CM_PORT}/api/v14/clusters/${CLUSTER_NAME}/services/${IMPALA_SERVICE}/impalaQueries?from=${FROM_TIME}&limit=$LIMIT" > ${LOCAL_FILE} | |
echo "Loading data into Impala..." | |
echo ' | |
import org.apache.spark.sql.types._ | |
import scala.io.Source | |
import org.json._ | |
val json = new JSONObject(Source.fromFile("'${LOCAL_FILE}'").mkString) | |
val queries = json.getJSONArray("queries") | |
val rdd = sc.parallelize((0 until queries.length).map(i => queries.getJSONObject(i).toString)) | |
val schema = StructType(Seq(StructField("attributes",StructType(Seq(StructField("query_type", StringType, true),StructField("query_state", StringType, true),StructField("query_duration", StringType, true),StructField("rows_produced", StringType, true),StructField("statement", StringType, true),StructField("user", StringType, true),StructField("database", StringType, true),StructField("coordinator_host_id", StringType, true),StructField("query_id", StringType, true),StructField("service_name", StringType, true),StructField("executing", StringType, true),StructField("hdfs_bytes_read", StringType, true),StructField("hdfs_scanner_average_bytes_read_per_second", StringType, true),StructField("hdfs_bytes_skipped", StringType, true),StructField("hdfs_bytes_read_local", StringType, true),StructField("hdfs_bytes_read_local_percentage", StringType, true),StructField("hdfs_bytes_read_remote", StringType, true),StructField("hdfs_bytes_read_remote_percentage", StringType, true),StructField("hdfs_bytes_read_short_circuit", StringType, true),StructField("hdfs_bytes_read_short_circuit_percentage", StringType, true),StructField("hdfs_bytes_read_from_cache", StringType, true),StructField("hdfs_bytes_read_from_cache_percentage", StringType, true),StructField("hdfs_average_scan_range", StringType, true),StructField("bytes_streamed", StringType, true),StructField("file_formats", StringType, true),StructField("query_status", StringType, true),StructField("oom", StringType, true),StructField("admission_result", StringType, true),StructField("admission_wait", StringType, true),StructField("memory_spilled", StringType, true),StructField("hbase_bytes_read", StringType, true),StructField("hbase_scanner_average_bytes_read_per_second", StringType, true),StructField("impala_version", StringType, true),StructField("session_id", StringType, true),StructField("session_type", StringType, true),StructField("network_address", StringType, true),StructField("connected_user", StringType, true),StructField("delegated_user", StringType, true),StructField("memory_per_node_peak", StringType, true),StructField("memory_per_node_peak_node", StringType, true),StructField("memory_aggregate_peak", StringType, true),StructField("memory_accrual", StringType, true),StructField("ddl_type", StringType, true),StructField("thread_total_time", StringType, true),StructField("thread_cpu_time", StringType, true),StructField("thread_cpu_time_percentage", StringType, true),StructField("thread_storage_wait_time", StringType, true),StructField("thread_storage_wait_time_percentage", StringType, true),StructField("thread_network_send_wait_time", StringType, true),StructField("thread_network_send_wait_time_percentage", StringType, true),StructField("thread_network_receive_wait_time", StringType, true),StructField("thread_network_receive_wait_time_percentage", StringType, true),StructField("planning_wait_time", StringType, true),StructField("planning_wait_time_percentage", StringType, true),StructField("client_fetch_wait_time", StringType, true),StructField("client_fetch_wait_time_percentage", StringType, true),StructField("pool", StringType, true),StructField("resources_reserved_wait_time", StringType, true),StructField("resources_reserved_wait_time_percentage", StringType, true),StructField("rows_inserted", StringType, true),StructField("hdfs_bytes_written", StringType, true),StructField("stats_missing", StringType, true),StructField("estimated_per_node_peak_memory", StringType, true),StructField("cm_cpu_milliseconds", StringType, true))),true), StructField("coordinator",StructType(Seq(StructField("hostId",StringType,true))),true), StructField("database",StringType,true), StructField("detailsAvailable",BooleanType,true), StructField("durationMillis",LongType,true), StructField("endTime",StringType,true), StructField("queryId",StringType,true), StructField("queryState",StringType,true), StructField("queryType",StringType,true), StructField("rowsProduced",LongType,true), StructField("startTime",StringType,true), StructField("statement",StringType,true), StructField("user",StringType,true))) | |
spark.read.schema(schema).json(rdd).write.mode("append").saveAsTable("'${DB_NAME}'.'${TABLE_NAME}'") | |
System.exit(0) | |
' | spark2-shell --num-executors 1 --executor-memory 1g &> /dev/null | |
echo "Refreshing Impala..." | |
impala-shell -q "REFRESH ${TABLE_NAME}" | |
echo "Cleaning up..." | |
rm ${LOCAL_FILE} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment