Skip to content

Instantly share code, notes, and snippets.

@imbaczek
Created January 25, 2020 12:19
Show Gist options
  • Save imbaczek/e87046bb10ab8db40de909f2d44aab07 to your computer and use it in GitHub Desktop.
Save imbaczek/e87046bb10ab8db40de909f2d44aab07 to your computer and use it in GitHub Desktop.
Tailing websocket server for Python using websockets module
# OSI License ID: Apache-2.0
#
# Copyright 2020 Marek Baczyński
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
import logging
import os
import time
from collections import deque
import websockets
log = logging.getLogger(__name__)
async def tailserver(websocket, path):
this_module_dir = os.path.dirname(os.path.abspath(os.getcwd()))
async for message in websocket:
t0 = time.perf_counter()
try:
path = "." + path
path_dir = os.path.abspath(os.path.dirname(path))
if not path_dir.startswith(this_module_dir):
await websocket.send(f'ERROR: not a file: {path}')
continue
if not os.path.isfile(path):
await websocket.send(f'ERROR: not a file: {path}')
continue
try:
lines_to_get = int(message)
except ValueError:
await websocket.send('ERROR: not an int')
continue
with open(path) as f:
buf = deque(f, maxlen=lines_to_get)
resp = ''.join(buf)
await websocket.send(resp)
t1 = time.perf_counter()
client_addr, client_port = websocket.remote_address[:2]
log.info('%s:"%s" %.5fs %s %s:%s', lines_to_get, path, t1 - t0, len(resp), client_addr, client_port)
except Exception as e:
log.exception('Exception when serving request for %s %s', lines_to_get, path)
await websocket.send(f'ERROR: internal error: {e}')
def main():
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('websockets.server')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
asyncio.get_event_loop().run_until_complete(
websockets.serve(tailserver, 'localhost', 8765))
asyncio.get_event_loop().run_forever()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment