Created
February 27, 2017 15:41
-
-
Save vejuhust/b7437e8b015feb87a45cab54e318fd9c to your computer and use it in GitHub Desktop.
Python Wrapper of SoftEtherVPN "vpncmd" Tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import logging | |
# Colorful console | |
RESET_SEQ = "\033[0m" | |
COLOR_SEQ = "\033[1;%dm" | |
BOLD_SEQ = "\033[1m" | |
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) | |
COLORS = { | |
'WARNING': YELLOW, | |
'INFO': WHITE, | |
'DEBUG': BLUE, | |
'CRITICAL': MAGENTA, | |
'ERROR': RED | |
} | |
class ColoredFormatter(logging.Formatter): | |
def __init__(self, format_string, use_color = True): | |
logging.Formatter.__init__(self, format_string) | |
self.use_color = use_color | |
def format(self, record): | |
levelname = record.levelname | |
if self.use_color and levelname in COLORS: | |
levelname_color = COLOR_SEQ % (30 + COLORS[levelname]) + levelname + " "*(8 - len(levelname)) + RESET_SEQ | |
record.levelname = levelname_color | |
return logging.Formatter.format(self, record) | |
# Upload to DataDog | |
from datadog import initialize, api, statsd | |
from random import getrandbits | |
aggregation_key = format(getrandbits(64), "x") | |
class UploadHandler(logging.NullHandler): | |
def __init__(self): | |
options = { | |
"api_key": "YOUR_OWN_KEY", | |
"app_key": "YOUR_OWN_APP" | |
} | |
initialize(**options) | |
def handle(self, record): | |
global aggregation_key | |
if record.getMessage().find("###") == 0: | |
aggregation_key = format(getrandbits(64), "x") | |
alert_type = "info" | |
if record.levelno == logging.INFO: | |
alert_type = "success" | |
elif record.levelno == logging.WARNING: | |
alert_type = "warning" | |
elif record.levelno == logging.ERROR: | |
alert_type = "error" | |
api.Event.create( | |
title="%s - %s [%s]" % (record.name, record.funcName, record.levelname), | |
text=self.format(record), | |
tags=["role:master"], | |
alert_type=alert_type, | |
aggregation_key=aggregation_key) | |
def GetLogger(loggerName): | |
logger = logging.getLogger(loggerName) | |
if not logger.handlers: | |
logger.setLevel(logging.DEBUG) | |
format_string = "%(asctime)-24s| %(name)-22s| %(funcName)-18s| %(lineno)4d | %(levelname)-8s | %(message)s" | |
# Write logs to files | |
plain_formatter = logging.Formatter(format_string) | |
fileHandler = logging.FileHandler("config.log") | |
fileHandler.setLevel(logging.DEBUG) | |
fileHandler.setFormatter(plain_formatter) | |
logger.addHandler(fileHandler) | |
# Upload logs to DataDog | |
uploadHandler = UploadHandler() | |
uploadHandler.setLevel(logging.DEBUG) | |
uploadHandler.setFormatter(plain_formatter) | |
logger.addHandler(uploadHandler) | |
# Output logs to console with color | |
color_formatter = ColoredFormatter(format_string) | |
consoleHandler = logging.StreamHandler() | |
consoleHandler.setLevel(logging.INFO) | |
consoleHandler.setFormatter(color_formatter) | |
logger.addHandler(consoleHandler) | |
return logger |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from subprocess import Popen, PIPE, TimeoutExpired | |
from os import path | |
from Logger import GetLogger | |
class VPNServerCommandline: | |
def __init__(self, vpn_server, vpn_port, vpn_password, command_timeout, hub_name=None): | |
self._logger = GetLogger("VPNServerCommandline") | |
self._logger.info("creating an instance of VPNServerCommandline") | |
if path.exists("./vpncmd") and path.exists("./hamcore.se2"): | |
utility_path = "./vpncmd" | |
else: | |
utility_path = "/usr/local/vpnserver/vpncmd" | |
self._logger.warning("failed to find local 'vpncmd'") | |
if hub_name != None and len(hub_name) > 0: | |
self._command_template = "%s %s:%d /SERVER /PASSWORD:%s /adminhub:%s /CMD %s" % (utility_path, vpn_server, vpn_port, vpn_password, hub_name, "%s") | |
self._logger.info("set to HUB mode with command template: %s", self._command_template) | |
else: | |
self._command_template = "%s %s:%d /SERVER /PASSWORD:%s /CMD %s" % (utility_path, vpn_server, vpn_port, vpn_password, "%s") | |
self._logger.info("set to SERVER mode with command template: %s", self._command_template) | |
self._command_timeout = command_timeout | |
self._logger.info("set command timeout %s secs", self._command_timeout) | |
def _get_clean_lines(self, byte_raw): | |
lines_raw = byte_raw.decode('utf-8').split('\n') | |
lines_clean = [] | |
for line in lines_raw: | |
line = line.strip() | |
if len(line) > 0: | |
lines_clean.append(line) | |
if len(lines_clean) == 0: | |
return None | |
else: | |
return lines_clean | |
def Execute(self, vpn_command): | |
command_line = self._command_template % vpn_command | |
process = Popen(command_line, shell=True, cwd=path.dirname(path.realpath(__file__)), stdout=PIPE, stderr=PIPE, start_new_session=True) | |
self._logger.info("execute command: %s", command_line) | |
try: | |
output_result, output_error = process.communicate(timeout=self._command_timeout) | |
except TimeoutExpired: | |
self._logger.error("execution timeout: %s", command_line) | |
process.kill() | |
output_result, output_error = process.communicate() | |
clean_result = self._get_clean_lines(output_result) | |
clean_error = self._get_clean_lines(output_error) | |
response_status = len(clean_result) >= 1 and clean_result[-1] == "The command completed successfully." | |
if response_status: | |
self._logger.info("command completed successfully: %s", command_line) | |
else: | |
self._logger.warning("command failed: %s", command_line) | |
self._logger.debug("result message: %s", clean_result) | |
self._logger.debug("error message: %s", clean_error) | |
return ( | |
response_status, | |
command_line, | |
clean_result, | |
clean_error) | |
if __name__ == '__main__': | |
vpn_server = "YOUR_OWN_SERVER.cloudapp.net" | |
vpn_port = 8301 | |
vpn_password = "YOUR_OWN_PASSWORD" | |
vpn_command = "UserList" | |
hub_name = "VPN" | |
command_timeout = 15 | |
vc = VPNServerCommandline(vpn_server, vpn_port, vpn_password, command_timeout) | |
vc.Execute("HubList") | |
vc = VPNServerCommandline(vpn_server, vpn_port, vpn_password, command_timeout, hub_name) | |
vc.Execute("UserList") | |
vc.Execute("UserGet Test1") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from VPNServerCommandline import VPNServerCommandline | |
from Logger import GetLogger | |
from datetime import timedelta, datetime | |
class VPNServerManager: | |
def __init__(self, vpn_server, vpn_port, vpn_password, hub_name, hub_password, preshared_key, command_timeout): | |
self._logger = GetLogger("VPNServerManager") | |
self._logger.info("creating an instance of VPNServerManager") | |
self._executor_server = VPNServerCommandline(vpn_server, vpn_port, vpn_password, command_timeout) | |
self._executor_hub = VPNServerCommandline(vpn_server, vpn_port, vpn_password, command_timeout, hub_name) | |
self._server_command_list = [ | |
"HubDelete %s" % hub_name, | |
"HubCreate %s /PASSWORD:%s" % (hub_name, hub_password), | |
] | |
self._hub_command_list = [ | |
"SecureNatEnable", | |
"IPsecEnable /L2TP:yes /L2TPRAW:yes /ETHERIP:yes /PSK:%s /DEFAULTHUB:%s" % (preshared_key, hub_name), | |
] | |
def _execute_command(self, executor, command): | |
responses = executor.Execute(command) | |
if len(responses) > 0 and responses[0] == True: | |
self._logger.info("succeeded to execute command '%s'", command) | |
return True | |
else: | |
self._logger.error("failed to execute command '%s'", command) | |
return False | |
def Setup(self, fault_tolerance = True): | |
self._logger.info("### setup server & hub") | |
for command in self._server_command_list: | |
if not self._execute_command(self._executor_server, command) and not fault_tolerance: | |
return False | |
for command in self._hub_command_list: | |
if not self._execute_command(self._executor_hub, command) and not fault_tolerance: | |
return False | |
return True | |
def CreateUser(self, user_name, user_password): | |
self._logger.info("### create user '%s' with password '%s'", user_name, user_password) | |
command_list = [ | |
"UserCreate %s /GROUP:none /REALNAME:none /NOTE:none" % user_name, | |
"UserPasswordSet %s /PASSWORD:%s" % (user_name, user_password), | |
] | |
for command in command_list: | |
if not self._execute_command(self._executor_hub, command): | |
return False | |
else: | |
return True | |
def SetUserExpire(self, user_name, duration): | |
self._logger.info("### set user '%s' expire after '%s'", user_name, duration) | |
expire_value = "none" | |
if duration != None: | |
expire_date = datetime.now() + duration | |
expire_value = expire_date.strftime('"%Y/%m/%d %H:%M:%S"') | |
self._logger.info("set user '%s' expires at '%s'", user_name, expire_value) | |
command = "UserExpiresSet %s /EXPIRES:%s" % (user_name, expire_value) | |
return self._execute_command(self._executor_hub, command) | |
def DeleteUser(self, user_name): | |
self._logger.info("### delete user '%s'", user_name) | |
command = "UserDelete %s" % user_name | |
return self._execute_command(self._executor_hub, command) | |
def DisconnectUser(self, user_name): | |
self._logger.info("### disconnect sessions of user '%s'", user_name) | |
session_list = [] | |
responses = self._executor_hub.Execute("SessionList") | |
if len(responses) >= 4 and responses[0] == True: | |
session_name = None | |
session_user = None | |
lines_raw = responses[2] | |
for line in lines_raw: | |
if line.find("Session Name") == 0: | |
pair = line.split('|') | |
if len(pair) >= 2: | |
session_name = pair[1].strip() | |
continue | |
if line.find("User Name") == 0: | |
pair = line.split('|') | |
if len(pair) >= 2: | |
session_user = pair[1].strip() | |
if session_user == user_name.lower(): | |
session_list.append(session_name) | |
self._logger.info("found session '%s' from '%s'", session_name, session_user) | |
for session_name in session_list: | |
self._logger.info("disconnect session '%s'", session_name) | |
command = "SessionDisconnect %s" % session_name | |
self._execute_command(self._executor_hub, command) | |
return True | |
else: | |
self._logger.error("failed to retrieve current session list") | |
return False | |
if __name__ == '__main__': | |
vpn_server = "YOUR_OWN_SERVER.cloudapp.net" | |
vpn_port = 8301 | |
vpn_password = "YOUR_OWN_PASSWORD" | |
hub_name = "VPN" | |
hub_password = "YOUR_OWN_PASSWORD" | |
preshared_key = "YOUR_OWN_KEY" | |
command_timeout = 15 | |
user_name = "YOUR_OWN_NAME" | |
user_password = "YOUR_OWN_PASSWORD" | |
vs = VPNServerManager(vpn_server, vpn_port, vpn_password, hub_name, hub_password, preshared_key, command_timeout) | |
vs.Setup() | |
vs.CreateUser(user_name, user_password) | |
vs.SetUserExpire(user_name, timedelta(hours=3)) | |
vs.DeleteUser(user_name) | |
vs.DisconnectUser(user_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment