Skip to content

Instantly share code, notes, and snippets.

@vncloudsco
Created February 28, 2019 07:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vncloudsco/863d56c1b8d2b97859060e9be4cf5712 to your computer and use it in GitHub Desktop.
Save vncloudsco/863d56c1b8d2b97859060e9be4cf5712 to your computer and use it in GitHub Desktop.
# Embedded file name: build/bdist.linux-x86_64/egg/pyovpn/lic/licstore.py
import re
import os
import json
import urllib
import time
from twisted.internet import defer
from pyovpnc import unix_time
from pyovpn.util.error import Passthru, SimpleError, ErrorTuple
from pyovpn.util.atomicfile import atomic_read, atomic_write_new, atomic_write_replace
from pyovpn.util.filemod import DirMod
from pyovpn.util.poversion import pyovpn_version
from pyovpn.util.simplefile import write_string_to_file
from pyovpn.linux.linfo import get_distro_info
from pyovpn.lic.lickey import LicenseKeyString
from pyovpn.lic.licser import LicenseSerialID
from pyovpn.lic.prop import LicenseProperties
from pyovpn.lic.vprop import generate_validation_properties
from pyovpn.lic.ino import generate_marker_file
from pyovpn.lic.licerror import LicError
from pyovpn.lic import lbq
from pyovpn.ssl.sslcli import SSLClientContext
from pyovpn.http.httpcli import https_get
from pyovpn.util.env import get_env_debug
DEBUG = get_env_debug('DEBUG_LICSTORE')
class LicenseReaderBase(object):
def ok(self, propname, value):
if self.uprop != None:
if propname in self.uprop:
limit = self.uprop[propname]
if value <= limit:
if DEBUG:
print 'License Manager: property %s=%d within limit of %d' % (propname, value, limit)
return (True, None)
else:
return (False, 'maximum %s exceeded (%d)' % (propname, limit))
else:
if DEBUG:
print 'License Manager: property %s is not limited' % propname
return (True, None)
else:
return (False, 'cannot validate maximum %s' % propname)
return
def info(self):
return self.uprop
def license_key_info(self):
return self.licdict
class LicenseReader(LicenseReaderBase):
ext = 'lic'
fn_re = re.compile('(.*)\\.%s$' % ext, re.IGNORECASE)
yyyymmdd_re = re.compile('(\\d\\d\\d\\d)(\\d\\d)(\\d\\d)')
def __init__(self, dir, sig_verify, vprop_verify, uprop_figure):
if not dir:
raise SimpleError('License Manager: license directory is undefined', type='LIC_DIR_UNDEF')
self.dir = dir
self.sig_verify = sig_verify
self.vprop_verify = vprop_verify
self.uprop_figure = uprop_figure
self.licdict = None
self.uprop = None
self.post_update_callback = None
return
def enable_auto_update(self, update_interval, post_update_callback = None, immediate_update = True):
if immediate_update:
self.update()
if update_interval > 0:
self.dirmod = DirMod(self.dir, update_interval, initial_interval=update_interval, notify_callback=lambda x: self.update_noret(), file_of_interest_callback=self.is_license_filename)
self.post_update_callback = post_update_callback
def manual_update(self):
if hasattr(self, 'dirmod'):
self.dirmod.manual_trigger()
def cancel(self):
if hasattr(self, 'dirmod'):
self.dirmod.cancel()
def update_noret(self):
self.update()
def update(self):
try:
licdict = self.get_license_files()
except:
licdict = None
print Passthru('License manager update: failed to obtain license info')
if licdict != None:
self.uprop = None
self.licdict = licdict
if self.licdict != None:
self.uprop = self.uprop_figure.figure(self.licdict)
if self.post_update_callback:
self.post_update_callback(self)
return licdict
@classmethod
def is_license_filename(C, fn):
m = re.match(C.fn_re, fn)
if m:
g = m.groups()
try:
return LicenseKeyString.validate(g[0])
except:
print Passthru('NOTE: improperly formatted license filename: %s' % fn, lev=0)
def get_license_files(self, all = False, rv_filt = None, quiet = False):
def err(e):
raise e
try:
ret = {}
self.vprop_verify.update(self.dir)
for dirpath, dirnames, filenames in os.walk(self.dir, onerror=err):
for f in filenames:
try:
fn = os.path.join(dirpath, f)
if self.is_license_filename(f):
lic = atomic_read(fn)
lp = self.verify_license(lic, self.sig_verify, self.vprop_verify)
rv_status = None
if rv_filt:
rv_status = rv_filt.get(lp['LICENSE_KEY'])
if rv_status is not None:
error = lp.get('ERROR')
if rv_status:
if error and error[1] == 'LIC_VPROP':
lp['RV_OK'] = error
del lp['ERROR']
elif not error:
lp['ERROR'] = ('license key ID is revoked', 'LIC_KEY_RREV')
if not all and 'ERROR' in lp:
raise ErrorTuple(lp['ERROR'])
lp['IS_RENEWED_FN'] = fn + '.renewed'
ret[lp['LICENSE_KEY']] = lp
except:
if not quiet:
print Passthru('License manager: exception with license file %s' % fn, type='LIC_FILE')
break
return ret
except:
raise Passthru('could not read license files from %s' % self.dir, type='LIC_FAIL')
return
@classmethod
def verify_license(C, lictxt, sig_verify, vprop_verify):
def yyyymmdd_to_unix(yyyymmdd_str):
m = re.match(C.yyyymmdd_re, yyyymmdd_str)
y, m, d = m.groups()
return int(time.mktime((int(y),
int(m),
int(d),
0,
0,
0,
0,
0,
-1)))
def error(tup):
if 'ERROR' not in lp:
lp['ERROR'] = tup
status, text = sig_verify.verify(lictxt)
if not status:
raise LicError('signature verification failed', type='LIC_VERIFY')
lp = LicenseProperties(text)
vinfo = None
try:
vinfo = vprop_verify.verify(lp)
except:
pt = Passthru('machine properties validation failed', type='LIC_VPROP')
error(pt.value_type())
if 'LICENSE_KEY' not in lp:
raise LicError('license key ID is missing', type='LIC_KEY_ID')
key = LicenseKeyString.validate(lp['LICENSE_KEY'])
ut = unix_time()
exp = lbq.query(key)
if exp is None:
expiry_date_str = lp.get('expiry_date')
if expiry_date_str:
try:
exp = yyyymmdd_to_unix(expiry_date_str) + 86400
except:
pass
if exp is not None:
if exp == 0:
error(('license key ID is revoked', 'LIC_KEY_REV'))
elif exp < ut:
error(('license key ID is expired', 'LIC_KEY_EXP'))
lp['EXP'] = exp
lp['LICENSE_KEY'] = key
if vinfo:
lp['VPROP_INFO'] = vinfo[2]
return lp
class LicenseUpdater(object):
ADD_ONLY = 0
ADD_MOD = 1
ADD_MOD_DEL = 2
def __init__(self, ssl_proxy, dir, sig_verify, vprop_verify, mode):
if not dir:
raise SimpleError('License Manager: license directory is undefined', type='LIC_DIR_UNDEF')
self.proxy = ssl_proxy
self.dir = dir
self.sig_verify = sig_verify
self.vprop_verify = vprop_verify
self.mode = mode
@classmethod
def mode_from_opt(C, opt_flags):
mode = C.ADD_ONLY
if opt_flags['mod']:
mode = C.ADD_MOD
if opt_flags['del']:
mode = C.ADD_MOD_DEL
return mode
@classmethod
def mode_from_str(C, mode):
modes = {'add_only': C.ADD_ONLY,
'mod': C.ADD_MOD,
'del': C.ADD_MOD_DEL}
if mode in modes:
return modes[mode]
raise SimpleError('LicenseUpdater: unknown mode option %s: must be one of %s' % (mode.__repr__(), modes.keys()))
def generate_marker_file(self):
generate_marker_file(self.dir)
def activate_key(self, key, foreign_properties = None):
try:
key = LicenseKeyString.validate(key)
if foreign_properties:
vprop = foreign_properties
else:
self.generate_marker_file()
vprop = LicenseProperties(generate_validation_properties(self.dir))
return self._activate_key_dispatch(key, vprop, bool(foreign_properties))
except:
raise Passthru('ActivateKey setup failure', type='ACTIVATE_KEY_SETUP')
def _activate_extra_info(self):
return {'distro': get_distro_info(),
'as_version': pyovpn_version()}
def _activate_key_dispatch(self, key, vprop, foreign_properties_flag):
try:
d = self.proxy.callRemote('ActivateKey', key, None, dict(vprop), self._activate_extra_info())
d.addCallback(self._activate_key_post, foreign_properties_flag)
return d
except:
raise Passthru('ActivateKey callout error', type='ACTIVATE_KEY_CALLOUT')
return
def _activate_key_post(self, result, foreign_properties_flag):
try:
if result != None:
sid, lictxt = result
if foreign_properties_flag:
self._save_license_file_foreign(lictxt)
else:
self._save_license_file(lictxt)
else:
raise SimpleError('XML method returned None', type='XML_NONE')
except:
raise Passthru('ActivateKey post error', type='ACTIVATE_KEY_POST')
return
def _save_license_file_foreign(self, lictxt):
lp = LicenseProperties(lictxt)
write_string_to_file(lictxt, self._path(lp['LICENSE_KEY'] + '.' + LicenseReader.ext))
def _save_license_file(self, lictxt):
self.vprop_verify.update(self.dir)
lp = LicenseReader.verify_license(lictxt, self.sig_verify, self.vprop_verify)
self._write_file(lp['LICENSE_KEY'] + '.' + LicenseReader.ext, lictxt)
return lp
def _path(self, fn):
return os.path.join(self.dir, fn)
def _write_file(self, fn, content):
try:
exists = self._read_file(fn)
except IOError:
atomic_write_new(self._path(fn), content)
if DEBUG:
print '%s written' % self._path(fn)
return True
if exists != content:
if self.mode >= self.ADD_MOD:
atomic_write_replace(self._path(fn), content)
if DEBUG:
print '%s updated' % self._path(fn)
return True
raise SimpleError('%s cannot be written because mode is set to ADD_ONLY' % self._path(fn), type='ADD_ONLY')
else:
if DEBUG:
print '%s not written -- new content is identical to old' % self._path(fn)
return False
def _read_file(self, fn):
return atomic_read(self._path(fn))
def _file_delete(self, keydict):
try:
if self.mode >= self.ADD_MOD_DEL:
for dirpath, dirnames, filenames in os.walk(self.dir):
for f in filenames:
key = LicenseReader.is_license_filename(f)
if key and key not in keydict:
fn = self._path(f)
os.remove(fn)
if DEBUG:
print '%s deleted' % fn
break
except:
raise Passthru('file delete failed', type='FILE_CLEANUP')
class LicenseRenewer(object):
renewal_server = 'https://openvpn.net/rest/License'
def __init__(self, resolver, ca_bundle):
self.resolver = resolver
self.ca_bundle = ca_bundle
@defer.inlineCallbacks
def get_renewal_key(self, key):
try:
url = '%s?key=%s' % (self.renewal_server, urllib.quote(key))
ssl_context = SSLClientContext(verify=True, requireCertificate=True, caBundle=self.ca_bundle)
status, ct, res = yield https_get(url, ssl_context=ssl_context, resolver=self.resolver)
if status == '200' and ct == 'application/json':
dres = json.JSONDecoder().decode(res)
newkey = dres.get('NewKey')
if newkey and newkey.lower() != 'none':
defer.returnValue(LicenseKeyString.validate(newkey))
else:
raise LicError('problem with https GET, status=%r content_type=%r' % (status, ct))
except Exception:
print Passthru('LicenseRenewer fail on %r' % (key,))
class RemoteValidation(object):
server = 'https://dw.yonan.net/rv'
def __init__(self, resolver, ca_bundle):
self.resolver = resolver
self.ca_bundle = ca_bundle
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment