Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Spark-submit like tool for livy
#!/usr/bin/env python
#
# Very bare bones shell for driving a Livy session. Usage:
#
# livy-shell url [option=value ...]
#
# Options are set directly in the session creation request, so they must match the names of fields
# in the CreateInteractiveRequest structure. Option values should be python-like objects (should be
# parseable by python's "eval" function; naked strings are allowed). For example:
#
# kind=spark
# jars=[ '/foo.jar', '/bar.jar' ]
# conf={ foo : bar, 'spark.option' : opt_value }
#
# By default, a Spark (Scala) session is created.
#
import httplib
import json
import readline
import sys
import time
import urlparse
def check(condition, msg, *args):
if not condition:
if args:
msg = msg % args
print >> sys.stderr, msg
sys.exit(1)
def message(msg, *args):
if args:
msg = msg % args
print msg
class LiteralDict(dict):
def __missing__(self, name):
return name
def request(conn, method, uri, body):
body = json.dumps(body) if body else None
headers = { 'Content-Type' : 'application/json' }
conn.request(method, uri, body=body, headers=headers)
resp = conn.getresponse()
data = resp.read()
if resp.status < 200 or resp.status >= 400:
raise httplib.HTTPException, (resp.status, resp.reason, data)
if resp.status < 300 and resp.status != httplib.NO_CONTENT:
return json.loads(data)
return None
def get(conn, uri):
return request(conn, 'GET', uri, None)
def post(conn, uri, body):
return request(conn, 'POST', uri, body)
def delete(conn, uri):
return request(conn, 'DELETE', uri, None)
def create_session(conn):
request = {
"kind" : "spark"
}
for opt in sys.argv[3:]:
check(opt.find('=') > 0, "Invalid option: %s.", opt)
key, value = opt.split('=', 1)
request[key] = eval(value, LiteralDict())
return post(conn, "/sessions", request)
def wait_for_idle(sid):
session = get(conn, "/sessions/%d" % (sid, ))
while session['state'] == 'starting':
message("Session not ready yet (%s)", session['state'])
time.sleep(5)
session = get(conn, "/sessions/%d" % (sid, ))
if session['state'] != 'idle':
raise Exception, "Session failed to start."
def monitor_statement(conn, sid, s):
cnt = 0
while True:
state = s['state']
if state == 'available':
output = s['output']
status = output['status']
if status == 'ok':
result = output['data']
text = result.get('text/plain', None)
if text is None:
message("Success (non-text result).")
elif text.rstrip():
message("%s", text)
elif status == 'error':
ename = output['ename']
evalue = output['evalue']
traceback = "\n".join(output.get('traceback', []))
message("%s: %s", ename, evalue)
if traceback:
message("%s", traceback)
else:
message("Statement finished with unknown status '%s'.", status)
break
elif state == 'error':
message("%s", s['error'])
break
else:
if cnt == 10:
message("(waiting for result...)")
cnt = 0
else:
cnt += 1
time.sleep(1)
s = get(conn, "/sessions/%d/statements/%s" % (sid, s['id']))
def run_shell(conn, sid):
while True:
cmd = sc.readline().strip()
if len(cmd) == 0:
break
statement = post(conn, "/sessions/%d/statements" % (sid, ), { 'code' : cmd })
print "cmd: " + cmd
monitor_statement(conn, sid, statement)
def open_connection(url):
if url.scheme == "https":
return httplib.HTTPSConnection(url.netloc)
else:
return httplib.HTTPConnection(url.netloc)
#
# main()
#
check(len(sys.argv) > 1, "Missing arguments.")
url = urlparse.urlparse(sys.argv[1])
conn = open_connection(url)
sc = open(sys.argv[2])
sid = -1
try:
message("Creating new session...")
session = create_session(conn)
sid = int(session['id'])
message("New session (id = %d, kind = %s), waiting for idle state...", sid, session['kind'])
wait_for_idle(sid)
message("Session ready.")
run_shell(conn,sid)
except EOFError:
pass
finally:
conn.close()
if sid != -1:
conn = open_connection(url)
try:
delete(conn, "/sessions/%d" % (sid, ))
finally:
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment