Skip to content

Instantly share code, notes, and snippets.

@chartjes
Created January 11, 2016 14:29
Show Gist options
  • Save chartjes/8d643f2de9d288859b60 to your computer and use it in GitHub Desktop.
Save chartjes/8d643f2de9d288859b60 to your computer and use it in GitHub Desktop.
import base64
import datetime
import re
import requests
import time
import unittest
class Kinto_Sync(unittest.TestCase):
"""
Tests that given two Kinto servers you can successfully sync data
from one to the other
"""
def setUp(self):
self.master = "http://192.168.99.100:8888/v1/"
self.read_only = "http://192.168.99.100:8889/v1/"
self.auth_string = base64.b64encode('%s:%s' % ("testuser", "abc123"))
self.headers = {
"Authorization": "Basic %s" % self.auth_string,
"Content-Type": "application/json"
}
self.resource = 'buckets/test_bucket/collections/test_collection/records'
def test_sync(self):
# Create some records in that bucket and sleep for a bit
url = self.master + self.resource
data = '{"data": {"uniqid": "1-%s"}}' % self.get_timestamp()
r = requests.post(url, data, headers=self.headers)
time.sleep(2)
since = self.get_timestamp()
data = '{"data": {"uniqid": "1-%s"}}' % since
url = self.master + self.resource
r = requests.post(url, data, headers=self.headers)
# Get the list of changes since <timestamp>
url = self.master + self.resource + '?_since=%s' % since
request_headers = self.headers
r = requests.get(url, headers = request_headers)
timestamp = r.headers['Etag']
request_headers['If-Match'] = timestamp
rg = re.compile('.*?(\\d+)')
m = rg.search(timestamp)
url = self.master + self.resource + '?_since=%s' % m.group(1)
r = requests.get(url, headers = request_headers)
print r.headers
print r.contents
self.assertTrue(False)
def get_timestamp(self):
n = time.time()
d = datetime.datetime.fromtimestamp(n)
return int(time.mktime(d.timetuple()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment