Skip to content

Instantly share code, notes, and snippets.

@aszlig
Last active December 22, 2015 09:08
Show Gist options
  • Save aszlig/6449773 to your computer and use it in GitHub Desktop.
Save aszlig/6449773 to your computer and use it in GitHub Desktop.
This updates the NixOps state from the old (and **unreleased**) Hetzner backend state format to the new one. So if you always use the release versions, you won't need this.
with import <nixpkgs> {};
stdenv.mkDerivation rec {
name = "nixops-update-hetzner-netinfo";
src = ./update.py;
buildInputs = [ python pythonPackages.wrapPython ];
pythonPath = [ pythonPackages.sqlite3 ];
phases = [ "installPhase" "fixupPhase" ];
installPhase = ''
install -vD "$src" "$out/bin/${name}"
wrapPythonPrograms
'';
}
#!/usr/bin/env python
import os
import re
import sqlite3
import string
import json
from textwrap import dedent
def _fold_string(value, rules):
folder = lambda val, rule: val.replace(rule[0], rule[1])
return reduce(folder, rules, value)
def nixmerge(expr1, expr2):
"""
Merge both expressions into one, merging dictionary keys and appending list
elements if they otherwise would clash.
"""
def _merge_dicts(d1, d2):
out = {}
for key in set(d1.keys()).union(d2.keys()):
if key in d1 and key in d2:
out[key] = _merge(d1[key], d2[key])
elif key in d1:
out[key] = d1[key]
else:
out[key] = d2[key]
return out
def _merge(e1, e2):
if isinstance(e1, dict) and isinstance(e2, dict):
return _merge_dicts(e1, e2)
elif isinstance(e1, list) and isinstance(e2, list):
return list(set(e1).union(e2))
else:
err = "unable to merge {0} with {1}".format(type(e1), type(e2))
raise ValueError(err)
return _merge(expr1, expr2)
class ParseFailure(Exception):
def __init__(self, pos, msg=None):
self.pos = pos
self.msg = msg
def __str__(self):
if self.msg is None:
return "parse error at position {0}".format(self.pos)
else:
return self.msg + " (pos: {0})".format(self.pos)
class ParseSuccess(object):
def __init__(self, pos, data):
self.pos = pos
self.data = data
RE_FLAGS = re.DOTALL | re.IGNORECASE
RE_STRING = re.compile(r"\"\"|''''|\"(.*?[^\\])\"|''(.*?[^'])?''(?!\$\{|')",
RE_FLAGS)
RE_ATTR = re.compile(r'"(.*?(?![^\\]\\))"|([a-z_][a-z0-9_]*)', RE_FLAGS)
RE_FUNHEAD = re.compile(r'(?:\s*(?:{.*?}|[a-z_][a-z0-9_]*)\s*:)+', RE_FLAGS)
RE_RAWVAL = re.compile(r'(?:\s*(?:<[^>]+>|\([^)]+\)|[a-z!+._][a-z0-9_]*))+',
RE_FLAGS)
def nix2py(source):
"""
Parse the given Nix source code into a tree of Python values (dicts, lists,
strings and whatnot) and return it.
This parser is NOT a complete parser and only parses a subset of Nix,
mainly attribute sets, lists and strings. So everything that is out of
scope of this parser will be put into a RawValue, so it can still be pretty
printed.
"""
maxpos = len(source)
def _is_char(pos, char):
return pos < maxpos and source[pos] == char
def _skip_whitespace(pos):
while pos < maxpos and source[pos].isspace():
pos += 1
return pos
def _parse_string(pos):
match = RE_STRING.match(source, pos)
if match is None:
return ParseFailure(pos, "string expected")
if match.group(1) is not None:
data = _fold_string(match.group(1), [
(r'\"', '"'),
(r'\n', "\n"),
(r'\t', "\t"),
(r'\${', "${"),
('\\\\', "\\"),
])
elif match.group(2) is not None:
data = _fold_string(dedent(match.group(2)), [
("'''", "''"),
(r"'\n", "\n"),
(r"'\t", "\t"),
(r"''${", "${"),
]).lstrip('\n')
else:
data = ""
return ParseSuccess(match.end(), data)
def _parse_int(pos):
mul = 1
if source[pos:pos+15] == "builtins.sub 0 ":
pos += 15
mul = -1
data = ""
while pos < maxpos and source[pos].isdigit():
data += source[pos]
pos += 1
if len(data) == 0:
return ParseFailure(pos, "integer expected")
else:
return ParseSuccess(pos, int(data) * mul)
def _parse_bool(pos):
if source[pos:pos+4] == "true":
return ParseSuccess(pos + 4, True)
elif source[pos:pos+5] == "false":
return ParseSuccess(pos + 5, False)
else:
return ParseFailure(pos, "boolean expected")
def _parse_null(pos):
if source[pos:pos+4] == "null":
return ParseSuccess(pos + 4, None)
else:
return ParseFailure(pos, "null expected")
def _parse_list(pos):
items = []
if _is_char(pos, '['):
result = _parse_expr(pos + 1)
while isinstance(result, ParseSuccess):
items.append(result.data)
result = _parse_expr(result.pos)
newpos = _skip_whitespace(result.pos)
if _is_char(newpos, ']'):
return ParseSuccess(newpos + 1, items)
else:
return result
else:
return ParseFailure(pos, "start of list expected")
def _parse_attr(pos):
newpos = _skip_whitespace(pos)
match = RE_ATTR.match(source, newpos)
if match is None:
return ParseFailure(newpos, "invalid attribute set key")
if match.group(1):
data = _fold_string(match.group(1), [
(r'\"', '"'),
('\\\\', "\\"),
])
else:
data = match.group(2)
return ParseSuccess(match.end(), data)
def _parse_dotattr(pos):
attrs = []
attr = _parse_attr(pos)
newpos = pos
while isinstance(attr, ParseSuccess):
attrs.append(attr)
newpos = _skip_whitespace(attr.pos)
if _is_char(newpos, '.'):
newpos += 1
else:
break
attr = _parse_attr(newpos)
if len(attrs) == 0:
return ParseFailure(newpos, "empty attribute set key")
return ParseSuccess(attrs[-1].pos, [attr.data for attr in attrs])
def _parse_keyval(pos):
key = _parse_dotattr(pos)
if not isinstance(key, ParseSuccess):
return key
newpos = _skip_whitespace(key.pos)
if not _is_char(newpos, '='):
return ParseFailure(newpos, "attribute operator expected")
newpos += 1
value = _parse_expr(newpos)
if not isinstance(value, ParseSuccess):
return value
newpos = _skip_whitespace(value.pos)
if not _is_char(newpos, ';'):
return ParseFailure(newpos, "end of attribute expected")
return ParseSuccess(newpos + 1, (key.data, value.data))
def _reduce_keys(keys, value):
if len(keys) == 0:
return value
else:
return {keys[0]: _reduce_keys(keys[1:], value)}
def _postprocess_attrlist(attrs):
dictlist = []
for keys, value in attrs:
dictlist.append({keys[0]: _reduce_keys(keys[1:], value)})
if len(dictlist) == 0:
return {}
else:
return reduce(nixmerge, dictlist)
def _parse_attrset(pos):
attrs = []
if _is_char(pos, '{'):
keyval = _parse_keyval(pos + 1)
newpos = keyval.pos
while isinstance(keyval, ParseSuccess):
attrs.append(keyval.data)
newpos = keyval.pos
keyval = _parse_keyval(newpos)
newpos = _skip_whitespace(newpos)
if _is_char(newpos, '}'):
return ParseSuccess(newpos + 1, _postprocess_attrlist(attrs))
else:
return ParseFailure(newpos, "end of attribute set expected")
else:
return ParseFailure(pos, "start of attribute set expected")
def _parse_expr(pos):
newpos = _skip_whitespace(pos)
for parser in [_parse_string, _parse_int, _parse_bool, _parse_null,
_parse_list, _parse_attrset]:
result = parser(newpos)
if isinstance(result, ParseSuccess):
return result
return ParseFailure(newpos, "invalid expression")
result = _parse_expr(0)
if isinstance(result, ParseSuccess):
return result.data
else:
raise result
def update(conn):
try:
c = conn.cursor()
attrs = c.execute('SELECT machine, name, value FROM ResourceAttrs')
for machine, name, value in attrs.fetchall():
if name == 'hetzner.networkInfo':
try:
new_val = json.dumps(nix2py('{' + value + '}'))
except ParseFailure:
continue
elif name in ('hetzner.fsInfo', 'hetzner.hardwareInfo'):
if value.endswith('}'):
continue
new_val = '{\n' + value + '\n}'
if name == 'hetzner.hardwareInfo':
new_val = "{ pkgs, config, ... }:\n" + new_val
else:
continue
c.execute('UPDATE ResourceAttrs SET value = ? '
'WHERE name = ? AND machine = ?',
(new_val, name, machine))
except:
conn.rollback()
raise
conn.commit()
if __name__ == '__main__':
statefile = os.getenv('NIXOPS_STATE', None)
if statefile is None:
statefile = os.path.expanduser("~/.nixops/deployments.nixops")
update(sqlite3.connect(statefile))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment