Skip to content

Instantly share code, notes, and snippets.

@danslinky
Created February 28, 2024 18:15
Show Gist options
  • Save danslinky/8f544dc88e08675acd2d1a780bec303e to your computer and use it in GitHub Desktop.
Save danslinky/8f544dc88e08675acd2d1a780bec303e to your computer and use it in GitHub Desktop.
digitalocean_droplet_wireguard_cloud-config_python

Create a DigitalOcean Droplet, using Python and cloud-config and setup some Wireguard

Code Review for DigitalOcean API Integration Script

The provided script is a Python script that interacts with the DigitalOcean API to perform various tasks related to managing resources like account info, SSH keys, VPCs, droplets, and firewalls.

Here are some aspects to consider for a code review:

Code Structure

  • The script is well-structured with functions for different API calls, which helps in maintaining and understanding the code.
  • The script imports necessary modules at the beginning and defines global variables for API base URL and headers.

Error Handling

  • The script handles HTTP status codes appropriately by checking for success (status code 200) and printing error messages with the response text if there is a failure.
  • The script exits with status 1 if there is an error response from the API, ensuring that the script stops execution on critical failures.

Security

  • The script uses an API token stored in an environment variable for authentication, which is a good practice for security.
  • The script sends requests over HTTPS to the DigitalOcean API endpoint, ensuring data security during transmission.

Improvements

  • The get_firewall_uuid function is incomplete and ends abruptly. It seems like there is a typo in r = request which should be r = requests.get(api_url, headers=do_headers).
  • The script could benefit from more detailed error handling, such as logging errors to a file or service for better monitoring and debugging.
  • Adding more comments within the functions to explain the purpose of each API call and the expected response structure would improve code readability.

Best Practices

  • It would be beneficial to add more input validation checks to ensure that the data being passed to API calls is in the correct format.
  • Consider using a library like logging for consistent and structured logging throughout the script.

Performance

  • The script makes multiple API calls synchronously, which could be optimized by using asynchronous requests for better performance, especially when dealing with multiple resources.

Testing

  • Unit tests could be added to validate the functions' behavior under different scenarios, ensuring the script functions correctly in various situations.

Documentation

  • Adding docstrings to each function explaining their purpose, parameters, and return values would enhance code documentation and readability.

Overall

The script demonstrates good integration with the DigitalOcean API and performs essential operations effectively. With some improvements in error handling, documentation, and testing, the script can be more robust and maintainable.

If you have any specific questions or need further assistance with this code, feel free to ask!

#cloud-config
disable_root: true
users:
- name: username
ssh-authorized-keys:
- sysadmin
sudo: ['ALL=(ALL) NOPASSWD:ALL']
groups: sudo
shell: /bin/bash
package_update: true
package_upgrade: true
packages:
- wireguard-tools
- net-tools
wireguard:
interfaces:
- name: wg0
config_path: /etc/wireguard/wg0.conf
content: |
[Interface]
PrivateKey = TmV2ZXIgR29ubmEgR2l2ZSBZb3UgVXA=
ListenPort = 51820
Address = 42.69.4.2/32
# DNS = 42.1.8.9
[Peer]
PublicKey = TmV2ZXIgR29ubmEgR2l2ZSBZb3UgVXA=
PresharedKey =
Endpoint = 1.2.2.3:51820
AllowedIPs = 42.0.0.0/8
# The idea behind readiness probes is to ensure Wireguard connectivity before continuing
# the cloud-init process. This could be useful if you need access to specific services like
# an internal APT Repository Server (e.g Landscape) to install/update packages.
readinessprobe:
- 'systemctl start wg-quick@wg0.service'
- 'systemctl enable wg-quick@wg0.service'
- 'ping -c 5 $TUNNEL_ENDPOINT'
final_message: |
The system is _finally_ up, after $UPTIME seconds
Cloud-init v. $UPSTART_JOB $UPSTART_EVENTS
$SYSTEM_INFO
$FINAL_MESSAGE
cat /etc/wireguard/wg0.conf
#!/usr/bin/env python3
import os
import sys
import json
import requests
import time
# get api token from DO_API_TOKEN environment var
do_api_token = os.environ['DO_API_TOKEN']
do_api_url_base = 'https://api.digitalocean.com/v2/'
do_headers = {'Content-Type': 'application/json',
'Authorization': 'Bearer {0}'.format(do_api_token)}
def get_account_info():
"""
Get DigitalOcean account info.
"""
api_url = '{0}account'.format(do_api_url_base)
r = requests.get(api_url, headers=do_headers)
if r.status_code == 200:
return json.loads(r.content.decode('utf-8'))
else:
return None
def get_ssh_keys():
"""
Get all DigitalOcean SSH keys, return their IDs.
"""
api_url = '{0}account/keys'.format(do_api_url_base)
r = requests.get(api_url, headers=do_headers)
if r.status_code != 200:
print(r.text)
sys.exit(1)
ssh_keys = json.loads(r.content.decode('utf-8'))
key_ids = []
if ssh_keys is not None:
for key, details in enumerate(ssh_keys['ssh_keys']):
key_ids.append(details['id'])
return key_ids
def get_vpcs():
"""
Get all DigitalOcean VPCs.
"""
api_url = '{0}vpcs'.format(do_api_url_base)
r = requests.get(api_url, headers=do_headers)
if r.status_code != 200:
print(r.text)
sys.exit(1)
vpcs = json.loads(r.content.decode('utf-8'))
return vpcs
def create_droplet(config):
"""
Create a new droplet of config, return the droplet id
"""
api_url = '{0}droplets'.format(do_api_url_base)
r = requests.post(api_url, headers=do_headers, data=json.dumps(config))
if r.status_code != 202:
print(r.text)
sys.exit(1)
return r.json()['droplet']['id']
def get_action_status(droplet_id):
api_url = '{0}droplets/{1}/actions'.format(do_api_url_base, droplet_id)
r = requests.get(api_url, headers=do_headers)
if r.status_code != 200:
print(r.text)
sys.exit(1)
return r.json()['actions'][0]['status']
def get_droplet_ip(droplet_id):
api_url = '{0}droplets/{1}'.format(do_api_url_base, droplet_id)
r = requests.get(api_url, headers=do_headers)
if r.status_code != 200:
print(r.text)
sys.exit(1)
droplet_ip = None
for network in r.json()['droplet']['networks']['v4']:
if network['type'] == 'public':
droplet_ip = network['ip_address']
break
return droplet_ip
def add_droplet_to_firewall(firewall_id, droplet_id):
api_url = '{0}firewalls/{1}/droplets'.format(do_api_url_base, firewall_id)
data = json.dumps({"droplet_ids": [droplet_id]})
r = requests.post(api_url, headers=do_headers, data=data)
if r.status_code != 204:
print(r.text)
sys.exit(1)
print("Added droplet {0} to firewall {1}".format(droplet_id, firewall_id))
def get_firewall_uuid(firewall_name):
api_url = '{0}firewalls'.format(do_api_url_base)
r = request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment