Skip to content

Instantly share code, notes, and snippets.

@danielfaust
Last active November 12, 2023 18:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielfaust/a9807b2f2256a49b18564daa8bc6cdf2 to your computer and use it in GitHub Desktop.
Save danielfaust/a9807b2f2256a49b18564daa8bc6cdf2 to your computer and use it in GitHub Desktop.
This file contains the code to check a YubiKey OTP against YubiCloud's v2 protocol servers for Python 3 and 2.7. This code is not well tested but works, it is only meant for internal stuff like to prevent accidental execution of scripts which deal with database schema migrations or such things which really shouldn't get executed accidentally
import yubikey_otp_check_v2
def verify(otp):
message = ''
user = ''
users = {
b'cccccabcdef1': b'user1',
b'cccccabcdef2': b'user2',
}
if otp[:12] in users:
client = { 'id': '0123456', 'secret': 'base64_encoded_secret' }
# client details obtained from https://upgrade.yubico.com/getapikey/
yubikey = yubikey_otp_check_v2.YubiKeyOTPCheck(client['id'], client['secret'])
yubikey.check_otp(otp)
if yubikey.valid == True:
user = users[otp[:12]]
message = b"ok-" + user
else:
message = b"not-ok-1"
else:
message = b"not-ok-2"
return message
# -*- coding: utf-8 -*-
# Works with YubiCloud v2 & Python 3
# code is based on https://bitbucket.org/nagylzs/yubistorm
###################################################################################################
import sys
IS_PYTHON_3 = True
if sys.version_info[0] < 3:
IS_PYTHON_3 = False
###################################################################################################
import copy, uuid, hmac, base64, hashlib, random, traceback
if IS_PYTHON_3:
import urllib.request as urllib2
import urllib.parse as urlparse
urlencode = urlparse.urlencode
else:
from urllib import urlencode
import urllib2
import urlparse
##################################################################
class YubiKeyOTPCheck():
#-------------------------------------------------------------------------
api_servers = [
"api.yubico.com",
"api2.yubico.com",
"api3.yubico.com",
"api4.yubico.com",
"api5.yubico.com",
]
#-------------------------------------------------------------------------
def __init__(self, client_id, client_secret):
self.status = 'INITIALIZED'
self.valid = False
self.client_id = client_id
self.client_secret = base64.b64decode(client_secret)
#-------------------------------------------------------------------------
def _create_signature(cls, params):
data = []
for key in sorted(params.keys()):
data.append("%s=%s" % (key, params[key]))
hashed = hmac.new(cls.client_secret, "&".join(data).encode("ascii"), hashlib.sha1)
return base64.b64encode(hashed.digest()).rstrip(b'\n').decode("ascii")
#-------------------------------------------------------------------------
def _verify_signature(cls, params):
if "h" not in params:
return False
bare = copy.copy(params)
del bare["h"]
good_signature = cls._create_signature(bare)
return good_signature == params["h"]
#-------------------------------------------------------------------------
def _add_query_params(cls, url, params):
url_parts = list(urlparse.urlparse(url))
query = dict(urlparse.parse_qsl(url_parts[4]))
query.update(params)
if "h" in query:
del query["h"]
query["h"] = cls._create_signature(query)
url_parts[4] = urlencode(query)
return urlparse.urlunparse(url_parts)
#-------------------------------------------------------------------------
def _fetch(cls, params):
code = 0
txt = None
remaining_attempts = 9
while remaining_attempts > 0:
try:
server_host = random.choice(cls.api_servers)
print('remaining_attempts = %d ( using server %s )' % (remaining_attempts, server_host))
url = cls._add_query_params("https://" + server_host + "/wsapi/2.0/verify", params)
print(url)
result = urllib2.urlopen(url)
code = result.getcode()
try: txt = result.read().decode("UTF-8")
except: pass
result.close()
remaining_attempts = 0
except urllib2.URLError:
remaining_attempts -= 1
return code, txt
#-------------------------------------------------------------------------
def _check(cls, params):
code, txt = cls._fetch(params)
data = {}
if code == 200 and txt is not None:
try:
for item in txt.strip().split('\r\n'):
segments = item.split('=', 1)
data[segments[0]] = segments[1]
except:
traceback.print_exc()
else:
data['SERVER_CODE'] = code
return data
#-------------------------------------------------------------------------
def check_otp(self, otp):
self.valid = False
params = {
'id': self.client_id,
'nonce': uuid.uuid4().hex,
'timestamp': "1",
'otp': otp.decode(),
}
response = self._check(params)
print(response)
if self._verify_signature(response):
self.status = response['status']
if self.status == 'OK':
self.valid = True
else:
self.status = 'RESPONSE_SIGNATURE_VERIFICATION_FAILED'
return response
#-------------------------------------------------------------------------
##################################################################
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment