Last active
July 27, 2020 19:14
-
-
Save dotcomboom/59e4bb63ac154fbf0bcfaba3096e3fc2 to your computer and use it in GitHub Desktop.
CCSO/Ph Server in Python 3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Read-only CCSO server for Python 3.7+ by dotcomboom for somnol | |
## Originally worked on ~5/29/2019, finished 7/6/2020 | |
## What is implemented: | |
## - "status" command | |
## - "fields" command | |
## - Title-cases fields for the frontend's description | |
## - query command | |
## - reload when any command passed has "reload" in it (with cooldown) | |
## - reading entries from json | |
## If using OverbiteFF, queries must be like name="bob" at this time | |
## Sample entries.json | |
# [ | |
# { | |
# "alias": "b-smith", | |
# "name": "smith bob c.", | |
# "discord": "bsmith#0000", | |
# "email": "b-smith@example.edu" | |
# }, | |
# { | |
# "alias": "j-smith", | |
# "name": "smith john z.", | |
# "slack": "\"delete this\"", | |
# "email": "j-smith@example.edu" | |
# } | |
# ] | |
## Add as many fields as necessary | |
### Config | |
port = 105 # Typical CCSO port is 105 (as for S/Gopher, no thank you) | |
reload_cooldown = 60 # how frequently "reload" can be used in a command to reload the database | |
# (in seconds) | |
search_fields = ["alias"] # Fields that are labeled as indexable | |
filterable_fields = ["alias", "name", "discord", "email"] # When doing returned data option "selected" only these and the indexable fields can be chosen to view | |
### | |
import asyncio | |
import traceback | |
import json | |
import time | |
import re | |
encoding = "ascii" # or utf-8? | |
newline = "\r\n" | |
x = ['quit', 'stop', 'exit'] # exit commands | |
last_reload = 0 | |
database = [] | |
def reload_db(): | |
global database | |
with open('entries.json', 'r') as f: | |
database = json.load(f) | |
print('Database read from entries.json') | |
reload_db() | |
def nl(x=''): | |
return str(x) + newline | |
def to_bytes(x): | |
if isinstance(x, list): | |
return bytes(newline.join(x), encoding) | |
else: | |
return bytes(str(x), encoding) | |
class PhProtocol(asyncio.Protocol): | |
def connection_made(self, transport): | |
self.transport = transport | |
print('Connected by', transport.get_extra_info('peername')) | |
def data_received(self, data): | |
global last_reload | |
# self.transport.write(data) | |
request = data.decode('utf-8') | |
print("Client: " + request) | |
commands = request.split('\r\n') | |
print(commands) | |
#print(args[0]) | |
for cmd in commands: | |
if 'reload' in cmd: | |
if (last_reload + 60) <= time.time(): | |
reload_db() | |
last_reload = time.time() | |
else: | |
print('-- Please wait', (last_reload + reload_cooldown) - time.time(), 'seconds to reload --') | |
args = cmd.split(' ') | |
print('', args) | |
try: | |
if args[0] == 'status': | |
self.transport.write(to_bytes(nl('201:Database ready, read-only.'))) | |
elif args[0] == 'fields': | |
unique_fields = [] | |
for entry in database: | |
for field in entry: | |
if not field in unique_fields: | |
unique_fields.append(field) | |
results = [] | |
_id = 0 | |
for field in unique_fields: | |
_id += 1 | |
if field in search_fields or len(search_fields) == 0: | |
results.append('-200:' + str(_id) + ':' + field + ':max 64 Indexed Lookup Public Default') | |
results.append('-200:' + str(_id) + ':' + field + ':' + field.title()) | |
elif field in filterable_fields: | |
results.append('-200:' + str(_id) + ':' + field + ':max 64 Default') | |
results.append('-200:' + str(_id) + ':' + field + ':' + field.title()) | |
results.append(nl('200:Ok.')) | |
# for r in results: | |
# print(r) | |
resp = to_bytes(results) | |
self.transport.write(resp) | |
elif args[0] == 'query': | |
# query email="something something" return all | |
# query email="something something" return discord, email | |
# query email="something something" | |
if not 'return' in cmd: | |
cmd += " return all" | |
criteria = {} | |
matches = re.finditer(r'(\S*)="([^"]*)"', cmd) | |
for match in matches: | |
criteria[match.group(1)] = match.group(2) | |
return_fields = re.match(r'.* return (.*)', cmd).group(1).split(' ') | |
_all = False | |
if 'all' in return_fields: | |
_all = True | |
results = [] | |
entry = 0 | |
for item in database: | |
entry += 1 | |
meets_criteria = True | |
for key in criteria: | |
if key in item: | |
if not criteria[key].lower() in item[key].lower(): | |
meets_criteria = False | |
break | |
else: | |
meets_criteria = False | |
break | |
if meets_criteria: | |
for field in database[entry - 1]: | |
if field in return_fields or _all: | |
results.append('-200:' + str(entry) + ': ' + field + ': ' + database[entry - 1][field]) | |
results.append(nl('200:Ok.')) | |
for r in results: | |
print(r) | |
resp = to_bytes(results) | |
self.transport.write(resp) | |
elif args[0] in x: | |
print('Client wants to exit') | |
self.transport.write(to_bytes(nl('200:Bye!'))) | |
self.transport.close() | |
break | |
elif args[0] != "": | |
self.transport.write(to_bytes(nl('514:Unknown command.'))) | |
except Exception as e: | |
traceback.print_exc() | |
resp = to_bytes(nl("400:Server error occurred. That gets a yikes from me.")) | |
self.transport.write(resp) | |
async def main(h, p): | |
loop = asyncio.get_running_loop() | |
server = await loop.create_server(PhProtocol, h, p) | |
await server.serve_forever() | |
print('Server is now running') | |
asyncio.run(main('0.0.0.0', port)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment