Skip to content

Instantly share code, notes, and snippets.

@pvillard31
Created December 24, 2021 19:29
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pvillard31/79ccd8f3f8d2f0a69476e5f0e63fc131 to your computer and use it in GitHub Desktop.
Save pvillard31/79ccd8f3f8d2f0a69476e5f0e63fc131 to your computer and use it in GitHub Desktop.
Script to be used with the NiFi Registry Event Script Hook for automatic sync with the Cloudera DataFlow Catalog
#!/bin/sh
set -e
# This script is used to automatically sync new flows into the DataFlow Catalog
# It assumes that the CDP CLI has been installed first. See:
# https://docs.cloudera.com/dataflow/cloud/cli/index.html
# It also assumes that the NiFi CLI has been properly configured to interact with the NiFi Registry instance. See:
# https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html#nifi_CLI
NIFICLI="/opt/cloudera/parcels/CFM/TOOLKIT/bin/cli.sh"
CDPCLI="/hadoopfs/fs1/working-dir/cdpclienv/bin/cdp"
MAPPING="/hadoopfs/fs1/working-dir/scripts/mapping.txt"
# CREATE_FLOW_VERSION is the event when a new version of a flow is committed
if [ "$1" = "CREATE_FLOW_VERSION" ]; then
BUCKETID="$2"
FLOWID="$3"
FLOWVERSION="$4"
# we export the flow definition using the NiFi CLI as a JSON file:
$NIFICLI registry export-flow-version --flowIdentifier $FLOWID --flowVersion $FLOWVERSION > /tmp/flow.json
FLOWNAME=`jq -r '.flowContents.name' /tmp/flow.json`
FLOWDESCRIPTION=`$NIFICLI registry list-flows --bucketIdentifier $BUCKETID -ot json | jq '.[] | select(.identifier=="'$FLOWID'")' | jq -r '.description'`
FLOWVERSIONCOMMENT=`jq -r '.snapshotMetadata.comments' /tmp/flow.json`
# In order to know if a flow has already been sync'ed into the DataFlow Catalog, we keep track of
# what we're doing in a local file.
if [ $(grep "$FLOWID" "$MAPPING" | wc -l) -eq 1 ]; then
# The flow ID is already in our mapping file... so we're going to add a new version
# we retrieve the CRN from the mapping file
CRN=`grep "$FLOWID" "$MAPPING" | awk '{print $2}'`
# we add the new version into the DF Catalog
$CDPCLI df import-flow-definition-version --flow-crn "$CRN" --file /tmp/flow.json --comments "$FLOWVERSIONCOMMENT"
else
# The flow ID is not in our mapping file... We didn't sync it yet in the DF Catalog
# we import the flow into the DF Catalog for the first time
$CDPCLI df import-flow-definition --file /tmp/flow.json --name "$FLOWNAME" --description "$FLOWDESCRIPTION" --comments "$FLOWVERSIONCOMMENT" > /tmp/cdpinsert.json
# We extract the CRN of the newly added flow
CRN=`jq -r '.crn' /tmp/cdpinsert.json`
# we add the info into our flow mapping file
echo "$FLOWID $CRN" >> $MAPPING
fi
fi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment