Skip to content

Instantly share code, notes, and snippets.

@celesteking
Created January 19, 2024 00:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save celesteking/f3f20aff72594aef785fd67867f07e2f to your computer and use it in GitHub Desktop.
Save celesteking/f3f20aff72594aef785fd67867f07e2f to your computer and use it in GitHub Desktop.
JIRA to GITLAB issue migration script
# Stolen from some other guy
# Sniffs issues from old JIRA and tries to import them into GITLAB.
# Does comments, too.
#
# You've got to create additional user in gitlab called `defuser`.
# Make sure to make all users of respective gitlab project as owners, otherwise comment/issues time goes bonkers.
# Doesn't work with JIRA user/pass but instead requires a 'personal token'.
import requests
from requests.auth import HTTPBasicAuth
import re
import uuid
import os
from gitlab import Gitlab, exceptions as gitlabexceptions
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List
import logging
from io import BytesIO
import time
from pprint import pprint
class ImportConfig:
# Jira project key that will be imported
_JIRA_PROJECT = "COOLPROJ"
_JIRA_URL = "https://jira.matrassian.net/"
# Jira personal token
_JIRA_PAT = "MDgxMzE4MTk11DY0Oo5IyBbDRcDCw1uu6cSKDz/94k2g"
_JIRA_VERIFY_SSL = True
# If Jira has sprints or milestones feature this variable stores name of the field
_JIRA_MILESTONE_FIELD = "customfield_10000"
# Jira issue types that will be imported as Incidens in GitLab, otherwise Issue
_JIRA_INCIDENT_TYPES = ("bug", "Bug")
_JIRA_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f%z"
_JIRA_DATE_FORMAT = "%Y-%m-%d"
# ID of detination GitLab project
_GITLAB_PROJECT = 123
_GITLAB_URL = "https://gitlab.blah.net/"
# Personal access token of user with role Admin
_GITLAB_TOKEN = "glpat-E3KhiXZDLE1aKX7UwcBj"
_GITLAB_HEADERS = {"PRIVATE-TOKEN": _GITLAB_TOKEN}
_GITLAB_DEFAULT_USER = "joe"
_GITLAB_SUDO = True
_GITLAB_VERIFY_SSL = True
_GITLAB_PREMIUM = False
# Map users [Jira diplay name]: [GitLab login]
# All users must be a memebers of the GitLab project with Reporter role at least
_USER_NAME_MAP = {
"John Doe": "jdoe",
"Patrick Swayze": "psze",
"*": "defuser"
}
# Map Jira issue type to GitLab issue type
_ISSUE_TYPE_MAP = {
"Bug": "bug",
"Improvement": "improvement",
"Spike": "spike",
"Story": "story",
"story": "story",
"Task": "task",
"Subtask": "subtask",
"Epic": "epic",
"epic": "epic"
}
# Media files extensions
_MEDIA_EXT = ("jpeg", "jpg", "bmp", "png", "gif", "svg", "mp4", "mpeg", "mov", "avi", "mkv")
_JIRA_REST_HEADERS = { "Content-Type": "application/json", "Authorization": ("Bearer " + _JIRA_PAT) }
@dataclass
class JiraUser:
display_name: str
account_id: str
@dataclass
class JiraAttachment:
author: JiraUser
filename: str
content: BytesIO
@dataclass
class JiraComment:
author: JiraUser
body: str
created: datetime
def __init__(self, **kwargs):
self.author = kwargs["author"]
self.body = kwargs["body"]
self.created = datetime.strptime(kwargs["created"], "%Y-%m-%dT%H:%M:%S.%f%z")
@dataclass
class JiraMilestone:
id: int
name: str
state: str
start_date: datetime
end_date: datetime
def __init__(self, **kwargs):
self.id = kwargs["id_"]
self.name = kwargs["name"]
self.state = kwargs["state"]
self.start_date = kwargs["start_date"]
self.end_date = kwargs["end_date"]
@dataclass
class JiraIssue(ImportConfig):
id: int
self_: str
key: str
created: datetime
updated: datetime
summary: str
reporter: JiraUser
assignee: Optional[JiraUser]
time_spent: int
time_estimate: int
type: str
priority: str
status: str
project_name: str
description: str = ' '
due_date: datetime = None
labels: [str] = None
parent: int = None
inward: [int] = field(default_factory=lambda: [])
outward: [int] = field(default_factory=lambda: [])
attachments: [JiraAttachment] = field(default_factory=lambda: [])
comments: [JiraComment] = field(default_factory=lambda: [])
milestones: [JiraMilestone] = field(default_factory=lambda: [])
def __init__(self, issue):
self.id = int(issue["id"])
self.self_ = issue["self"]
self.key = issue["key"]
self.created = datetime.strptime(issue["fields"]["created"], self._JIRA_DATETIME_FORMAT)
self.updated = datetime.strptime(issue["fields"]["updated"], self._JIRA_DATETIME_FORMAT)
if issue["fields"]["duedate"]:
self.due_date = datetime.strptime(issue["fields"]["duedate"], self._JIRA_DATE_FORMAT)
self.summary = issue["fields"]["summary"]
self.description = issue["fields"]["description"]
self.reporter = JiraUser(display_name=issue["fields"]["reporter"]["displayName"],
account_id=issue["fields"]["reporter"]["key"])
if issue["fields"]["assignee"]:
self.assignee = JiraUser(display_name=issue["fields"]["assignee"]["displayName"],
account_id=issue["fields"]["assignee"]["key"])
else:
self.assignee = None
self.time_spent = issue["fields"]["timespent"]
self.time_estimate = issue["fields"]["timeoriginalestimate"]
self.type = issue["fields"]["issuetype"]["name"]
self.priority = issue["fields"]["priority"]["name"]
self.labels = issue["fields"]["labels"]
self.project_name = issue["fields"]['project']['name']
milestones = []
# if issue["fields"][self._JIRA_MILESTONE_FIELD]:
# for milestone in issue["fields"][self._JIRA_MILESTONE_FIELD]:
# start_date = None
# if "startDate" in milestone.keys():
# start_date = datetime.strptime(milestone["startDate"], self._JIRA_DATETIME_FORMAT)
# end_date = None
# if "endDate" in milestone.keys():
# end_date = datetime.strptime(milestone["endDate"], self._JIRA_DATETIME_FORMAT)
# milestones.append(JiraMilestone(id_=milestone["id"], name=milestone["name"], state=milestone["state"],
# start_date=start_date, end_date=end_date))
self.milestones = milestones
self.status = issue["fields"]["status"]["name"]
if "parent" in issue["fields"]:
self.parent = int(issue["fields"]["parent"]["id"])
self.inward = []
self.outward = []
for link in issue["fields"]["issuelinks"]:
if "inwardIssue" in link:
self.inward.append(int(link["inwardIssue"]["id"]))
if "outwardIssue" in link:
self.outward.append(int(link["outwardIssue"]["id"]))
class Jira(ImportConfig):
__jira_users: List[JiraUser]
__jira_issues: List[JiraIssue]
__jira_issue_index: int
@property
def _jira_issues_count(self) -> int:
if self.__jira_issues:
return len(self.__jira_issues)
else:
return 0
@property
def _jira_users(self):
return self.__jira_users
def __init__(self):
logging.info(f"Jira project key {self._JIRA_PROJECT}")
self.__jira_users = self.__retrieve_jira_users()
if self.__jira_users:
logging.info(f"Retrieved {len(self.__jira_users)} Jira users")
self.__jira_issues = self.__retrieve_jira_issues_list()
if self.__jira_issues:
logging.info(f"Retrieved {len(self.__jira_issues)} Jira issues")
def __retrieve_jira_issues_list(self) -> Optional[List[JiraIssue]]:
self._reset_issue_index()
issues = []
start_at = 0
max_results = 100
while True:
jira_issues = requests.get(
f'{self._JIRA_URL}rest/api/2/search?jql=project={self._JIRA_PROJECT}+ORDER+BY+id+ASC&maxResults='
f'{str(max_results)}&startAt={str(start_at)}',
verify=self._JIRA_VERIFY_SSL,
headers=self._JIRA_REST_HEADERS
).json()
if "issues" in jira_issues.keys():
issues = issues + jira_issues["issues"]
start_at = start_at + max_results
if start_at > jira_issues["total"]:
break
else:
break
if len(issues):
return [JiraIssue(issue) for issue in issues]
def __retrieve_jira_users(self) -> Optional[List[JiraUser]]:
resp = requests.get(
f'{self._JIRA_URL}rest/api/2/user/assignable/search?project={self._JIRA_PROJECT}',
verify=self._JIRA_VERIFY_SSL,
headers=self._JIRA_REST_HEADERS
)
jira_users = resp.json()
if len(jira_users) > 0 and "errorMessages" not in jira_users:
return [JiraUser(display_name=user["displayName"], account_id=user["key"]) for user in jira_users]
else:
logging.error(f"Can't retrieve Jira users, error {jira_users['errorMessages']}")
return None
def _find_jira_user(self, **kwargs) -> Optional[JiraUser]:
for user in self.__jira_users:
if ("account_id" in kwargs.keys() and user.account_id == kwargs["account_id"]) or \
("display_name" in kwargs.keys() and user.display_name == kwargs["display_name"]):
return user
return None
def _find_jira_issue(self, id_: int) -> Optional[JiraIssue]:
for issue in self.__jira_issues:
if id_ == issue.id:
return self.__retrieve_attachments_and_comments(issue)
return None
def _next_jira_issue(self) -> Optional[JiraIssue]:
if len(self.__jira_issues) > self.__jira_issue_index:
issue = self.__retrieve_attachments_and_comments(self.__jira_issues[self.__jira_issue_index])
self.__jira_issue_index += 1
return issue
else:
return None
def _reset_issue_index(self):
self.__jira_issue_index = 0
def __retrieve_attachments_and_comments(self, issue: JiraIssue) -> JiraIssue:
issue_details = requests.get(
issue.self_,
verify=self._JIRA_VERIFY_SSL,
headers=self._JIRA_REST_HEADERS
).json()
attachments = []
for attachment in issue_details["fields"]["attachment"]:
file = requests.get(
attachment["content"],
verify=self._JIRA_VERIFY_SSL,
headers=self._JIRA_REST_HEADERS
)
attachments.append(JiraAttachment(
author=JiraUser(display_name=attachment["author"]["displayName"],
account_id=attachment["author"]["key"]),
filename=attachment["filename"], content=BytesIO(file.content)))
issue.attachments = attachments
comments = []
for comment in issue_details["fields"]["comment"]["comments"]:
comments.append(
JiraComment(
author=JiraUser(display_name=comment["author"]["displayName"],
account_id=comment["author"]["key"]),
body=comment["body"],
created=comment["created"]))
issue.comments = comments
return issue
@dataclass
class GitLabUser:
id: int
login: str
name: str
def __init__(self, **kwargs):
self.id = kwargs["id_"]
self.login = kwargs["login"]
if 'name' in kwargs: self.name = kwargs['name']
@dataclass
class GitLabIssue:
assignee_id: int = None
created_at: str = None
description: str = None
epic_id: int = None
id: int = None
issue_type: str = None
labels: str = None
milestone_id: int = None
title: str = None
weight: int = None
due_date: str = None
class GitLabImport(Jira):
__gitlab: Gitlab
__project = None
__gitlab_users = [GitLabUser]
__gitlab_default_user = GitLabUser
__gitlab_other_user = GitLabUser
__gitlab_headers = None
__replace_user_dict = {}
__jira_gitlab_issues_hash = {}
__jira_gitlab_milestone_hash = {}
def __init__(self):
super().__init__()
logging.info(f"GitLab project id {self._GITLAB_PROJECT}")
self.__gitlab = Gitlab(self._GITLAB_URL, private_token=self._GITLAB_TOKEN, ssl_verify=self._GITLAB_VERIFY_SSL)
self.__gitlab.auth()
try:
self.__project = self.__gitlab.projects.get(self._GITLAB_PROJECT)
except (gitlabexceptions.GitlabHttpError, gitlabexceptions.GitlabGetError) as e:
logging.error(f"Exception {e}")
self.__gitlab_users = self.__retrieve_gitlab_users()
logging.info(f"Retrieved {len(self.__gitlab_users)} GitLab users")
if self._jira_users:
for jira_user in self._jira_users:
if not self.__map_jira_user(display_name=jira_user.display_name):
logging.warning(f"User {jira_user.display_name} not found in GitLab")
answer = input("Continue? [y/n]")
if answer.lower() not in ["y", "yes"]:
quit()
self.__replace_user_dict = self.__jira_users_replace_dict()
def __jira_users_replace_dict(self) -> Optional[dict]:
if not len(self._jira_users) or not len(self.__gitlab_users):
return None
replace_dict = {}
for user in self._jira_users:
key = r"\[~accountid:"+user.account_id+"\]"
value = r"@"+self.__map_jira_user(display_name=user.display_name).login
replace_dict[key] = value
return replace_dict
def __retrieve_gitlab_users(self) -> list:
if self.__project is None:
return []
self.__gitlab_headers = self._GITLAB_HEADERS
members = self.__project.users.list()
for member in members:
if self._GITLAB_DEFAULT_USER == member.attributes.get("username"):
self.__gitlab_default_user = GitLabUser(id_=member.id, login=member.attributes.get("username"))
self.__gitlab_headers["SUDO"] = self.__gitlab_default_user.login
continue
if self._USER_NAME_MAP['*'] == member.attributes.get("username"):
self.__gitlab_other_user = GitLabUser(id_=member.id, login=member.attributes.get("username"))
continue
return [GitLabUser(id_=member.id, login=member.attributes.get("username")) for member in members]
def __map_jira_user(self, **kwargs) -> Optional[GitLabUser]:
if "display_name" not in kwargs.keys():
if "account_id" in kwargs.keys():
kwargs["display_name"] = self._find_jira_user(account_id=kwargs["account_id"]).display_name
else:
return self.__gitlab_other_user
for gitlab_user in self.__gitlab_users:
if kwargs["display_name"] in self._USER_NAME_MAP.keys():
if gitlab_user.login == self._USER_NAME_MAP[kwargs["display_name"]]:
return gitlab_user
return self.__gitlab_other_user
def __multiple_replace(self, text: str, dictionary=None) -> Optional[str]:
if dictionary is None:
dictionary = {}
if text is None:
return ''
t = text
t = re.sub(r'\s*\{noformat}\s*', r'```', t)
#t = re.sub(r'\n\n\*(![\n\*])\*\n\n{{ "": "(.+)"}}', r'<details>\n<summary>\1</summary>\n\n\2</details>\n', t)
t = re.sub(r'(\r\n){1}', r' \1', t) # line breaks
t = re.sub(r'\{code:([a-z]+)}\s*', r'\n```\1\n', t) # Block code
t = re.sub(r'\{code}\s*', r'\n```\n', t) # Block code
t = re.sub(r'\n\s*bq\. (.*)\n', r'\n\> \1\n', t) # Block quote
t = re.sub(r'\{quote}', r'\n\>\>\>\n', t) # Block quote #2
t = re.sub(r'\{color:[#\w]+}(.*)\{color}', r'> **\1**', t) # Colors
t = re.sub(r'\n-{4,}\n', r'---', t) # Ruler
#t = re.sub(r'\[~([a-z]+)\]', r'@\1', t) # Links to users
t = re.sub(r'\[([^~|\]]*)]', r'\1', t) # Links without alt
t = re.sub(r'\[(?:(.+)\|)([a-z]+://.+)]', r'[\1](\2)', t) # Links with alt
#t = re.sub(r'(\b%s-\d+\b)' % self._JIRA_PROJECT,
# r'[\1](%sbrowse/\1)' % self._JIRA_URL, t)
# Lists
t = re.sub(r'\n *# ', r'\n1. ', t) # Ordered list
t = re.sub(r'\n *[*\-#]# ', r'\n 1. ', t) # Ordered sub-list
t = re.sub(r'\n *[*\-#]{2}# ', r'\n 1. ', t) # Ordered sub-sub-list
t = re.sub(r'\n *[*\-#]{3}# ', r'\n 1. ', t) # Ordered sub-sub-list
t = re.sub(r'\n *\* ', r'\n - ', t) # Unordered list
t = re.sub(r'\n *[*\-#][*\-] ', r'\n - ', t) # Unordered sub-list
# Unordered sub-sub-list
t = re.sub(r'\n *[*\-#]{2}[*\-] ', r'\n - ', t)
# Text effects
t = re.sub(r'(^|[\W])\*(\S.*\S)\*([\W]|$)', r'\1**\2**\3', t) # Bold
t = re.sub(r'(^|[\W])_(\S.*\S)_([\W]|$)', r'\1*\2*\3', t) # Emphasis
# Deleted / Strikethrough
t = re.sub(r'(^|[\W])-(\S.*\S)-([\W]|$)', r'\1~~\2~~\3', t)
t = re.sub(r'(^|[\W])\+(\S.*\S)\+([\W]|$)', r'\1__\2__\3', t) # Underline
t = re.sub(r'(^|[\W])\{\{(.*)}}([\W]|$)', r'\1`\2`\3', t) # Inline code
# Titles
t = re.sub(r'[\n^]h1\. ', r'\n# ', t)
t = re.sub(r'[\n^]h2\. ', r'\n## ', t)
t = re.sub(r'[\n^]h3\. ', r'\n### ', t)
t = re.sub(r'[\n^]h4\. ', r'\n#### ', t)
t = re.sub(r'[\n^]h5\. ', r'\n##### ', t)
t = re.sub(r'[\n^]h6\. ', r'\n###### ', t)
# Emojis : https://emoji.codes
t = re.sub(r':\)', r':smiley:', t)
t = re.sub(r':\(', r':disappointed:', t)
t = re.sub(r':P', r':yum:', t)
t = re.sub(r':D', r':grin:', t)
t = re.sub(r';\)', r':wink:', t)
t = re.sub(r'\(y\)', r':thumbsup:', t)
t = re.sub(r'\(n\)', r':thumbsdown:', t)
t = re.sub(r'\(i\)', r':information_source:', t)
t = re.sub(r'\(/\)', r':white_check_mark:', t)
t = re.sub(r'\(x\)', r':x:', t)
t = re.sub(r'\(!\)', r':warning:', t)
t = re.sub(r'\(\+\)', r':heavy_plus_sign:', t)
t = re.sub(r'\(-\)', r':heavy_minus_sign:', t)
t = re.sub(r'\(\?\)', r':grey_question:', t)
t = re.sub(r'\(on\)', r':bulb:', t)
# t = re.sub(r'\(off\)', r'::', t) # Not found
t = re.sub(r'\(\*[rgby]?\)', r':star:', t)
if dictionary:
for k, v in dictionary.items():
t = re.sub(k, v, t)
return t
def __upload_attachment(self, attachment: JiraAttachment) -> Optional[dict]:
if self._GITLAB_SUDO:
self.__gitlab_headers["SUDO"] = self.__map_jira_user(display_name=attachment.author.display_name).login
fn, extension = os.path.splitext(attachment.filename)
extension = extension.lower()
filename = str(uuid.uuid4()) + extension
request = requests.post(
f"{self._GITLAB_URL}api/v4/projects/{self._GITLAB_PROJECT}/uploads",
headers=self.__gitlab_headers,
files={"file": (filename, attachment.content)},
verify=self._GITLAB_VERIFY_SSL
)
if request.ok:
file_info = request.json()
if "url" in file_info:
key = f"!{attachment.filename}[^!]*!"
value = f"![{attachment.filename}]({file_info['url']}) \n"
return {key: value}
else:
logging.warning(f"Attachment {attachment.filename} {int(attachment.content.getbuffer().nbytes/1024)}Kb"
f" isn't uploaded, error code {request.status_code}, response {request.content}")
return None
def __import_issue(self, jira_issue: JiraIssue):
if jira_issue is None:
return
# Check if issue already imported
if jira_issue.id in self.__jira_gitlab_issues_hash.keys():
return
logging.info(f"Importing Jira issue {jira_issue.key} id {jira_issue.id}")
# Saving issue flag in the hash for prevent doubles and exit from recursion
self.__jira_gitlab_issues_hash[jira_issue.id] = 0
# Recursively creating subtask, linked and parent issues first
linked_issues = jira_issue.inward + jira_issue.outward
if jira_issue.parent is not None:
linked_issues.append(jira_issue.parent)
for id_ in linked_issues:
if id_ not in self.__jira_gitlab_issues_hash.keys():
linked_issue = self._find_jira_issue(id_)
self.__import_issue(linked_issue)
# Preparing replacement dict
replace_dict = self.__replace_user_dict
# Importing issue attachments
attachment_description = ""
for attachment in jira_issue.attachments:
attachment_replace = self.__upload_attachment(attachment)
if attachment_replace is not None:
replace_dict.update(attachment_replace.items())
for key in attachment_replace.keys():
attachment_description += f"{re.sub(r'^!', r'', attachment_replace[key])} \n"
# Maping Jira issue to GitLab issue
gitlab_issue = GitLabIssue()
gitlab_issue.id = self._GITLAB_PROJECT
if jira_issue.assignee is not None:
gitlab_issue.assignee_id = self.__map_jira_user(display_name=jira_issue.assignee.display_name).id
gitlab_issue.created_at = jira_issue.created.isoformat()
if jira_issue.due_date is not None:
gitlab_issue.due_date = jira_issue.due_date.strftime("%Y-%m-%d")
gitlab_issue.description = self.__multiple_replace(jira_issue.description, replace_dict)
if attachment_description:
gitlab_issue.description += f" \n \n--- \n<b>Attachments:</b> \n{attachment_description}"
gitlab_issue.description += f" \n \n--- \n<small>" \
f"Jira link: [{jira_issue.key}]({self._JIRA_URL}browse/{jira_issue.key}) \n" \
f"Created/updated: {jira_issue.created.strftime('%d.%m.%Y %H:%M:%S')}/" \
f"{jira_issue.updated.strftime('%d.%m.%Y %H:%M:%S')}</small> \n"
if jira_issue.parent in self.__jira_gitlab_issues_hash.keys():
if self.__jira_gitlab_issues_hash[jira_issue.parent] > 0:
gitlab_issue.description += f"<small>Parent issue: {self._GITLAB_URL}" \
f"{self.__project.attributes.get('path_with_namespace')}/-/issues/" \
f"{self.__jira_gitlab_issues_hash[jira_issue.parent]}</small> \n"
# Add linked issues to description
if len(jira_issue.inward) > 0:
blocked_issues = ""
for linked in jira_issue.inward:
if linked in self.__jira_gitlab_issues_hash.keys():
if self.__jira_gitlab_issues_hash[linked] > 0:
blocked_issues += f"{self._GITLAB_URL}" \
f"{self.__project.attributes.get('path_with_namespace')}/-/issues/" \
f"{self.__jira_gitlab_issues_hash[linked]} "
if len(blocked_issues) > 0:
gitlab_issue.description += f"<small>Blocked by: {blocked_issues}</small> \n"
if len(jira_issue.outward) > 0:
subtasks = ""
for subtask in jira_issue.outward:
if subtask in self.__jira_gitlab_issues_hash.keys():
if self.__jira_gitlab_issues_hash[subtask] > 0:
subtasks += f"{self._GITLAB_URL}" \
f"{self.__project.attributes.get('path_with_namespace')}/-/issues/" \
f"{self.__jira_gitlab_issues_hash[subtask]} "
if len(subtasks) > 0:
gitlab_issue.description += f"<small>Related to: {subtasks}</small>"
# Setting issue type
if jira_issue.type in self._JIRA_INCIDENT_TYPES:
gitlab_issue.issue_type = "incident"
else:
gitlab_issue.issue_type = "issue"
# Setting issue labels including priority and Jira type
labels = jira_issue.labels
labels.append(f"project::"+jira_issue.project_name.lower())
if jira_issue.status:
labels.append(f"status::"+jira_issue.status.lower())
if jira_issue.priority:
labels.append(f"priority::"+jira_issue.priority.lower())
if jira_issue.type:
if jira_issue.type in self._ISSUE_TYPE_MAP.keys():
labels.append(f"type::"+self._ISSUE_TYPE_MAP[jira_issue.type])
else:
labels.append(f"type::"+jira_issue.type)
gitlab_issue.labels = ",".join(labels)
gitlab_issue.title = jira_issue.summary
gitlab_issue.weight = jira_issue.time_estimate
# Creating milestones
for jira_milestone in jira_issue.milestones:
if jira_milestone.id not in self.__jira_gitlab_milestone_hash.keys():
gitlab_milestone = self.__create_milestone(jira_milestone)
self.__jira_gitlab_milestone_hash[jira_milestone.id] = gitlab_milestone.attributes.get("id")
gitlab_issue.milestone_id = self.__jira_gitlab_milestone_hash[jira_milestone.id]
jira_login = self.__map_jira_user(display_name=jira_issue.reporter.display_name).login
# Sudo if allowed
if self._GITLAB_SUDO:
self.__gitlab_headers["SUDO"] = jira_login
if jira_login == 'defuser':
gitlab_issue.description = f"Issue by " \
f"`{jira_issue.reporter.display_name}`" \
f"\n\n" + gitlab_issue.description
# Creating GitLab issue
created_issue = self.__project.issues.create({k: v for k, v in gitlab_issue.__dict__.items() if v is not None},
sudo=self.__gitlab_headers["SUDO"])
# Writing estimate and spent time
if jira_issue.time_spent:
created_issue.add_spent_time(str(jira_issue.time_spent) + "s")
created_issue.save()
if jira_issue.time_estimate:
created_issue.time_estimate(str(jira_issue.time_estimate) + "s")
created_issue.save()
# Close issue if its in Done status
if jira_issue.status in ["Done", "Closed", "Done", "Resolved"]:
created_issue.state_event = "close"
created_issue.save()
# Importing issue comments
for comment in jira_issue.comments:
jira_login = self.__map_jira_user(display_name=comment.author.display_name).login
comment_text = ''
if self._GITLAB_SUDO:
self.__gitlab_headers["SUDO"] = jira_login
if jira_login == 'defuser':
comment_text = f"Comment by `{comment.author.display_name}`:\n\n"
comment_text = comment_text + self.__multiple_replace(comment.body, replace_dict)
body = {"body": comment_text, "created_at": comment.created.isoformat()}
created_issue.notes.create(body, sudo=self.__gitlab_headers["SUDO"])
# Deleting attachments for memory saving
del jira_issue.attachments
# Saving created issue id in hash
self.__jira_gitlab_issues_hash[jira_issue.id] = created_issue.get_id()
logging.info(f"Importing issue {jira_issue.key} completed.")
return created_issue
# Creating milestone
def __create_milestone(self, jira_milestone: JiraMilestone):
milestones = self.__project.milestones.list(title=jira_milestone.name)
if len(milestones) > 0:
return milestones[0]
data = {"title": jira_milestone.name}
if jira_milestone.start_date:
data["start_date"] = jira_milestone.start_date.strftime("%Y%m%d")
if jira_milestone.end_date:
data["due_date"] = jira_milestone.end_date.strftime("%Y%m%d")
milestone = self.__project.milestones.create(data)
if jira_milestone.state == "closed":
milestone.state_event = "close"
milestone.save()
return milestone
# Iterate via Jira issues and link them
def __link_imported_issues(self):
self._reset_issue_index()
linked = 0
while True:
jira_issue = self._next_jira_issue()
if jira_issue is None:
break
if jira_issue.parent is not None:
linked += self.__create_link(jira_issue.id, jira_issue.parent, "blocks")
for dst in jira_issue.inward:
linked += self.__create_link(jira_issue.id, dst, "is_blocked_by")
for dst in jira_issue.outward:
linked += self.__create_link(jira_issue.id, dst, "relates_to")
return linked
# Link two issues
def __create_link(self, jira_src_id: int, jira_dst_id: int, link_type: str = "relates_to"):
if jira_src_id not in self.__jira_gitlab_issues_hash.keys() or \
jira_dst_id not in self.__jira_gitlab_issues_hash.keys():
return 0
if not self._GITLAB_PREMIUM:
link_type = "relates_to"
gitlab_issue = self.__project.issues.get(self.__jira_gitlab_issues_hash[jira_src_id])
data = {
"target_project_id": self.__project.attributes.get("id"),
"target_issue_iid": self.__jira_gitlab_issues_hash[jira_dst_id],
"link_type": link_type
}
try:
gitlab_issue.links.create(data)
except gitlabexceptions.GitlabCreateError as e:
logging.warning(f"Exception {e} src={jira_src_id} dst={jira_dst_id} link_type={link_type}")
return 1
# Iterate via Jira issues and import it
def __import_issues(self) -> int:
self._reset_issue_index()
imported = 0
while True:
jira_issue = self._next_jira_issue()
if jira_issue is None:
break
self.__import_issue(jira_issue)
imported += 1
return imported
def run_import(self):
time.sleep(1)
if self.__project is None:
return
if self._jira_issues_count == 0:
logging.warning(f"Nothing to import, quiting")
return
answer = input(f"Import {self._jira_issues_count} issues from Jira project {self._JIRA_PROJECT} to GitLab project "
f"{self.__project.attributes.get('name')}, continue? [y/n]")
if answer.lower() not in ["y", "yes"]:
return
logging.info(f"Starting import issues")
logging.info(f"Imported {self.__import_issues()} issues")
logging.info(f"Starting linking imported issues")
logging.info(f"Linked {self.__link_imported_issues()} issues")
def delete_issues(self):
if self.__project is None:
return
issues = self.__project.issues.list(get_all=True)
if len(issues) == 0:
logging.info(f"No issues found in GitLab project {self.__project.attributes.get('name')}")
return
answer = input(f"Delete {len(issues)} issues in GitLab project {self.__project.attributes.get('name')}, "
f"continue? [y/n]")
if answer.lower() not in ["y", "yes"]:
return
for issue in issues:
issue.delete()
milestones = self.__project.milestones.list(get_all=True)
for milestone in milestones:
milestone.delete()
def test(self, id_: int):
print(self.__import_issue(self._find_jira_issue(id_=id_)))
print(self.__link_imported_issues())
def main():
logging.basicConfig(level=logging.INFO)
gitlab = GitLabImport()
# Delete all issues and milestones first
gitlab.delete_issues()
# Import Jira issues
gitlab.run_import()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment