Skip to content

Instantly share code, notes, and snippets.

@vinayak-mehta
Last active October 29, 2020 11:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vinayak-mehta/7828b64987395235f8253d9885090250 to your computer and use it in GitHub Desktop.
Save vinayak-mehta/7828b64987395235f8253d9885090250 to your computer and use it in GitHub Desktop.
import sys
from jupyter_client import KernelManager
try:
manager = KernelManager()
manager.start_kernel()
client = manager.client()
execution_state = "idle"
execution_count = 1
while True:
code = input(f"In [{execution_count}]: ")
if not code.strip():
continue
msg_id = client.execute(code)
execution_state = "busy"
# https://github.com/jupyter/jupyter_console/blob/c0bdeb1918f1235cf620ea10290a4c448aa56c68/jupyter_console/ptshell.py#L717
while execution_state != "idle" and client.is_alive():
# https://github.com/jupyter/jupyter_console/blob/c0bdeb1918f1235cf620ea10290a4c448aa56c68/jupyter_console/ptshell.py#L845
while client.iopub_channel.msg_ready():
msg = client.iopub_channel.get_msg()
msg_type = msg["header"]["msg_type"]
if msg_type == "status":
execution_state = msg["content"]["execution_state"]
elif msg_type == "stream":
if msg["content"]["name"] == "stdout":
print(msg["content"]["text"])
sys.stdout.flush()
elif msg["content"]["name"] == "stderr":
print(msg["content"]["text"], file=sys.stderr)
sys.stderr.flush()
elif msg_type == "execute_result":
pass
elif msg_type == "display_data":
pass
elif msg_type == "execute_input":
execution_count = int(msg["content"]["execution_count"]) + 1
elif msg_type == "clear_output":
pass
elif msg_type == "error":
for frame in msg["content"]["traceback"]:
print(frame, file=sys.stderr)
except KeyboardInterrupt:
manager.shutdown_kernel()
except Exception as e:
print(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment