Skip to content

Instantly share code, notes, and snippets.

@elpaso
Created October 5, 2017 07:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elpaso/0cd7fb6a0518dc46c45557d9133438b8 to your computer and use it in GitHub Desktop.
Save elpaso/0cd7fb6a0518dc46c45557d9133438b8 to your computer and use it in GitHub Desktop.
A simple QGIS3 WSGI server experiment
#!/usr/bin/env python
# Simple QGIS 3 Server wsgi test
import signal
import sys
from cgi import escape, parse_qs
from urllib.parse import quote
# Python's bundled WSGI server
from wsgiref.simple_server import make_server
from qgis.core import QgsApplication
from qgis.server import *
# Init QGIS
qgs_app = QgsApplication([], False)
# Init server
qgs_server = QgsServer()
def reconstruct_url(environ):
"""Standard algorithm to retrieve the full URL from wsgi request
From: https://www.python.org/dev/peps/pep-0333/#url-reconstruction
"""
url = environ['wsgi.url_scheme']+'://'
if environ.get('HTTP_HOST'):
url += environ['HTTP_HOST']
else:
url += environ['SERVER_NAME']
if environ['wsgi.url_scheme'] == 'https':
if environ['SERVER_PORT'] != '443':
url += ':' + environ['SERVER_PORT']
else:
if environ['SERVER_PORT'] != '80':
url += ':' + environ['SERVER_PORT']
url += quote(environ.get('SCRIPT_NAME', ''))
url += quote(environ.get('PATH_INFO', ''))
if environ.get('QUERY_STRING'):
url += '?' + environ['QUERY_STRING']
return url
def application (environ, start_response):
headers = {} # Parse headers from environ here if needed
# the environment variable CONTENT_LENGTH may be empty or missing
# When the method is POST the variable will be sent
# in the HTTP request body which is passed by the WSGI server
# in the file like wsgi.input environment variable.
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
request_body = environ['wsgi.input'].read(request_body_size)
except (ValueError):
request_body_size = 0
request_body = None
request = QgsBufferServerRequest(reconstruct_url(environ), (QgsServerRequest.PostMethod
if environ['REQUEST_METHOD'] == 'POST' else QgsServerRequest.GetMethod), {}, request_body)
response = QgsBufferServerResponse()
qgs_server.handleRequest(request, response)
headers_dict = response.headers()
try:
status = headers_dict['Status']
except KeyError:
status = '200 OK'
start_response(status, [(k, v) for k, v in headers_dict.items()])
return [bytes(response.body())]
# Instantiate the server
httpd = make_server (
'localhost', # The host name
8051, # A port number where to wait for the request
application # The application object name, in this case a function
)
print("Listening to http://localhost:8051 press CTRL+C to quit")
def signal_handler(signal, frame):
"""Exit QGIS cleanly"""
global qgs_app
print("\nExiting QGIS...")
qgs_app.exitQgis()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
httpd.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment