Skip to content

Instantly share code, notes, and snippets.

@eridal
Created Jul 28, 2017
Embed
What would you like to do?
Build a dot model from a aws data pipeline json definition
function merge (into, object) {
Object
.keys(object)
.forEach(k => {
let val = object[k]
if (val){
if (Array.isArray(val)) {
into[k] = [].concat(into[k] || [], val)
}
else if (typeof val === 'object') {
into[k] = into[k] ? merge(into[k], val) : val
} else {
into[k] = val
}
}
})
return into
}
function wrap (...fns) {
return (obj) => fns.reduce((node, fn) => {
return merge(node, fn(obj, node))
}, {})
}
function get (expr, object) {
return expr
.split('.')
.reduce((ctx, key) => ctx && ctx[key], object)
}
let colors = {
data: '#DBDB8D',
config: '#C5B0D5',
activity: '#9EDAE5',
database: '#C5B0D5',
schedule: '#F7B6D2',
resource: '#C7C7C7',
format: '#C5B0D5',
lines: '#656565', // #C7C7C7', #8F8F8F
}
let arrow = (obj, style) => ({
node: {
id: obj.id,
name: obj.name,
},
style: merge({ color: colors.lines }, style || {})
})
let mixins = {
color: (color) => () => ({
styles: {
style: 'filled',
fillcolor: `${color || 'red'};0.1:white`,
},
}),
depends: (...fields) => mixins._into('depends', fields),
requires: (...fields) => mixins._into('requires', fields),
_into: (property, fields) => (obj) => {
let node = {}
node[property] = fields
.map(k => get(k, obj))
.filter(Boolean)
.map(val => arrow(val))
return node
},
}
function Node (obj) {
return {
id: obj.id,
name: obj.name,
type: obj.type,
styles: {
shape: 'box',
},
depends: [].concat(
obj.dependsOn ? arrow(obj.dependsOn) : [],
obj.runsOn ? arrow(obj.runsOn, { style: 'dashed' }) : [],
obj.input ? arrow(obj.input) : []
),
requires: [].concat(
obj.output ? arrow(obj.output) : []
),
}
}
let Config = wrap(Node, mixins.color(colors.config ), mixins.requires('schedule'))
let Schedule = wrap(Node, mixins.color(colors.schedule))
let Activity = wrap(Node, mixins.color(colors.activity), mixins.depends('schedule'))
let Data = wrap(Node, mixins.color(colors.data ), mixins.depends('dataFormat'))
let Resource = wrap(Node, mixins.color(colors.resource), mixins.depends('schedule'))
let Format = wrap(Node, mixins.color(colors.format ))
let Database = wrap(Node, mixins.color(colors.database))
// https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-pipeline-objects.html
let types = {
Config,
Schedule,
Data,
DynamoDBDataNode: Data,
MySqlDataNode: Data,
RedshiftDataNode: wrap(Data, mixins.requires('database')),
S3DataNode: Data,
Format,
CSV: Format,
Custom: Format,
DynamoDBDataFormat: Format,
DynamoDBExportDataFormat: Format,
RegEx: Format,
TSV: Format,
Database,
JdbcDatabase: Database,
RdsDatabase: Database,
RedshiftDatabase: Database,
Resource,
WorkerGroup: Resource,
EmrCluster: Resource,
Ec2Resource: Resource,
Activity,
SqlActivity: wrap(Activity, mixins.requires('database')),
RedshiftCopyActivity: Activity,
ShellCommandActivity: Activity,
HiveActivity: Activity,
}
let error = (fn) => (obj) => { throw fn(obj) }
let missingType = (obj) => new Error(`missing type for node ${obj.id}`)
let read = (file) => resolve(require(file).objects)
.map((obj, ix) => (obj.ix = ix, obj))
.map(obj => (types[obj.type] || error(missingType))(obj))
let propsOf = (obj) => Object
.keys(obj)
.map(key => ({ key, value: obj[key] }))
let resolve = (objects) => {
// create worker groups
let workerGroups = [];
objects.forEach(obj => {
if (obj.workerGroup) {
let workGroup = workerGroups.find(wg => wg.name == obj.workerGroup)
if (!workGroup) {
workerGroups.push(workGroup = {
id: `WorkGroup_${workerGroups.length}`,
name: obj.workerGroup,
type: 'WorkerGroup',
});
}
obj.runsOn = { ref: workGroup.id }
}
})
objects = objects.concat(workerGroups)
// replace references with objects
objects.forEach(object => propsOf(object)
.forEach(p => {
if (p.value.ref) {
object[p.key] = objects.find(obj => obj.id === p.value.ref)
}
})
)
// connect config
objects.find(obj => obj.id === 'Default').type = 'Config'
// schedules
let schedule = objects.find(obj => obj.id === 'DefaultSchedule')
objects
.filter(obj => /(Resource|Cluster)/.test(obj.type))
.forEach(obj => obj.schedule = obj.schedule || schedule)
objects
.filter(obj => /(Activity)/.test(obj.type))
.forEach(obj => {
if (obj.runsOn && obj.runsOn.schedule === schedule) {
delete obj.schedule
}
})
return objects
}
let render = (nodes) => {
let renders = {
style: (styles) => `${Object.keys(styles).map(k => `${k}=${renders.labelOrTable(styles[k])}`).join(', ')}`,
labelOrTable: (text) => text.substr(0, 2) === '<<' ? text : `"${text}"`,
name: (name) => name.replace(/#\{([^}]+)\}/g, '$1'),
table: (lines) => `<<TABLE border="0">${
lines.map(line => `<TR><TD>${line}</TD></TR>`).join('')
}</TABLE>>`,
label: (node) => renders.table([
`<B>${node.type}</B>`,
renders.name(node.name),
]),
props: (props) => Object
.keys(props)
.reduce((list, key) => list.concat({name: key, styles: props[key]}), [])
.map(renders.prop),
prop: (prop) => `${prop.name} [${renders.style(prop.styles)}];`,
node: (node) => {
let styles = Object.assign(node.styles, {
label: renders.label(node)
})
return `${node.id} [${renders.style(styles)}];`
},
arrow: (from, till, styles) => {
let style = styles ? ` [${renders.style(styles)}]` : ''
return `${from.id} -> ${till.id} ${style};`
},
each: (xs, render) => xs.reduce((lines, node, i) => lines.concat(render(node, i)), []),
}
let props = {
node: {
shape: 'record',
},
}
let lines = [].concat(
renders.props(props),
'// nodes',
renders.each(nodes, renders.node),
'// requires',
renders.each(nodes, node => node.requires.map(rel => renders.arrow(node, rel.node, rel.style))),
'// depends',
renders.each(nodes, node => node.depends.map(rel => renders.arrow(rel.node, node, rel.style))),
'// end'
);
return `digraph {\n ${lines.join('\n ')}\n}`
}
let execute = (file) => {
console.log(`// ${file}`)
console.log(render(read(file)))
}
let path = require('path')
execute(
path.resolve(process.argv[2])
)
#!/usr/bin/env bash
js dp2dot.js "$1" | dot -Tsvg > "$1.svg"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment