Skip to content

Instantly share code, notes, and snippets.

@xynova
Last active May 30, 2022 22:34
Show Gist options
  • Save xynova/93c7d1e6af2a5d1ad9287ed6bccb44d8 to your computer and use it in GitHub Desktop.
Save xynova/93c7d1e6af2a5d1ad9287ed6bccb44d8 to your computer and use it in GitHub Desktop.
Terraform Registry module reference rewrite server
from git import Repo # pip install gitpython
import requests # pip install requests
from http.server import BaseHTTPRequestHandler, HTTPServer
import ssl
import time
import logging
import urllib.parse
import os
import shutil
import fnmatch
import re
import hashlib
import uuid
import getpass
import sys, errno
import logging
logger = logging.getLogger("mylogger")
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# These get set when deploying as a container
serverHost = os.getenv('SERVER_HOST', "tfstacks.ourdomain.com.au")
serverPort = os.getenv('SERVER_PORT', "443")
serverProtocol = os.getenv('SERVER_PROTOCOL', "https")
ifce_address = "0.0.0.0"
ifce_port = os.getenv('IFCE_PORT', serverPort)
ifce_protocol = os.getenv('IFCE_PROTOCOL', serverProtocol)
ifce_tls_keyfile = os.getenv('IFCE_TLS_KEYFILE', "/tmp/ssl/tls.key")
ifce_tls_certfile = os.getenv('IFCE_TLS_CERTFILE', "/tmp/ssl/tls.crt")
gitServer = os.getenv('GIT_SERVER', 'https://gitlab.ourdomain.com.au')
gitRepo = os.getenv('GIT_REPO', '/plat/grunt/terraform-stacks.git')
gitTokenPath = os.getenv('GIT_TOKEN_PATH', '/usr/src/app/secrets/gitToken')
gitlabProjectId = os.getenv('GITLAB_PROJECT_ID', '117')
def getGitToken():
try:
file = open(gitTokenPath, 'r')
logger.info('GitLab token read from ' + gitTokenPath + ' and configured.')
return file.readline().strip()
except:
logger.info('No GitLab token at ' + gitTokenPath + ' prompting for token instead.')
return getpass.getpass("Enter gitlab auth token (press ENTER if none): ")
gitToken = getGitToken()
class MyServer(BaseHTTPRequestHandler):
def write_response(self,bytes):
try:
self.wfile.write(bytes)
except BrokenPipeError: # ignore broken pipe as socket might have been closed
pass
# OVERRIDE DEFAULT LOG HANDLER
def log_message(self, format, *args):
# Don't log health checks
for arg in args:
if arg == "GET /healthz HTTP/1.1" or arg == "GET /ready HTTP/1.1":
return
logger.info("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
format%args))
def do_GET(self):
o = urllib.parse.urlparse(self.path)
qs = urllib.parse.parse_qs(o.query)
mrId=(qs.get("mr") or [""])[0]
try:
if o.path == "/healthz" or o.path == "/ready":
self.send_response(200)
self.send_header('Contenty-type', 'application.json')
self.end_headers()
self.write_response(b'{ "status": "ok" }\n')
return
elif qs.get('terraform-get', None):
logger.info("PATH -> with-tf-get")
self.send_response_only(200)
reply = ''.join([serverProtocol,'://', serverHost, ':', serverPort, o.path, 'download.zip?mr=' , mrId])
self.send_header('x-terraform-get', reply )
self.end_headers()
return
elif o.path.endswith(".zip"):
logger.info("PATH -> zip file")
modulePath = os.path.dirname(o.path)
zipContents = createZip(mrId, modulePath)
self.send_response(200)
self.send_header('Content-type', 'application/zip')
self.end_headers()
self.write_response(zipContents)
return
else:
if not o.path.endswith("/"):
raise TypeError(f"Path {o.path} must end with '/'")
modulePath = os.path.dirname(o.path)
zipContents = createZip(mrId, modulePath)
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.write_response(b'The request is valid and has backing files. \n')
return
except BaseException as err:
msg=f"ERROR: {err}\n"
logger.error(msg)
self.send_response(500)
self.send_header('Contenty-type', 'text/html')
self.end_headers()
self.write_response(msg.encode('utf-8'))
pass
return
def do_HEAD(self):
o = urllib.parse.urlparse(self.path)
qs = urllib.parse.parse_qs(o.query)
logger.info(o)
logger.info("PATH -> method head")
self.send_response_only(200)
self.end_headers()
return
def createZip(mrId,modulePath):
logger.info(f'modulePath is {modulePath}')
gitDir = ensureGitDir(mrId)
tfPath = gitDir + modulePath
tmpName = str(uuid.uuid4())
tmpDir = gitDir + '/' + tmpName
logger.info('Copy ' + tfPath +' into ' + tmpDir)
shutil.copytree(tfPath, tmpDir)
findExp = r'^(\s*)\bsource\s*=\s*\"/workdir((/[\w-]+)+)/?\"' # this should be configurable
replaceExp = r''.join(['\\1','source="'+ serverProtocol +'://', serverHost, ':', str(serverPort), '\\2/download.zip','?', 'mr=' + mrId , '"'])
findReplace(tmpDir, findExp, replaceExp, "*.tf")
zipPath = shutil.make_archive(tmpDir, 'zip', tmpDir)
logger.info('Created modified module @ ' + zipPath)
f = open(zipPath,'rb')
zipContents=f.read()
f.close()
# cleanup
if os.path.exists(tmpDir):
shutil.rmtree(tmpDir)
if os.path.isfile(zipPath):
os.remove(zipPath)
return zipContents
def ensureGitDir(mrId):
gitDir = getGitDir(mrId)
repo = None
if not os.path.isdir(gitDir):
logger.info("Cloning into " + gitDir)
gitUrl = gitServer.replace('://', '://oauth2:'+ gitToken +'@') if gitToken else gitServer
Repo.clone_from(gitUrl + gitRepo, gitDir)
repo = Repo(gitDir)
logger.info("Fetch latest into " + gitDir)
repo.git.fetch()
if gitServer.startswith("git://") and mrId:
logger.info(gitServer)
logger.info("Checkout tag "+ mrId +" into " + gitDir)
repo.git.checkout(mrId)
elif mrId:
my_headers = {'PRIVATE-TOKEN' : gitToken}
response = requests.get(gitServer +'/api/v4/projects/'+ gitlabProjectId +'/merge_requests/' + mrId, headers=my_headers)
sha = response.json()['sha']
logger.info("Fetch "+ sha +" into " + gitDir)
repo.remotes.origin.fetch(sha)
repo.git.checkout(sha)
logger.info('MR: ' + mrId + ' is at ' + sha )
return gitDir
def getGitDir(mrId):
return getWorkDir() + '/' + (mrId or 'master')
def getWorkDir():
return '/tmp/tf-versioning-wd'
def findReplace(directory, find, replace, filePattern):
for pth, dirs, files in os.walk(os.path.abspath(directory)):
for filename in fnmatch.filter(files, filePattern):
filepath = os.path.join(pth, filename)
with open(filepath) as f:
s = f.read()
s = re.sub(find,replace, s, flags=re.M)
with open(filepath, "w") as f:
f.write(s)
if __name__ == "__main__":
webServer = HTTPServer((ifce_address, int(ifce_port)), MyServer)
if ifce_protocol == "https":
webServer.socket = ssl.wrap_socket (webServer.socket, keyfile=ifce_tls_keyfile, certfile=ifce_tls_certfile, server_side=True)
logger.info(f"Server started {ifce_protocol}://%s:%s" % webServer.server_address)
try:
webServer.serve_forever()
except KeyboardInterrupt:
pass
webServer.server_close()
logger.info("Server stopped.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment