Skip to content

Instantly share code, notes, and snippets.

@hashbrowncipher
Last active September 21, 2018 09:00
Show Gist options
  • Save hashbrowncipher/33e7f70c8929d8ba47728800bb30f852 to your computer and use it in GitHub Desktop.
Save hashbrowncipher/33e7f70c8929d8ba47728800bb30f852 to your computer and use it in GitHub Desktop.
Multiprocess WSGI server using gevent==1.3.6, demonstrating sendfile
#!/usr/bin/python3
"""
Copyright 2018 Josh Snyder
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from gevent.monkey import patch_all
patch_all()
from errno import EAGAIN
from os import WIFSIGNALED
from socket import SOL_SOCKET
from socket import SO_REUSEADDR
from socket import SO_REUSEPORT
from socket import socket
import os
import sys
from gevent import pywsgi
from gevent.socket import wait_write
from gevent.queue import Channel
import gevent
def cooperative_sendfile(out_fh, in_fh, count):
while count > 0:
try:
sent = os.sendfile(out_fh, in_fh, None, count)
if sent == 0:
raise EOFError('Reached EOF with {} remaining bytes'.format(count))
count -= sent
except OSError as e:
if e.args[0] == errno.EAGAIN:
wait_write(out_fh)
else:
raise
class FileChunk:
def __init__(self, size):
self._size = size
def __len__(self):
return self._size
class FileWrapper(object):
__len__ = None
def __init__(self, fh):
self._fh = fh
self._size = os.stat(fh.fileno()).st_size
def __iter__(self):
yield FileChunk(self._size)
def write(self, socket):
cooperative_sendfile(socket.fileno(), self._fh.fileno(), self._size)
class WSGIHandler(pywsgi.WSGIHandler):
def process_result(self):
if not isinstance(self.result, FileWrapper):
return super().process_result()
self._write_with_headers(b'')
self.result.write(self.socket)
class WSGIServer(pywsgi.WSGIServer):
handler_class = WSGIHandler
def set_environ(self, environ=None):
super().set_environ(environ)
if self.environ.get('wsgi.file_wrapper') is None:
self.environ['wsgi.file_wrapper'] = FileWrapper
def application(environ, start_response):
if environ['PATH_INFO'] == '/':
start_response('200 OK', [('Content-Type', 'text/plain')])
return FileWrapper(open('multicore_server.py', 'rb'))
start_response('404 Not Found', [('Content-Type', 'text/html')])
return [b'<h1>Not Found</h1>']
def listen(address, backlog=32):
"""Create a TCP socket, bind it and listen on it.
This represents a tradeoff. We could have created the listen socket in the
master and forked it into each child process, but doing so would cause
all of the processes to wake up each time a new connection is formed.
Only one of them would be able to successfully accept().
One solution is to use SO_REUSEPORT, as below.
Another would be to use EPOLLEXCLUSIVE.
"""
ret = socket()
ret.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
ret.setsockopt(SOL_SOCKET, SO_REUSEPORT, True)
ret.bind(address)
ret.listen(backlog)
return ret
def serve(identifier, port):
sock = listen(('0.0.0.0', port))
server = WSGIServer(sock, application)
server.serve_forever()
deathpipe = None
def die_on_eof(fd):
try:
while os.read(fd, 4096):
pass
except:
pass
sys.exit(1)
def wait_on_process(pid, done_queue):
done_queue.put(os.waitpid(pid, 0))
def spawn_process(done_queue, target, args=()):
global deathpipe
if deathpipe is None:
deathpipe = os.pipe()
pid = os.fork()
if pid:
gevent.spawn(wait_on_process, pid, done_queue)
return
sys.stdin.close()
os.close(0)
os.close(deathpipe[1])
gevent.spawn(die_on_eof, deathpipe[0])
return target(*args)
def main():
port = int(sys.argv[1])
done_queue = Channel()
def launch_process():
spawn_process(done_queue, serve, args=(i, port))
for i in range(20):
launch_process()
for pid, status in done_queue:
if WIFSIGNALED(status):
print('Process {} exited with signal {}'.format(pid, status))
launch_process()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment