Skip to content

Instantly share code, notes, and snippets.

@davidfstr
Created August 19, 2018 04:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidfstr/b524577baebf6bb5c4769b51911210cf to your computer and use it in GitHub Desktop.
Save davidfstr/b524577baebf6bb5c4769b51911210cf to your computer and use it in GitHub Desktop.
def main():
fork_and_continue_in_child()
setup_reactor('AsyncioSelectorReactor')
run_server()
def setup_reactor(reactor_type):
if reactor_type == 'AsyncioSelectorReactor':
from twisted.internet import asyncioreactor
asyncioreactor.install()
elif reactor_type == 'SelectReactor':
pass
else:
raise ValueError('Unrecognized type of reactor: %s' % reactor_type)
# Get the current reactor
from twisted.internet import reactor
print('Reactor implementation: %r' % reactor)
def fork_and_continue_in_child():
import os
import sys
if os.fork() == 0:
# Child
pass
else:
# Parent
sys.exit(0)
def run_server():
from twisted.internet import reactor
from twisted.internet.endpoints import serverFromString
from twisted.web import server, resource
class Counter(resource.Resource):
isLeaf = True
numberRequests = 0
def render_GET(self, request):
self.numberRequests += 1
request.setHeader(b"content-type", b"text/plain")
content = u"I am request #{}\n".format(self.numberRequests)
return content.encode("ascii")
http_factory = server.Site(Counter())
def listen_success(port):
host = port.getHost()
print('Success: %s' % ((host.host, host.port),))
def listen_error(failure):
print(failure)
ep = serverFromString(reactor, 'tcp:8080')
listener = ep.listen(http_factory)
listener.addCallback(listen_success)
listener.addErrback(listen_error)
reactor.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment