Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
'''
Author: mjmalone
https://twitter.com/mjmalone/status/1874036416
'''
import struct
import pytyrant
class QueryException(Exception): pass
def validate_str(arg):
return isinstance(arg, basestring)
def validate_str_list(lst):
if not hasattr(lst, '__iter__'):
return False
for item in lst:
if not validate_str(item):
return False
return True
def validate_btwn_list(lst):
return validate_str_list and len(lst) == 2
class Query(object):
# Enumeration of query conditions
OPERATIONS = (
('streq', '0', validate_str), # string is equal to
('strinc', '1', validate_str), # string is included in
('strbw', '2', validate_str), # string begins with
('strew', '3', validate_str), # string ends with
('strand', '4', validate_str_list), # string includes all tokens in
('stror', '5', validate_str_list), # string includes at least one token in
('stroreq', '6', validate_str_list), # string is equal to at least one token in
('strrx', '7', validate_str), # string matches regular expression of
('numeq', '8', validate_str), # number is equal to
('numgt', '9', validate_str), # number is greater than
('numge', '10', validate_str), # number is greater than or equal to
('numlt', '11', validate_str), # number is less than
('numle', '12', validate_str), # number is less than or equal to
('numbt', '13', validate_btwn_list), # number is between two tokens of
('numoreq', '14', validate_str_list), # number is equal to at least one token in
)
def __init__(self):
self.args = []
def get_packed_args(self):
return [struct.pack('>BBIII', pytyrant.MAGIC, pytyrant.C.misc, 6, 0, len(self.args)), "search"] + self.args
def pack_condition(self, field, op, value):
if not isinstance(value, basestring) and hasattr(value, '__iter__'):
# Value is a list of strings. Make it a single comma separated string.
value = ','.join(value)
condition = '\x00'.join(["addcond", field, op, value])
return '%s%s' % (struct.pack('>I', len(condition)), condition)
def get_operator_info(self, op_name):
for name, code, validator in Query.OPERATIONS:
if op_name == name:
return code, validator
raise QueryException("Unknown query condition: '%s'" % operation)
def add_condition(self, **kwargs):
for key, value in kwargs.iteritems():
if '__' not in key:
raise QueryException("Condition arguments should be of the form `field__operation`")
field, operation = key.split('__')
op_code, validator = self.get_operator_info(operation)
if not validator(value):
raise QueryException("Invalid value for query type `%s`: `%s`", (operator, value))
self.args.append(self.pack_condition(field, op_code, value))
return self
def setlimit(self, limit, offset):
condition = '\x00'.join(('setlimit', str(limit), str(offset)))
self.args.append('%s%s' % (struct.pack('>I', len(condition)), condition))
return self
class Tyrant(pytyrant.Tyrant):
def search(self, query):
pytyrant.socksend(self.sock, query.get_packed_args())
pytyrant.socksuccess(self.sock)
return [pytyrant.sockstr(self.sock) for i in xrange(pytyrant.socklen(self.sock))]
if __name__ == '__main__':
t = Tyrant.open('127.0.0.1', 1979)
# Find places that have names equal to California or Maryland
q = Query().add_condition(name__stroreq = ["California", "Maryland"])
r = t.search(q)
for id in r:
print t.get(id)
print len(r)
# Find the first 10 places with a parent WOEID between 2500066 and 2500080
q = Query().add_condition(parent_woeid__numbt = ["2500066", "2500080"]).setlimit(10,0)
r = t.search(q)
for id in r:
print t.get(id)
print len(r)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.