Created
March 17, 2018 11:13
-
-
Save djungelorm/1e3925e4edc4011b71439b87f8b4a665 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
from time import sleep | |
from google.protobuf.internal.encoder import _VarintEncoder | |
from google.protobuf.internal.decoder import _DecodeVarint | |
from krpc.schema import KRPC_pb2 as KRPC | |
# ---------------------- | |
""" | |
support functions (e.g. send_message()) omitted for clarity -- see kRPC documents for examples. | |
""" | |
def encode_varint(value): | |
""" Encode an int as a protobuf varint """ | |
data = [] | |
_VarintEncoder()(data.append, value, False) | |
return b''.join(data) | |
def bytelength(string): | |
return len(string.encode('utf-8')) | |
def encode_string(value): | |
size = encode_varint(bytelength(value)) | |
data = value.encode('utf-8') | |
return size + data | |
def decode_varint(data): | |
""" Decode a protobuf varint to an int """ | |
return _DecodeVarint(data, 0)[0] | |
def send_message(conn, msg): | |
""" Send a message, prefixed with its size, to a TPC/IP socket """ | |
data = msg.SerializeToString() | |
size = encode_varint(len(data)) | |
conn.sendall(size + data) | |
def recv_message(conn, msg_type): | |
""" Receive a message, prefixed with its size, from a TCP/IP socket """ | |
# Receive the size of the message data | |
data = b'' | |
while True: | |
try: | |
data += conn.recv(1) | |
size = decode_varint(data) | |
break | |
except IndexError: | |
pass | |
# Receive the message data | |
data = conn.recv(size) | |
# Decode the message | |
msg = msg_type() | |
msg.ParseFromString(data) | |
return msg | |
# Open a TCP/IP socket to the RPC server | |
rpc_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
rpc_conn.connect(('127.0.0.1', 50000)) | |
# Send an RPC connection request | |
request = KRPC.ConnectionRequest() | |
request.type = KRPC.ConnectionRequest.RPC | |
request.client_name = 'Jeb' | |
send_message(rpc_conn, request) | |
# Receive the connection response | |
response = recv_message(rpc_conn, KRPC.ConnectionResponse) | |
# Check the connection was successful | |
if response.status != KRPC.ConnectionResponse.OK: | |
raise RuntimeError('Connection failed: ' + response.message) | |
print('Connected to RPC server') | |
# --------------------- | |
# connect over socket to kRPC, request status | |
call = KRPC.ProcedureCall() | |
call.service = 'KRPC' | |
call.procedure = 'GetStatus' | |
request = KRPC.Request() | |
request.calls.extend([call]) | |
send_message(rpc_conn, request) | |
# Receive the response | |
response = recv_message(rpc_conn, KRPC.Response) | |
# Check for an error in the response | |
if response.HasField('error'): | |
raise RuntimeError('ERROR: ' + str(response.error)) | |
# Check for an error in the results | |
assert(len(response.results) == 1) | |
if response.results[0].HasField('error'): | |
raise RuntimeError('ERROR: ' + str(response.error)) | |
# Decode the return value as a Status message | |
status = KRPC.Status() | |
status.ParseFromString(response.results[0].value) | |
# Test UI.Message() | |
print 'Invoking UI.Message' | |
call = KRPC.ProcedureCall() | |
call.service = 'UI' | |
call.procedure = 'Message' | |
arg = KRPC.Argument() | |
arg.position = 0 | |
arg.value = encode_string('nice ship you have there') | |
call.arguments.extend([arg]) | |
request = KRPC.Request() | |
request.calls.extend([call]) | |
send_message(rpc_conn, request) | |
# Receive the response | |
response = recv_message(rpc_conn, KRPC.Response) | |
# Check for an error in the response | |
if response.HasField('error'): | |
raise RuntimeError('ERROR: ' + str(response.error)) | |
# Check for an error in the response | |
assert(len(response.results) == 1) | |
if response.results[0].HasField('error'): | |
raise RuntimeError('ERROR: ' + str(response.error)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've found that recv_message() will sometimes not retrieve all the data that is being sent by kRPC, and have modified as shown below. Without this modifications, the GetServices call would almost always fail with a 'truncated message' error.