Created
February 28, 2019 07:10
-
-
Save vncloudsco/863d56c1b8d2b97859060e9be4cf5712 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Embedded file name: build/bdist.linux-x86_64/egg/pyovpn/lic/licstore.py | |
import re | |
import os | |
import json | |
import urllib | |
import time | |
from twisted.internet import defer | |
from pyovpnc import unix_time | |
from pyovpn.util.error import Passthru, SimpleError, ErrorTuple | |
from pyovpn.util.atomicfile import atomic_read, atomic_write_new, atomic_write_replace | |
from pyovpn.util.filemod import DirMod | |
from pyovpn.util.poversion import pyovpn_version | |
from pyovpn.util.simplefile import write_string_to_file | |
from pyovpn.linux.linfo import get_distro_info | |
from pyovpn.lic.lickey import LicenseKeyString | |
from pyovpn.lic.licser import LicenseSerialID | |
from pyovpn.lic.prop import LicenseProperties | |
from pyovpn.lic.vprop import generate_validation_properties | |
from pyovpn.lic.ino import generate_marker_file | |
from pyovpn.lic.licerror import LicError | |
from pyovpn.lic import lbq | |
from pyovpn.ssl.sslcli import SSLClientContext | |
from pyovpn.http.httpcli import https_get | |
from pyovpn.util.env import get_env_debug | |
DEBUG = get_env_debug('DEBUG_LICSTORE') | |
class LicenseReaderBase(object): | |
def ok(self, propname, value): | |
if self.uprop != None: | |
if propname in self.uprop: | |
limit = self.uprop[propname] | |
if value <= limit: | |
if DEBUG: | |
print 'License Manager: property %s=%d within limit of %d' % (propname, value, limit) | |
return (True, None) | |
else: | |
return (False, 'maximum %s exceeded (%d)' % (propname, limit)) | |
else: | |
if DEBUG: | |
print 'License Manager: property %s is not limited' % propname | |
return (True, None) | |
else: | |
return (False, 'cannot validate maximum %s' % propname) | |
return | |
def info(self): | |
return self.uprop | |
def license_key_info(self): | |
return self.licdict | |
class LicenseReader(LicenseReaderBase): | |
ext = 'lic' | |
fn_re = re.compile('(.*)\\.%s$' % ext, re.IGNORECASE) | |
yyyymmdd_re = re.compile('(\\d\\d\\d\\d)(\\d\\d)(\\d\\d)') | |
def __init__(self, dir, sig_verify, vprop_verify, uprop_figure): | |
if not dir: | |
raise SimpleError('License Manager: license directory is undefined', type='LIC_DIR_UNDEF') | |
self.dir = dir | |
self.sig_verify = sig_verify | |
self.vprop_verify = vprop_verify | |
self.uprop_figure = uprop_figure | |
self.licdict = None | |
self.uprop = None | |
self.post_update_callback = None | |
return | |
def enable_auto_update(self, update_interval, post_update_callback = None, immediate_update = True): | |
if immediate_update: | |
self.update() | |
if update_interval > 0: | |
self.dirmod = DirMod(self.dir, update_interval, initial_interval=update_interval, notify_callback=lambda x: self.update_noret(), file_of_interest_callback=self.is_license_filename) | |
self.post_update_callback = post_update_callback | |
def manual_update(self): | |
if hasattr(self, 'dirmod'): | |
self.dirmod.manual_trigger() | |
def cancel(self): | |
if hasattr(self, 'dirmod'): | |
self.dirmod.cancel() | |
def update_noret(self): | |
self.update() | |
def update(self): | |
try: | |
licdict = self.get_license_files() | |
except: | |
licdict = None | |
print Passthru('License manager update: failed to obtain license info') | |
if licdict != None: | |
self.uprop = None | |
self.licdict = licdict | |
if self.licdict != None: | |
self.uprop = self.uprop_figure.figure(self.licdict) | |
if self.post_update_callback: | |
self.post_update_callback(self) | |
return licdict | |
@classmethod | |
def is_license_filename(C, fn): | |
m = re.match(C.fn_re, fn) | |
if m: | |
g = m.groups() | |
try: | |
return LicenseKeyString.validate(g[0]) | |
except: | |
print Passthru('NOTE: improperly formatted license filename: %s' % fn, lev=0) | |
def get_license_files(self, all = False, rv_filt = None, quiet = False): | |
def err(e): | |
raise e | |
try: | |
ret = {} | |
self.vprop_verify.update(self.dir) | |
for dirpath, dirnames, filenames in os.walk(self.dir, onerror=err): | |
for f in filenames: | |
try: | |
fn = os.path.join(dirpath, f) | |
if self.is_license_filename(f): | |
lic = atomic_read(fn) | |
lp = self.verify_license(lic, self.sig_verify, self.vprop_verify) | |
rv_status = None | |
if rv_filt: | |
rv_status = rv_filt.get(lp['LICENSE_KEY']) | |
if rv_status is not None: | |
error = lp.get('ERROR') | |
if rv_status: | |
if error and error[1] == 'LIC_VPROP': | |
lp['RV_OK'] = error | |
del lp['ERROR'] | |
elif not error: | |
lp['ERROR'] = ('license key ID is revoked', 'LIC_KEY_RREV') | |
if not all and 'ERROR' in lp: | |
raise ErrorTuple(lp['ERROR']) | |
lp['IS_RENEWED_FN'] = fn + '.renewed' | |
ret[lp['LICENSE_KEY']] = lp | |
except: | |
if not quiet: | |
print Passthru('License manager: exception with license file %s' % fn, type='LIC_FILE') | |
break | |
return ret | |
except: | |
raise Passthru('could not read license files from %s' % self.dir, type='LIC_FAIL') | |
return | |
@classmethod | |
def verify_license(C, lictxt, sig_verify, vprop_verify): | |
def yyyymmdd_to_unix(yyyymmdd_str): | |
m = re.match(C.yyyymmdd_re, yyyymmdd_str) | |
y, m, d = m.groups() | |
return int(time.mktime((int(y), | |
int(m), | |
int(d), | |
0, | |
0, | |
0, | |
0, | |
0, | |
-1))) | |
def error(tup): | |
if 'ERROR' not in lp: | |
lp['ERROR'] = tup | |
status, text = sig_verify.verify(lictxt) | |
if not status: | |
raise LicError('signature verification failed', type='LIC_VERIFY') | |
lp = LicenseProperties(text) | |
vinfo = None | |
try: | |
vinfo = vprop_verify.verify(lp) | |
except: | |
pt = Passthru('machine properties validation failed', type='LIC_VPROP') | |
error(pt.value_type()) | |
if 'LICENSE_KEY' not in lp: | |
raise LicError('license key ID is missing', type='LIC_KEY_ID') | |
key = LicenseKeyString.validate(lp['LICENSE_KEY']) | |
ut = unix_time() | |
exp = lbq.query(key) | |
if exp is None: | |
expiry_date_str = lp.get('expiry_date') | |
if expiry_date_str: | |
try: | |
exp = yyyymmdd_to_unix(expiry_date_str) + 86400 | |
except: | |
pass | |
if exp is not None: | |
if exp == 0: | |
error(('license key ID is revoked', 'LIC_KEY_REV')) | |
elif exp < ut: | |
error(('license key ID is expired', 'LIC_KEY_EXP')) | |
lp['EXP'] = exp | |
lp['LICENSE_KEY'] = key | |
if vinfo: | |
lp['VPROP_INFO'] = vinfo[2] | |
return lp | |
class LicenseUpdater(object): | |
ADD_ONLY = 0 | |
ADD_MOD = 1 | |
ADD_MOD_DEL = 2 | |
def __init__(self, ssl_proxy, dir, sig_verify, vprop_verify, mode): | |
if not dir: | |
raise SimpleError('License Manager: license directory is undefined', type='LIC_DIR_UNDEF') | |
self.proxy = ssl_proxy | |
self.dir = dir | |
self.sig_verify = sig_verify | |
self.vprop_verify = vprop_verify | |
self.mode = mode | |
@classmethod | |
def mode_from_opt(C, opt_flags): | |
mode = C.ADD_ONLY | |
if opt_flags['mod']: | |
mode = C.ADD_MOD | |
if opt_flags['del']: | |
mode = C.ADD_MOD_DEL | |
return mode | |
@classmethod | |
def mode_from_str(C, mode): | |
modes = {'add_only': C.ADD_ONLY, | |
'mod': C.ADD_MOD, | |
'del': C.ADD_MOD_DEL} | |
if mode in modes: | |
return modes[mode] | |
raise SimpleError('LicenseUpdater: unknown mode option %s: must be one of %s' % (mode.__repr__(), modes.keys())) | |
def generate_marker_file(self): | |
generate_marker_file(self.dir) | |
def activate_key(self, key, foreign_properties = None): | |
try: | |
key = LicenseKeyString.validate(key) | |
if foreign_properties: | |
vprop = foreign_properties | |
else: | |
self.generate_marker_file() | |
vprop = LicenseProperties(generate_validation_properties(self.dir)) | |
return self._activate_key_dispatch(key, vprop, bool(foreign_properties)) | |
except: | |
raise Passthru('ActivateKey setup failure', type='ACTIVATE_KEY_SETUP') | |
def _activate_extra_info(self): | |
return {'distro': get_distro_info(), | |
'as_version': pyovpn_version()} | |
def _activate_key_dispatch(self, key, vprop, foreign_properties_flag): | |
try: | |
d = self.proxy.callRemote('ActivateKey', key, None, dict(vprop), self._activate_extra_info()) | |
d.addCallback(self._activate_key_post, foreign_properties_flag) | |
return d | |
except: | |
raise Passthru('ActivateKey callout error', type='ACTIVATE_KEY_CALLOUT') | |
return | |
def _activate_key_post(self, result, foreign_properties_flag): | |
try: | |
if result != None: | |
sid, lictxt = result | |
if foreign_properties_flag: | |
self._save_license_file_foreign(lictxt) | |
else: | |
self._save_license_file(lictxt) | |
else: | |
raise SimpleError('XML method returned None', type='XML_NONE') | |
except: | |
raise Passthru('ActivateKey post error', type='ACTIVATE_KEY_POST') | |
return | |
def _save_license_file_foreign(self, lictxt): | |
lp = LicenseProperties(lictxt) | |
write_string_to_file(lictxt, self._path(lp['LICENSE_KEY'] + '.' + LicenseReader.ext)) | |
def _save_license_file(self, lictxt): | |
self.vprop_verify.update(self.dir) | |
lp = LicenseReader.verify_license(lictxt, self.sig_verify, self.vprop_verify) | |
self._write_file(lp['LICENSE_KEY'] + '.' + LicenseReader.ext, lictxt) | |
return lp | |
def _path(self, fn): | |
return os.path.join(self.dir, fn) | |
def _write_file(self, fn, content): | |
try: | |
exists = self._read_file(fn) | |
except IOError: | |
atomic_write_new(self._path(fn), content) | |
if DEBUG: | |
print '%s written' % self._path(fn) | |
return True | |
if exists != content: | |
if self.mode >= self.ADD_MOD: | |
atomic_write_replace(self._path(fn), content) | |
if DEBUG: | |
print '%s updated' % self._path(fn) | |
return True | |
raise SimpleError('%s cannot be written because mode is set to ADD_ONLY' % self._path(fn), type='ADD_ONLY') | |
else: | |
if DEBUG: | |
print '%s not written -- new content is identical to old' % self._path(fn) | |
return False | |
def _read_file(self, fn): | |
return atomic_read(self._path(fn)) | |
def _file_delete(self, keydict): | |
try: | |
if self.mode >= self.ADD_MOD_DEL: | |
for dirpath, dirnames, filenames in os.walk(self.dir): | |
for f in filenames: | |
key = LicenseReader.is_license_filename(f) | |
if key and key not in keydict: | |
fn = self._path(f) | |
os.remove(fn) | |
if DEBUG: | |
print '%s deleted' % fn | |
break | |
except: | |
raise Passthru('file delete failed', type='FILE_CLEANUP') | |
class LicenseRenewer(object): | |
renewal_server = 'https://openvpn.net/rest/License' | |
def __init__(self, resolver, ca_bundle): | |
self.resolver = resolver | |
self.ca_bundle = ca_bundle | |
@defer.inlineCallbacks | |
def get_renewal_key(self, key): | |
try: | |
url = '%s?key=%s' % (self.renewal_server, urllib.quote(key)) | |
ssl_context = SSLClientContext(verify=True, requireCertificate=True, caBundle=self.ca_bundle) | |
status, ct, res = yield https_get(url, ssl_context=ssl_context, resolver=self.resolver) | |
if status == '200' and ct == 'application/json': | |
dres = json.JSONDecoder().decode(res) | |
newkey = dres.get('NewKey') | |
if newkey and newkey.lower() != 'none': | |
defer.returnValue(LicenseKeyString.validate(newkey)) | |
else: | |
raise LicError('problem with https GET, status=%r content_type=%r' % (status, ct)) | |
except Exception: | |
print Passthru('LicenseRenewer fail on %r' % (key,)) | |
class RemoteValidation(object): | |
server = 'https://dw.yonan.net/rv' | |
def __init__(self, resolver, ca_bundle): | |
self.resolver = resolver | |
self.ca_bundle = ca_bundle |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment