Skip to content

Instantly share code, notes, and snippets.

@Roman-Port
Last active June 4, 2018 13:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Roman-Port/28251b7d332062481fd4157cdaae1baf to your computer and use it in GitHub Desktop.
Save Roman-Port/28251b7d332062481fd4157cdaae1baf to your computer and use it in GitHub Desktop.
updated
import socket
import json
import turtle
import tkinter
import random
import math
import time
import datetime
#Written by RomanPort
print("Written by RomanPort - Revision 1")
class RPPacket:
def __init__(self,packetType,msgId,parseType,token,body):
self.packetType = packetType
self.msgId = msgId
self.parseType = parseType
self.token = token
self.body = body
@staticmethod
def CreatePacket(packetType, msgId, parseType, token, body):
#Create packet to send.
data = bytearray()
#Convert strings.
a_body = RPPacket.ToByteFromString(body)
a_token = RPPacket.ToByteFromString(token)
#Add to the data
data = RPPacket.WriteBytesToBytearray(data, RPPacket.ToByteFromInt(len(a_body)) ) #Write int32 body size
data = RPPacket.WriteBytesToBytearray(data, RPPacket.ToByteFromInt( packetType ) ) #Write int32 packet type
data = RPPacket.WriteBytesToBytearray(data, RPPacket.ToByteFromInt( msgId ) ) #Write int32 packet id
data = RPPacket.WriteBytesToBytearray(data, RPPacket.ToByteFromInt( parseType ) ) #Write int32 parse type for the server
#Write strings, first the access token.
data = RPPacket.WriteBytesToBytearray(data, a_token)
#Now, write the body
data = RPPacket.WriteBytesToBytearray(data, a_body)
#Write end padding
data.append(0)
#Data has been written
return data
@staticmethod
def ImportPacket(raw):
#This will take in the data and turn it back into a class.
bodyLen = RPPacket.FromByteToInt( RPPacket.SubArray(raw,0,4) ) #Read in int32 body length
packetType = RPPacket.FromByteToInt( RPPacket.SubArray(raw,4,4) ) #Read in int32 packet type
messageId = RPPacket.FromByteToInt( RPPacket.SubArray(raw,8,4) ) #Read in int32 message id
parseType = RPPacket.FromByteToInt( RPPacket.SubArray(raw,12,4) ) #Read in int32 parse type
#Now read in the strings
token = RPPacket.FromByteToString( RPPacket.SubArray(raw,16,16) )
body = RPPacket.FromByteToString( RPPacket.SubArray(raw,32,bodyLen) )
#Pack this into the packet
out = RPPacket(packetType,messageId,parseType,token,body)
return out
@staticmethod
def SubArray(data,offset,length):
i = 0
output = bytearray()
while(i<length):
output.append(data[i+offset])
i+=1
return output
@staticmethod
def WriteBytesToBytearray(data,write):
i = 0
while(i<len(write)):
data.append(write[i])
i+=1
return data
@staticmethod
def DebugWriteToDisk(data):
f = open("C:\\Users\\Roman\\Desktop\\test.bin",'wb')
f.write(data)
f.close()
@staticmethod
def ToByteFromInt(data):
return int(data).to_bytes(4,byteorder='little')
@staticmethod
def FromByteToInt(data):
i = int(0)
return i.from_bytes(data,byteorder='little')
@staticmethod
def ToByteFromString(data):
return bytes(data, 'ascii')
@staticmethod
def FromByteToString(data):
return str(data,'ascii')
class TcpConnection:
def __init__(self):
self.CONN_IP = '10.0.1.13';
self.CONN_PORT = 23082
self.CONN_BUFFER_SIZE = 99999
self.token = " "
self.password = "hello"
#connect
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.connect((self.CONN_IP, self.CONN_PORT))
print("Connected")
#Login and handshake with the server. We'll need to get a token.
#Create data
loginData = {
"password":self.password,
"wasAuthOkay":False,
"token":""
}
loginRes = self.SendData(TcpConnection.SerJSON(loginData),0,self.token,1)
#Check the handshake status
#Read in
logReq = TcpConnection.DeSerJSON(loginRes.body)
if(logReq["wasAuthOkay"]==True):
print("RSN auth okay!")
self.token = logReq["token"]
else:
print("Auth failed. "+logReq["wasAuthOkay"])
def SendData(self,body,parseType, token=None, packetType=2):
if(token==None):
token = self.token
#Create packet.
data = RPPacket.CreatePacket(packetType,0,parseType,token,body)
#Send this data.
self.conn.send(data)
#Get
got = self.conn.recv(self.CONN_BUFFER_SIZE)
#Convert this to a readable packet
packet = RPPacket.ImportPacket(got)
return packet
@staticmethod
def SerJSON(data):
return json.dumps(data)
@staticmethod
def DeSerJSON(data):
return json.loads(data)
class TronServerConn:
def __init__(self, user):
self.conn = TcpConnection()
self.lastUpdateIndex = 0
#Login and handshake with the game server.
loginData = {
"username":user
}
loginRes = self.conn.SendData(TcpConnection.SerJSON(loginData),1)
#Check the handshake status
#Read in
logReq = TcpConnection.DeSerJSON(loginRes.body)
token = logReq["serverToken"]
userId = logReq["id"]
okay = logReq["wasLoginOkay"]
self.gameToken = token
self.gameId = userId
self.isLoggedIn = okay
print("Connected to game server!")
def SendGameplayEvent(self,action):
#Create request
d = {
"action":action,
"id":self.gameId,
"token":self.gameToken
}
#Send
loginRes = self.conn.SendData(TcpConnection.SerJSON(d),1)
res = TcpConnection.DeSerJSON(loginRes.body)
#Done
def GetLatestUpdates(self):
d = {
"startingIndex":self.lastUpdateIndex
}
#Request
loginRes = self.conn.SendData(TcpConnection.SerJSON(d),3)
res = TcpConnection.DeSerJSON(loginRes.body)
#Decode
self.lastUpdateIndex = res["startingIndex"]
#Return data
return res["updates"]
def SendTurn(self,action):
d = {
"id":self.gameId,
"token":self.gameToken,
"action":action
}
loginRes = self.conn.SendData(TcpConnection.SerJSON(d),2)
res = TcpConnection.DeSerJSON(loginRes.body)
#Ignore the output.
class Vector3:
def __init__(self,x,y,z):
self.x = float(x)
self.y = float(y)
self.z = float(z)
class GamePlayer:
def __init__(self,serverData,color):
#Create turtle.
self.draw = turtle.Turtle()
self.ts = self.draw.getscreen()
self.draw.pensize(3)
self.draw.pencolor(color)
self.draw.speed(0)
self.draw.penup()
self.color=color
#Get entries
self.UpdateFromServer(serverData)
self.updateTime = GetMs()
self.ts.ontimer(self.InterpolateMovement, 50)
self.pauseInterpolation = False
self.draw.pendown()
def InterpolateMovement(self):
if(self.isMoving and self.pauseInterpolation == False):
xx = 0
zz = 0
rr = self.heading;
updateTime = float(self.updateTime - GetMs())/1000
print(updateTime)
angleInRadian = float(float(rr) / 180) * float(3.14159265359)
xx = math.sin(angleInRadian) * 1;
zz = math.cos(angleInRadian) * 1;
xx = float(xx) * float(updateTime) * float(self.speed) * 1.5;
zz = float(zz) * float(updateTime) * float(self.speed) * 1.5;
self.pos.x += float(xx);
self.pos.z += float(zz);
self.UpdateTurtle()
self.updateTime = GetMs()
self.ts.ontimer(self.InterpolateMovement, 50)
def UpdateFromServer(self,serverData):
self.deaths = serverData["deaths"]
self.heading = (serverData["heading"] % 360 ) -180
self.playerId = serverData["id"]
self.isLiving = serverData["isLiving"]
self.isMoving = serverData["isMoving"]
self.score = serverData["score"]
self.screenname = serverData["screenname"]
self.speed = serverData["speed"]
self.stateId = serverData["stateId"]
self.wins = serverData["wins"]
self.pos = Vector3(serverData["x"],serverData["y"],serverData["z"])
#If turtle exists, update it.
self.UpdateTurtle()
def UpdateTurtle(self):
self.draw.setheading(self.heading)
print(self.heading)
self.draw.setpos(self.pos.x,self.pos.z)
print("Please type in your username.")
gameConn = TronServerConn(input())
players = {}
def GetMs():
return int(round(time.time() * 1000))
def UpdateFromServer():
#Called a bunch a second.
#Fetch updates.
newUpdates = gameConn.GetLatestUpdates()
i = 0
while(i<len(newUpdates)):
#Update player.
UpdatePlayerList(newUpdates[i])
i+=1
#Queue again
global ts
ts.ontimer(UpdateFromServer, 200)
def UpdatePlayerList(serverData):
i=0
data = serverData["players"]
global players
while(i<len(data)):
key = data[i]["id"]
players[key].UpdateFromServer(data[i])
#Unpause interpolation
players[key].pauseInterpolation = False
i+=1
#Check if we need to show anything in chat.
if(serverData["type"]==2):
#Someone died.
vic = players[serverData["playerConcerned"]]
killer = players[serverData["otherPlayerConcerned"]]
if(killer.playerId == vic.playerId):
WriteLog(vic.screenname+" killed themself!")
else:
WriteLog(vic.screenname+" was killed by "+killer.screenname+"!")
def SendRight():
SendTurn(1)
def SendLeft():
SendTurn(0)
def SendTurn(action):
global gameConn
global players
gameConn.SendTurn(action)
#Pause interpolation.
players[gameConn.gameId].pauseInterpolation = True
#Launch UI and wait for the game to start.
draw = turtle.Turtle()
draw.hideturtle()
ts = draw.getscreen()
incomingPlayers = []
#ts.ontimer(self.KeepMoving, 50)
#self.ts.onkey(self.Right,userPrefs.rightKey)
#Create text turtle.
textTurtle = turtle.Turtle()
logPos = (textTurtle.getscreen().screensize()[1]/4)*3.25
def WriteText(text, align="center", isLog=False, font=("Arial",12,"normal")):
textTurtle.hideturtle()
textTurtle.speed(0)
textTurtle.penup()
textTurtle.setx(0)
size = textTurtle.getscreen().screensize()[0]
if(isLog):
global logPos
textTurtle.sety(logPos)
logPos-=font[1]+3
print(size)
if(align=="left"):
textTurtle.setx(-((size/4)*3.25))
textTurtle.hideturtle()
textTurtle.write(text,False,align=align,font=font)
def WriteLog(text):
WriteText(text,"left",True)
print("Waiting for game start...")
WriteLog("Wating for the game to begin...")
while(True):
#This while true is only used to stop the main thread until the game has started.
#Fetch updates
newUpdates = gameConn.GetLatestUpdates()
i=0
exit = False
while(i<len(newUpdates)):
if(newUpdates[i]["type"]==0):
#Game started
#Set players
incomingPlayers = newUpdates[i]["players"]
WriteLog("Game started.")
exit=True
#Check if it is a player join
if(newUpdates[i]["type"]==3):
#Find the ID of the player.
incomingPlayers = newUpdates[i]["players"]
ii=0
while(ii<len(incomingPlayers)):
if(incomingPlayers[ii]["id"]==newUpdates[i]["playerConcerned"]):
WriteLog("Player "+incomingPlayers[ii]["screenname"]+" joined.")
ii+=1
i+=1
time.sleep(0.3)
if(exit):
break
print("Game started. Creating characters...")
i=0
while(i<len(incomingPlayers)):
name = incomingPlayers[i]["id"]
color = "black"
if(name == gameConn.gameId):
color="red"
players[name] = GamePlayer(incomingPlayers[i],color)
i+=1
print("Players created.")
#Set events and let the turtles loose.
ts.ontimer(UpdateFromServer, 200)
ts.onkey(SendRight,"d")
ts.onkey(SendLeft,"a")
ts.listen()
tkinter.mainloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment