Skip to content

Instantly share code, notes, and snippets.

@u1735067
Created January 31, 2023 14:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save u1735067/6100ea0f7bc25b78036078b9ce85e5d4 to your computer and use it in GitHub Desktop.
Save u1735067/6100ea0f7bc25b78036078b9ce85e5d4 to your computer and use it in GitHub Desktop.
An adaptation (NOT cleaned) of https://github.com/mattstruble/portainer-swarm-migrate using new StackAssociate API, for when you have to recreate your cluster because you locked yourself out :) // Consider same license as original code (MIT).
[portainer]
username=USER
password=PASSWORD
url=http://127.0.0.1:9000
[swarm]
clusterID=ID
# as seen in `docker info`
import requests
import configparser
import json
import time
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__file__)
class PortainerAPIError(Exception):
def __init__(self, code, text, *args):
super().__init__(args)
self.code = code
content = json.loads(text)
self.message = content["message"]
self.details = content["details"]
def __str__(self):
return f"Portainer API returned error code {self.code}: {self.message}\n{self.details}"
class PortainerAPI:
url: str
jwt: str
def __init__(self, parser: configparser.ConfigParser):
self.url = parser.get(section="portainer", option="url").strip()
headers = {
"Content-Type": "application/json"
}
data = {
"Username": parser.get(section="portainer", option="username").strip(),
"Password": parser.get(section="portainer", option="password").strip()
}
response = requests.post(f"{self.url}/api/auth", headers=headers, data=json.dumps(data))
self._validate_response(response)
self.jwt = json.loads(response.text)["jwt"]
@property
def headers(self) -> dict:
return {"Authorization": f"Bearer {self.jwt}",
"Content-Type": "application/json"}
@staticmethod
def _validate_response(response):
if response.status_code != 200:
print(PortainerAPIError(response.status_code, response.text))
logger.debug("Response text:" + response.text)
def _get(self, url, **kwargs):
response = requests.get(url, headers=self.headers, **kwargs)
self._validate_response(response)
return response
def _post(self, url, **kwargs):
response = requests.post(url, headers=self.headers, **kwargs)
self._validate_response(response)
return response
def _put(self, url, **kwargs):
response = requests.put(url, headers=self.headers, **kwargs)
self._validate_response(response)
return response
def get_stacks(self) -> list:
response = self._get(f"{self.url}/api/stacks")
response_json = response.json()
print(json.dumps(response_json, indent=2))
return response_json
def start_stack(self, stack: dict):
logger.debug(f"Starting: {stack['Name']}")
self._post(f"{self.url}/api/stacks/{stack['Id']}/start")
def stop_stack(self, stack: dict):
logger.debug(f"Stopping: {stack['Name']}")
try:
self._post(f"{self.url}/api/stacks/{stack['Id']}/stop")
except PortainerAPIError as api_error:
if api_error.code == 400 and api_error.message == "Stack is already inactive":
print(api_error.message)
else:
raise api_error
def migrate_stack(self, stack: dict, new_swarm_id: str):
logger.info(f"Migrating: {stack['Name']}")
if stack["SwarmId"] == new_swarm_id:
print("Stack already exists on the new swarm.")
return
data = {
"endpointID": stack["EndpointId"],
"name": stack["Name"],
"swarmID": new_swarm_id
}
self._post(f"{self.url}/api/stacks/{stack['Id']}/migrate", data=json.dumps(data))
def associate_stack(self, stack: dict, new_swarm_id: str):
"""
https://app.swaggerhub.com/apis/portainer/portainer-ce/2.16.2#/stacks/StackAssociate
https://github.com/portainer/portainer/blob/2.16.2/api/http/handler/stacks/stack_associate.go#L16
https://github.com/portainer/portainer/issues/4397
"""
logger.info(f"Migrating: {stack['Name']}")
if stack["SwarmId"] == new_swarm_id:
print("Stack already exists on the new swarm.")
return
params = {
"endpointId": stack["EndpointId"],
"swarmId": new_swarm_id,
"orphanedRunning": False,
}
self._put(f"{self.url}/api/stacks/{stack['Id']}/associate", params=params)
def __repr__(self):
return str({
"url": self.url,
"jwt": self.jwt
})
def get_old_cluster_stacks(api: PortainerAPI, new_cluster_id: str) -> list:
return [s for s in api.get_stacks() if s["SwarmId"] != new_cluster_id]
if __name__=="__main__":
parser = configparser.ConfigParser()
parser.read("configuration.cfg")
api = PortainerAPI(parser)
new_cluster_id = parser.get("swarm", "clusterID").strip()
old_stacks = get_old_cluster_stacks(api, new_cluster_id)
if len(old_stacks) == 0:
logger.info("Could not find any orphaned stacks.")
exit(0)
logger.info(f"Found {len(old_stacks)} orphaned stacks. Beginning migration.")
for stack in old_stacks:
api.associate_stack(stack, parser.get("swarm", "clusterID").strip())
raise SystemExit('Done')
# Stop all old running stacks before migrating
for stack in old_stacks:
api.stop_stack(stack)
# Wait for all the stacks to stop before trying to migrate them
start_time = time.time()
all_stopped = False
while time.time() - start_time < 10:
time.sleep(1)
running = [s['Name'] for s in get_old_cluster_stacks(api, new_cluster_id) if s['Status'] == 1]
if len(running) == 0:
break
for stack in running:
logger.info(f"Couldn't stop '{stack['Name']}' retrying..")
api.stop_stack(stack)
if not all_stopped:
running = [s['Name'] for s in get_old_cluster_stacks(api, new_cluster_id) if s['Status'] == 1]
running_list = '\n\t'.join(running)
logger.error(f"The following stacks could not be stopped and will need to be killed in the Docker CLI by running `sudo docker rm STACK_NAME`: {running_list}")
exit(1)
# Migrate old stacks
for stack in old_stacks:
api.migrate_stack(stack, parser.get("swarm", "clusterID").strip())
time.sleep(0.5)
api.start_stack(stack)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment