Skip to content

Instantly share code, notes, and snippets.

@sonthonaxrk
Last active April 5, 2020 14:03
Show Gist options
  • Save sonthonaxrk/254f61239e4683bc1dd8c12310f36dcb to your computer and use it in GitHub Desktop.
Save sonthonaxrk/254f61239e4683bc1dd8c12310f36dcb to your computer and use it in GitHub Desktop.
import re
import pampy
S3_MATCHING_RULES = {
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?restore"): {
"restore_object": {
"header": {},
"method": "POST",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
}
},
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?select&select-type=2"): {
"select_object_content": {
"header": {},
"method": "POST",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
}
},
re.compile("/(?P<Bucket>([^\\/]+))\\?acl"): {
"get_bucket_acl": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_acl": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?legal-hold"): {
"get_object_legal_hold": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
"put_object_legal_hold": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?cors"): {
"delete_bucket_cors": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str},
},
"get_bucket_cors": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_cors": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?uploads"): {
"list_multipart_uploads": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
}
},
re.compile("/(?P<Bucket>([^\\/]+))\\?logging"): {
"get_bucket_logging": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_logging": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?website"): {
"delete_bucket_website": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str},
},
"get_bucket_website": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_website": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?tagging"): {
"delete_bucket_tagging": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str},
},
"get_bucket_tagging": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_tagging": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?metrics"): {
"delete_bucket_metrics_configuration": {
"header": {},
"method": "DELETE",
"querystring": {"id": str},
"uri": {"Bucket": str},
},
"get_bucket_metrics_configuration": {
"header": {},
"method": "GET",
"querystring": {"id": str},
"uri": {"Bucket": str},
},
"list_bucket_metrics_configurations": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_metrics_configuration": {
"header": {},
"method": "PUT",
"querystring": {"id": str},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))"): {
"abort_multipart_upload": {
"header": {},
"method": "DELETE",
"querystring": {"uploadId": str},
"uri": {"Bucket": str, "Key": str},
},
"complete_multipart_upload": {
"header": {},
"method": "POST",
"querystring": {"uploadId": str},
"uri": {"Bucket": str, "Key": str},
},
"copy_object": {
"header": {"x-amz-copy-source": str},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
"delete_object": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
"get_object": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
"head_object": {
"header": {},
"method": "HEAD",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
"list_parts": {
"header": {},
"method": "GET",
"querystring": {"uploadId": str},
"uri": {"Bucket": str, "Key": str},
},
"put_object": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
"upload_part": {
"header": {},
"method": "PUT",
"querystring": {"partNumber": str, "uploadId": str},
"uri": {"Bucket": str, "Key": str},
},
"upload_part_copy": {
"header": {"x-amz-copy-source": str},
"method": "PUT",
"querystring": {"partNumber": str, "uploadId": str},
"uri": {"Bucket": str, "Key": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?requestPayment"): {
"get_bucket_request_payment": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_request_payment": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?publicAccessBlock"): {
"delete_public_access_block": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str},
},
"get_public_access_block": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_public_access_block": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?retention"): {
"get_object_retention": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
"put_object_retention": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?location"): {
"get_bucket_location": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
}
},
re.compile("/(?P<Bucket>([^\\/]+))\\?versions"): {
"list_object_versions": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
}
},
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?torrent"): {
"get_object_torrent": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
}
},
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?tagging"): {
"delete_object_tagging": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
"get_object_tagging": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
"put_object_tagging": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?uploads"): {
"create_multipart_upload": {
"header": {},
"method": "POST",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
}
},
re.compile("/(?P<Bucket>([^\\/]+))\\?policyStatus"): {
"get_bucket_policy_status": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
}
},
re.compile("/(?P<Bucket>([^\\/]+))\\?notification"): {
"get_bucket_notification": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"get_bucket_notification_configuration": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_notification": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_notification_configuration": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))"): {
"create_bucket": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
"delete_bucket": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str},
},
"head_bucket": {
"header": {},
"method": "HEAD",
"querystring": {},
"uri": {"Bucket": str},
},
"list_objects": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/"): {
"list_buckets": {"header": {}, "method": "GET", "querystring": {}, "uri": {}}
},
re.compile("/(?P<Bucket>([^\\/]+))\\?lifecycle"): {
"delete_bucket_lifecycle": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str},
},
"get_bucket_lifecycle": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"get_bucket_lifecycle_configuration": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_lifecycle": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_lifecycle_configuration": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?inventory"): {
"delete_bucket_inventory_configuration": {
"header": {},
"method": "DELETE",
"querystring": {"id": str},
"uri": {"Bucket": str},
},
"get_bucket_inventory_configuration": {
"header": {},
"method": "GET",
"querystring": {"id": str},
"uri": {"Bucket": str},
},
"list_bucket_inventory_configurations": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_inventory_configuration": {
"header": {},
"method": "PUT",
"querystring": {"id": str},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?analytics"): {
"delete_bucket_analytics_configuration": {
"header": {},
"method": "DELETE",
"querystring": {"id": str},
"uri": {"Bucket": str},
},
"get_bucket_analytics_configuration": {
"header": {},
"method": "GET",
"querystring": {"id": str},
"uri": {"Bucket": str},
},
"list_bucket_analytics_configurations": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_analytics_configuration": {
"header": {},
"method": "PUT",
"querystring": {"id": str},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?list-type=2"): {
"list_objects_v2": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
}
},
re.compile("/(?P<Bucket>([^\\/]+))\\?object-lock"): {
"get_object_lock_configuration": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_object_lock_configuration": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?acl"): {
"get_object_acl": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
"put_object_acl": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str, "Key": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?replication"): {
"delete_bucket_replication": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str},
},
"get_bucket_replication": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_replication": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?versioning"): {
"get_bucket_versioning": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_versioning": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?accelerate"): {
"get_bucket_accelerate_configuration": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_accelerate_configuration": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?delete"): {
"delete_objects": {
"header": {},
"method": "POST",
"querystring": {},
"uri": {"Bucket": str},
}
},
re.compile("/(?P<Bucket>([^\\/]+))\\?policy"): {
"delete_bucket_policy": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str},
},
"get_bucket_policy": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_policy": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
re.compile("/(?P<Bucket>([^\\/]+))\\?encryption"): {
"delete_bucket_encryption": {
"header": {},
"method": "DELETE",
"querystring": {},
"uri": {"Bucket": str},
},
"get_bucket_encryption": {
"header": {},
"method": "GET",
"querystring": {},
"uri": {"Bucket": str},
},
"put_bucket_encryption": {
"header": {},
"method": "PUT",
"querystring": {},
"uri": {"Bucket": str},
},
},
}
class BaseS3Server:
def run_flask_request(self, request):
for regex_path, actions in S3_MATCHING_RULES.items():
m = regex_path.match(request.path)
if m:
request_dict = {
'uri': m.groupdict(),
'header': dict(request.headers),
'querystring': dict(request.args),
'method': request.method.upper()
}
for method_name, action in actions.items():
is_match, _ = pampy.match_dict(action, request_dict)
if is_match:
method = getattr(self, method_name, None)
if method:
return method(request, **request_dict)
else:
return self.not_implemented(request, **request_dict)
return self.invalid_s3_request()
def invalid_s3_request(self, *args, **kwargs):
pass
def not_implemented(self, *args, **kwargs):
pass
def list_buckets(self, request, uri, header, querystring, method) -> Response:
raise NotImplemented('return a xml reponse')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment