Skip to content

Instantly share code, notes, and snippets.

@nurav
Created June 6, 2016 20:02
Show Gist options
  • Save nurav/675f7b369e648028d6df648350fabdbe to your computer and use it in GitHub Desktop.
Save nurav/675f7b369e648028d6df648350fabdbe to your computer and use it in GitHub Desktop.
def updateRelease(self, name, changed_by, old_data_version, product=None, read_only=None, blob=None, transaction=None):
if product or blob:
self._proceedIfNotReadOnly(name, transaction=transaction)
what = {}
if read_only is not None:
what['read_only'] = read_only
if product:
what['product'] = product
if blob:
blob.validate()
# Blob schemas often require a name property but we can't assume so here
if blob.get("name"):
# If they do, we should not let the column and the in-blob name be different.
if name != blob["name"]:
raise ValueError("name in database (%s) does not match name in blob (%s)" % (name, blob.get("name")))
if self.containsForbiddenDomain(blob, product):
raise ValueError("Release blob contains forbidden domain.")
what['data'] = blob.getJSON()
try:
self.update(where=[self.name == name], what=what, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction)
except OutdatedDataError as e:
import pdb; pdb.set_trace()
if blob:
ancestor_blob = self.history.getChange(data_version=old_data_version,
column_values={'name': name},
transaction=transaction) \
.get('data')
tip_release = self.getReleases(name=name,
transaction=transaction)[0]
tip_blob = tip_release['data']
m = merge.Merger(ancestor_blob, tip_blob, blob, {})
try:
m.run()
unified_blob = patch(m.unified_patches, ancestor_blob)
what['data'] = unified_blob.getJSON()
tip_data_version = tip_release['data_version']
self.update(where=[self.name == name], what=what, changed_by=changed_by,
old_data_version=tip_data_version, transaction=transaction)
except UnresolvedConflictsException:
raise e
new_data_version = old_data_version + 1
cache.put("blob", name, {"data_version": new_data_version, "blob": blob})
cache.put("blob_version", name, new_data_version)
def testReleasePutUpdateMergeableOutdatedData(self):
data = json.dumps(dict(detailsUrl='blah', fakePartials=True, schema_version=1))
blob1 = """
{
"name": "dd",
"schema_version": 1,
"detailsUrl": "blah",
"fakePartials": true,
"hashFunction": "sha512",
"platforms": {
"p": {
"locales": {
"dd": {
"complete": {
"filesize": 1234,
"from": "*",
"hashValue": "abc"
}
},
}
}
}
}"""
blob2 = """
{
"name": "dd",
"schema_version": 1,
"detailsUrl": "blah",
"fakePartials": true,
"hashFunction": "sha512",
"platforms": {
"p": {
"locales": {
"dd": {
"complete": {
"filesize": 1234,
"from": "*",
"hashValue": "abc"
}
}
}
}
}
}"""
result_blob = """
{
"name": "dd",
"schema_version": 1,
"detailsUrl": "blah",
"fakePartials": true,
"hashFunction": "sha512",
"platforms": {
"p": {
"locales": {
"dd": {
"complete": {
"filesize": 1234,
"from": "*",
"hashValue": "abc"
}
},
"dd1": {
"complete": {
"filesize": 1235,
"from": "*",
"hashValue": "abc"
}
}
}
}
}
}"""
# Testing Put request to add new release
ret = self._put('/releases/dd', data=dict(data=data, name='dd',
blob=blob1, product='dd', data_version=1))
self.assertStatusCode(ret, 201)
ret = select([dbo.releases.data]).where(dbo.releases.name == 'dd').execute().fetchone()[0]
self.assertEqual(json.loads(ret), json.loads(blob))
# Updating same release
data = json.dumps(dict(detailsUrl='blah', fakePartials=True, schema_version=1))
ret = self._put('/releases/dd', data=dict(data=data, name='dd',
product='dd', blob=blob1, data_version=1))
self.assertStatusCode(ret, 200)
self.assertEqual(ret.data, json.dumps(dict(new_data_version=2)), "Data: %s" % ret.data)
# Updating release with outdated data, testing if merged correctly
data = json.dumps(dict(detailsUrl='blah', fakePartials=True, schema_version=1))
ret = self._put('/releases/dd', data=dict(data=data, name='dd',
product='dd', blob=blob1, data_version=1))
self.assertStatusCode(ret, 200)
self.assertEqual(ret, json.dumps(dict(new_data_version=2)), "Data: %s" % ret.data)
ret = select([dbo.releases.data]).where(dbo.releases.name == 'dd').execute().fetchone()[0]
self.assertEqual(json.loads(ret), json.loads(result_blob))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment