Skip to content

Instantly share code, notes, and snippets.

@asdaraujo
Created May 3, 2023 03:14
Show Gist options
  • Save asdaraujo/cd4a996d4bdab62bcc454dd868feb1a8 to your computer and use it in GitHub Desktop.
Save asdaraujo/cd4a996d4bdab62bcc454dd868feb1a8 to your computer and use it in GitHub Desktop.
#!/bin/bash
set -u
set -e
export CDP_PROFILE=<CDP_PROFILE>
CLUSTER_NAME=<DATAHUB_NAME>
WORKLOAD_USR=<WORKLOAD_USER_NAME>
WORKLOAD_PWD=<WORKLOAD_PASSWORD>
CURL=(curl -s -H 'Accept: application/json' -H 'Content-Type: application/json' -u "${WORKLOAD_USR}:${WORKLOAD_PWD}")
function create_topic() {
# Create a topic
local cluster_name=$1
local topic_name=$2
local partitions=${3:-10}
local replication_factor=${4:-3}
local smm_api=$(cdp datahub describe-cluster --cluster-name $cluster_name | jq -r '.cluster.endpoints.endpoints[] | select(.knoxService == "SMM-API").serviceUrl')
"${CURL[@]}" -X POST "${smm_api%/}/api/v1/admin/topics" \
-d '
{
"newTopics": [
{
"name": "'"$topic_name"'",
"numPartitions": '"$partitions"',
"replicationFactor": '"$replication_factor"',
"configs": {
"cleanup.policy": "delete"
}
}
],
"allTopicNames": [
"'"$topic_name"'"
]
}'
}
function create_schema() {
# Create a schema
local cluster_name=$1
local schema_name=$2
local schema_text=$3
local schema_type=${4:-avro}
local schema_group=${5:-Kafka}
local schema_branch=${6:-MASTER}
local sr_api=$(cdp datahub describe-cluster --cluster-name $cluster_name | jq -r '.cluster.endpoints.endpoints[] | select(.knoxService == "SCHEMA-REGISTRY-API").serviceUrl')
"${CURL[@]}" -X POST "${sr_api%/}/schemaregistry/schemas" \
-d '
{
"type": "'"$schema_type"'",
"schemaGroup": "'"$schema_group"'",
"name": "'"$schema_name"'",
"description": "Schema for '"$schema_name"'",
"compatibility": "BACKWARD",
"validationLevel": "LATEST"
}
'
"${CURL[@]}" -X POST "${sr_api%/}/schemaregistry/schemas/${schema_name}/versions?branch=${schema_branch}" \
-d '
{
"description": "string",
"schemaText": '"$(echo "$schema_text" | jq -c . | jq -R .)"'
}
'
}
function print_details() {
# Print information required for the workshop
local cluster_name=$1
local smm_api=$(cdp datahub describe-cluster --cluster-name $cluster_name | jq -r '.cluster.endpoints.endpoints[] | select(.knoxService == "SMM-API").serviceUrl')
local brokers=$("${CURL[@]}" -X GET "${smm_api%/}/api/v1/admin/brokers" | jq -r '[.[] | "\(.host):\(.port)"] | join(",")')
printf "\n\n%-20s %s\n" "Kafka Brokers:" "$brokers"
local cm_api=$(cdp datahub describe-cluster --cluster-name $cluster_name | jq -r '.cluster.endpoints.endpoints[] | select(.knoxService == "CM-API").serviceUrl')
local sr_svc_name=$("${CURL[@]}" "${cm_api%/}/v40/clusters/${CLUSTER_NAME}/services" | jq -r '.items[] | select(.type == "SCHEMAREGISTRY").name')
local sr_host=$("${CURL[@]}" "${cm_api%/}/v40/clusters/${CLUSTER_NAME}/services/${sr_svc_name}/roles" | jq -r '.items[] | select(.type = "SCHEMA_REGISTRY_SERVER").hostRef.hostname')
local sr_rcg_name=$("${CURL[@]}" "${cm_api%/}/v40/clusters/${CLUSTER_NAME}/services/${sr_svc_name}/roles" | jq -r '.items[] | select(.type = "SCHEMA_REGISTRY_SERVER").roleConfigGroupRef.roleConfigGroupName')
local sr_port=$("${CURL[@]}" "${cm_api%/}/v40/clusters/${CLUSTER_NAME}/services/${sr_svc_name}/roleConfigGroups/${sr_rcg_name}/config?view=full" | jq -r '.items[] | select(.name == "schema.registry.ssl.port") | .value // .default')
printf "%-20s %s\n" "Schema Registry API:" "https://${sr_host}:${sr_port}/api/v1"
}
# MAIN
create_topic "$CLUSTER_NAME" syslog_avro 3 3
create_topic "$CLUSTER_NAME" syslog_critical 3 3
create_schema "$CLUSTER_NAME" "syslog" '
{
"name": "syslog",
"type": "record",
"namespace": "com.cloudera",
"fields": [
{ "name": "priority", "type": "int" },
{ "name": "severity", "type": "int" },
{ "name": "facility", "type": "int" },
{ "name": "version", "type": "int" },
{ "name": "timestamp", "type": "long" },
{ "name": "hostname", "type": "string" },
{ "name": "body", "type": "string" },
{ "name": "appName", "type": "string" },
{ "name": "procid", "type": "string" },
{ "name": "messageid", "type": "string" },
{ "name": "structuredData",
"type": {
"name": "structuredData",
"type": "record",
"fields": [
{ "name": "SDID",
"type": {
"name": "SDID",
"type": "record",
"fields": [
{ "name": "eventId", "type": "string" },
{ "name": "eventSource", "type": "string" },
{ "name": "iut", "type": "string" }
]
}
}
]
}
}
]
}
'
create_schema "$CLUSTER_NAME" "syslog_transformed" '
{
"name": "syslog_transformed",
"type": "record",
"namespace": "com.cloudera",
"fields": [
{ "name": "priority", "type": "int" },
{ "name": "severity", "type": "int" },
{ "name": "facility", "type": "int" },
{ "name": "version", "type": "int" },
{ "name": "event_timestamp", "type": "long" },
{ "name": "hostname", "type": "string" },
{ "name": "body", "type": "string" },
{ "name": "appname", "type": "string" },
{ "name": "procid", "type": "string" },
{ "name": "messageid", "type": "string" },
{ "name": "structureddata",
"type": {
"name": "structuredData",
"type": "record",
"fields": [
{ "name": "sdid",
"type": {
"name": "sdid",
"type": "record",
"fields": [
{ "name": "eventid", "type": "string" },
{ "name": "eventsource", "type": "string" },
{ "name": "iut", "type": "string" }
]
}
}
]
}
}
]
}
'
print_details "$CLUSTER_NAME"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment