Skip to content

Instantly share code, notes, and snippets.

@dooglus
Last active January 2, 2016 22:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dooglus/e9f783b1465c6245cb13 to your computer and use it in GitHub Desktop.
Save dooglus/e9f783b1465c6245cb13 to your computer and use it in GitHub Desktop.
clamd in Python, accepting long commands on standard input
#!/usr/bin/python
from __future__ import print_function
import base64, httplib, json, string, sys, re
USER = 'user'; PASS = 'pass'; HOST = '127.0.0.1'; PORT = 30174
class RPC:
def __init__(self, host, port, username, password):
self.authhdr = "Basic %s" % (base64.b64encode("%s:%s" % (username, password)))
self.conn = httplib.HTTPConnection(host, port, False, 30)
def execute(self, obj, params):
self.conn.request('POST', '/', json.dumps(obj).replace('"'+magic_random_string+'"', params),
{'Authorization':self.authhdr, 'Content-type':'application/json' })
resp = self.conn.getresponse()
if resp is None:
print("JSON-RPC: no response", file=sys.stderr)
return None
return json.loads(resp.read())
def run_line(line):
decoder, pos, j, params = json.JSONDecoder(), 0, string.join(line[1:]), ''
while True: # use the JSON decoder to separate the parameters, but keep them in their original string form
matched = nonspace.search(j, pos)
if not matched: break
try:
decoded, pos = decoder.raw_decode(j, matched.start())
except:
return print('JSON-RPC: parse error in object starting "%s[...]"' % (j[matched.start():matched.start()+30]), file=sys.stderr)
params += j[matched.start():pos] + ','
params = '[%s]' % params[:-1]
# magic_random_string hackery because the JSON isn't always valid, eg. duplicate keys in {a1:v1, a1:v2}
resp = rpc.execute({'version':'1.1', 'method':line[0], 'id':0, 'params':magic_random_string}, params)
if 'error' in resp and resp['error'] is not None: print('JSON-RPC: error: ', json.dumps(resp['error']), file=sys.stderr)
else: print(json.dumps(resp['result'], indent=2))
def prompt():
if sys.stdin.isatty(): sys.stderr.write("> ")
return sys.stdin.readline()
rpc = RPC(HOST, PORT, USER, PASS);
magic_random_string = 'woefh3oifg38hg27gdygf3oirgjowihfiouwh'
nonspace = re.compile(r'\S')
line = prompt()
while line:
split = line[:-1].split()
if len(split): run_line(split)
line = prompt()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment