Skip to content

Instantly share code, notes, and snippets.

@fabware
Created May 11, 2011 07:17
Show Gist options
  • Save fabware/966036 to your computer and use it in GitHub Desktop.
Save fabware/966036 to your computer and use it in GitHub Desktop.
#------------- consumer ---------------
import simplejson
def check_callable(cb):
if not callable(cb):
raise TypeError("callback isn't a callable")
class ConsumerBase(object):
def __init__(self, res, channel, **kwargs):
self.res = res
self.channel = channel
def fetch(self, cb=None, **params):
resp = self.res.get("SUBSCRIBE/%s"%self.channel, **params)
if cb is not None:
check_callable(cb)
cb(resp.json_body)
else:
return resp.json_body
def wait(self, cb, **params):
raise NotImplementedError
def wait_async(self, cb, **params):
raise NotImplementedError
class SyncWebdisConsumer(ConsumerBase):
def wait(self, cb, **params):
check_callable(cb)
subscribe_path = "SUBSCRIBE/%s"%self.channel
resp = self.res.get(subscribe_path, **params)
with resp.body_stream() as body:
while True:
try:
print 'before read line'
line = body.readline()
print 'get line:', line
if not line:
break
if line.endswith("\r\n"):
line = line[:-2]
else:
line = line[:-1]
if not line:
continue
cb(simplejson.loads(line))
except (KeyboardInterrupt, SystemExit,):
break
class SyncCouchdbConsumer(ConsumerBase):
def wait(self, cb, **params):
check_callable(cb)
subscribe_path = 'kw_operate/_changes'
resp = self.res.get(subscribe_path, feed='continuous', **params)
with resp.body_stream() as body:
while True:
try:
print 'before read line'
line = body.readline()
print 'get line:', line
if not line:
break
if line.endswith("\r\n"):
line = line[:-2]
else:
line = line[:-1]
if not line:
continue
cb(simplejson.loads(line))
except (KeyboardInterrupt, SystemExit,):
break
#------- test_webdis_consumer.py --------------
from restkit import Resource
from consumer import SyncWebdisConsumer
res = Resource('http://127.0.0.1:7379')
def printer(line):
print 'consuming:', line
consumer = SyncWebdisConsumer(res, 'ground_profile')
consumer.wait(printer)
#------ webdis curl tracking -------
== Info: About to connect() to 127.0.0.1 port 7379 (#0)
== Info: Trying 127.0.0.1... == Info: connected
== Info: Connected to 127.0.0.1 (127.0.0.1) port 7379 (#0)
=> Send header, 177 bytes (0xb1)
0000: 47 45 54 20 2f 53 55 42 53 43 52 49 42 45 2f 67 GET /SUBSCRIBE/g
0010: 72 6f 75 6e 64 5f 70 72 6f 66 69 6c 65 20 48 54 round_profile HT
0020: 54 50 2f 31 2e 31 0d 0a 55 73 65 72 2d 41 67 65 TP/1.1..User-Age
0030: 6e 74 3a 20 63 75 72 6c 2f 37 2e 31 39 2e 37 20 nt: curl/7.19.7
0040: 28 69 34 38 36 2d 70 63 2d 6c 69 6e 75 78 2d 67 (i486-pc-linux-g
0050: 6e 75 29 20 6c 69 62 63 75 72 6c 2f 37 2e 31 39 nu) libcurl/7.19
0060: 2e 37 20 4f 70 65 6e 53 53 4c 2f 30 2e 39 2e 38 .7 OpenSSL/0.9.8
0070: 6b 20 7a 6c 69 62 2f 31 2e 32 2e 33 2e 33 20 6c k zlib/1.2.3.3 l
0080: 69 62 69 64 6e 2f 31 2e 31 35 0d 0a 48 6f 73 74 ibidn/1.15..Host
0090: 3a 20 31 32 37 2e 30 2e 30 2e 31 3a 37 33 37 39 : 127.0.0.1:7379
00a0: 0d 0a 41 63 63 65 70 74 3a 20 2a 2f 2a 0d 0a 0d ..Accept: */*...
00b0: 0a .
<= Recv header, 17 bytes (0x11)
0000: 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d HTTP/1.1 200 OK.
0010: 0a .
<= Recv header, 16 bytes (0x10)
0000: 53 65 72 76 65 72 3a 20 57 65 62 64 69 73 0d 0a Server: Webdis..
<= Recv header, 29 bytes (0x1d)
0000: 41 6c 6c 6f 77 3a 20 47 45 54 2c 50 4f 53 54 2c Allow: GET,POST,
0010: 50 55 54 2c 4f 50 54 49 4f 4e 53 0d 0a PUT,OPTIONS..
<= Recv header, 32 bytes (0x20)
0000: 41 63 63 65 73 73 2d 43 6f 6e 74 72 6f 6c 2d 41 Access-Control-A
0010: 6c 6c 6f 77 2d 4f 72 69 67 69 6e 3a 20 2a 0d 0a llow-Origin: *..
<= Recv header, 32 bytes (0x20)
0000: 43 6f 6e 74 65 6e 74 2d 54 79 70 65 3a 20 61 70 Content-Type: ap
0010: 70 6c 69 63 61 74 69 6f 6e 2f 6a 73 6f 6e 0d 0a plication/json..
<= Recv header, 24 bytes (0x18)
0000: 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 4b 65 65 70 Connection: Keep
0010: 2d 41 6c 69 76 65 0d 0a -Alive..
<= Recv header, 28 bytes (0x1c)
0000: 54 72 61 6e 73 66 65 72 2d 45 6e 63 6f 64 69 6e Transfer-Encodin
0010: 67 3a 20 63 68 75 6e 6b 65 64 0d 0a g: chunked..
<= Recv header, 2 bytes (0x2)
0000: 0d 0a ..
<= Recv data, 52 bytes (0x34)
0000: 32 65 0d 0a 7b 22 53 55 42 53 43 52 49 42 45 22 2e..{"SUBSCRIBE"
0010: 3a 5b 22 73 75 62 73 63 72 69 62 65 22 2c 22 67 :["subscribe","g
0020: 72 6f 75 6e 64 5f 70 72 6f 66 69 6c 65 22 2c 31 round_profile",1
0030: 5d 7d 0d 0a ]}..
<= Recv data, 53 bytes (0x35)
0000: 32 66 0d 0a 7b 22 53 55 42 53 43 52 49 42 45 22 2f..{"SUBSCRIBE"
0010: 3a 5b 22 6d 65 73 73 61 67 65 22 2c 22 67 72 6f :["message","gro
0020: 75 6e 64 5f 70 72 6f 66 69 6c 65 22 2c 22 68 69 und_profile","hi
0030: 22 5d 7d 0d 0a "]}..
#---------- couchdb _changes curl tracking -------------
== Info: About to connect() to 192.168.60.60 port 5984 (#0)
== Info: Trying 192.168.60.60... == Info: connected
== Info: Connected to 192.168.60.60 (192.168.60.60) port 5984 (#0)
=> Send header, 173 bytes (0xad)
0000: 47 45 54 20 2f 6b 77 5f 67 61 6d 65 2f 5f 63 68 GET /kw_game/_ch
0010: 61 6e 67 65 73 20 48 54 54 50 2f 31 2e 31 0d 0a anges HTTP/1.1..
0020: 55 73 65 72 2d 41 67 65 6e 74 3a 20 63 75 72 6c User-Agent: curl
0030: 2f 37 2e 31 39 2e 37 20 28 69 34 38 36 2d 70 63 /7.19.7 (i486-pc
0040: 2d 6c 69 6e 75 78 2d 67 6e 75 29 20 6c 69 62 63 -linux-gnu) libc
0050: 75 72 6c 2f 37 2e 31 39 2e 37 20 4f 70 65 6e 53 url/7.19.7 OpenS
0060: 53 4c 2f 30 2e 39 2e 38 6b 20 7a 6c 69 62 2f 31 SL/0.9.8k zlib/1
0070: 2e 32 2e 33 2e 33 20 6c 69 62 69 64 6e 2f 31 2e .2.3.3 libidn/1.
0080: 31 35 0d 0a 48 6f 73 74 3a 20 31 39 32 2e 31 36 15..Host: 192.16
0090: 38 2e 36 30 2e 36 30 3a 35 39 38 34 0d 0a 41 63 8.60.60:5984..Ac
00a0: 63 65 70 74 3a 20 2a 2f 2a 0d 0a 0d 0a cept: */*....
<= Recv header, 17 bytes (0x11)
0000: 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d HTTP/1.1 200 OK.
0010: 0a .
<= Recv header, 28 bytes (0x1c)
0000: 54 72 61 6e 73 66 65 72 2d 45 6e 63 6f 64 69 6e Transfer-Encodin
0010: 67 3a 20 63 68 75 6e 6b 65 64 0d 0a g: chunked..
<= Recv header, 42 bytes (0x2a)
0000: 53 65 72 76 65 72 3a 20 43 6f 75 63 68 44 42 2f Server: CouchDB/
0010: 30 2e 31 31 2e 30 20 28 45 72 6c 61 6e 67 20 4f 0.11.0 (Erlang O
0020: 54 50 2f 52 31 33 42 29 0d 0a TP/R13B)..
<= Recv header, 35 bytes (0x23)
0000: 45 74 61 67 3a 20 22 36 4a 4e 32 35 59 36 31 30 Etag: "6JN25Y610
0010: 34 34 34 43 4b 51 43 4e 38 44 48 58 36 36 59 4a 444CKQCN8DHX66YJ
0020: 22 0d 0a "..
<= Recv header, 37 bytes (0x25)
0000: 44 61 74 65 3a 20 57 65 64 2c 20 31 31 20 4d 61 Date: Wed, 11 Ma
0010: 79 20 32 30 31 31 20 30 33 3a 34 37 3a 32 33 20 y 2011 03:47:23
0020: 47 4d 54 0d 0a GMT..
<= Recv header, 40 bytes (0x28)
0000: 43 6f 6e 74 65 6e 74 2d 54 79 70 65 3a 20 74 65 Content-Type: te
0010: 78 74 2f 70 6c 61 69 6e 3b 63 68 61 72 73 65 74 xt/plain;charset
0020: 3d 75 74 66 2d 38 0d 0a =utf-8..
<= Recv header, 32 bytes (0x20)
0000: 43 61 63 68 65 2d 43 6f 6e 74 72 6f 6c 3a 20 6d Cache-Control: m
0010: 75 73 74 2d 72 65 76 61 6c 69 64 61 74 65 0d 0a ust-revalidate..
<= Recv header, 2 bytes (0x2)
0000: 0d 0a ..
<= Recv data, 18 bytes (0x12)
0000: 64 0d 0a 7b 22 72 65 73 75 6c 74 73 22 3a 5b 0a d..{"results":[.
0010: 0d 0a ..
<= Recv data, 96 bytes (0x60)
0000: 35 61 0d 0a 7b 22 73 65 71 22 3a 31 2c 22 69 64 5a..{"seq":1,"id
0010: 22 3a 22 30 22 2c 22 63 68 61 6e 67 65 73 22 3a ":"0","changes":
0020: 5b 7b 22 72 65 76 22 3a 22 33 2d 37 34 64 31 62 [{"rev":"3-74d1b
0030: 38 31 38 30 62 39 35 61 33 64 30 31 35 65 63 32 8180b95a3d015ec2
0040: 63 63 64 30 39 32 32 30 33 39 37 22 7d 5d 2c 22 ccd09220397"}],"
0050: 64 65 6c 65 74 65 64 22 3a 74 72 75 65 7d 0d 0a deleted":true}..
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment