Created
January 31, 2023 14:58
-
-
Save u1735067/6100ea0f7bc25b78036078b9ce85e5d4 to your computer and use it in GitHub Desktop.
An adaptation (NOT cleaned) of https://github.com/mattstruble/portainer-swarm-migrate using new StackAssociate API, for when you have to recreate your cluster because you locked yourself out :) // Consider same license as original code (MIT).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[portainer] | |
username=USER | |
password=PASSWORD | |
url=http://127.0.0.1:9000 | |
[swarm] | |
clusterID=ID | |
# as seen in `docker info` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import configparser | |
import json | |
import time | |
import logging | |
import sys | |
logging.basicConfig(stream=sys.stdout, level=logging.INFO) | |
logger = logging.getLogger(__file__) | |
class PortainerAPIError(Exception): | |
def __init__(self, code, text, *args): | |
super().__init__(args) | |
self.code = code | |
content = json.loads(text) | |
self.message = content["message"] | |
self.details = content["details"] | |
def __str__(self): | |
return f"Portainer API returned error code {self.code}: {self.message}\n{self.details}" | |
class PortainerAPI: | |
url: str | |
jwt: str | |
def __init__(self, parser: configparser.ConfigParser): | |
self.url = parser.get(section="portainer", option="url").strip() | |
headers = { | |
"Content-Type": "application/json" | |
} | |
data = { | |
"Username": parser.get(section="portainer", option="username").strip(), | |
"Password": parser.get(section="portainer", option="password").strip() | |
} | |
response = requests.post(f"{self.url}/api/auth", headers=headers, data=json.dumps(data)) | |
self._validate_response(response) | |
self.jwt = json.loads(response.text)["jwt"] | |
@property | |
def headers(self) -> dict: | |
return {"Authorization": f"Bearer {self.jwt}", | |
"Content-Type": "application/json"} | |
@staticmethod | |
def _validate_response(response): | |
if response.status_code != 200: | |
print(PortainerAPIError(response.status_code, response.text)) | |
logger.debug("Response text:" + response.text) | |
def _get(self, url, **kwargs): | |
response = requests.get(url, headers=self.headers, **kwargs) | |
self._validate_response(response) | |
return response | |
def _post(self, url, **kwargs): | |
response = requests.post(url, headers=self.headers, **kwargs) | |
self._validate_response(response) | |
return response | |
def _put(self, url, **kwargs): | |
response = requests.put(url, headers=self.headers, **kwargs) | |
self._validate_response(response) | |
return response | |
def get_stacks(self) -> list: | |
response = self._get(f"{self.url}/api/stacks") | |
response_json = response.json() | |
print(json.dumps(response_json, indent=2)) | |
return response_json | |
def start_stack(self, stack: dict): | |
logger.debug(f"Starting: {stack['Name']}") | |
self._post(f"{self.url}/api/stacks/{stack['Id']}/start") | |
def stop_stack(self, stack: dict): | |
logger.debug(f"Stopping: {stack['Name']}") | |
try: | |
self._post(f"{self.url}/api/stacks/{stack['Id']}/stop") | |
except PortainerAPIError as api_error: | |
if api_error.code == 400 and api_error.message == "Stack is already inactive": | |
print(api_error.message) | |
else: | |
raise api_error | |
def migrate_stack(self, stack: dict, new_swarm_id: str): | |
logger.info(f"Migrating: {stack['Name']}") | |
if stack["SwarmId"] == new_swarm_id: | |
print("Stack already exists on the new swarm.") | |
return | |
data = { | |
"endpointID": stack["EndpointId"], | |
"name": stack["Name"], | |
"swarmID": new_swarm_id | |
} | |
self._post(f"{self.url}/api/stacks/{stack['Id']}/migrate", data=json.dumps(data)) | |
def associate_stack(self, stack: dict, new_swarm_id: str): | |
""" | |
https://app.swaggerhub.com/apis/portainer/portainer-ce/2.16.2#/stacks/StackAssociate | |
https://github.com/portainer/portainer/blob/2.16.2/api/http/handler/stacks/stack_associate.go#L16 | |
https://github.com/portainer/portainer/issues/4397 | |
""" | |
logger.info(f"Migrating: {stack['Name']}") | |
if stack["SwarmId"] == new_swarm_id: | |
print("Stack already exists on the new swarm.") | |
return | |
params = { | |
"endpointId": stack["EndpointId"], | |
"swarmId": new_swarm_id, | |
"orphanedRunning": False, | |
} | |
self._put(f"{self.url}/api/stacks/{stack['Id']}/associate", params=params) | |
def __repr__(self): | |
return str({ | |
"url": self.url, | |
"jwt": self.jwt | |
}) | |
def get_old_cluster_stacks(api: PortainerAPI, new_cluster_id: str) -> list: | |
return [s for s in api.get_stacks() if s["SwarmId"] != new_cluster_id] | |
if __name__=="__main__": | |
parser = configparser.ConfigParser() | |
parser.read("configuration.cfg") | |
api = PortainerAPI(parser) | |
new_cluster_id = parser.get("swarm", "clusterID").strip() | |
old_stacks = get_old_cluster_stacks(api, new_cluster_id) | |
if len(old_stacks) == 0: | |
logger.info("Could not find any orphaned stacks.") | |
exit(0) | |
logger.info(f"Found {len(old_stacks)} orphaned stacks. Beginning migration.") | |
for stack in old_stacks: | |
api.associate_stack(stack, parser.get("swarm", "clusterID").strip()) | |
raise SystemExit('Done') | |
# Stop all old running stacks before migrating | |
for stack in old_stacks: | |
api.stop_stack(stack) | |
# Wait for all the stacks to stop before trying to migrate them | |
start_time = time.time() | |
all_stopped = False | |
while time.time() - start_time < 10: | |
time.sleep(1) | |
running = [s['Name'] for s in get_old_cluster_stacks(api, new_cluster_id) if s['Status'] == 1] | |
if len(running) == 0: | |
break | |
for stack in running: | |
logger.info(f"Couldn't stop '{stack['Name']}' retrying..") | |
api.stop_stack(stack) | |
if not all_stopped: | |
running = [s['Name'] for s in get_old_cluster_stacks(api, new_cluster_id) if s['Status'] == 1] | |
running_list = '\n\t'.join(running) | |
logger.error(f"The following stacks could not be stopped and will need to be killed in the Docker CLI by running `sudo docker rm STACK_NAME`: {running_list}") | |
exit(1) | |
# Migrate old stacks | |
for stack in old_stacks: | |
api.migrate_stack(stack, parser.get("swarm", "clusterID").strip()) | |
time.sleep(0.5) | |
api.start_stack(stack) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment