Skip to content

Instantly share code, notes, and snippets.

@alienrobotwizard
Created October 30, 2014 21:45
Show Gist options
  • Save alienrobotwizard/9a655ed135d3b1bbadb7 to your computer and use it in GitHub Desktop.
Save alienrobotwizard/9a655ed135d3b1bbadb7 to your computer and use it in GitHub Desktop.
Translate Lipstick Graph (P2jPlanPackage) to more general graph
#!/usr/bin/env ruby
require 'rubygems'
require 'json'
class P2jPlanPackage
attr_accessor :optimized
attr_accessor :unoptimized
attr_accessor :script, :status, :userName, :jobName, :uuid
attr_accessor :sampleOutputMap
def initialize optimized, unoptimized, status, script, uuid, jobName, userName, sampleOutputMap
@optimized = optimized
@unoptimized = unoptimized
@script = script
@status = status
@uuid = uuid
@jobName = jobName
@userName = userName
@sampleOutputMap = sampleOutputMap
end
def self.from_json json
data = JSON.parse(json, {:symbolize_names => true})
optimized = P2jPlan.from_hash(data[:optimized])
unoptimized = P2jPlan.from_hash(data[:unoptimized])
status = P2jPlanStatus.from_hash(data[:status])
script = P2jScripts.from_hash(data[:scripts])
uuid = data[:uuid]
jobName = data[:jobName]
userName = data[:userName]
sampleOutputMap = {}
if data[:sampleOutputMap]
sampleOutputMap = data[:sampleOutputMap].inject({}) do |hsh, sol|
if sol.last[:sampleOutputList]
hsh[sol.first.to_s] = sol.last[:sampleOutputList].map{|so| P2jSampleOutput.from_hash(so) }
end
hsh
end
end
P2jPlanPackage.new(optimized, unoptimized, status, script, uuid, jobName, userName, sampleOutputMap)
end
def translate_node nodeMap, nodeGroupMap, prefix, op_id, op
node_id = prefix+'-'+op_id.to_s
ng_id = prefix+'-'+op.mapReduce.jobId
nodeMap[node_id] = op.to_node(prefix)
ng = nodeGroupMap[ng_id]
ng ||= {}
ng[:children] ||= []
ng[:children] << node_id
nodeGroupMap[ng_id] = ng
end
def node_for_node_group id, child
{:id => id, :child => child}
end
def translate_node_group id, name, ng
ng[:id] = id
original_scope = name.split('-',2).last
if status.jobStatusMap
jobStatus = status.jobStatusMap.values.find{|x| x.scope == original_scope}
if jobStatus
mapProgress = jobStatus.mapProgress ? jobStatus.mapProgress.to_f : 0
reduceProgress = jobStatus.reduceProgress ? jobStatus.reduceProgress.to_f : 0
statusText = ""
if jobStatus.isComplete
if jobStatus.isSuccessful
statusText = "finished"
else
statusText = "failed"
end
elsif (mapProgress > 0)
statusText = "running"
end
ng[:status] = {
:startTime => jobStatus.startTime,
:endTime => jobStatus.finishTime
}
ng[:status][:statusText] = statusText if statusText
if jobStatus.isSuccessful
ng[:status][:progress] = 100
else
ng[:status][:progress] = ((mapProgress + reduceProgress)/2 * 100).to_i
end
ng[:properties] = {
:counters => jobStatus.counters.inject({}){|hsh, cnt| hsh[cnt.first] = cnt.last.to_hash; hsh},
:warnings => jobStatus.warnings.inject({}){|hsh, wn| hsh[wn.first] = wn.last.to_hash; hsh}
}
end
end
ng
end
def translate_edge u, v
edge = {:u => u[:id], :v => v[:id]}
startScope = u[:properties][:scope]
endScope = v[:properties][:scope]
if startScope != endScope
actualScope = startScope.split("-",2).last
sampleOutput = sampleOutputMap[actualScope]
if sampleOutput
schema = u[:properties][:schema]
properties = {
:schema => schema,
:sampleOutput => sampleOutput.map{|x| x.sampleOutput}
}
edge[:properties] = properties
end
end
edge
end
def to_graph
graph = {}
nodes = []
node_groups = []
edges = []
properties = {}
nodeMap = {}
nodeGroupMap = {}
optimized.plan.each do |op_id, op|
translate_node(nodeMap, nodeGroupMap, 'optimized', op_id, op)
end
unoptimized.plan.each do |op_id, op|
translate_node(nodeMap, nodeGroupMap, 'unoptimized', op_id, op)
end
nodes += nodeMap.values
idx = 0
optimized_children = []
unoptimized_children = []
nodeGroupMap.each do |ng_name, ng|
nodes << node_for_node_group(ng_name, idx.to_s)
node_groups << translate_node_group(idx.to_s, ng_name, ng)
if ng_name.start_with?('optimized')
optimized_children << ng_name
else
unoptimized_children << ng_name
end
idx += 1
end
# node group for optimized
nodes << {:id => 'optimized', :child => idx.to_s}
node_groups << {:id => idx.to_s, :children => optimized_children}
idx += 1
# node group for unoptimized
nodes << {:id => 'unoptimized', :child => idx.to_s}
node_groups << {:id => idx.to_s, :children => unoptimized_children}
idx += 1
nodeMap.values.each do |node|
succs = node[:properties][:successors]
succs.each do |succ|
edges << translate_edge(node, nodeMap[succ])
end
end
properties[:userName] = userName
properties[:script] = script.to_hash
graph[:id] = uuid
graph[:name] = jobName
graph[:properties] = properties
graph[:status] = status.to_graph_status
graph[:edges] = edges
graph[:node_groups] = node_groups
graph[:nodes] = nodes
graph
end
class P2jPlan
attr_accessor :plan
def initialize plan
@plan = plan
end
def self.from_hash data
plan_map = data[:plan].inject({}) do |plan, op|
plan[op.first] = P2jLogicalRelationalOperator.from_hash(op.last)
plan
end
P2jPlan.new(plan_map)
end
end
class P2jPlanStatus
attr_accessor :startTime, :endTime, :heartbeatTime
attr_accessor :jobStatusMap, :progress, :statusText
def initialize startTime, endTime, heartbeatTime, jobStatusMap, progress, statusText
@startTime = startTime
@endTime = endTime
@heartbeatTime = heartbeatTime
@jobStatusMap = jobStatusMap
@progress = progress
@statusText = statusText
end
def self.from_hash data
startTime = data[:startTime]
endTime = data[:endTime]
heartbeatTime = data[:heartbeatTime]
jobStatusMap = data[:jobStatusMap].inject({}){|hsh, js| hsh[js.first] = P2jJobStatus.from_hash(js.last); hsh}
progress = data[:progress]
statusText = data[:statusText]
P2jPlanStatus.new(startTime, endTime, heartbeatTime, jobStatusMap, progress, statusText)
end
def to_graph_status
r = {}
r[:startTime] = startTime if startTime
r[:endTime] = endTime if endTime
r[:heartbeatTime] = heartbeatTime if heartbeatTime
r[:progress] = progress if progress
r[:statusText] = statusText if statusText
r
end
end
class P2jJobStatus
attr_accessor :counters, :warnings, :scope, :jobId, :jobName, :trackingUrl
attr_accessor :isComplete, :isSuccessful, :mapProgress, :reduceProgress
attr_accessor :totalMappers, :totalReducers, :startTime, :finishTime
attr_accessor :recordsWritten, :bytesWritten
def initialize(counters, warnings, scope, jobId, jobName, trackingUrl,
isComplete, isSuccessful, mapProgress, reduceProgress, totalMappers,
totalReducers, startTime, finishTime, recordsWritten, bytesWritten)
@counters = counters
@warnings = warnings
@scope = scope
@jobId = jobId
@jobName = jobName
@trackingUrl = trackingUrl
@isComplete = isComplete
@isSuccessful = isSuccessful
@mapProgress = mapProgress
@reduceProgress = reduceProgress
@totalMappers = totalMappers
@totalReducers = totalReducers
@startTime = startTime
@finishTime = finishTime
@recordsWritten = recordsWritten
@bytesWritten = bytesWritten
end
def self.from_hash data
counters = data[:counters].inject({}){|hsh, cnter| hsh[cnter.first] = P2jCounters.from_hash(cnter.last); hsh}
warnings = data[:warnings].inject({}){|hsh, wn| hsh[wn.first] = P2jWarning.from_hash(wn.last); hsh}
scope = data[:scope]
jobId = data[:jobId]
jobName = data[:jobName]
trackingUrl = data[:trackingUrl]
isComplete = data[:isComplete]
isSuccessful = data[:isSuccessful]
mapProgress = data[:mapProgress]
reduceProgress = data[:reduceProgress]
totalMappers = data[:totalMappers]
totalReducers = data[:totalReducers]
startTime = data[:startTime]
finishTime = data[:finishTime]
recordsWritten = data[:recordsWritten]
bytesWritten = data[:bytesWritten]
P2jJobStatus.new(counters, warnings, scope, jobId, jobName, trackingUrl,
isComplete, isSuccessful, mapProgress, reduceProgress, totalMappers,
totalReducers, startTime, finishTime, recordsWritten, bytesWritten)
end
end
class P2jCounters
attr_accessor :counters
def initialize counters
@counters = counters
end
def self.from_hash data
P2jCounters.new(data[:counters])
end
def to_hash
counters
end
end
class P2jWarning
attr_accessor :warningAttributes, :jobId, :warningKey
def initialize warningAttributes, jobId, warningKey
@warningAttributes = warningAttributes
@jobId = jobId
@warningKey = warningKey
end
def self.from_hash data
P2jWarning.new(data[:warningAttributes], data[:jobId], data[:warningKey])
end
def to_hash
{
:warningAttributes => warningAttributes, :jobId => jobId, :warningKey => warningKey
}
end
end
class P2jScripts
attr_accessor :script
def initialize script
@script = script
end
def self.from_hash data
P2jScripts.new(data[:script])
end
def to_hash
script
end
end
class P2jSampleOutput
attr_accessor :schemaString, :sampleOutput
def initialize schemaString, sampleOutput
@schemaString = schemaString
@sampleOutput = sampleOutput
end
def self.from_hash data
P2jSampleOutput.new(data[:schemaString], data[:sampleOutput])
end
end
class P2jLogicalRelationalOperator
attr_accessor :alias, :location, :mapReduce, :operator, :uid
attr_accessor :predecessors, :successors, :schema, :schemaString
attr_accessor :expression, :storageFunction, :storageLocation, :group, :join
def initialize aliaz, location, mapReduce, operator, uid, preds, succs, schema, schemaStr
@alias = aliaz
@location = location
@mapReduce = mapReduce
@operator = operator
@uid = uid
@predecessors = preds
@successors = succs
@schema = schema
@schemaString = schemaStr
end
def self.from_hash data
aliaz = data[:alias]
location = Location.from_hash(data[:location])
mapReduce = MRStage.from_hash(data[:mapReduce])
operator = data[:operator]
uid = data[:uid]
preds = data[:predeccessors]
succs = data[:successors]
schema = []
if data[:schema]
schema = data[:schema].map{|s| SchemaElement.from_hash(s)}
end
schemaStr = data[:schemaString]
P2jLogicalRelationalOperator.new(aliaz, location, mapReduce, operator, uid, preds, succs, schema, schemaStr)
end
def to_node prefix
r = {
:type => "PigNode",
:id => "#{prefix}-#{uid}"
}
properties = {
:alias => self.send(:alias),
:schema => schema.map{|se| se.to_hash},
:location => location.to_hash,
:operation => operator.gsub("LO", "").upcase,
:scope => "#{prefix}-#{mapReduce.jobId}",
:successors => successors.map{|succ| "#{prefix}-#{succ}"},
:step_type => mapReduce.stepType
}
properties[:expression] = expression if expression
properties[:storage_function] = storageFunction if storageFunction
properties[:storage_location] = storageLocation if storageLocation
properties[:group] = group.to_hash if group
properties[:join] = join.to_hash if join
r[:properties] = properties
r
end
class Join
attr_accessor :expression, :strategy, :type
def initialize strategy, type, expression
@strategy = strategy
@type = type
@expression = expression
end
def self.from_hash data
strategy = data[:strategy]
type = data[:type]
expression = data[:expression].inject({}){|hsh, exp| hsh[exp.first] = JoinExpression.from_hash(exp.last); hsh}
Join.new(strategy, type, expression)
end
def to_hash
{
:by => expression.inject([]){|arr, e| arr << {:alias => e.first, :fields => e.last.fields}; arr},
:strategy => strategy,
:type => type
}
end
end
class Location
attr_accessor :filename, :line, :macro
def initialize line, filename, macro
@line = line
@filename = filename
@macro = macro
end
def self.from_hash data
Location.new(data[:line], data[:filename], data[:macro])
end
def to_hash
r = {
:line => line, :filename => filename
}
r[:macro] = macro if macro
r
end
end
class MRStage
attr_accessor :jobId, :stepType
def initialize jobId, stepType
@jobId = jobId
@stepType = stepType
end
def self.from_hash data
MRStage.new(data[:jobId], data[:stepType])
end
end
end
class P2jLOCogroup < P2jLogicalRelationalOperator
attr_accessor :group
def initialize group
@group = group
end
def self.from_hash data
P2jLOCogroup.new(Join.from_hash(data[:group]))
end
end
class P2jLOFilter < P2jLogicalRelationalOperator
attr_accessor :expression
def initialize expression
@expression = expression
end
def self.from_hash data
P2jLOFilter.new(data[:expression])
end
end
class P2jLOJoin < P2jLogicalRelationalOperator
attr_accessor :join
def initialize join
@join = join
end
def self.from_hash data
P2jLOJoin.new(Join.from_hash(data[:join]))
end
end
class P2jLOLimit < P2jLogicalRelationalOperator
attr_accessor :rowLimit
def initialize rowLimit
@rowLimit = rowLimit
end
def self.from_hash data
P2jLOLimit.new(data[:rowLimit])
end
end
class P2jLOLoad < P2jLogicalRelationalOperator
attr_accessor :storageLocation, :storageFunction
def initialize storageLocation, storageFunction
@storageLocation = storageLocation
@storageFunction = storageFunction
end
def self.from_hash data
P2jLOLoad.new(data[:storageLocation], data[:storageFunction])
end
end
class P2jLOSplitOutput < P2jLogicalRelationalOperator
attr_accessor :expression
def initialize expression
@expression = expression
end
def self.from_hash data
P2jLOSplitOutput.new(data[:expression])
end
end
class P2jLOStore < P2jLogicalRelationalOperator
attr_accessor :storageLocation, :storageFunction
def initialize storageLocation, storageFunction
@storageLocation = storageLocation
@storageFunction = storageFunction
end
def self.from_hash data
P2jLOStore.new(data[:storageLocation], data[:storageFunction])
end
end
class JoinExpression
attr_accessor :fields
def initialize fields
@fields = fields
end
def self.from_hash data
JoinExpression.new(data)
end
end
class SchemaElement
attr_accessor :alias, :type, :schemaElements
def initialize aliaz, type, schemaElements
@alias = aliaz
@type = type
@schemaElements = schemaElements
end
def self.from_hash data
schemaElements = []
if data[:schemaElements]
schemaElements = schemadata[:schemaElements].map{|se| SchemaElement.from_hash(se) }
end
SchemaElement.new(data[:alias], data[:type], schemaElements)
end
def to_hash
r = {
:alias => self.send(:alias),
:type => type,
}
if schemaElements.size > 0
r[:schema] = schemaElements.map{|se| se.to_hash}
end
r
end
end
end
puts P2jPlanPackage.from_json(File.read(ARGV[0])).to_graph.to_json
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment