Last active
June 23, 2023 03:49
-
-
Save FelixWolf/2d30015b03b46559a9905f389c9e13af to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import math | |
def b95encode(i, size = None): | |
out = b"" | |
while i > 0: | |
out = bytes([i % 95 + 32]) + out | |
i = i // 95 | |
#Pad if needed | |
if size: | |
out = out[:size] | |
if len(out) < size: | |
out = b" "*(size - len(out)) + out | |
return out | |
def b95decode(data): | |
out = 0 | |
for c in data: | |
if c >= 32 and c < 127: | |
out = (out * 95) + (c - 32) | |
else: | |
raise ValueError("Invalid base95 character!") | |
return out | |
class Sounds: | |
pop = 0 | |
bouncy = 1 | |
sploit = 2 | |
swish2 = 3 #Swing | |
clank = 4 | |
breathe2 = 5 | |
entity = 6 #Magic | |
terrier = 7 #Bark | |
pom = 8 #Yip | |
class Colors: | |
black = "a" | |
darkblue = "b" | |
blue = "c" | |
olive = "d" | |
lime = "e" | |
grey = "f" | |
deepred = "g" | |
lightred = "h" | |
purple = "i" | |
lavender = "j" | |
bronze = "k" | |
gold = "l" | |
silver = "m" | |
white = "n" | |
clear = "o" | |
class Sounds: | |
pop = 0 | |
bouncy = 1 | |
sploit = 2 | |
swish2 = 3 #Swing | |
clank = 4 | |
breathe2 = 5 | |
entity = 6 #Magic | |
terrier = 7 #Bark | |
pom = 8 #Yip | |
class Objects: | |
empty = 0 | |
scroll = 1 | |
armor = 2 | |
stone = 3 | |
bottle = 4 | |
sword1 = 5 | |
pouch = 6 | |
whitemark = 7 | |
funball = 8 | |
table = 9 | |
bigfish = 10 | |
chalice = 11 | |
sword2 = 12 | |
bush = 13 | |
tree = 14 | |
chest = 15 | |
pop = 16 | |
throne = 17 | |
chair = 18 | |
bone = 19 | |
smallfish = 20 | |
sign = 21 | |
pillar = 22 | |
statue = 23 | |
mirror = 24 | |
stoplight3 = 63 | |
stoplight2 = 64 | |
stoplight1 = 65 | |
playerSpecies = { | |
1: [41, 42, 43, 42], | |
3: [46, 47, 48, 47], | |
7: [51, 52, 53, 52], | |
9: [56, 57, 58, 57], | |
"actions": { | |
"swing": { | |
1: [44, 45], | |
3: [49, 50], | |
7: [54, 55], | |
9: [59, 60], | |
}, | |
"get": [61, 62] | |
} | |
} | |
dogSpecies = { | |
1: [25, 26, 27, 26], | |
3: [29, 30, 31, 30], | |
7: [33, 34, 35, 34], | |
9: [37, 38, 39, 38], | |
"actions": { | |
"bark": { | |
1: [28], | |
3: [32], | |
7: [36], | |
9: [40], | |
} | |
} | |
} | |
def movePosSW(x, y, n = 1): | |
for i in range(n): | |
y += 1 | |
if y & 1 == 0: | |
x -= 1 | |
return [x, y] | |
def movePosSE(x, y, n = 1): | |
for i in range(n): | |
y += 1 | |
if y & 1 == 1: | |
x += 1 | |
return [x, y] | |
def movePosNW(x, y, n = 1): | |
for i in range(n): | |
y -= 1 | |
if y & 1 == 0: | |
x -= 1 | |
return [x, y] | |
def movePosNE(x, y, n = 1): | |
for i in range(n): | |
y -= 1 | |
if y & 1 == 1: | |
x += 1 | |
return [x, y] | |
#TODO: make this non-static because it annoys me | |
tileLookup = [ | |
1,1,1,1,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, | |
0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,1,1,0,1,0,0,0,0,1,1,1, | |
0,1,0,0,0,0,0,0,1,1,1,0,0,1 | |
] | |
class DragonSpiresMap(): | |
def __init__(self, mapdata): | |
self.players = {} | |
self.width = 52 | |
self.height = 100 | |
self.tiles = [0] * (self.width * self.height) | |
self.items = [0] * (self.width * self.height) | |
for i in range(self.width*self.height): | |
dt = mapdata.read(2) | |
self.tiles[i] = dt[0] | dt[1] << 8 | |
for i in range(self.width*self.height): | |
dt = mapdata.read(2) | |
self.items[i] = dt[0] | dt[1] << 8 | |
async def addPlayer(self, handle, name): | |
self.players[handle] = { | |
"name": name, | |
"desc": " ", | |
"color": b" ", | |
"spawned": False, | |
"pos": [0,0], | |
"species": playerSpecies, | |
"sprite": 41 | |
} | |
def getItemAt(self, x, y): | |
return self.items[x * self.height + y] | |
def getFloorAt(self, x, y): | |
return self.tiles[x * self.height + y] | |
async def sendMapUpdate(self, handle): | |
#TODO: Add object updates | |
handle[1].write(b"~\r\n") | |
for player in self.players: | |
if self.players[player]["spawned"] == False: | |
continue | |
handle[1].write(b"<" | |
+self.players[player]["color"] | |
+b95encode(self.players[player]["pos"][0]) | |
+b95encode(self.players[player]["pos"][1]) | |
+b95encode(self.players[player]["sprite"]) | |
+b"\r\n" | |
) | |
handle[1].write(b"=\r\n") | |
await handle[1].drain() | |
async def setPlayerPos(self, handle, x, y): | |
handle[1].write(b"@"+b95encode(x)+b95encode(y)+b"\r\n") | |
await handle[1].drain() | |
item = self.getItemAt(self.players[handle]["pos"][0], self.players[handle]["pos"][1]) | |
oldPosData = b95encode(self.players[handle]["pos"][0])+b95encode(self.players[handle]["pos"][1])+b95encode(item, 1) | |
newPosData = self.players[handle]["color"]+b95encode(x)+b95encode(y)+b95encode(self.players[handle]["sprite"]) | |
for player in self.players: | |
player[1].write(b"~\r\n") | |
player[1].write(b">"+oldPosData+b"\r\n") | |
player[1].write(b"<"+newPosData+b"\r\n") | |
player[1].write(b"=\r\n") | |
await player[1].drain() | |
self.players[handle]["pos"] = [x, y] | |
def checkCanMove(self, x, y): | |
if x < 0 or x > 95 or y < 0 or y > 95: | |
return False | |
if tileLookup[self.getFloorAt(x, y)] == 0: | |
return False | |
for player in self.players: | |
player = self.players[player] | |
if player["pos"][0] == x and player["pos"][1] == y: | |
return False | |
return True | |
async def movePlayer(self, handle, direction): | |
player = self.players[handle] | |
current = player["sprite"] | |
if current not in player["species"][direction]: | |
current = player["species"][direction][0] | |
i = player["species"][direction].index(current) + 1 | |
if i >= len(player["species"][direction]): | |
i = 0 | |
player["sprite"] = player["species"][direction][i] | |
if direction == 1: | |
pos = movePosSW(*player["pos"]) | |
elif direction == 3: | |
pos = movePosSE(*player["pos"]) | |
elif direction == 7: | |
pos = movePosNW(*player["pos"]) | |
elif direction == 9: | |
pos = movePosNE(*player["pos"]) | |
if not self.checkCanMove(*pos): | |
pos = player["pos"] | |
await self.setPlayerPos(handle, *pos) | |
async def setPlayerEndurance(self, handle, v): | |
handle[1].write(b"#" + bytes([97 + math.floor(v/26), 97 + (v%26)]) + b"\r\n") | |
await handle[1].drain() | |
async def setPlayerLife(self, handle, v): | |
handle[1].write(b"$" + str(v).encode() + b"\r\n") | |
await handle[1].drain() | |
async def playSound(self, handle, v): | |
handle[1].write(b"!" + b95encode(v) + b"\r\n") | |
await handle[1].drain() | |
async def setPlayerHeldItem(self, handle, v): | |
handle[1].write(b"^" + b95encode(v) + b"\r\n") | |
await handle[1].drain() | |
async def setPlayerFeetItem(self, handle, v): | |
handle[1].write(b"%" + b95encode(v) + b"\r\n") | |
await handle[1].drain() | |
async def sendMessage(self, handle, msg): | |
handle[1].write(b"(" + msg.encode() + b"\r\n") | |
await handle[1].drain() | |
async def sendLook(self, handle): | |
handle[1].write(b"_\r\n") | |
await handle[1].drain() | |
async def playerCommand(self, handle, command): | |
player = self.players[handle] | |
tmp = command.split(b" ", 1) | |
if tmp[0] == b"desc" and len(tmp) > 1: | |
player["desc"] = tmp[1] | |
elif tmp[0] == b"color" and len(tmp) > 1: | |
player["color"] = tmp[1] | |
if not player["spawned"]: | |
player["spawned"] = True | |
await self.setPlayerPos(handle, 33, 15) | |
await self.setPlayerLife(handle, 3) | |
await self.setPlayerEndurance(handle, 100) | |
await self.sendMapUpdate(handle) | |
elif tmp[0] == b"who": | |
await self.sendMessage(handle, "Current players: {}".format(", ".join([ | |
self.players[player]["name"] for player in self.players | |
]))) | |
elif tmp[0] == b"abort": | |
pass | |
elif player["spawned"]: | |
if command.startswith(b"\""): | |
msg = "{}: {}".format(player["name"], command[1:].decode()) | |
print("[CHAT]", msg) | |
for other in self.players: | |
await self.sendMessage(other, msg) | |
elif command.startswith(b":"): | |
msg = "{}{}".format(player["name"], command[1:].decode()) | |
print("[CHAT]", msg) | |
for other in self.players: | |
await self.sendMessage(other, msg) | |
elif tmp[0] == b"wh": | |
tmp = tmp[1].split(b" ", 1) | |
if len(tmp) == 1: | |
return | |
tmp[0] = tmp[0].decode().replace("|"," ").lower() | |
for other in self.players: | |
if self.players[other]["name"].lower() == tmp[0]: | |
await self.sendMessage(other, "[ {} whispers \"{}\" to you. ]".format(player["name"], tmp[1])) | |
await self.sendMessage(other, "[ You whisper \"{}\" to {}. ]".format(tmp[1], tmp[0])) | |
return | |
await self.sendMessage(handle, "Sorry, there are no players around named {}.".format(tmp[0])) | |
elif tmp[0] == b"look": | |
await self.sendMessage(handle, "Look not implemented yet!") | |
elif tmp[0] == b"swing": | |
await self.sendMessage(handle, "Swing not implemented yet!") | |
await self.sendUnknown(handle) | |
elif tmp[0] == b"get": | |
await self.sendMessage(handle, "Get not implemented yet!") | |
elif tmp[0] == b"m": | |
if tmp[1] == b"1": | |
await self.movePlayer(handle, 1) | |
elif tmp[1] == b"3": | |
await self.movePlayer(handle, 3) | |
elif tmp[1] == b"7": | |
await self.movePlayer(handle, 7) | |
elif tmp[1] == b"9": | |
await self.movePlayer(handle, 9) | |
elif tmp[0] == b">": | |
facing = 1 | |
for i in (1,3,7,9): | |
if player["sprite"] in player["species"][i]: | |
facing = i | |
break | |
if facing == 1: | |
facing = 3 | |
elif facing == 3: | |
facing = 9 | |
elif facing == 9: | |
facing = 7 | |
elif facing == 7: | |
facing = 1 | |
player["shape"] = player["species"][facing][1] | |
await self.setPlayerPos(handle, *player["pos"]) | |
elif tmp[0] == b"<": | |
facing = 1 | |
for i in (1,3,7,9): | |
if player["sprite"] in player["species"][i]: | |
facing = i | |
break | |
if facing == 1: | |
facing = 7 | |
elif facing == 7: | |
facing = 9 | |
elif facing == 9: | |
facing = 3 | |
elif facing == 3: | |
facing = 1 | |
player["shape"] = player["species"][facing][1] | |
await self.setPlayerPos(handle, *player["pos"]) | |
else: | |
print("[DSPIRE] Unknown command from player {}:".format(player["name"]), command) | |
else: | |
print("[DSPIRE] Unknown unspawned command from player {}:".format(player["name"]), command) | |
async def removePlayer(self, handle): | |
self.players.pop(handle) | |
with open("dspire15/LEV01.MAP", "rb") as f: | |
DSInstance = DragonSpiresMap(f) | |
async def handleDragonSpires(reader, writer): | |
addr = writer.get_extra_info('peername') | |
print("[NET] DragonSpires connection from [{}]:{}".format(*addr)) | |
writer.write(b"Welcome to DragonSpires.\nDragonroar!\r\nV0005\r\n") | |
await writer.drain() | |
handle = None | |
desc = None | |
color = None | |
while True: | |
buffer = b"" | |
while True: | |
data = await reader.read(1) | |
if data == b'': | |
buffer = None | |
break | |
if data == b"\n" or data == b"\r": | |
break | |
buffer += data | |
if len(buffer) > 4096: | |
buffer = buffer[-4096:-1] | |
if buffer == None: | |
break | |
if buffer == b"": | |
continue | |
#Login | |
if buffer.startswith(b"connect ") or buffer.startswith(b"create "): | |
userdata = buffer.split(b" ")[1:] | |
if len(userdata) != 2: | |
print("[AUTH] login failure from [{}:{}]: {}".format(*addr, "(NO USER)" if len(userdata) == 0 else userdata[0].decode())) | |
writer.write(b"NInvalid credentials.\r\n") | |
await writer.drain() | |
continue | |
print("[AUTH] [{}:{}] logged in as {}".format(*addr, userdata[0].decode())) | |
writer.write(b"&\n") | |
await writer.drain() | |
handle = (reader, writer) | |
await DSInstance.addPlayer(handle, userdata[0].decode()) | |
elif handle: | |
await DSInstance.playerCommand(handle, buffer) | |
elif buffer == b"abort": | |
break | |
if handle: | |
await DSInstance.removePlayer(handle) | |
print("[NET] [{}:{}] disconnected from DragonSpires".format(*addr)) | |
async def InvalidCommand(writer): | |
writer.write(b"?Invalid command\n") | |
await writer.drain() | |
async def handleDOSBox(reader, writer): | |
addr = writer.get_extra_info('peername') | |
print("[NET] Dosbox connection from [{}]:{}".format(*addr)) | |
while True: | |
buffer = b"" | |
while True: | |
data = await reader.read(1) | |
if data == b'': | |
buffer = None | |
break | |
writer.write(data) | |
await writer.drain() | |
if data == b"\n" or data == b"\r": | |
break | |
buffer += data | |
if len(buffer) > 4096: | |
buffer = buffer[-4096:-1] | |
if buffer == None: | |
break | |
if buffer.startswith(b"telnet "): | |
try: | |
dest = buffer.decode() | |
except UnicodeDecodeError: | |
await InvalidCommand(writer) | |
continue | |
dest = dest.split(" ")[1:] | |
if len(dest) == 0 or len(dest) > 2: | |
await InvalidCommand(writer) | |
continue | |
elif len(dest) == 1: | |
dest.append("22") | |
print("[DOSBOX] Connection request to [{}]:{} from [{}]:{}".format(*dest, *addr)) | |
if dest[0] == "boris.eden.com" and dest[1] == "7734": | |
print("[DOSBOX] Sending [{}]:{} to DragonSpires handler".format(*addr)) | |
await handleDragonSpires(reader, writer) | |
else: | |
writer.write(b"?Forbidden\n") | |
await writer.drain() | |
elif buffer == b"": | |
continue | |
else: | |
print("[DOSBOX] Unknown command from [{}]:{}".format(*addr), buffer) | |
await InvalidCommand(writer) | |
print("[NET] [{}:{}] disconnected from dosbox".format(*addr)) | |
await writer.wait_closed() | |
async def main(): | |
serverDOSBox = await asyncio.start_server(handleDOSBox, '0.0.0.0', 7733) | |
addrs = ', '.join(str(sock.getsockname()) for sock in serverDOSBox.sockets) | |
print(f'[NET] Serving DOSBox telnet on {addrs}') | |
serverDragonSpires = await asyncio.start_server(handleDragonSpires, '0.0.0.0', 7734) | |
addrs = ', '.join(str(sock.getsockname()) for sock in serverDragonSpires.sockets) | |
print(f'[NET] Serving DragonSpires on {addrs}') | |
async with serverDOSBox, serverDragonSpires: | |
await asyncio.gather( | |
serverDOSBox.serve_forever(), | |
serverDragonSpires.serve_forever() | |
) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment