Skip to content

Instantly share code, notes, and snippets.

@FelixWolf
Last active June 23, 2023 03:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FelixWolf/2d30015b03b46559a9905f389c9e13af to your computer and use it in GitHub Desktop.
Save FelixWolf/2d30015b03b46559a9905f389c9e13af to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import asyncio
import math
def b95encode(i, size = None):
out = b""
while i > 0:
out = bytes([i % 95 + 32]) + out
i = i // 95
#Pad if needed
if size:
out = out[:size]
if len(out) < size:
out = b" "*(size - len(out)) + out
return out
def b95decode(data):
out = 0
for c in data:
if c >= 32 and c < 127:
out = (out * 95) + (c - 32)
else:
raise ValueError("Invalid base95 character!")
return out
class Sounds:
pop = 0
bouncy = 1
sploit = 2
swish2 = 3 #Swing
clank = 4
breathe2 = 5
entity = 6 #Magic
terrier = 7 #Bark
pom = 8 #Yip
class Colors:
black = "a"
darkblue = "b"
blue = "c"
olive = "d"
lime = "e"
grey = "f"
deepred = "g"
lightred = "h"
purple = "i"
lavender = "j"
bronze = "k"
gold = "l"
silver = "m"
white = "n"
clear = "o"
class Sounds:
pop = 0
bouncy = 1
sploit = 2
swish2 = 3 #Swing
clank = 4
breathe2 = 5
entity = 6 #Magic
terrier = 7 #Bark
pom = 8 #Yip
class Objects:
empty = 0
scroll = 1
armor = 2
stone = 3
bottle = 4
sword1 = 5
pouch = 6
whitemark = 7
funball = 8
table = 9
bigfish = 10
chalice = 11
sword2 = 12
bush = 13
tree = 14
chest = 15
pop = 16
throne = 17
chair = 18
bone = 19
smallfish = 20
sign = 21
pillar = 22
statue = 23
mirror = 24
stoplight3 = 63
stoplight2 = 64
stoplight1 = 65
playerSpecies = {
1: [41, 42, 43, 42],
3: [46, 47, 48, 47],
7: [51, 52, 53, 52],
9: [56, 57, 58, 57],
"actions": {
"swing": {
1: [44, 45],
3: [49, 50],
7: [54, 55],
9: [59, 60],
},
"get": [61, 62]
}
}
dogSpecies = {
1: [25, 26, 27, 26],
3: [29, 30, 31, 30],
7: [33, 34, 35, 34],
9: [37, 38, 39, 38],
"actions": {
"bark": {
1: [28],
3: [32],
7: [36],
9: [40],
}
}
}
def movePosSW(x, y, n = 1):
for i in range(n):
y += 1
if y & 1 == 0:
x -= 1
return [x, y]
def movePosSE(x, y, n = 1):
for i in range(n):
y += 1
if y & 1 == 1:
x += 1
return [x, y]
def movePosNW(x, y, n = 1):
for i in range(n):
y -= 1
if y & 1 == 0:
x -= 1
return [x, y]
def movePosNE(x, y, n = 1):
for i in range(n):
y -= 1
if y & 1 == 1:
x += 1
return [x, y]
#TODO: make this non-static because it annoys me
tileLookup = [
1,1,1,1,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,1,1,0,1,0,0,0,0,1,1,1,
0,1,0,0,0,0,0,0,1,1,1,0,0,1
]
class DragonSpiresMap():
def __init__(self, mapdata):
self.players = {}
self.width = 52
self.height = 100
self.tiles = [0] * (self.width * self.height)
self.items = [0] * (self.width * self.height)
for i in range(self.width*self.height):
dt = mapdata.read(2)
self.tiles[i] = dt[0] | dt[1] << 8
for i in range(self.width*self.height):
dt = mapdata.read(2)
self.items[i] = dt[0] | dt[1] << 8
async def addPlayer(self, handle, name):
self.players[handle] = {
"name": name,
"desc": " ",
"color": b" ",
"spawned": False,
"pos": [0,0],
"species": playerSpecies,
"sprite": 41
}
def getItemAt(self, x, y):
return self.items[x * self.height + y]
def getFloorAt(self, x, y):
return self.tiles[x * self.height + y]
async def sendMapUpdate(self, handle):
#TODO: Add object updates
handle[1].write(b"~\r\n")
for player in self.players:
if self.players[player]["spawned"] == False:
continue
handle[1].write(b"<"
+self.players[player]["color"]
+b95encode(self.players[player]["pos"][0])
+b95encode(self.players[player]["pos"][1])
+b95encode(self.players[player]["sprite"])
+b"\r\n"
)
handle[1].write(b"=\r\n")
await handle[1].drain()
async def setPlayerPos(self, handle, x, y):
handle[1].write(b"@"+b95encode(x)+b95encode(y)+b"\r\n")
await handle[1].drain()
item = self.getItemAt(self.players[handle]["pos"][0], self.players[handle]["pos"][1])
oldPosData = b95encode(self.players[handle]["pos"][0])+b95encode(self.players[handle]["pos"][1])+b95encode(item, 1)
newPosData = self.players[handle]["color"]+b95encode(x)+b95encode(y)+b95encode(self.players[handle]["sprite"])
for player in self.players:
player[1].write(b"~\r\n")
player[1].write(b">"+oldPosData+b"\r\n")
player[1].write(b"<"+newPosData+b"\r\n")
player[1].write(b"=\r\n")
await player[1].drain()
self.players[handle]["pos"] = [x, y]
def checkCanMove(self, x, y):
if x < 0 or x > 95 or y < 0 or y > 95:
return False
if tileLookup[self.getFloorAt(x, y)] == 0:
return False
for player in self.players:
player = self.players[player]
if player["pos"][0] == x and player["pos"][1] == y:
return False
return True
async def movePlayer(self, handle, direction):
player = self.players[handle]
current = player["sprite"]
if current not in player["species"][direction]:
current = player["species"][direction][0]
i = player["species"][direction].index(current) + 1
if i >= len(player["species"][direction]):
i = 0
player["sprite"] = player["species"][direction][i]
if direction == 1:
pos = movePosSW(*player["pos"])
elif direction == 3:
pos = movePosSE(*player["pos"])
elif direction == 7:
pos = movePosNW(*player["pos"])
elif direction == 9:
pos = movePosNE(*player["pos"])
if not self.checkCanMove(*pos):
pos = player["pos"]
await self.setPlayerPos(handle, *pos)
async def setPlayerEndurance(self, handle, v):
handle[1].write(b"#" + bytes([97 + math.floor(v/26), 97 + (v%26)]) + b"\r\n")
await handle[1].drain()
async def setPlayerLife(self, handle, v):
handle[1].write(b"$" + str(v).encode() + b"\r\n")
await handle[1].drain()
async def playSound(self, handle, v):
handle[1].write(b"!" + b95encode(v) + b"\r\n")
await handle[1].drain()
async def setPlayerHeldItem(self, handle, v):
handle[1].write(b"^" + b95encode(v) + b"\r\n")
await handle[1].drain()
async def setPlayerFeetItem(self, handle, v):
handle[1].write(b"%" + b95encode(v) + b"\r\n")
await handle[1].drain()
async def sendMessage(self, handle, msg):
handle[1].write(b"(" + msg.encode() + b"\r\n")
await handle[1].drain()
async def sendLook(self, handle):
handle[1].write(b"_\r\n")
await handle[1].drain()
async def playerCommand(self, handle, command):
player = self.players[handle]
tmp = command.split(b" ", 1)
if tmp[0] == b"desc" and len(tmp) > 1:
player["desc"] = tmp[1]
elif tmp[0] == b"color" and len(tmp) > 1:
player["color"] = tmp[1]
if not player["spawned"]:
player["spawned"] = True
await self.setPlayerPos(handle, 33, 15)
await self.setPlayerLife(handle, 3)
await self.setPlayerEndurance(handle, 100)
await self.sendMapUpdate(handle)
elif tmp[0] == b"who":
await self.sendMessage(handle, "Current players: {}".format(", ".join([
self.players[player]["name"] for player in self.players
])))
elif tmp[0] == b"abort":
pass
elif player["spawned"]:
if command.startswith(b"\""):
msg = "{}: {}".format(player["name"], command[1:].decode())
print("[CHAT]", msg)
for other in self.players:
await self.sendMessage(other, msg)
elif command.startswith(b":"):
msg = "{}{}".format(player["name"], command[1:].decode())
print("[CHAT]", msg)
for other in self.players:
await self.sendMessage(other, msg)
elif tmp[0] == b"wh":
tmp = tmp[1].split(b" ", 1)
if len(tmp) == 1:
return
tmp[0] = tmp[0].decode().replace("|"," ").lower()
for other in self.players:
if self.players[other]["name"].lower() == tmp[0]:
await self.sendMessage(other, "[ {} whispers \"{}\" to you. ]".format(player["name"], tmp[1]))
await self.sendMessage(other, "[ You whisper \"{}\" to {}. ]".format(tmp[1], tmp[0]))
return
await self.sendMessage(handle, "Sorry, there are no players around named {}.".format(tmp[0]))
elif tmp[0] == b"look":
await self.sendMessage(handle, "Look not implemented yet!")
elif tmp[0] == b"swing":
await self.sendMessage(handle, "Swing not implemented yet!")
await self.sendUnknown(handle)
elif tmp[0] == b"get":
await self.sendMessage(handle, "Get not implemented yet!")
elif tmp[0] == b"m":
if tmp[1] == b"1":
await self.movePlayer(handle, 1)
elif tmp[1] == b"3":
await self.movePlayer(handle, 3)
elif tmp[1] == b"7":
await self.movePlayer(handle, 7)
elif tmp[1] == b"9":
await self.movePlayer(handle, 9)
elif tmp[0] == b">":
facing = 1
for i in (1,3,7,9):
if player["sprite"] in player["species"][i]:
facing = i
break
if facing == 1:
facing = 3
elif facing == 3:
facing = 9
elif facing == 9:
facing = 7
elif facing == 7:
facing = 1
player["shape"] = player["species"][facing][1]
await self.setPlayerPos(handle, *player["pos"])
elif tmp[0] == b"<":
facing = 1
for i in (1,3,7,9):
if player["sprite"] in player["species"][i]:
facing = i
break
if facing == 1:
facing = 7
elif facing == 7:
facing = 9
elif facing == 9:
facing = 3
elif facing == 3:
facing = 1
player["shape"] = player["species"][facing][1]
await self.setPlayerPos(handle, *player["pos"])
else:
print("[DSPIRE] Unknown command from player {}:".format(player["name"]), command)
else:
print("[DSPIRE] Unknown unspawned command from player {}:".format(player["name"]), command)
async def removePlayer(self, handle):
self.players.pop(handle)
with open("dspire15/LEV01.MAP", "rb") as f:
DSInstance = DragonSpiresMap(f)
async def handleDragonSpires(reader, writer):
addr = writer.get_extra_info('peername')
print("[NET] DragonSpires connection from [{}]:{}".format(*addr))
writer.write(b"Welcome to DragonSpires.\nDragonroar!\r\nV0005\r\n")
await writer.drain()
handle = None
desc = None
color = None
while True:
buffer = b""
while True:
data = await reader.read(1)
if data == b'':
buffer = None
break
if data == b"\n" or data == b"\r":
break
buffer += data
if len(buffer) > 4096:
buffer = buffer[-4096:-1]
if buffer == None:
break
if buffer == b"":
continue
#Login
if buffer.startswith(b"connect ") or buffer.startswith(b"create "):
userdata = buffer.split(b" ")[1:]
if len(userdata) != 2:
print("[AUTH] login failure from [{}:{}]: {}".format(*addr, "(NO USER)" if len(userdata) == 0 else userdata[0].decode()))
writer.write(b"NInvalid credentials.\r\n")
await writer.drain()
continue
print("[AUTH] [{}:{}] logged in as {}".format(*addr, userdata[0].decode()))
writer.write(b"&\n")
await writer.drain()
handle = (reader, writer)
await DSInstance.addPlayer(handle, userdata[0].decode())
elif handle:
await DSInstance.playerCommand(handle, buffer)
elif buffer == b"abort":
break
if handle:
await DSInstance.removePlayer(handle)
print("[NET] [{}:{}] disconnected from DragonSpires".format(*addr))
async def InvalidCommand(writer):
writer.write(b"?Invalid command\n")
await writer.drain()
async def handleDOSBox(reader, writer):
addr = writer.get_extra_info('peername')
print("[NET] Dosbox connection from [{}]:{}".format(*addr))
while True:
buffer = b""
while True:
data = await reader.read(1)
if data == b'':
buffer = None
break
writer.write(data)
await writer.drain()
if data == b"\n" or data == b"\r":
break
buffer += data
if len(buffer) > 4096:
buffer = buffer[-4096:-1]
if buffer == None:
break
if buffer.startswith(b"telnet "):
try:
dest = buffer.decode()
except UnicodeDecodeError:
await InvalidCommand(writer)
continue
dest = dest.split(" ")[1:]
if len(dest) == 0 or len(dest) > 2:
await InvalidCommand(writer)
continue
elif len(dest) == 1:
dest.append("22")
print("[DOSBOX] Connection request to [{}]:{} from [{}]:{}".format(*dest, *addr))
if dest[0] == "boris.eden.com" and dest[1] == "7734":
print("[DOSBOX] Sending [{}]:{} to DragonSpires handler".format(*addr))
await handleDragonSpires(reader, writer)
else:
writer.write(b"?Forbidden\n")
await writer.drain()
elif buffer == b"":
continue
else:
print("[DOSBOX] Unknown command from [{}]:{}".format(*addr), buffer)
await InvalidCommand(writer)
print("[NET] [{}:{}] disconnected from dosbox".format(*addr))
await writer.wait_closed()
async def main():
serverDOSBox = await asyncio.start_server(handleDOSBox, '0.0.0.0', 7733)
addrs = ', '.join(str(sock.getsockname()) for sock in serverDOSBox.sockets)
print(f'[NET] Serving DOSBox telnet on {addrs}')
serverDragonSpires = await asyncio.start_server(handleDragonSpires, '0.0.0.0', 7734)
addrs = ', '.join(str(sock.getsockname()) for sock in serverDragonSpires.sockets)
print(f'[NET] Serving DragonSpires on {addrs}')
async with serverDOSBox, serverDragonSpires:
await asyncio.gather(
serverDOSBox.serve_forever(),
serverDragonSpires.serve_forever()
)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment