Skip to content

Instantly share code, notes, and snippets.

@eric-maynard
Created November 7, 2018 20:36
Show Gist options
  • Save eric-maynard/9a36d32d1d281f4db7373c27c3fdc4eb to your computer and use it in GitHub Desktop.
Save eric-maynard/9a36d32d1d281f4db7373c27c3fdc4eb to your computer and use it in GitHub Desktop.
A script to analyze Impala queries using Impala
#!/bin/bash
##################################################
# Constants #
##################################################
# curl constants:
HOST="bkestelman-1.gce.cloudera.com"
CM_USER="admin"
CM_PASS="admin"
CM_PORT="7180"
CLUSTER_NAME="Cluster%201" # spaces need to be represented as %20 for curl
IMPALA_SERVICE="IMPALA-1"
FROM_TIME="2018-11-06"
LIMIT="1000"
LOCAL_FILE="queries.json"
impala-shell=''
# impala table constants
DB_NAME="default"
TABLE_NAME="queries_extract"
##################################################
# Script #
##################################################
echo "Downloading Impala queries from CM API..."
curl -u ${CM_USER}:${CM_PASS} "${HOST}:${CM_PORT}/api/v14/clusters/${CLUSTER_NAME}/services/${IMPALA_SERVICE}/impalaQueries?from=${FROM_TIME}&limit=$LIMIT" > ${LOCAL_FILE}
echo "Loading data into Impala..."
echo '
import org.apache.spark.sql.types._
import scala.io.Source
import org.json._
val json = new JSONObject(Source.fromFile("'${LOCAL_FILE}'").mkString)
val queries = json.getJSONArray("queries")
val rdd = sc.parallelize((0 until queries.length).map(i => queries.getJSONObject(i).toString))
val schema = StructType(Seq(StructField("attributes",StructType(Seq(StructField("query_type", StringType, true),StructField("query_state", StringType, true),StructField("query_duration", StringType, true),StructField("rows_produced", StringType, true),StructField("statement", StringType, true),StructField("user", StringType, true),StructField("database", StringType, true),StructField("coordinator_host_id", StringType, true),StructField("query_id", StringType, true),StructField("service_name", StringType, true),StructField("executing", StringType, true),StructField("hdfs_bytes_read", StringType, true),StructField("hdfs_scanner_average_bytes_read_per_second", StringType, true),StructField("hdfs_bytes_skipped", StringType, true),StructField("hdfs_bytes_read_local", StringType, true),StructField("hdfs_bytes_read_local_percentage", StringType, true),StructField("hdfs_bytes_read_remote", StringType, true),StructField("hdfs_bytes_read_remote_percentage", StringType, true),StructField("hdfs_bytes_read_short_circuit", StringType, true),StructField("hdfs_bytes_read_short_circuit_percentage", StringType, true),StructField("hdfs_bytes_read_from_cache", StringType, true),StructField("hdfs_bytes_read_from_cache_percentage", StringType, true),StructField("hdfs_average_scan_range", StringType, true),StructField("bytes_streamed", StringType, true),StructField("file_formats", StringType, true),StructField("query_status", StringType, true),StructField("oom", StringType, true),StructField("admission_result", StringType, true),StructField("admission_wait", StringType, true),StructField("memory_spilled", StringType, true),StructField("hbase_bytes_read", StringType, true),StructField("hbase_scanner_average_bytes_read_per_second", StringType, true),StructField("impala_version", StringType, true),StructField("session_id", StringType, true),StructField("session_type", StringType, true),StructField("network_address", StringType, true),StructField("connected_user", StringType, true),StructField("delegated_user", StringType, true),StructField("memory_per_node_peak", StringType, true),StructField("memory_per_node_peak_node", StringType, true),StructField("memory_aggregate_peak", StringType, true),StructField("memory_accrual", StringType, true),StructField("ddl_type", StringType, true),StructField("thread_total_time", StringType, true),StructField("thread_cpu_time", StringType, true),StructField("thread_cpu_time_percentage", StringType, true),StructField("thread_storage_wait_time", StringType, true),StructField("thread_storage_wait_time_percentage", StringType, true),StructField("thread_network_send_wait_time", StringType, true),StructField("thread_network_send_wait_time_percentage", StringType, true),StructField("thread_network_receive_wait_time", StringType, true),StructField("thread_network_receive_wait_time_percentage", StringType, true),StructField("planning_wait_time", StringType, true),StructField("planning_wait_time_percentage", StringType, true),StructField("client_fetch_wait_time", StringType, true),StructField("client_fetch_wait_time_percentage", StringType, true),StructField("pool", StringType, true),StructField("resources_reserved_wait_time", StringType, true),StructField("resources_reserved_wait_time_percentage", StringType, true),StructField("rows_inserted", StringType, true),StructField("hdfs_bytes_written", StringType, true),StructField("stats_missing", StringType, true),StructField("estimated_per_node_peak_memory", StringType, true),StructField("cm_cpu_milliseconds", StringType, true))),true), StructField("coordinator",StructType(Seq(StructField("hostId",StringType,true))),true), StructField("database",StringType,true), StructField("detailsAvailable",BooleanType,true), StructField("durationMillis",LongType,true), StructField("endTime",StringType,true), StructField("queryId",StringType,true), StructField("queryState",StringType,true), StructField("queryType",StringType,true), StructField("rowsProduced",LongType,true), StructField("startTime",StringType,true), StructField("statement",StringType,true), StructField("user",StringType,true)))
spark.read.schema(schema).json(rdd).write.mode("append").saveAsTable("'${DB_NAME}'.'${TABLE_NAME}'")
System.exit(0)
' | spark2-shell --num-executors 1 --executor-memory 1g &> /dev/null
echo "Refreshing Impala..."
impala-shell -q "REFRESH ${TABLE_NAME}"
echo "Cleaning up..."
rm ${LOCAL_FILE}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment