Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@vejuhust
Created February 27, 2017 15:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vejuhust/b7437e8b015feb87a45cab54e318fd9c to your computer and use it in GitHub Desktop.
Save vejuhust/b7437e8b015feb87a45cab54e318fd9c to your computer and use it in GitHub Desktop.
Python Wrapper of SoftEtherVPN "vpncmd" Tool
#!/usr/bin/env python3
import logging
# Colorful console
RESET_SEQ = "\033[0m"
COLOR_SEQ = "\033[1;%dm"
BOLD_SEQ = "\033[1m"
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
COLORS = {
'WARNING': YELLOW,
'INFO': WHITE,
'DEBUG': BLUE,
'CRITICAL': MAGENTA,
'ERROR': RED
}
class ColoredFormatter(logging.Formatter):
def __init__(self, format_string, use_color = True):
logging.Formatter.__init__(self, format_string)
self.use_color = use_color
def format(self, record):
levelname = record.levelname
if self.use_color and levelname in COLORS:
levelname_color = COLOR_SEQ % (30 + COLORS[levelname]) + levelname + " "*(8 - len(levelname)) + RESET_SEQ
record.levelname = levelname_color
return logging.Formatter.format(self, record)
# Upload to DataDog
from datadog import initialize, api, statsd
from random import getrandbits
aggregation_key = format(getrandbits(64), "x")
class UploadHandler(logging.NullHandler):
def __init__(self):
options = {
"api_key": "YOUR_OWN_KEY",
"app_key": "YOUR_OWN_APP"
}
initialize(**options)
def handle(self, record):
global aggregation_key
if record.getMessage().find("###") == 0:
aggregation_key = format(getrandbits(64), "x")
alert_type = "info"
if record.levelno == logging.INFO:
alert_type = "success"
elif record.levelno == logging.WARNING:
alert_type = "warning"
elif record.levelno == logging.ERROR:
alert_type = "error"
api.Event.create(
title="%s - %s [%s]" % (record.name, record.funcName, record.levelname),
text=self.format(record),
tags=["role:master"],
alert_type=alert_type,
aggregation_key=aggregation_key)
def GetLogger(loggerName):
logger = logging.getLogger(loggerName)
if not logger.handlers:
logger.setLevel(logging.DEBUG)
format_string = "%(asctime)-24s| %(name)-22s| %(funcName)-18s| %(lineno)4d | %(levelname)-8s | %(message)s"
# Write logs to files
plain_formatter = logging.Formatter(format_string)
fileHandler = logging.FileHandler("config.log")
fileHandler.setLevel(logging.DEBUG)
fileHandler.setFormatter(plain_formatter)
logger.addHandler(fileHandler)
# Upload logs to DataDog
uploadHandler = UploadHandler()
uploadHandler.setLevel(logging.DEBUG)
uploadHandler.setFormatter(plain_formatter)
logger.addHandler(uploadHandler)
# Output logs to console with color
color_formatter = ColoredFormatter(format_string)
consoleHandler = logging.StreamHandler()
consoleHandler.setLevel(logging.INFO)
consoleHandler.setFormatter(color_formatter)
logger.addHandler(consoleHandler)
return logger
#!/usr/bin/env python3
from subprocess import Popen, PIPE, TimeoutExpired
from os import path
from Logger import GetLogger
class VPNServerCommandline:
def __init__(self, vpn_server, vpn_port, vpn_password, command_timeout, hub_name=None):
self._logger = GetLogger("VPNServerCommandline")
self._logger.info("creating an instance of VPNServerCommandline")
if path.exists("./vpncmd") and path.exists("./hamcore.se2"):
utility_path = "./vpncmd"
else:
utility_path = "/usr/local/vpnserver/vpncmd"
self._logger.warning("failed to find local 'vpncmd'")
if hub_name != None and len(hub_name) > 0:
self._command_template = "%s %s:%d /SERVER /PASSWORD:%s /adminhub:%s /CMD %s" % (utility_path, vpn_server, vpn_port, vpn_password, hub_name, "%s")
self._logger.info("set to HUB mode with command template: %s", self._command_template)
else:
self._command_template = "%s %s:%d /SERVER /PASSWORD:%s /CMD %s" % (utility_path, vpn_server, vpn_port, vpn_password, "%s")
self._logger.info("set to SERVER mode with command template: %s", self._command_template)
self._command_timeout = command_timeout
self._logger.info("set command timeout %s secs", self._command_timeout)
def _get_clean_lines(self, byte_raw):
lines_raw = byte_raw.decode('utf-8').split('\n')
lines_clean = []
for line in lines_raw:
line = line.strip()
if len(line) > 0:
lines_clean.append(line)
if len(lines_clean) == 0:
return None
else:
return lines_clean
def Execute(self, vpn_command):
command_line = self._command_template % vpn_command
process = Popen(command_line, shell=True, cwd=path.dirname(path.realpath(__file__)), stdout=PIPE, stderr=PIPE, start_new_session=True)
self._logger.info("execute command: %s", command_line)
try:
output_result, output_error = process.communicate(timeout=self._command_timeout)
except TimeoutExpired:
self._logger.error("execution timeout: %s", command_line)
process.kill()
output_result, output_error = process.communicate()
clean_result = self._get_clean_lines(output_result)
clean_error = self._get_clean_lines(output_error)
response_status = len(clean_result) >= 1 and clean_result[-1] == "The command completed successfully."
if response_status:
self._logger.info("command completed successfully: %s", command_line)
else:
self._logger.warning("command failed: %s", command_line)
self._logger.debug("result message: %s", clean_result)
self._logger.debug("error message: %s", clean_error)
return (
response_status,
command_line,
clean_result,
clean_error)
if __name__ == '__main__':
vpn_server = "YOUR_OWN_SERVER.cloudapp.net"
vpn_port = 8301
vpn_password = "YOUR_OWN_PASSWORD"
vpn_command = "UserList"
hub_name = "VPN"
command_timeout = 15
vc = VPNServerCommandline(vpn_server, vpn_port, vpn_password, command_timeout)
vc.Execute("HubList")
vc = VPNServerCommandline(vpn_server, vpn_port, vpn_password, command_timeout, hub_name)
vc.Execute("UserList")
vc.Execute("UserGet Test1")
#!/usr/bin/env python3
from VPNServerCommandline import VPNServerCommandline
from Logger import GetLogger
from datetime import timedelta, datetime
class VPNServerManager:
def __init__(self, vpn_server, vpn_port, vpn_password, hub_name, hub_password, preshared_key, command_timeout):
self._logger = GetLogger("VPNServerManager")
self._logger.info("creating an instance of VPNServerManager")
self._executor_server = VPNServerCommandline(vpn_server, vpn_port, vpn_password, command_timeout)
self._executor_hub = VPNServerCommandline(vpn_server, vpn_port, vpn_password, command_timeout, hub_name)
self._server_command_list = [
"HubDelete %s" % hub_name,
"HubCreate %s /PASSWORD:%s" % (hub_name, hub_password),
]
self._hub_command_list = [
"SecureNatEnable",
"IPsecEnable /L2TP:yes /L2TPRAW:yes /ETHERIP:yes /PSK:%s /DEFAULTHUB:%s" % (preshared_key, hub_name),
]
def _execute_command(self, executor, command):
responses = executor.Execute(command)
if len(responses) > 0 and responses[0] == True:
self._logger.info("succeeded to execute command '%s'", command)
return True
else:
self._logger.error("failed to execute command '%s'", command)
return False
def Setup(self, fault_tolerance = True):
self._logger.info("### setup server & hub")
for command in self._server_command_list:
if not self._execute_command(self._executor_server, command) and not fault_tolerance:
return False
for command in self._hub_command_list:
if not self._execute_command(self._executor_hub, command) and not fault_tolerance:
return False
return True
def CreateUser(self, user_name, user_password):
self._logger.info("### create user '%s' with password '%s'", user_name, user_password)
command_list = [
"UserCreate %s /GROUP:none /REALNAME:none /NOTE:none" % user_name,
"UserPasswordSet %s /PASSWORD:%s" % (user_name, user_password),
]
for command in command_list:
if not self._execute_command(self._executor_hub, command):
return False
else:
return True
def SetUserExpire(self, user_name, duration):
self._logger.info("### set user '%s' expire after '%s'", user_name, duration)
expire_value = "none"
if duration != None:
expire_date = datetime.now() + duration
expire_value = expire_date.strftime('"%Y/%m/%d %H:%M:%S"')
self._logger.info("set user '%s' expires at '%s'", user_name, expire_value)
command = "UserExpiresSet %s /EXPIRES:%s" % (user_name, expire_value)
return self._execute_command(self._executor_hub, command)
def DeleteUser(self, user_name):
self._logger.info("### delete user '%s'", user_name)
command = "UserDelete %s" % user_name
return self._execute_command(self._executor_hub, command)
def DisconnectUser(self, user_name):
self._logger.info("### disconnect sessions of user '%s'", user_name)
session_list = []
responses = self._executor_hub.Execute("SessionList")
if len(responses) >= 4 and responses[0] == True:
session_name = None
session_user = None
lines_raw = responses[2]
for line in lines_raw:
if line.find("Session Name") == 0:
pair = line.split('|')
if len(pair) >= 2:
session_name = pair[1].strip()
continue
if line.find("User Name") == 0:
pair = line.split('|')
if len(pair) >= 2:
session_user = pair[1].strip()
if session_user == user_name.lower():
session_list.append(session_name)
self._logger.info("found session '%s' from '%s'", session_name, session_user)
for session_name in session_list:
self._logger.info("disconnect session '%s'", session_name)
command = "SessionDisconnect %s" % session_name
self._execute_command(self._executor_hub, command)
return True
else:
self._logger.error("failed to retrieve current session list")
return False
if __name__ == '__main__':
vpn_server = "YOUR_OWN_SERVER.cloudapp.net"
vpn_port = 8301
vpn_password = "YOUR_OWN_PASSWORD"
hub_name = "VPN"
hub_password = "YOUR_OWN_PASSWORD"
preshared_key = "YOUR_OWN_KEY"
command_timeout = 15
user_name = "YOUR_OWN_NAME"
user_password = "YOUR_OWN_PASSWORD"
vs = VPNServerManager(vpn_server, vpn_port, vpn_password, hub_name, hub_password, preshared_key, command_timeout)
vs.Setup()
vs.CreateUser(user_name, user_password)
vs.SetUserExpire(user_name, timedelta(hours=3))
vs.DeleteUser(user_name)
vs.DisconnectUser(user_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment