Created
September 13, 2014 13:30
-
-
Save tzmfreedom/d582dc03691de09199f2 to your computer and use it in GitHub Desktop.
Sample for Salesforce OpenID Connect.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from Crypto.Hash import SHA256 | |
from Crypto.PublicKey import RSA | |
from Crypto.Signature import PKCS1_v1_5 | |
from Crypto.Util.number import bytes_to_long | |
from flask import Flask, request, session, render_template | |
from urllib.error import HTTPError | |
import urllib.request | |
import base64 | |
import json | |
import time | |
import datetime | |
import random | |
import hmac | |
BASE_URL = "https://{community domain}" | |
CLIENT_ID = "input your client_id" | |
CLIENT_SECRET = "input your client_secret" | |
REDIRECT_URI = "input your redirect_uri" | |
def get_random_string(length): | |
""" | |
ランダムな文字列を生成する | |
""" | |
source_str = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890' | |
return "".join([random.choice(source_str) for x in range(length)]) | |
def b64d(b): | |
""" | |
base64urlデコード | |
""" | |
b += "=" * ((4 - len(b)) % 4) | |
return base64.urlsafe_b64decode(b) | |
def b64e(b): | |
""" | |
base64urlエンコード | |
""" | |
return base64.urlsafe_b64encode(b).decode().replace("=", "") | |
def b64_to_long(b): | |
""" | |
base64エンコードした文字列をロング型に変換 | |
""" | |
return bytes_to_long(b64d(b)) | |
def get_rsa_publickey(jwt_header): | |
""" | |
https://{sfdcドメイン}/id/keysから公開鍵を取得する。 | |
""" | |
kid = json.loads(b64d(jwt_header).decode())["kid"] | |
req = urllib.request.Request(url=BASE_URL + "/id/keys") | |
res = urllib.request.urlopen(req) | |
jsonMap = json.loads(res.read().decode()) | |
target_key = {} | |
for key in jsonMap["keys"]: | |
if key["kid"] == kid: | |
target_key = key | |
break | |
modulus = b64_to_long(target_key["n"]) | |
exponent = b64_to_long(target_key["e"]) | |
return RSA.construct((modulus, exponent)) | |
def validate_idtoken(jwt): | |
""" | |
JWSの署名検証 | |
""" | |
jwt_array = jwt.split(".") | |
jws_payload = jwt_array[0] + "." + jwt_array[1] | |
#https://{sfdcドメイン}/id/keysから公開鍵の情報を取得 | |
key = get_rsa_publickey(jwt_array[0]) | |
#jws_payloadのSHA-256ハッシュ値と復号化した署名が一致しているかどうかを検証 | |
signer = PKCS1_v1_5.new(key) | |
local_hash = SHA256.new(jws_payload.encode()) | |
return signer.verify(local_hash, b64d(jwt_array[2])) | |
def create_signature(json_map): | |
""" | |
id,issued_atからSignatureを生成し署名を検証する | |
""" | |
payload = json_map["id"] + json_map["issued_at"] | |
signature = hmac.new(CLIENT_SECRET.encode(), payload.encode(), SHA256).digest() | |
return base64.b64encode(signature).decode() | |
def get_idtoken(code): | |
""" | |
id_token+access_tokenの取得 | |
""" | |
#access_tokenの取得 | |
token_req_param = { | |
"grant_type" : "authorization_code", | |
"client_id" : CLIENT_ID, | |
"client_secret" : CLIENT_SECRET, | |
"redirect_uri" : REDIRECT_URI, | |
"code" : code | |
} | |
req = urllib.request.Request( | |
url=BASE_URL + "/services/oauth2/token", | |
data=urllib.parse.urlencode(token_req_param).encode(), | |
headers={"Content-Type" : "application/x-www-form-urlencoded"} | |
) | |
try: | |
res = urllib.request.urlopen(req) | |
return json.loads(res.read().decode()) | |
except HTTPError as error: | |
print(error.reason) | |
return {} | |
def get_userinfo(): | |
""" | |
ユーザ情報の取得 | |
""" | |
#access_tokenの取得 | |
req = urllib.request.Request( | |
url=session["instance_url"] + "/services/oauth2/userinfo", | |
headers={"Authorization" : "Bearer " + session["access_token"]} | |
) | |
try: | |
res = urllib.request.urlopen(req) | |
return json.loads(res.read().decode()) | |
except HTTPError as error: | |
print(error.reason) | |
return {} | |
app = Flask(__name__) | |
app.debug = True | |
app.secret_key = get_random_string(32) | |
@app.route("/") | |
def index(): | |
""" | |
Authorization Requestの為の初期ページ | |
""" | |
request_param = { | |
"response_type" : "code", | |
"client_id" : CLIENT_ID, | |
"redirect_uri" : REDIRECT_URI, | |
"scope" : "id openid refresh_token", | |
"state" : get_random_string(32), | |
"nonce" : get_random_string(32) | |
} | |
session["state"] = request_param["state"] | |
session["nonce"] = request_param["nonce"] | |
authorization_url = BASE_URL + "/services/oauth2/authorize?" + urllib.parse.urlencode(request_param) | |
return render_template("index.html", authorization_url=authorization_url) | |
@app.route("/callback") | |
def callback(): | |
""" | |
Authorization Requestに対するコールバックを受け取る | |
""" | |
#state値が正しいかどうか確認 | |
if session["state"] != request.args.get("state"): | |
return "invalid parameter error!!" | |
#id_token(+access_token)の取得 | |
json_map = get_idtoken(request.args.get("code")) | |
#signature検証 | |
if json_map["signature"] != create_signature(json_map): | |
return "invalid parameter error!!" | |
#id_tokenの検証 | |
if validate_idtoken(json_map["id_token"]): | |
jwt_array = json_map["id_token"].split(".") | |
jwt_claim = json.loads(b64d(jwt_array[1]).decode()) | |
now = int(time.mktime(datetime.datetime.now().utctimetuple())) | |
exp = int(jwt_claim["exp"]) | |
#token有効期限の確認 | |
if now > exp: | |
return "code is expired!!" | |
if jwt_claim["nonce"] != session["nonce"]: | |
return "invalid parameter error!!" | |
signature = SHA256.new(json_map["refresh_token"].encode()).digest() | |
#signature = SHA256.new(json_map["access_token"].encode()).digest() #at_hash | |
#signature = SHA256.new(request.args.get("code").encode()).digest() #c_hash | |
print(json_map) | |
print(jwt_claim) | |
signature = b64e(signature[:int(len(signature)/2)]).replace("=", "") | |
#hash, issuer, audienceの確認 | |
if signature == jwt_claim["c_hash"] and jwt_claim["iss"] == "https://login.salesforce.com" and jwt_claim["aud"] == CLIENT_ID: | |
session["id"] = jwt_claim["sub"] | |
session["access_token"] = json_map["access_token"] | |
session["instance_url"] = json_map["instance_url"] | |
userinfo = get_userinfo() | |
return "hello " + userinfo["name"] + "<br/>login is successful!!" | |
return "validation error!!" | |
if __name__ == "__main__": | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment