Skip to content

Instantly share code, notes, and snippets.

@xan7r
Last active May 6, 2023 11:10
Show Gist options
  • Save xan7r/b0775e4ec23715f846e54223c4959011 to your computer and use it in GitHub Desktop.
Save xan7r/b0775e4ec23715f846e54223c4959011 to your computer and use it in GitHub Desktop.
Script for basic management of Digital Ocean droplets and DNS records
#!/usr/bin/python3
import digitalocean, time, datetime, sys, re, argparse
from urllib import request
# NOTE: REQUIRES python-digitalocean
#
# examples:
# ./digitalOceanAPI.py --create-droplet "ubuntu 18.04"
# ./digitalOceanAPI.py --destroy-droplet <ubuntu18.04-20180723.185051>
# ./digitalOceanAPI.py --list-droplets
# ./digitalOceanAPI.py --list-images
# ./digitalOceanAPI.py --list-domains
# ./digitalOceanAPI.py --list-records --domain <example.local>
# ./digitalOceanAPI.py --create-record --domain <example.local> --record-name www --record-data 127.0.0.1
# ./digitalOceanAPI.py --destroy-record www --domain <example.local>
# ./digitalOceanAPI.py --dynamic-dns dns --domain <example.local>
apiKey = "" # REPLACE THIS WITH DIGITAL OCEAN API KEY
sshKeyID = 0000000 # REPLACE THIS WITH DEFAULT SSH KEY ID (INTEGER)
manager = digitalocean.Manager(token=apiKey)
def listDroplets():
for i in manager.get_all_droplets():
print("\nName : " + i.name + "\nIP Address : " + i.ip_address + "\nStatus : " + i.status)
def listImages():
for i in manager.get_all_images():
print(i)
def listRegions():
for i in manager.get_all_regions():
print(str(i))
def listSizes():
for i in manager.get_all_sizes():
print(str(i))
def listKeys():
print("")
for i in manager.get_all_sshkeys():
print(str(i))
def listDomains():
print("")
for i in manager.get_all_domains():
print(str(i))
def listDomainRecords(domain):
if domain == None:
print("ERROR: Domain required. Use --list-domains to view all domains")
sys.exit(1)
domainObj = digitalocean.Domain(name=domain, token=apiKey)
try:
print("\nID\t\tTYPE\tNAME\t\tDATA\t\t\t\tTTL")
for i in domainObj.get_records():
print(str(i.id) + "\t" + i.type + "\t" + i.name + "\t\t" + i.data + "\t\t" + str(i.ttl))
except Exception as e:
print("ERROR: Domain " + domain + " not found. Use --list-domains to view all domains")
print(e)
sys.exit(1)
def createDomainRecord(domain, recordType, recordName, recordData):
if domain == None:
print("ERROR: Domain required. Use --list-domains to view all domains")
sys.exit(1)
domainObj = digitalocean.Domain(name=domain, token=apiKey)
if recordType == None:
recordType = "A"
if recordName == None:
print("ERROR: Domain subdomain name required to add record")
sys.exit(1)
if recordData == None:
print("ERROR: Domain subdomain data required to add record")
sys.exit(1)
try:
if recordType == "MX":
domainObj.create_new_domain_record(type=recordType, name=recordName, data=recordData, priority=10)
else:
domainObj.create_new_domain_record(type=recordType, name=recordName, data=recordData)
print("\nCreated Record Successfully.")
listDomainRecords(domain)
except Exception as e:
print("ERROR: Unable to create new record\n" + str(e))
sys.exit(1)
def destroyDomainRecord(domain, recordID, bypassConfirmation=False):
if domain == None:
print("ERROR: Domain required. Use --list-domains to view all domains")
sys.exit(1)
if recordID == None:
print("ERROR: Record ID required.")
listDomainRecords(domain)
domainObj = digitalocean.Domain(name=domain, token=apiKey)
for i in domainObj.get_records():
if str(i.id) == recordID or str(i.name) == recordID:
print("\nDNS Record:\n" + str(i.id) + "\t" + i.type + "\t" + i.name + "\t\t" + i.data + "\t\t" + str(i.ttl))
if bypassConfirmation:
i.destroy()
print("\nDestroyed Record Successfully.")
listDomainRecords(domain)
else:
text = input("\nType CONFIRM to destroy record: ")
if text == "CONFIRM":
i.destroy()
print("\nDestroyed Record Successfully.")
listDomainRecords(domain)
else:
print("Operation aborted. Droplet not destroyed")
sys.exit(1)
def createDroplet(imageName, imageSize, imageRegion, keyID, userDataFile):
if imageSize == None:
imageSize = '512mb'
if imageRegion == None:
imageRegion = 'nyc1'
if keyID == None:
keyID = sshKeyID
userDataContents = ""
if userDataFile:
with open(userDataFile, 'r') as myfile:
userDataContents=myfile.read()
allKeys = manager.get_all_sshkeys()
for i,val in enumerate(allKeys):
if val.id == int(keyID):
myKey = allKeys[i]
break
allImages = manager.get_all_images()
DOimageName = None
print("")
for i in allImages:
curName = str(i).upper()
if curName.find(imageName.upper()) > -1:
DOimageName = str(i.id)
print("Creating droplet from image: " + str(i))
break
if DOimageName == None:
print("Unable to find images matching: " + imageName)
sys.exit()
ts = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d.%H%M%S')
boxName = re.sub(' ', '', imageName) + "-" + ts
droplet = digitalocean.Droplet(token=apiKey,
name=boxName,
region=imageRegion,
image=DOimageName,
ssh_keys=[myKey],
user_data=userDataContents,
size_slug=imageSize)
droplet.create()
currentStatus = droplet.get_actions()[0]
while ( currentStatus.status != 'completed'):
time.sleep(2)
currentStatus.load()
droplet.load()
print("Created Host: " + boxName)
print("In Region: " + imageRegion)
print("With Size: " + imageSize)
print("With SSH Key: " + str(myKey))
print("With Public IP: " + droplet.ip_address)
def destroyDroplet(droplet):
allDroplets = manager.get_all_droplets()
for i in allDroplets:
if (i.name.upper() == droplet.upper()) or (i.ip_address == droplet):
print("\nName : " + i.name + "\nIP Address : " + i.ip_address + "\nStatus : " + i.status)
text = input("\nType CONFIRM to destroy droplet: ")
if text == "CONFIRM":
i.destroy()
print("Destroyed droplet " + droplet)
return
else:
print("Operation aborted. Droplet not destroyed")
return
print("ERROR: Unable to destroy droplet " + droplet)
def checkIP(url):
req = request.urlopen(url)
response = req.read().decode('utf-8').rstrip()
return response
def DynamicDNS(url, domain, recordName):
if domain == None:
print("ERROR: Domain required (e.g. example.local). Use --list-domains to view all domains")
sys.exit(1)
if recordName == None:
print("ERROR: Record Name required. (e.g. dynamicdns)")
sys.exit(1)
currentIP = str(checkIP(url))
dnsIP = None
domainObj = digitalocean.Domain(name=domain, token=apiKey)
try:
for i in domainObj.get_records():
if str(i.name) == recordName:
dnsIP = str(i.data)
except Exception as e:
print("ERROR: Domain " + domain + " not found. Use --list-domains to view all domains")
print(e)
sys.exit(1)
if dnsIP == None:
print("ERROR: Existing record not found. Please set DNS once manually before using Dynamic DNS mode.")
sys.exit(1)
if dnsIP == currentIP:
print("[*] DNS already set correctly, nothing to do")
sys.exit(0)
else:
destroyDomainRecord(domain, recordName, True)
createDomainRecord(domain, "A", recordName, currentIP)
print("[*] DNS record updated to " + currentIP)
def parseArgs():
parser = argparse.ArgumentParser(description="Script to interface with Digital Ocean API")
primaryCommand = parser.add_mutually_exclusive_group(required=True)
primaryCommand.add_argument('-cd','--create-droplet', help='create droplet based on DO image name (fuzzy match)')
parser.add_argument('-r','--region', help='specify region when creating droplet (default: nyc1)')
parser.add_argument('-s','--size', help='specify size when creating droplet (default: 512mb)')
parser.add_argument('-k','--key', help='specify ssh key ID to use when creating droplet')
parser.add_argument('-d','--data', help='specify User Data file (bash script) to apply when creating new droplet')
primaryCommand.add_argument('-dd','--destroy-droplet', help='destroy droplet based on name or IP (exact match)')
parser.add_argument('-D','--domain', help='specify domain to manipulate (exact match)')
primaryCommand.add_argument('-cr','--create-record', help='create new subdomain record', action='store_true')
primaryCommand.add_argument('-dr','--destroy-record', help='destroy subdomain record by ID or Name')
parser.add_argument('-rt','--record-type', help='specify subdomain record type (e.g. A, CNAME)')
parser.add_argument('-rn','--record-name', help='specify subdomain record name (e.g. @, www)')
parser.add_argument('-rd','--record-data', help='specify subdomain record data (e.g. 127.0.0.1)')
primaryCommand.add_argument('-li','--list-images', help='list all available images ', action='store_true')
primaryCommand.add_argument('-ld','--list-droplets', help='list all available droplets', action='store_true')
primaryCommand.add_argument('-lk','--list-keys', help='list all available ssh keys', action='store_true')
primaryCommand.add_argument('-lr','--list-regions', help='list all available regions', action='store_true')
primaryCommand.add_argument('-ls','--list-sizes', help='list all available sizes', action='store_true')
primaryCommand.add_argument('-lD','--list-domains', help='list all available domain names', action='store_true')
primaryCommand.add_argument('-lR','--list-records', help='list records from specified domain', action='store_true')
primaryCommand.add_argument('-CI','--check-ip', help='check current ip using specified url', action='store_true')
primaryCommand.add_argument('-DD','--dynamic-dns', help='Check IP and update Dynamic DNS if required')
parser.add_argument('-u','--url', help='url to use when checking IP (only used in --check-ip and --dynamic-dns)', nargs='?', const='https://ipinfo.io/ip', default='https://ipinfo.io/ip')
args = parser.parse_args()
return args
def main():
args = parseArgs()
if args.list_images:
listImages()
if args.list_droplets:
listDroplets()
if args.list_keys:
listKeys()
if args.list_regions:
listRegions()
if args.list_sizes:
listSizes()
if args.list_domains:
listDomains()
if args.list_records:
listDomainRecords(args.domain)
if args.create_record:
createDomainRecord(args.domain, args.record_type, args.record_name, args.record_data)
if args.destroy_record:
destroyDomainRecord(args.domain, args.destroy_record)
if args.create_droplet:
createDroplet(args.create_droplet, args.size, args.region, args.key, args.data)
if args.destroy_droplet:
destroyDroplet(args.destroy_droplet)
if args.dynamic_dns:
DynamicDNS(args.url, args.domain, args.dynamic_dns)
if args.check_ip:
print(checkIP(args.url))
print("")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment