Instantly share code, notes, and snippets.
hartror/authorizer.py
Created Sep 23, 2017
AWS API Gateway Custom Authorizer PEP8'd
""" | |
Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
Licensed under the Apache License, Version 2.0 (the "License"). You may not use | |
this file except in compliance with the License. A copy of the License is | |
located at | |
http://aws.amazon.com/apache2.0/ | |
or in the "license" file accompanying this file. This file is distributed on an | |
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
implied. See the License for the specific language governing permissions and | |
limitations under the License. | |
""" | |
from __future__ import print_function | |
import re | |
def lambda_handler(event, context): | |
print("Client token: " + event['authorizationToken']) | |
print("Method ARN: " + event['methodArn']) | |
# validate the incoming token | |
# and produce the principal user identifier associated with the token | |
# this could be accomplished in a number of ways: | |
# 1. Call out to OAuth provider | |
# 2. Decode a JWT token inline | |
# 3. Lookup in a self-managed DB | |
principalId = "user|a1b2c3d4" | |
# you can send a 401 Unauthorized response to the client by failing like | |
# so: | |
# raise Exception('Unauthorized') | |
# if the token is valid, a policy must be generated which will allow or | |
# deny access to the client | |
# if access is denied, the client will recieve a 403 Access Denied response | |
# if access is allowed, API Gateway will proceed with the backend | |
# integration configured on the method that was called | |
# this function must generate a policy that is associated with the | |
# recognized principal user identifier. depending on your use case, you | |
# might store policies in a DB, or generate them on the fly | |
# keep in mind, the policy is cached for 5 minutes by default (TTL is | |
# configurable in the authorizer) and will apply to subsequent calls to any | |
# method/resource in the RestApi made with the same token | |
# the example policy below denies access to all resources in the RestApi | |
tmp = event['methodArn'].split(':') | |
apiGatewayArnTmp = tmp[5].split('/') | |
awsAccountId = tmp[4] | |
policy = AuthPolicy(principalId, awsAccountId) | |
policy.restApiId = apiGatewayArnTmp[0] | |
policy.region = tmp[3] | |
policy.stage = apiGatewayArnTmp[1] | |
policy.denyAllMethods() | |
# policy.allowMethod(HttpVerb.GET, "/pets/*") | |
# Finally, build the policy | |
authResponse = policy.build() | |
# new! -- add additional key-value pairs associated with the authenticated | |
# principal these are made available by APIGW like so: | |
# $context.authorizer.<key> | |
# additional context is cached | |
context = { | |
'key': 'value', # $context.authorizer.key -> value | |
'number': 1, | |
'bool': True | |
} | |
# context['arr'] = ['foo'] <- this is invalid, APIGW will not accept it | |
# context['obj'] = {'foo':'bar'} <- also invalid | |
authResponse['context'] = context | |
return authResponse | |
class HttpVerb: | |
GET = "GET" | |
POST = "POST" | |
PUT = "PUT" | |
PATCH = "PATCH" | |
HEAD = "HEAD" | |
DELETE = "DELETE" | |
OPTIONS = "OPTIONS" | |
ALL = "*" | |
class AuthPolicy(object): | |
awsAccountId = "" | |
# The AWS account id the policy will be generated for. This is used to | |
# create the method ARNs. | |
principalId = "" | |
# The principal used for the policy, this should be a unique identifier for | |
# the end user. | |
version = "2012-10-17" | |
# The policy version used for the evaluation. This should always be | |
# '2012-10-17' | |
pathRegex = "^[/.a-zA-Z0-9-\*]+$" | |
# The regular expression used to validate resource paths for the policy | |
# these are the internal lists of allowed and denied methods. These are | |
# lists of objects and each object has 2 properties: A resource ARN and a | |
# nullable conditions statement. The build method processes these lists and | |
# generates the approriate statements for the final policy | |
allowMethods = [] | |
denyMethods = [] | |
restApiId = "*" | |
# The API Gateway API id. By default this is set to '*' | |
region = "*" | |
# The region where the API is deployed. By default this is set to '*' | |
stage = "*" | |
# The name of the stage used in the policy. By default this is set to '*' | |
def __init__(self, principal, awsAccountId): | |
self.awsAccountId = awsAccountId | |
self.principalId = principal | |
self.allowMethods = [] | |
self.denyMethods = [] | |
def _addMethod(self, effect, verb, resource, conditions): | |
"""Adds a method to the internal lists of allowed or denied methods. | |
Each object in the internal list contains a resource ARN and a | |
condition statement. The condition statement can be null.""" | |
if verb != "*" and not hasattr(HttpVerb, verb): | |
raise NameError( | |
"Invalid HTTP verb " + verb + | |
". Allowed verbs in HttpVerb class") | |
resourcePattern = re.compile(self.pathRegex) | |
if not resourcePattern.match(resource): | |
raise NameError( | |
"Invalid resource path: " + resource + | |
". Path should match " + self.pathRegex) | |
if resource[:1] == "/": | |
resource = resource[1:] | |
resourceArn = ( | |
"arn:aws:execute-api:" + | |
self.region + ":" + | |
self.awsAccountId + ":" + | |
self.restApiId + "/" + | |
self.stage + "/" + | |
verb + "/" + | |
resource) | |
if effect.lower() == "allow": | |
self.allowMethods.append({ | |
'resourceArn': resourceArn, | |
'conditions': conditions | |
}) | |
elif effect.lower() == "deny": | |
self.denyMethods.append({ | |
'resourceArn': resourceArn, | |
'conditions': conditions | |
}) | |
def _getEmptyStatement(self, effect): | |
"""Returns an empty statement object prepopulated with the correct | |
action and the desired effect.""" | |
statement = { | |
'Action': 'execute-api:Invoke', | |
'Effect': effect[:1].upper() + effect[1:].lower(), | |
'Resource': [] | |
} | |
return statement | |
def _getStatementForEffect(self, effect, methods): | |
"""This function loops over an array of objects containing a | |
resourceArn and conditions statement and generates the array of | |
statements for the policy.""" | |
statements = [] | |
if len(methods) > 0: | |
statement = self._getEmptyStatement(effect) | |
for curMethod in methods: | |
if curMethod['conditions'] is None or len(curMethod['conditions']) == 0: | |
statement['Resource'].append(curMethod['resourceArn']) | |
else: | |
conditionalStatement = self._getEmptyStatement(effect) | |
conditionalStatement['Resource'].append(curMethod['resourceArn']) | |
conditionalStatement['Condition'] = curMethod['conditions'] | |
statements.append(conditionalStatement) | |
statements.append(statement) | |
return statements | |
def allowAllMethods(self): | |
"""Adds a '*' allow to the policy to authorize access to all methods of | |
an API""" | |
self._addMethod("Allow", HttpVerb.ALL, "*", []) | |
def denyAllMethods(self): | |
"""Adds a '*' allow to the policy to deny access to all methods of an | |
API""" | |
self._addMethod("Deny", HttpVerb.ALL, "*", []) | |
def allowMethod(self, verb, resource): | |
"""Adds an API Gateway method (Http verb + Resource path) to the list | |
of allowed methods for the policy""" | |
self._addMethod("Allow", verb, resource, []) | |
def denyMethod(self, verb, resource): | |
"""Adds an API Gateway method (Http verb + Resource path) to the list | |
of denied methods for the policy""" | |
self._addMethod("Deny", verb, resource, []) | |
def allowMethodWithConditions(self, verb, resource, conditions): | |
"""Adds an API Gateway method (Http verb + Resource path) to the list | |
of allowed methods and includes a condition for the policy statement. | |
More on AWS policy conditions here: | |
http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition""" | |
self._addMethod("Allow", verb, resource, conditions) | |
def denyMethodWithConditions(self, verb, resource, conditions): | |
"""Adds an API Gateway method (Http verb + Resource path) to the list | |
of denied methods and includes a condition for the policy statement. | |
More on AWS policy conditions here: | |
http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition""" | |
self._addMethod("Deny", verb, resource, conditions) | |
def build(self): | |
"""Generates the policy document based on the internal lists of allowed | |
and denied conditions. This will generate a policy with two main | |
statements for the effect: | |
one statement for Allow and one statement for Deny. | |
Methods that includes conditions will have their own statement in the | |
policy.""" | |
if ((self.allowMethods is None or len(self.allowMethods) == 0) and | |
(self.denyMethods is None or len(self.denyMethods) == 0)): | |
raise NameError("No statements defined for the policy") | |
policy = { | |
'principalId': self.principalId, | |
'policyDocument': { | |
'Version': self.version, | |
'Statement': [] | |
} | |
} | |
policy['policyDocument']['Statement'].extend( | |
self._getStatementForEffect("Allow", self.allowMethods)) | |
policy['policyDocument']['Statement'].extend( | |
self._getStatementForEffect("Deny", self.denyMethods)) | |
return policy |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment