Skip to content

Instantly share code, notes, and snippets.

@tzmfreedom
Created September 13, 2014 13:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tzmfreedom/d582dc03691de09199f2 to your computer and use it in GitHub Desktop.
Save tzmfreedom/d582dc03691de09199f2 to your computer and use it in GitHub Desktop.
Sample for Salesforce OpenID Connect.
# -*- coding: utf-8 -*-
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Util.number import bytes_to_long
from flask import Flask, request, session, render_template
from urllib.error import HTTPError
import urllib.request
import base64
import json
import time
import datetime
import random
import hmac
BASE_URL = "https://{community domain}"
CLIENT_ID = "input your client_id"
CLIENT_SECRET = "input your client_secret"
REDIRECT_URI = "input your redirect_uri"
def get_random_string(length):
"""
ランダムな文字列を生成する
"""
source_str = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890'
return "".join([random.choice(source_str) for x in range(length)])
def b64d(b):
"""
base64urlデコード
"""
b += "=" * ((4 - len(b)) % 4)
return base64.urlsafe_b64decode(b)
def b64e(b):
"""
base64urlエンコード
"""
return base64.urlsafe_b64encode(b).decode().replace("=", "")
def b64_to_long(b):
"""
base64エンコードした文字列をロング型に変換
"""
return bytes_to_long(b64d(b))
def get_rsa_publickey(jwt_header):
"""
https://{sfdcドメイン}/id/keysから公開鍵を取得する。
"""
kid = json.loads(b64d(jwt_header).decode())["kid"]
req = urllib.request.Request(url=BASE_URL + "/id/keys")
res = urllib.request.urlopen(req)
jsonMap = json.loads(res.read().decode())
target_key = {}
for key in jsonMap["keys"]:
if key["kid"] == kid:
target_key = key
break
modulus = b64_to_long(target_key["n"])
exponent = b64_to_long(target_key["e"])
return RSA.construct((modulus, exponent))
def validate_idtoken(jwt):
"""
JWSの署名検証
"""
jwt_array = jwt.split(".")
jws_payload = jwt_array[0] + "." + jwt_array[1]
#https://{sfdcドメイン}/id/keysから公開鍵の情報を取得
key = get_rsa_publickey(jwt_array[0])
#jws_payloadのSHA-256ハッシュ値と復号化した署名が一致しているかどうかを検証
signer = PKCS1_v1_5.new(key)
local_hash = SHA256.new(jws_payload.encode())
return signer.verify(local_hash, b64d(jwt_array[2]))
def create_signature(json_map):
"""
id,issued_atからSignatureを生成し署名を検証する
"""
payload = json_map["id"] + json_map["issued_at"]
signature = hmac.new(CLIENT_SECRET.encode(), payload.encode(), SHA256).digest()
return base64.b64encode(signature).decode()
def get_idtoken(code):
"""
id_token+access_tokenの取得
"""
#access_tokenの取得
token_req_param = {
"grant_type" : "authorization_code",
"client_id" : CLIENT_ID,
"client_secret" : CLIENT_SECRET,
"redirect_uri" : REDIRECT_URI,
"code" : code
}
req = urllib.request.Request(
url=BASE_URL + "/services/oauth2/token",
data=urllib.parse.urlencode(token_req_param).encode(),
headers={"Content-Type" : "application/x-www-form-urlencoded"}
)
try:
res = urllib.request.urlopen(req)
return json.loads(res.read().decode())
except HTTPError as error:
print(error.reason)
return {}
def get_userinfo():
"""
ユーザ情報の取得
"""
#access_tokenの取得
req = urllib.request.Request(
url=session["instance_url"] + "/services/oauth2/userinfo",
headers={"Authorization" : "Bearer " + session["access_token"]}
)
try:
res = urllib.request.urlopen(req)
return json.loads(res.read().decode())
except HTTPError as error:
print(error.reason)
return {}
app = Flask(__name__)
app.debug = True
app.secret_key = get_random_string(32)
@app.route("/")
def index():
"""
Authorization Requestの為の初期ページ
"""
request_param = {
"response_type" : "code",
"client_id" : CLIENT_ID,
"redirect_uri" : REDIRECT_URI,
"scope" : "id openid refresh_token",
"state" : get_random_string(32),
"nonce" : get_random_string(32)
}
session["state"] = request_param["state"]
session["nonce"] = request_param["nonce"]
authorization_url = BASE_URL + "/services/oauth2/authorize?" + urllib.parse.urlencode(request_param)
return render_template("index.html", authorization_url=authorization_url)
@app.route("/callback")
def callback():
"""
Authorization Requestに対するコールバックを受け取る
"""
#state値が正しいかどうか確認
if session["state"] != request.args.get("state"):
return "invalid parameter error!!"
#id_token(+access_token)の取得
json_map = get_idtoken(request.args.get("code"))
#signature検証
if json_map["signature"] != create_signature(json_map):
return "invalid parameter error!!"
#id_tokenの検証
if validate_idtoken(json_map["id_token"]):
jwt_array = json_map["id_token"].split(".")
jwt_claim = json.loads(b64d(jwt_array[1]).decode())
now = int(time.mktime(datetime.datetime.now().utctimetuple()))
exp = int(jwt_claim["exp"])
#token有効期限の確認
if now > exp:
return "code is expired!!"
if jwt_claim["nonce"] != session["nonce"]:
return "invalid parameter error!!"
signature = SHA256.new(json_map["refresh_token"].encode()).digest()
#signature = SHA256.new(json_map["access_token"].encode()).digest() #at_hash
#signature = SHA256.new(request.args.get("code").encode()).digest() #c_hash
print(json_map)
print(jwt_claim)
signature = b64e(signature[:int(len(signature)/2)]).replace("=", "")
#hash, issuer, audienceの確認
if signature == jwt_claim["c_hash"] and jwt_claim["iss"] == "https://login.salesforce.com" and jwt_claim["aud"] == CLIENT_ID:
session["id"] = jwt_claim["sub"]
session["access_token"] = json_map["access_token"]
session["instance_url"] = json_map["instance_url"]
userinfo = get_userinfo()
return "hello " + userinfo["name"] + "<br/>login is successful!!"
return "validation error!!"
if __name__ == "__main__":
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment