Skip to content

Instantly share code, notes, and snippets.

@matteobertozzi
Created May 11, 2024 20:11
Show Gist options
  • Save matteobertozzi/31767b48745c9c36675266cbcfb94456 to your computer and use it in GitHub Desktop.
Save matteobertozzi/31767b48745c9c36675266cbcfb94456 to your computer and use it in GitHub Desktop.
HTTP Long Tasks
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Handling Long Tasks in terms of execution times and large uploads or downloads with HTTP
# - If the endpoint execution is fast and there is a single chunk to download,
# the behaviour is the same as a normal http req/resp. (/fast/small example)
# - If the endpoint execution is fast and there are multiple chunks to download (/fast/large example)
# the response will contain an header "X-Task-HasMore-Chunks: True" which means there is more data to fetch.
# to fetch the data you need the Task ExecutionId that you can find in the response header "X-Task-ExecutionId".
# the endpoint /runtime/download?executionId=ID&chunkId=N will return the data for the specific executionId/chunkId.
# - If the endpoint execution is slow, instead of receiving the response back you'll get a "202 Accepted"
# with an ExecutionId that will be used to poll the result.
# You can use the /runtime/download with chunkId=0 or you can use the /runtime/poll
# If the task is not yet comlpeted you'll get back a "202 Accepted" with state WAITING
# and once ready you'll get back the response. (example /slow/large)
# - If the endpoint takes a large input you can chunk the upload
# the first call to the endpoint will specify the header "X-Task-HasMore-Chunks: True"
# and it will get back a "202 Accepted" with the ExecutionId.
# Using /runtime/upload?executionId=ID&chunkId=N you can upload the chunks and remember the proper "X-Task-HasMore-Chunks".
# Once all the chunks are available the server will start the execution,
# and you'll receive the direct response if the execution is fast, or a "202 Accepted" if the execution is still in progress.
# -------------------------------------------------------------------------------------------------------------------
# Long Tasks Demo Nov 2021
# -------------------------------------------------------------------------------------------------------------------
# Simulate a normal fast call with a single chunk result
# curl -v http://localhost:57025/fast/small
# Simulate a fast task with a large number of chunks to download
# curl -v http://localhost:57025/fast/large
# curl -v "http://localhost:57025/runtime/download?executionId=71cd1ca0-7fca-4ab0-95ce-38dc512a58fa&chunkId=1"
# Simulate a slow task with a small number or no of chunks to download
# curl -v http://localhost:57025/slow/small
# curl -v "http://localhost:57025/runtime/poll?executionId=bcee18e7-e600-48a9-9ee1-2b6185e14677"
# Simulate a slow task with a large numer of chunks to download
# curl -v http://localhost:57025/slow/large
# curl -v "http://localhost:57025/runtime/poll?executionId=b1a4feda-e89d-46d1-a070-cb4bd92751fe"
# curl -v "http://localhost:57025/runtime/download?executionId=b1a4feda-e89d-46d1-a070-cb4bd92751fe&chunkId=1"
# Simulate a fast task with 3 chunks to upload and a small number or no of chunks to download
# curl -v http://localhost:57025/fast/small -H "X-Task-HasMore-Chunks: true" -d '[1, 2, 3]'
# curl -v "http://localhost:57025/runtime/upload?executionId=9e50ac92-4c19-4173-997a-f716391d074a&chunkId=1" -H "X-Task-HasMore-Chunks: true" -d '[4, 5, 6]'
# curl -v "http://localhost:57025/runtime/upload?executionId=9e50ac92-4c19-4173-997a-f716391d074a&chunkId=3" -H "X-Task-HasMore-Chunks: false" -d '[10, 11, 12]'
# curl -v "http://localhost:57025/runtime/upload?executionId=9e50ac92-4c19-4173-997a-f716391d074a&chunkId=2" -H "X-Task-HasMore-Chunks: true" -d '[7, 8, 9]'
from collections.abc import Iterable
from http.server import SimpleHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
from socketserver import TCPServer
from random import randint
from uuid import uuid4
from time import time
import logging
import json
class ExecutionData:
def __init__(self, path: str) -> None:
self.execution_id = str(uuid4())
self.start_time = time()
self.update_time = self.start_time
self.path = path
self.chunks = {}
self.last_chunk = None
# dummy function result
if path.endswith('/large'):
self.download_chunks = randint(2, 8)
else:
self.download_chunks = randint(0, 1)
def add_chunk(self, chunk_id: int, chunk: bytes, has_more: bool) -> None:
logger = logging.getLogger('state-store')
self.update_time = time()
logger.info('execId:%s - add chunk chunkId:%d hasMore:%d length:%d', self.execution_id, chunk_id, has_more, len(chunk))
if not has_more:
self.last_chunk = chunk_id
self.chunks[chunk_id] = True
def is_ready(self) -> bool:
logger = logging.getLogger('state-store')
if self.last_chunk is None:
logger.debug('execId:%s - last chunk not yet available', self.execution_id)
return False
for chunk_id in range(self.last_chunk):
if not self.chunks.get(chunk_id):
logger.debug('execId:%s - chunk %d not yet available', self.execution_id, chunk_id)
return False
# dummy function fast/slow simulation
if self.path.startswith('/fast/'):
return True
elapsed = time() - self.update_time
logger.debug('execId:%s - the function takes 20sec, %.2felapsed', self.execution_id, elapsed)
return elapsed > 20
class TaskRuntime:
def __init__(self) -> None:
self.state_store = {}
def add(self, execution_data: ExecutionData) -> None:
self.state_store[execution_data.execution_id] = execution_data
def get_state(self, execution_id: str) -> ExecutionData:
return self.state_store.get(execution_id)
task_runtime = TaskRuntime()
class HttpHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.trace_id = str(uuid4())
super().__init__(*args, **kwargs)
def do_GET(self):
self._do_exec()
def do_POST(self):
self._do_exec()
def _do_exec(self) -> None:
logger = logging.getLogger('http-handler')
try:
uri = urlparse(self.path)
query = parse_qs(uri.query)
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
logger.debug('received request: %s - query: %s - content-length:%d', uri.path, query, content_length)
# dummy endpoints & /runtime endpoinds
if uri.path in ('/slow/small', '/slow/large', '/fast/small', '/fast/large'):
self.runtime_exec(uri.path, body)
elif uri.path == '/runtime/poll':
self.runtime_poll(query)
elif uri.path == '/runtime/upload':
self.runtime_upload(query, body)
elif uri.path == '/runtime/download':
self.runtime_download(query)
else:
self.respond_not_found()
except Exception as e:
logger.error('unhandled exception during execution: %s', e, stack_info=True)
self.respond_internal_server_error()
# -----------------------------------------------------------------------------------------------
# Runtime Handlers
# -----------------------------------------------------------------------------------------------
def runtime_poll(self, query: dict[str, list[str]]) -> None:
params = self.verify_query_args_avail(query, ['executionId'])
if not params: return
execution_data = self._get_ready_execution_data(params['executionId'][0])
if not execution_data: return
self._fetch_first_page(execution_data)
def runtime_upload(self, query: dict[str, list[str]], body: bytes) -> None:
params = self.verify_query_args_avail(query, ('executionId', 'chunkId'))
if not params: return
executor = self._get_execution_data(params['executionId'][0])
if not executor:
return
chunk_id = int(params['chunkId'][0])
has_more = self.headers.get('X-Task-HasMore-Chunks', 'False').lower() == 'true'
executor.add_chunk(chunk_id, body, has_more)
if executor.is_ready():
self._fetch_first_page(executor)
else:
self.respond_json(202, {'executionId': executor.execution_id, 'state': 'WAITING'})
def runtime_download(self, query: dict[str, list[str]]) -> None:
params = self.verify_query_args_avail(query, ('executionId', 'chunkId'))
if not params: return
executor = self._get_ready_execution_data(params['executionId'][0])
if not executor:
return
chunk_id = int(params['chunkId'][0])
if chunk_id < executor.download_chunks:
has_more = (chunk_id + 1) < executor.download_chunks
self.respond_json(200, {'result': list(range(chunk_id * 10, (chunk_id + 1) * 10))}, {
'X-Task-HasMore-Chunks': has_more,
'X-Task-ExecutionId': executor.execution_id
})
else:
self.respond_status(404, 'CHUNK_NOT_FOUND', 'chunk not found')
def runtime_exec(self, path: str, body: bytes):
executor = ExecutionData(path)
has_more = self.headers.get('X-Task-HasMore-Chunks', 'False').lower() == 'true'
executor.add_chunk(0, body, has_more)
task_runtime.add(executor)
if executor.is_ready():
self._fetch_first_page(executor)
else:
self.respond_json(202, {'executionId': executor.execution_id, 'state': 'WAITING'})
# -----------------------------------------------------------------------------------------------
# Runtime Helpers
# -----------------------------------------------------------------------------------------------
def _get_execution_data(self, execution_id: str) -> ExecutionData:
executor = task_runtime.get_state(execution_id)
if not executor:
self.respond_status(400, 'INVALID_EXECUTION_ID', 'invalid executionId')
return None
return executor
def _get_ready_execution_data(self, execution_id: str) -> ExecutionData:
executor = self._get_execution_data(execution_id)
if not executor:
return None
if not executor.is_ready():
self.respond_json(202, {'executionId': executor.execution_id, 'state': 'WAITING'})
return None
return executor
def _fetch_first_page(self, execution_data: ExecutionData):
if execution_data.download_chunks > 0:
has_more = execution_data.download_chunks > 1
self.respond_json(200, {'result': list(range(0, 10))}, {
'X-Task-HasMore-Chunks': has_more,
'X-Task-ExecutionId': execution_data.execution_id
})
else:
self.respond_json(204, b'')
# -----------------------------------------------------------------------------------------------
# HTTP Helpers
# -----------------------------------------------------------------------------------------------
def respond_not_found(self):
self.respond_status(404, 'NOT_FOUND', 'page not found')
def respond_internal_server_error(self):
self.respond_status(500, 'INTERNAL_SERVER_ERROR', 'internal server error')
def respond_status(self, http_code: int, status: str, message: str):
self.respond_json(http_code, {'status': status, 'message': message, 'traceId': self.trace_id})
def respond_json(self, http_code: int, json_data, headers=None):
content = json.dumps(json_data).encode('utf-8') if json_data else b''
self.send_response(http_code)
self.send_header('content-type', 'application/json')
self.send_header('content-length', len(content))
if headers:
for key, value in headers.items():
self.send_header(key, value)
self.end_headers()
self.wfile.write(content)
self.wfile.flush()
# -----------------------------------------------------------------------------------------------
# Param Utils
# -----------------------------------------------------------------------------------------------
def verify_query_args_avail(self, query: dict[str, list[str]], keys: Iterable[str]):
params = {}
for k in keys:
values = query.get(k)
if not values or len(values) == 0:
self.respond_status(400, 'VERIFY_ARG', k + ' is missing')
return None
params[k] = values
return params
def get_query_param(self, query: dict[str, list[str]], key: str):
values = query.get(key)
if values and len(values) == 1:
return values[0]
return None
class ReuseAddrTCPServer(TCPServer):
allow_reuse_address = True
if __name__ == '__main__':
log_format = '%(asctime)-15s %(levelname)s %(name)-8s %(filename)s %(funcName)s():%(lineno)s - %(message)s'
logging.basicConfig(level=logging.DEBUG, format=log_format)
root_logger = logging.getLogger('')
PORT = 57025
with ReuseAddrTCPServer(("", PORT), HttpHandler) as httpd:
try:
print("serving at port", PORT)
httpd.serve_forever()
except KeyboardInterrupt:
print("Keyboard interrupt received, exiting.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment