Skip to content

Instantly share code, notes, and snippets.

@jackghm
Created September 11, 2016 06:46
Show Gist options
  • Save jackghm/b2ea9d5536755288ca5f48cd182731b1 to your computer and use it in GitHub Desktop.
Save jackghm/b2ea9d5536755288ca5f48cd182731b1 to your computer and use it in GitHub Desktop.
nodeSegUDFgen.scala
import java.io._
/*
Argument 0 is the output file from the following
vsql -h hostNme -U jackg -w <redacted> -d pstl -At -c "select get_projection_segments('jackg.testHashNodes_p1_b0');" -o /tmp/get_projection_segments.out
Argument 1 is the output file from the following
run vSQL to create an output file of the node names and Private IP address used to ssh too.
typically we run on an Edge Node, then ssh to private Vertica host IP Address to execute local vsql commands
vsql -h hostNme -U jackg -w <redacted> -d pstl -At -c "select b.node_name, a.ip_address from v_monitor.network_interfaces a join v_catalog.nodes b on (a.node_name = b.node_name) where b.node_state = 'UP' and a.interface = 'eno16777736' and a.ip_address_family = 'ipv4' order by a.node_name asc;" -o /tmp/NodeIP.out
*/
object nodeSegUDFgen {
def main(args: Array[String]): Unit = {
if (args.isEmpty) {
println(s"ERROR: expected 2 arguments. A Path to get_projection_segments file and path to IP/Nodes filename\n")
System.exit(1)
}
import Control._
val fileIn = args(0) // /tmp/get_projection_segments.out
val sNodeNamesFilePath = args(1) // /tmp/NodeIP.out
val fJava = new java.io.File(fileIn)
val sInsSelParallelConf = fJava.getParent + "/InsSelParallel.sh.conf" // get the parent directory path of the file
val segUDF = s"CREATE OR REPLACE FUNCTION test.segmentation(hashint Integer) RETURN INT AS BEGIN RETURN (4294967295 & hashINT); END;\n"
case class nodeSeg(index: Int, nodeName: String, segLow: Long, segHigh: Long)
// To avoid using JDBC we read the get_projection_segments output from a file.
// The file from get_projection_segments output contains 3 rows that we read into arrays of strings.
// We also read a file containing the nodenames and internal IpAddresses
// used in our parallel export/import migration code (not shown here)
import scala.io._
val bufferedSourceNodeIPs = Source.fromFile(sNodeNamesFilePath)
val aLinesNodeIP = using(bufferedSourceNodeIPs) { bufferedSourceNodeIPs => bufferedSourceNodeIPs.getLines.toArray }
import scala.collection.mutable.Map
val nodeMap = Map[String, String]()
for ((line, elementNum) <- aLinesNodeIP.zipWithIndex) {
val aNode: Array[String] = line.split('|').map(_.trim)
nodeMap += (aNode(0) -> aNode(1))
}
val bufferedSource = Source.fromFile(fileIn)
val aLines = using(bufferedSource) { bufferedSource => bufferedSource.getLines.toArray }
val aNodes: Array[String] = aLines(0).split('|').map(_.trim)
val aSegHigh: Array[Long] = aLines(1).split('|').map(_.trim.toLong)
val aSegLow: Array[Long] = aLines(2).split('|').map(_.trim.toLong)
// prealloc an array of n case classes (null) objects
val aNodeSeg = new Array[nodeSeg](aNodes.length)
// fill our array with case class objects
for ((node, elementNum) <- aNodes.zipWithIndex) {
aNodeSeg(elementNum) = nodeSeg(elementNum, node, aSegLow(elementNum), aSegHigh(elementNum))
}
val aNodeSegSorted = aNodeSeg.sortBy(r => (r.segLow))
val sNodeSegUDFgen_sql = args(0) + ".sql"
var segNodes = s"CREATE OR REPLACE FUNCTION test.segmentationNode(seg INT) RETURN varchar(128) AS BEGIN RETURN (\nCASE"
val fNodeSegUDFgen_sql = new File(sNodeSegUDFgen_sql)
val bwNodeSegUDFgen_sql = new BufferedWriter(new FileWriter(fNodeSegUDFgen_sql))
val fInsSelParallel_conf = new File(sInsSelParallelConf)
val bwInsSelParallel_conf = new BufferedWriter(new FileWriter(fInsSelParallel_conf))
// build our SQL UDF string
var sNodeNameAndIPaddr = ""
for ((node, elementNum) <- aNodeSegSorted.zipWithIndex) {
sNodeNameAndIPaddr += s"${elementNum + 1}|${node.nodeName}|${nodeMap(node.nodeName)}\n"
if (node.segHigh < node.segLow) {
// special case logic for lower and upper bounds from Vertica Eng
segNodes += s"\n WHEN ( (seg between 0 and ${node.segHigh}) or (seg >= ${node.segLow})) then '${node.nodeName}'"
}
else {
segNodes += s"\n WHEN (seg between ${node.segLow}\tand ${node.segHigh})\tthen '${node.nodeName}'"
}
}
segNodes += " ELSE 'unk' END);\nEND;\n"
using(bwNodeSegUDFgen_sql) {
bwNodeSegUDFgen_sql => {
bwNodeSegUDFgen_sql.write(segUDF)
bwNodeSegUDFgen_sql.append(segNodes)
}
}
println(s"INFO: vSQL UDF scripts created in file (${sNodeSegUDFgen_sql})")
using(bwInsSelParallel_conf) {
bwInsSelParallel_conf => {
bwInsSelParallel_conf.write(sNodeNameAndIPaddr)
}
}
println(s"INFO: InsSelParallel.conf file created (${sInsSelParallelConf})")
}
object Control {
def using[A <: {def close() : Unit}, B](param: A)(f: A => B): B =
try {
f(param)
}
finally {
param.close()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment