Skip to content

Instantly share code, notes, and snippets.

@pdparchitect
Created April 25, 2021 21:37
Show Gist options
  • Save pdparchitect/d004863e1e54e980c9635065245787fc to your computer and use it in GitHub Desktop.
Save pdparchitect/d004863e1e54e980c9635065245787fc to your computer and use it in GitHub Desktop.
Graph data structure based on sqlite
const { PassThrough } = require('stream')
const { EventEmitter } = require('events')
const sqlite3 = require('sqlite3').verbose()
const { CssSelectorParser } = require('css-selector-parser')
class Graph {
constructor() {
this.db = null
this.parser = new CssSelectorParser()
}
async open(database) {
return new Promise((resolve) => {
this.db = new sqlite3.Database('./test.db')
this.db.serialize(() => {
this.db.run("CREATE TABLE IF NOT EXISTS node (id TEXT PRIMARY KEY, type TEXT NOT NULL, label TEXT NOT NULL, props TEXT)")
this.db.run("CREATE TABLE IF NOT EXISTS edge (id TEXT PRIMARY KEY, type TEXT NOT NULL, label TEXT NOT NULL, props TEXT, source TEXT NOT NULL, target TEXT NOT NULL)")
})
resolve()
})
}
async close() {
return new Promise((resolve, reject) => {
this.db.close((err) => {
if (err) {
reject(err)
}
else {
delete this.db
resolve()
}
})
})
}
async addNode({ id = '', type = '', label = '', props = {} }) {
return new Promise((resolve, reject) => {
const stmt = this.db.prepare("INSERT INTO node VALUES (?, ?, ?, ?)")
stmt.run(id, type, label, JSON.stringify(props), (err) => {
if (err) {
reject(err)
}
else {
resolve()
}
})
})
}
async addEdge({ id = '', type = '', label = '', props = {}, source = '', target = '' }) {
return new Promise((resolve, reject) => {
const stmt = this.db.prepare("INSERT INTO edge VALUES (?, ?, ?, ?, ?, ?)")
stmt.run(id, type, label, source, target, JSON.stringify(props), (err) => {
if (err) {
reject(err)
}
else {
resolve()
}
})
})
}
async add({ edge, edges = [edge], ...node }) {
await this.addNode(node)
if (edges) {
edges = edges.filter(edge => edge)
if (!Array.isArray(edges)) {
edges = [edges]
}
await Promise.all(edges.map(async(edge) => {
if (typeof(edge) !== 'object') {
edge = { target: edge }
}
else {
edge = { ...edge }
}
if (!edge.source) {
edge.source = node.id
}
this.addEdge(edge)
}))
}
}
async * iterateOverEmitter(emitter, options) {
const { yieldEvent = 'item', errorEvent = 'error', doneEvent = 'end' } = options || {}
const stream = new PassThrough({ objectMode: true })
const yieldEventHandler = (i) => {
stream.write(i)
}
const errorEventHandler = (e) => {
stream.emit('error', e)
}
const doneEventHandler = () => {
emitter.removeListener(yieldEvent, yieldEventHandler)
emitter.removeListener(errorEvent, errorEventHandler)
emitter.removeListener(doneEvent, doneEventHandler)
stream.end()
}
emitter.addListener(yieldEvent, yieldEventHandler)
emitter.addListener(errorEvent, errorEventHandler)
emitter.addListener(doneEvent, doneEventHandler)
for await (const chunk of stream) {
yield await chunk
}
}
async * generateSelectorStatements(ast) {
switch (ast.type) {
case 'selectors':
const { selectors } = ast
for (let selector of selectors) {
yield* this.generateSelectorStatements(selector)
}
break
case 'ruleSet':
const { rule } = ast || {}
const { tagName, attrs } = rule || {}
for (let table of tagName ? [tagName] : ['node', 'edge']) {
if (!['node', 'edge'].includes(table)) {
throw new Error(`Unexpected tag ${table}`)
}
if (attrs) {
const expressions = []
const values = []
attrs.forEach(({ name, operator, value }) => {
if (!['id', 'type', 'label', 'source', 'target'].includes(name)) {
throw new Error(`Unexpected attribute ${name}`)
}
switch (operator) {
case '=':
expressions.push(`${name} = ?`)
break
default:
throw new Error(`Unsupported operator ${operator}`)
}
values.push(value)
})
yield this.db.prepare(`SELECT * FROM ${table} WHERE ${expressions.join(' AND ')}`).bind(...values)
}
else {
yield this.db.prepare(`SELECT * FROM ${table}`)
}
}
break
default:
throw new Error(`Unrecognized type ${ast.type}`)
}
}
async * select(selector) {
for await (let stmt of this.generateSelectorStatements(this.parser.parse(selector))) {
const em = new EventEmitter()
stmt.each(
(err, item) => {
if (err) {
em.emit('error', err)
}
else {
em.emit('item', item)
}
},
(err) => {
if (err) {
em.emit('error', err)
}
else {
em.emit('end')
}
}
)
yield* this.iterateOverEmitter(em)
}
}
}
const main = async() => {
const v = new Graph()
await v.open('./test.db')
await v.add({ id: Math.random().toString(32).slice(2), label: 'test' })
for await (let node of v.select('node[label="test"]')) {
console.log(node)
}
}
main().catch(console.error)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment