Skip to content

Instantly share code, notes, and snippets.

@paulharter
Created March 13, 2015 15:39
Show Gist options
  • Save paulharter/ce78fa79ac2d6d384941 to your computer and use it in GitHub Desktop.
Save paulharter/ce78fa79ac2d6d384941 to your computer and use it in GitHub Desktop.
gateway_view_tests.py
import unittest
import time
import requests
import json
import uuid
SYNC_GATEWAY_HOST = "192.168.59.103"
SYNC_GATEWAY_PORT = "4984"
SYNC_GATEWAY_ADMIN_PORT = "4985"
SYNC_GATEWAY_NAME = "sync_gateway"
SYNC_GATEWAY_URL = "http://%s:%s/%s" % (SYNC_GATEWAY_HOST, SYNC_GATEWAY_ADMIN_PORT, SYNC_GATEWAY_NAME)
VIEW = {"_id": "_design/raw", "views": {"all": {"map": "function(doc) {emit(doc.type, doc);}"}}}
HEADERS = {"Content-Type": "application/json", "Accept": "application/json"}
class GateWayViewTests(unittest.TestCase):
def test_query_a_view(self):
"""
This fails
"""
# add a design doc with one view that emits (doc.type, doc)
view_id = "_design/raw"
url = "%s/%s" % (SYNC_GATEWAY_URL, view_id)
rsp = requests.put(url, data=json.dumps(VIEW), headers=HEADERS)
self.assertEqual(rsp.status_code, 201)
# add a document of type "dog"
dog_id = str(uuid.uuid4())
print "new dog id", dog_id
url = "%s/%s" % (SYNC_GATEWAY_URL, dog_id)
rsp = requests.put(url, data=json.dumps({"_id": dog_id, "type": "dog", "name": "fly"}), headers=HEADERS)
self.assertEqual(rsp.status_code, 201)
print rsp.content
# use the view to get all dogs
view_url = "%s/_design/raw/_view/all?key=\"dog\"" % SYNC_GATEWAY_URL
rsp = requests.get(view_url, headers={"Accept": "application/json"})
if rsp.status_code == 200:
results = rsp.json()
print "count", results["total_rows"]
for row in results["rows"]:
print "indexed dog id", row["id"]
self.assertTrue(any(row["id"] == unicode(dog_id) for row in results["rows"]))
else:
self.fail("couldn't query the view")
def test_query_a_view_twice_with_pauses(self):
"""
This passes
"""
time.sleep(2)
# add a design doc with one view that emits (doc.type, doc)
view_id = "_design/raw"
url = "%s/%s" % (SYNC_GATEWAY_URL, view_id)
rsp = requests.put(url, data=json.dumps(VIEW), headers=HEADERS)
self.assertEqual(rsp.status_code, 201)
# add a document of type "dog"
dog_id = str(uuid.uuid4())
print "new dog id", dog_id
url = "%s/%s" % (SYNC_GATEWAY_URL, dog_id)
rsp = requests.put(url, data=json.dumps({"_id": dog_id, "type": "dog", "name": "fly"}), headers=HEADERS)
self.assertEqual(rsp.status_code, 201)
print rsp.content
# use the view to get all dogs
view_url = "%s/_design/raw/_view/all?key=\"dog\"" % SYNC_GATEWAY_URL
rsp = requests.get(view_url, headers={"Accept": "application/json"})
# wait two seconds and repeat the request
print "*** PAUSE ***"
time.sleep(2)
# then re run exactly the same request
rsp = requests.get(view_url, headers={"Accept": "application/json"})
# check to see if the new dog is in the list of dogs
if rsp.status_code == 200:
results = rsp.json()
print "count", results["total_rows"]
for row in results["rows"]:
print "indexed dog id", row["id"]
self.assertTrue(any(row["id"] == unicode(dog_id) for row in results["rows"]))
else:
self.fail("couldn't query the view")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment