Last active
June 28, 2025 18:28
-
-
Save darkarp/b496e05dfcf6fd75c6157b886d383b39 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import datetime | |
import requests | |
from jira import JIRA | |
from config import ( | |
JIRA_SERVER, | |
JIRA_API_TOKEN, | |
JIRA_EMAIL, | |
JIRA_PROJECT_KEY, | |
JIRA_EPIC, | |
JIRA_LABELS, | |
JIRA_ISSUE_TYPE, | |
JIRA_USE_ADF, | |
JIRA_API_VERSION, | |
SERVER_URL, | |
POLL_INTERVAL, | |
) | |
from flask import Flask, request, jsonify | |
app = Flask(__name__) | |
jira_client = None | |
def safe_extract_status_name(status_field): | |
"""Safely extract status name from PropertyHolder objects""" | |
if status_field is None: | |
return '' | |
status_name = status_field.name if hasattr(status_field, 'name') else status_field | |
# Handle nested PropertyHolder objects | |
while hasattr(status_name, 'value'): | |
status_name = status_name.value | |
return str(status_name).lower() if status_name else '' | |
COUNTRY_CODE_TO_NAME = { | |
'af': 'afghanistan', | |
'ax': 'aland islands', | |
'al': 'albania', | |
'dz': 'algeria', | |
'as': 'american samoa', | |
'ad': 'andorra', | |
'ao': 'angola', | |
'ai': 'anguilla', | |
'aq': 'antarctica', | |
'ag': 'antigua and barbuda', | |
'ar': 'argentina', | |
'am': 'armenia', | |
'aw': 'aruba', | |
'au': 'australia', | |
'at': 'austria', | |
'az': 'azerbaijan', | |
'bs': 'bahamas', | |
'bh': 'bahrain', | |
'bd': 'bangladesh', | |
'bb': 'barbados', | |
'by': 'belarus', | |
'be': 'belgium', | |
'bz': 'belize', | |
'bj': 'benin', | |
'bm': 'bermuda', | |
'bt': 'bhutan', | |
'bo': 'bolivia', | |
'bq': 'bonaire', | |
'ba': 'bosnia and herzegovina', | |
'bw': 'botswana', | |
'bv': 'bouvet island', | |
'br': 'brazil', | |
'io': 'british indian ocean territory', | |
'vg': 'british virgin islands', | |
'bn': 'brunei', | |
'bg': 'bulgaria', | |
'bf': 'burkina faso', | |
'bi': 'burundi', | |
'kh': 'cambodia', | |
'cm': 'cameroon', | |
'ca': 'canada', | |
'cv': 'cape verde', | |
'ky': 'cayman islands', | |
'cf': 'central african republic', | |
'td': 'chad', | |
'cl': 'chile', | |
'cn': 'china', | |
'cx': 'christmas island', | |
'cc': 'cocos islands', | |
'co': 'colombia', | |
'km': 'comoros', | |
'ck': 'cook islands', | |
'cr': 'costa rica', | |
'hr': 'croatia', | |
'cu': 'cuba', | |
'cw': 'curacao', | |
'cy': 'cyprus', | |
'cz': 'czech republic', | |
'cd': 'democratic republic of the congo', | |
'dk': 'denmark', | |
'dj': 'djibouti', | |
'dm': 'dominica', | |
'do': 'dominican republic', | |
'tl': 'east timor', | |
'ec': 'ecuador', | |
'eg': 'egypt', | |
'sv': 'el salvador', | |
'gq': 'equatorial guinea', | |
'er': 'eritrea', | |
'ee': 'estonia', | |
'sz': 'eswatini', | |
'et': 'ethiopia', | |
'fk': 'falkland islands', | |
'fo': 'faroe islands', | |
'fj': 'fiji', | |
'fi': 'finland', | |
'fr': 'france', | |
'pf': 'french polynesia', | |
'tf': 'french southern territories', | |
'ga': 'gabon', | |
'gm': 'gambia', | |
'ge': 'georgia', | |
'de': 'germany', | |
'gh': 'ghana', | |
'gi': 'gibraltar', | |
'gr': 'greece', | |
'gl': 'greenland', | |
'gd': 'grenada', | |
'gu': 'guam', | |
'gt': 'guatemala', | |
'gg': 'guernsey', | |
'gn': 'guinea', | |
'gw': 'guinea-bissau', | |
'gy': 'guyana', | |
'ht': 'haiti', | |
'hm': 'heard island and mcdonald islands', | |
'hn': 'honduras', | |
'hk': 'hong kong', | |
'hu': 'hungary', | |
'is': 'iceland', | |
'in': 'india', | |
'id': 'indonesia', | |
'ir': 'iran', | |
'iq': 'iraq', | |
'ie': 'ireland', | |
'im': 'isle of man', | |
'il': 'israel', | |
'it': 'italy', | |
'ci': 'ivory coast', | |
'jm': 'jamaica', | |
'jp': 'japan', | |
'je': 'jersey', | |
'jo': 'jordan', | |
'kz': 'kazakhstan', | |
'ke': 'kenya', | |
'ki': 'kiribati', | |
'xk': 'kosovo', | |
'kw': 'kuwait', | |
'kg': 'kyrgyzstan', | |
'la': 'laos', | |
'lv': 'latvia', | |
'lb': 'lebanon', | |
'ls': 'lesotho', | |
'lr': 'liberia', | |
'ly': 'libya', | |
'li': 'liechtenstein', | |
'lt': 'lithuania', | |
'lu': 'luxembourg', | |
'mo': 'macau', | |
'mg': 'madagascar', | |
'mw': 'malawi', | |
'my': 'malaysia', | |
'mv': 'maldives', | |
'ml': 'mali', | |
'mt': 'malta', | |
'mh': 'marshall islands', | |
'mr': 'mauritania', | |
'mu': 'mauritius', | |
'mx': 'mexico', | |
'fm': 'micronesia', | |
'md': 'moldova', | |
'mc': 'monaco', | |
'mn': 'mongolia', | |
'me': 'montenegro', | |
'ms': 'montserrat', | |
'ma': 'morocco', | |
'mz': 'mozambique', | |
'mm': 'myanmar', | |
'na': 'namibia', | |
'nr': 'nauru', | |
'np': 'nepal', | |
'nl': 'netherlands', | |
'nc': 'new caledonia', | |
'nz': 'new zealand', | |
'ni': 'nicaragua', | |
'ne': 'niger', | |
'ng': 'nigeria', | |
'nu': 'niue', | |
'nf': 'norfolk island', | |
'kp': 'north korea', | |
'mk': 'north macedonia', | |
'mp': 'northern mariana islands', | |
'no': 'norway', | |
'om': 'oman', | |
'pk': 'pakistan', | |
'pw': 'palau', | |
'ps': 'palestine', | |
'pa': 'panama', | |
'pg': 'papua new guinea', | |
'py': 'paraguay', | |
'pe': 'peru', | |
'ph': 'philippines', | |
'pn': 'pitcairn islands', | |
'pl': 'poland', | |
'pt': 'portugal', | |
'pr': 'puerto rico', | |
'qa': 'qatar', | |
'cg': 'republic of the congo', | |
'ro': 'romania', | |
'ru': 'russia', | |
'rw': 'rwanda', | |
'bl': 'saint barthelemy', | |
'sh': 'saint helena', | |
'kn': 'saint kitts and nevis', | |
'lc': 'saint lucia', | |
'mf': 'saint martin', | |
'pm': 'saint pierre and miquelon', | |
'vc': 'saint vincent and the grenadines', | |
'ws': 'samoa', | |
'sm': 'san marino', | |
'st': 'sao tome and principe', | |
'sa': 'saudi arabia', | |
'sn': 'senegal', | |
'rs': 'serbia', | |
'sc': 'seychelles', | |
'sl': 'sierra leone', | |
'sg': 'singapore', | |
'sx': 'sint maarten', | |
'sk': 'slovakia', | |
'si': 'slovenia', | |
'sb': 'solomon islands', | |
'so': 'somalia', | |
'za': 'south africa', | |
'gs': 'south georgia', | |
'kr': 'south korea', | |
'ss': 'south sudan', | |
'es': 'spain', | |
'lk': 'sri lanka', | |
'sd': 'sudan', | |
'sr': 'suriname', | |
'sj': 'svalbard', | |
'se': 'sweden', | |
'ch': 'switzerland', | |
'sy': 'syria', | |
'tw': 'taiwan', | |
'tj': 'tajikistan', | |
'tz': 'tanzania', | |
'th': 'thailand', | |
'tg': 'togo', | |
'tk': 'tokelau', | |
'to': 'tonga', | |
'tt': 'trinidad and tobago', | |
'tn': 'tunisia', | |
'tr': 'turkey', | |
'tm': 'turkmenistan', | |
'tc': 'turks and caicos islands', | |
'tv': 'tuvalu', | |
'vi': 'u.s. virgin islands', | |
'ug': 'uganda', | |
'ua': 'ukraine', | |
'ae': 'united arab emirates', | |
'gb': 'united kingdom', | |
'us': 'united states', | |
'uy': 'uruguay', | |
'uz': 'uzbekistan', | |
'vu': 'vanuatu', | |
'va': 'vatican city', | |
've': 'venezuela', | |
'vn': 'vietnam', | |
'wf': 'wallis and futuna', | |
'eh': 'western sahara', | |
'ye': 'yemen', | |
'zm': 'zambia', | |
'zw': 'zimbabwe' | |
} | |
HOSTING_LOCATION_IDS = { | |
'abkhazia': '11345', | |
'afghanistan': '11346', | |
'akrotiri and dhekelia': '11347', | |
'aland islands': '11348', | |
'albania': '11349', | |
'algeria': '11350', | |
'american samoa': '11351', | |
'andorra': '11352', | |
'angola': '11353', | |
'anguilla': '11354', | |
'antarctica': '11355', | |
'antigua and barbuda': '11356', | |
'argentina': '11357', | |
'armenia': '11358', | |
'aruba': '11359', | |
'ashmore and cartier islands': '11360', | |
'australia': '11361', | |
'austria': '11362', | |
'azerbaijan': '11363', | |
'bahamas': '11364', | |
'bahrain': '11365', | |
'bangladesh': '11366', | |
'barbados': '11367', | |
'belarus': '11368', | |
'belgium': '11369', | |
'belize': '11370', | |
'benin': '11371', | |
'bermuda': '11372', | |
'bhutan': '11373', | |
'bir tawil': '11374', | |
'bolivia': '11375', | |
'bonaire': '11376', | |
'bosnia and herzegovina': '11377', | |
'botswana': '11378', | |
'bouvet island': '11379', | |
'brazil': '11380', | |
'british indian ocean territory': '11381', | |
'british virgin islands': '11382', | |
'brunei': '11383', | |
'bulgaria': '11384', | |
'burkina faso': '11385', | |
'burundi': '11386', | |
'cambodia': '11387', | |
'cameroon': '11388', | |
'canada': '11389', | |
'cape verde': '11390', | |
'cayman islands': '11391', | |
'central african republic': '11392', | |
'chad': '11393', | |
'chile': '11394', | |
'china': '11395', | |
'christmas island': '11396', | |
'clipperton island': '11397', | |
'cocos islands': '11398', | |
'colombia': '11399', | |
'comoros': '11400', | |
'cook islands': '11401', | |
'coral sea islands': '11402', | |
'costa rica': '11403', | |
'croatia': '11404', | |
'cuba': '11405', | |
'curacao': '11406', | |
'cyprus': '11407', | |
'czech republic': '11408', | |
'democratic republic of the congo': '11409', | |
'denmark': '11410', | |
'djibouti': '11411', | |
'dominica': '11412', | |
'dominican republic': '11413', | |
'east timor': '11414', | |
'easter island': '11415', | |
'ecuador': '11416', | |
'egypt': '11417', | |
'el salvador': '11418', | |
'equatorial guinea': '11419', | |
'eritrea': '11420', | |
'estonia': '11421', | |
'eswatini': '11422', | |
'ethiopia': '11423', | |
'falkland islands': '11424', | |
'faroe islands': '11425', | |
'fiji': '11426', | |
'finland': '11427', | |
'france': '11428', | |
'french polynesia': '11429', | |
'french southern territories': '11430', | |
'gabon': '11431', | |
'gambia': '11432', | |
'georgia': '11433', | |
'germany': '11434', | |
'ghana': '11435', | |
'gibraltar': '11436', | |
'greece': '11437', | |
'greenland': '11438', | |
'grenada': '11439', | |
'guam': '11440', | |
'guatemala': '11441', | |
'guernsey': '11442', | |
'guinea': '11443', | |
'guinea-bissau': '11444', | |
'guyana': '11445', | |
'haiti': '11446', | |
'heard island and mcdonald islands': '11447', | |
'honduras': '11448', | |
'hong kong': '11449', | |
'hungary': '11450', | |
'iceland': '11451', | |
'india': '11452', | |
'indonesia': '11453', | |
'iran': '11454', | |
'iraq': '11455', | |
'ireland': '11456', | |
'isle of man': '11457', | |
'israel': '11458', | |
'italy': '11459', | |
'ivory coast': '11460', | |
'jamaica': '11461', | |
'jan mayen': '11462', | |
'japan': '11463', | |
'jersey': '11464', | |
'jordan': '11465', | |
'kazakhstan': '11466', | |
'kenya': '11467', | |
'kiribati': '11468', | |
'kosovo': '11469', | |
'kuwait': '11470', | |
'kyrgyzstan': '11471', | |
'laos': '11472', | |
'latvia': '11473', | |
'lebanon': '11474', | |
'lesotho': '11475', | |
'liberia': '11476', | |
'libya': '11477', | |
'liechtenstein': '11478', | |
'lithuania': '11479', | |
'luxembourg': '11480', | |
'macau': '11481', | |
'madagascar': '11482', | |
'malawi': '11483', | |
'malaysia': '11484', | |
'maldives': '11485', | |
'mali': '11486', | |
'malta': '11487', | |
'marshall islands': '11488', | |
'mauritania': '11489', | |
'mauritius': '11490', | |
'mexico': '11491', | |
'micronesia': '11492', | |
'moldova': '11493', | |
'monaco': '11494', | |
'mongolia': '11495', | |
'montenegro': '11496', | |
'montserrat': '11497', | |
'morocco': '11498', | |
'mozambique': '11499', | |
'myanmar': '11500', | |
'namibia': '11501', | |
'nauru': '11502', | |
'nepal': '11503', | |
'netherlands': '11504', | |
'new caledonia': '11505', | |
'new zealand': '11506', | |
'nicaragua': '11507', | |
'niger': '11508', | |
'nigeria': '11509', | |
'niue': '11510', | |
'norfolk island': '11511', | |
'north korea': '11512', | |
'north macedonia': '11513', | |
'northern cyprus': '11514', | |
'northern mariana islands': '11515', | |
'norway': '11516', | |
'oman': '11517', | |
'pakistan': '11518', | |
'palau': '11519', | |
'palestine': '11520', | |
'panama': '11521', | |
'papua new guinea': '11522', | |
'paraguay': '11523', | |
'peru': '11524', | |
'philippines': '11525', | |
'pitcairn islands': '11526', | |
'poland': '11527', | |
'portugal': '11528', | |
'puerto rico': '11529', | |
'qatar': '11530', | |
'republic of the congo': '11531', | |
'romania': '11532', | |
'russia': '11533', | |
'rwanda': '11534', | |
'saba': '11535', | |
'saint barthelemy': '11536', | |
'saint helena': '11537', | |
'saint kitts and nevis': '11538', | |
'saint lucia': '11539', | |
'saint martin': '11540', | |
'saint pierre and miquelon': '11541', | |
'saint vincent and the grenadines': '11542', | |
'samoa': '11543', | |
'san marino': '11544', | |
'sao tome and principe': '11545', | |
'saudi arabia': '11546', | |
'senegal': '11547', | |
'serbia': '11548', | |
'seychelles': '11549', | |
'sierra leone': '11550', | |
'singapore': '11551', | |
'sint eustatius': '11552', | |
'sint maarten': '11553', | |
'slovakia': '11554', | |
'slovenia': '11555', | |
'solomon islands': '11556', | |
'somalia': '11557', | |
'somaliland': '11558', | |
'south africa': '11559', | |
'south georgia': '11560', | |
'south korea': '11561', | |
'south ossetia': '11562', | |
'south sudan': '11563', | |
'spain': '11564', | |
'sri lanka': '11565', | |
'sudan': '11566', | |
'suriname': '11567', | |
'svalbard': '11568', | |
'sweden': '11569', | |
'switzerland': '11570', | |
'syria': '11571', | |
'taiwan': '11572', | |
'tajikistan': '11573', | |
'tanzania': '11574', | |
'thailand': '11575', | |
'togo': '11576', | |
'tokelau': '11577', | |
'tonga': '11578', | |
'transnistria': '11579', | |
'trinidad and tobago': '11580', | |
'tunisia': '11581', | |
'turkey': '11582', | |
'turkmenistan': '11583', | |
'turks and caicos islands': '11584', | |
'tuvalu': '11585', | |
'u.s. virgin islands': '11586', | |
'uganda': '11587', | |
'ukraine': '11588', | |
'united arab emirates': '11589', | |
'united kingdom': '11590', | |
'united states': '11591', | |
'uruguay': '11592', | |
'uzbekistan': '11593', | |
'vanuatu': '11594', | |
'vatican city': '11595', | |
'venezuela': '11596', | |
'vietnam': '11597', | |
'wallis and futuna': '11598', | |
'western sahara': '11599', | |
'yemen': '11600', | |
'zambia': '11601', | |
'zimbabwe': '11602' | |
} | |
# Add this mapping at the top of the file, after HOSTING_LOCATION_IDS | |
TARGET_COUNTRY_IDS = { | |
'afghanistan': '10163', | |
'akrotiri and dhekelia': '10164', | |
'aland islands': '10165', | |
'albania': '10167', | |
'algeria': '10169', | |
'american samoa': '10171', | |
'andorra': '10172', | |
'angola': '10174', | |
'anguilla': '10176', | |
'antarctica': '10178', | |
'antigua and barbuda': '10180', | |
'argentina': '10182', | |
'armenia': '10184', | |
'aruba': '10186', | |
'ashmore and cartier islands': '10188', | |
'australia': '10190', | |
'austria': '10192', | |
'azerbaijan': '10194', | |
'bahamas': '10195', | |
'bahrain': '10196', | |
'bangladesh': '10197', | |
'barbados': '10198', | |
'belarus': '10199', | |
'belgium': '10200', | |
'belize': '10201', | |
'benin': '10202', | |
'bermuda': '10203', | |
'bhutan': '10204', | |
'bir tawil': '10205', | |
'bolivia': '10206', | |
'bonaire': '10207', | |
'bosnia and herzegovina': '10208', | |
'botswana': '10209', | |
'bouvet island': '10210', | |
'brazil': '10211', | |
'british indian ocean territory': '10212', | |
'british virgin islands': '10213', | |
'brunei': '10214', | |
'bulgaria': '10215', | |
'burkina faso': '10216', | |
'burundi': '10217', | |
'cambodia': '10218', | |
'cameroon': '10219', | |
'canada': '10220', | |
'cape verde': '10221', | |
'cayman islands': '10222', | |
'central african republic': '10223', | |
'chad': '10224', | |
'chile': '10225', | |
'china': '10226', | |
'christmas island': '10227', | |
'clipperton island': '10228', | |
'cocos islands': '10229', | |
'colombia': '10230', | |
'comoros': '10231', | |
'cook islands': '10232', | |
'coral sea islands': '10233', | |
'costa rica': '10234', | |
'croatia': '10235', | |
'cuba': '10236', | |
'curacao': '10237', | |
'cyprus': '10238', | |
'czech republic': '10239', | |
'democratic republic of the congo': '10240', | |
'denmark': '10241', | |
'djibouti': '10242', | |
'dominica': '10243', | |
'dominican republic': '10244', | |
'east timor': '10245', | |
'easter island': '10246', | |
'ecuador': '10247', | |
'egypt': '10248', | |
'el salvador': '10249', | |
'equatorial guinea': '10250', | |
'eritrea': '10251', | |
'estonia': '10252', | |
'eswatini': '10253', | |
'ethiopia': '10254', | |
'falkland islands': '10255', | |
'faroe islands': '10256', | |
'fiji': '10257', | |
'finland': '10258', | |
'france': '10259', | |
'french polynesia': '10260', | |
'french southern territories': '10261', | |
'gabon': '10262', | |
'gambia': '10263', | |
'georgia': '10264', | |
'germany': '10265', | |
'ghana': '10266', | |
'gibraltar': '10267', | |
'greece': '10268', | |
'greenland': '10269', | |
'grenada': '10270', | |
'guam': '10271', | |
'guatemala': '10272', | |
'guernsey': '10273', | |
'guinea': '10274', | |
'guinea-bissau': '10275', | |
'guyana': '10276', | |
'haiti': '10277', | |
'heard island and mcdonald islands': '10278', | |
'honduras': '10279', | |
'hong kong': '10280', | |
'hungary': '10281', | |
'iceland': '10282', | |
'india': '10283', | |
'indonesia': '10284', | |
'iran': '10285', | |
'iraq': '10286', | |
'ireland': '10287', | |
'isle of man': '10288', | |
'israel': '10289', | |
'italy': '10290', | |
'ivory coast': '10291', | |
'jamaica': '10292', | |
'jan mayen': '10293', | |
'japan': '10294', | |
'jersey': '10295', | |
'jordan': '10296', | |
'kazakhstan': '10297', | |
'kenya': '10298', | |
'kiribati': '10299', | |
'kosovo': '10300', | |
'kuwait': '10301', | |
'kyrgyzstan': '10302', | |
'laos': '10303', | |
'latvia': '10304', | |
'lebanon': '10305', | |
'lesotho': '10306', | |
'liberia': '10307', | |
'libya': '10308', | |
'liechtenstein': '10309', | |
'lithuania': '10310', | |
'luxembourg': '10311', | |
'macau': '10312', | |
'madagascar': '10313', | |
'malawi': '10314', | |
'malaysia': '10315', | |
'maldives': '10316', | |
'mali': '10317', | |
'malta': '10318', | |
'marshall islands': '10319', | |
'mauritania': '10320', | |
'mauritius': '10321', | |
'mexico': '10322', | |
'micronesia': '10323', | |
'moldova': '10324', | |
'monaco': '10325', | |
'mongolia': '10326', | |
'montenegro': '10327', | |
'montserrat': '10328', | |
'morocco': '10329', | |
'mozambique': '10330', | |
'myanmar': '10331', | |
'namibia': '10332', | |
'nauru': '10333', | |
'nepal': '10334', | |
'netherlands': '10335', | |
'new caledonia': '10336', | |
'new zealand': '10337', | |
'nicaragua': '10338', | |
'niger': '10339', | |
'nigeria': '10340', | |
'niue': '10341', | |
'norfolk island': '10342', | |
'north korea': '10343', | |
'north macedonia': '10344', | |
'northern cyprus': '10345', | |
'northern mariana islands': '10346', | |
'norway': '10347', | |
'oman': '10348', | |
'pakistan': '10349', | |
'palau': '10350', | |
'palestine': '10351', | |
'panama': '10352', | |
'papua new guinea': '10353', | |
'paraguay': '10354', | |
'peru': '10355', | |
'philippines': '10356', | |
'pitcairn islands': '10357', | |
'poland': '10358', | |
'portugal': '10359', | |
'puerto rico': '10360', | |
'qatar': '10361', | |
'republic of the congo': '10362', | |
'romania': '10363', | |
'russia': '10364', | |
'rwanda': '10365', | |
'saba': '10366', | |
'saint barthelemy': '10367', | |
'saint helena': '10368', | |
'saint kitts and nevis': '10369', | |
'saint lucia': '10370', | |
'saint martin': '10371', | |
'saint pierre and miquelon': '10372', | |
'saint vincent and the grenadines': '10373', | |
'samoa': '10374', | |
'san marino': '10375', | |
'sao tome and principe': '10376', | |
'saudi arabia': '10377', | |
'senegal': '10378', | |
'serbia': '10379', | |
'seychelles': '10380', | |
'sierra leone': '10381', | |
'singapore': '10382', | |
'sint eustatius': '10383', | |
'sint maarten': '10384', | |
'slovakia': '10385', | |
'slovenia': '10386', | |
'solomon islands': '10387', | |
'somalia': '10388', | |
'somaliland': '10389', | |
'south africa': '10390', | |
'south georgia': '10391', | |
'south korea': '10392', | |
'south ossetia': '10393', | |
'south sudan': '10394', | |
'spain': '10395', | |
'sri lanka': '10396', | |
'sudan': '10397', | |
'suriname': '10398', | |
'svalbard': '10399', | |
'sweden': '10400', | |
'switzerland': '10401', | |
'syria': '10402', | |
'taiwan': '10403', | |
'tajikistan': '10404', | |
'tanzania': '10405', | |
'thailand': '10406', | |
'togo': '10407', | |
'tokelau': '10408', | |
'tonga': '10409', | |
'transnistria': '10410', | |
'trinidad and tobago': '10411', | |
'tunisia': '10412', | |
'turkey': '10413', | |
'turkmenistan': '10414', | |
'turks and caicos islands': '10415', | |
'tuvalu': '10416', | |
'u.s. virgin islands': '10417', | |
'uganda': '10418', | |
'ukraine': '10419', | |
'united arab emirates': '10420', | |
'united kingdom': '10421', | |
'united states': '10422', | |
'uruguay': '10423', | |
'uzbekistan': '10424', | |
'vanuatu': '10425', | |
'vatican city': '10426', | |
'venezuela': '10427', | |
'vietnam': '10428', | |
'wallis and futuna': '10429', | |
'western sahara': '10430', | |
'yemen': '10431', | |
'zambia': '10432', | |
'zimbabwe': '10433' | |
} | |
def get_hosting_location_id(location_code): | |
"""Convert hosting location code to JIRA ID""" | |
if not location_code: | |
return '-1' # None value | |
location_code = location_code.lower().strip() | |
country_name = COUNTRY_CODE_TO_NAME.get(location_code) | |
if not country_name: | |
print(f"[WARN] Unknown country code for hosting location: {location_code}") | |
return '-1' | |
return HOSTING_LOCATION_IDS.get(country_name, '-1') | |
def get_target_country_id(country_code): | |
"""Convert target country code to JIRA ID""" | |
if not country_code: | |
return '-1' # None value | |
country_code = country_code.lower().strip() | |
country_name = COUNTRY_CODE_TO_NAME.get(country_code) | |
if not country_name: | |
print(f"[WARN] Unknown country code for target country: {country_code}") | |
return '-1' | |
return TARGET_COUNTRY_IDS.get(country_name, '-1') | |
def get_domains(): | |
try: | |
resp = requests.get(f"{SERVER_URL}/domains") | |
resp.raise_for_status() | |
return resp.json() | |
except Exception as e: | |
print(f"[ERROR] Retrieving domains: {e}") | |
return [] | |
def update_jira_ticket(domain_id, ticket): | |
try: | |
payload = {"jira_ticket": ticket} | |
resp = requests.post(f"{SERVER_URL}/jira/update-ticket/{domain_id}", json=payload) | |
resp.raise_for_status() | |
print(f"[INFO] Updated domain {domain_id} with Jira ticket {ticket}") | |
return True | |
except Exception as e: | |
print(f"[ERROR] Updating Jira ticket for domain {domain_id}: {e}") | |
return False | |
def create_jira_ticket(jira_client, domain): | |
# Updated summary with a better title | |
summary = f"Domain Abuse - Takedown: {domain['url']}" | |
# Try to create ADF description first, fall back to plain text | |
adf_description = create_adf_description(domain) | |
if adf_description: | |
# Use ADF format for v3 API | |
description = adf_description | |
print(f"[INFO] Using ADF format for ticket description") | |
else: | |
# Fallback to plain text description | |
# Include hosting provider info if available | |
hosting_info = "" | |
if domain.get("hosting_provider"): | |
hosting_info = f"Hosting Provider: {domain.get('hosting_provider')}\n" | |
# Updated description with additional info and a note about the attached image | |
description = ( | |
f"New domain abuse ticket created:\n\n" | |
f"URL: {domain['url']}\n" | |
f"Status: {domain['status']}\n" | |
f"Page Title: {domain.get('page_title', '')}\n" | |
f"{hosting_info}" | |
) | |
# Add target country and hosting location to the description if available | |
if domain.get("target_country"): | |
country_name = COUNTRY_CODE_TO_NAME.get(domain["target_country"].lower(), domain["target_country"]) | |
description += f"Target Country: {country_name}\n" | |
if domain.get("hosting_location"): | |
location_name = COUNTRY_CODE_TO_NAME.get(domain["hosting_location"].lower(), domain["hosting_location"]) | |
description += f"Hosting Location: {location_name}\n" | |
# Add abuse type information to description if available | |
if domain.get("abuse_type"): | |
description += f"Abuse Type: {domain.get('abuse_type')}\n" | |
description += f"Last Checked: {domain.get('last_checked', '')}\n\n" | |
description += f"Please refer to the attached preview image for more details." | |
# Append takedown abuse information if already provided | |
if domain.get("report_abuse") and domain.get("report_abuse_timestamp"): | |
description += ( | |
"\n\n### Takedown Reported\n" | |
f"Takedown submitted at {domain.get('report_abuse_timestamp')}." | |
) | |
# Add Notes section if notes exist | |
notes = domain.get('notes') | |
if notes and notes.strip(): | |
description += f"\n\n### Notes\n{notes.strip()}" | |
print(f"[INFO] Using plain text format for ticket description") | |
# Prepare the labels from the configuration and add additional labels | |
labels = list(JIRA_LABELS) | |
# Add corporate/franchise label if applicable - Fixed NoneType error | |
corporate_franchise = domain.get("corporate_franchise") | |
if corporate_franchise: | |
if str(corporate_franchise).lower() in ["corporate", "franchise"]: | |
labels.append(str(corporate_franchise).lower()) | |
# Add abuse type label if applicable | |
abuse_type = domain.get("abuse_type") | |
if abuse_type: | |
if str(abuse_type).lower() in ["mimic", "redirect", "independent", "n/a", "to_validate"]: | |
labels.append(f"abuse-type-{str(abuse_type).lower()}") | |
# Add report abuse label if applicable | |
if domain.get("report_abuse"): | |
labels.append("takedown-requested") | |
try: | |
# Map corporate/franchise values to their NEW CLOUD IDs (updated from discovery) | |
corp_franchise_map = { | |
'corporate': '11343', # Updated from '18955' to new Cloud ID | |
'franchise': '11344', # Updated from '18956' to new Cloud ID | |
'none': '-1' | |
} | |
# Safely handle corporate/franchise value | |
corporate_franchise = domain.get('corporate_franchise') | |
corp_franchise_id = '-1' # Default to None/empty value | |
if corporate_franchise: | |
corp_value = str(corporate_franchise).lower().strip() | |
if corp_value in corp_franchise_map: | |
corp_franchise_id = corp_franchise_map[corp_value] | |
# Safely handle hosting location | |
hosting_location = domain.get('hosting_location') | |
hosting_location_id = get_hosting_location_id(hosting_location) if hosting_location else '-1' | |
# Safely handle target country | |
target_country = domain.get('target_country') | |
target_country_id = get_target_country_id(target_country) if target_country else '-1' | |
# Safely handle hosting provider - as string | |
hosting_provider = domain.get('hosting_provider') | |
hosting_provider_str = str(hosting_provider).strip() if hosting_provider else None | |
# Prepare custom fields, only include non-empty values | |
issue_dict = { | |
'project': {'key': JIRA_PROJECT_KEY}, | |
'summary': summary, | |
'description': description, | |
'issuetype': {'name': JIRA_ISSUE_TYPE}, | |
'labels': labels, | |
'parent': {'key': JIRA_EPIC}, # Epic must be an object with key property | |
} | |
# Only add custom fields if they have valid values - CORRECTED FIELD MAPPINGS | |
if corp_franchise_id != '-1': | |
issue_dict['customfield_10367'] = [{"id": corp_franchise_id}] # Corporate/Franchise - CORRECT | |
if hosting_provider_str: | |
issue_dict['customfield_10368'] = hosting_provider_str # Hosting Provider - CORRECTED FROM 10366 to 10368 | |
if hosting_location_id != '-1': | |
issue_dict['customfield_10366'] = [{"id": hosting_location_id}] # Hosting Location - CORRECTED FROM 10368 to 10366 | |
if target_country_id != '-1': | |
issue_dict['customfield_10107'] = [{"id": target_country_id}] # Target Country - CORRECT | |
issue = jira_client.create_issue(fields=issue_dict) | |
print(f"[INFO] Created Jira ticket {issue.key} for domain {domain['url']}") | |
# Always move the ticket to "In Progress" after creation | |
# First check available transitions | |
transitions = jira_client.transitions(issue.key) | |
transition_id = None | |
for t in transitions: | |
if 'in progress' in t['name'].lower(): | |
transition_id = t['id'] | |
break | |
if transition_id: | |
jira_client.transition_issue(issue.key, transition_id) | |
print(f"[INFO] Set ticket {issue.key} to In Progress") | |
else: | |
print(f"[WARN] Could not find In Progress transition for ticket {issue.key}") | |
# If a preview image exists, download it and attach to the ticket. | |
screenshot_url = domain.get('screenshot_url') | |
if screenshot_url: | |
try: | |
response = requests.get(screenshot_url) | |
response.raise_for_status() | |
from io import BytesIO | |
image_file = BytesIO(response.content) | |
image_file.name = "preview.png" | |
jira_client.add_attachment(issue=issue.key, attachment=image_file, filename="preview.png") | |
print(f"[INFO] Attached preview image for domain {domain['url']}") | |
except Exception as attach_error: | |
print(f"[ERROR] Failed to attach preview image for domain {domain['url']}: {attach_error}") | |
# If a takedown was already requested when creating the ticket, transition to Review | |
if domain.get("report_abuse") and domain.get("report_abuse_timestamp"): | |
transition_to_review(jira_client, issue.key) | |
print(f"[INFO] Moved ticket {issue.key} to Review due to existing takedown request") | |
# If domain is already down, transition to Done | |
if domain.get("status", "").lower() == "down": | |
close_jira_ticket(jira_client, issue.key) | |
print(f"[INFO] Moved ticket {issue.key} to Done due to domain being down") | |
return issue.key | |
except Exception as e: | |
print(f"[ERROR] Creating Jira ticket for domain {domain['url']}: {e}") | |
return None | |
def add_jira_comment(jira_client, ticket, comment): | |
try: | |
# Try to use ADF format first for v3 API | |
adf_comment = create_adf_comment(comment) | |
if adf_comment: | |
# Use ADF format | |
jira_client.add_comment(ticket, body=adf_comment) | |
print(f"[INFO] Added ADF comment to Jira ticket {ticket}") | |
else: | |
# Fallback to plain text | |
jira_client.add_comment(ticket, comment) | |
print(f"[INFO] Added plain text comment to Jira ticket {ticket}") | |
except Exception as e: | |
# If ADF fails, try plain text as fallback | |
try: | |
jira_client.add_comment(ticket, comment) | |
print(f"[INFO] Added fallback plain text comment to Jira ticket {ticket}") | |
except Exception as e2: | |
print(f"[ERROR] Adding comment to Jira ticket {ticket}: {e2}") | |
print(f"[ERROR] Original ADF error: {e}") | |
def calculate_time_diff(start_time, end_time): | |
""" | |
Calculate time difference between two timestamps and format it for Jira worklog. | |
Args: | |
start_time: Start timestamp (ISO format string or datetime) | |
end_time: End timestamp (ISO format string or datetime) | |
Returns: | |
Tuple of (total_minutes, formatted_time_for_jira) | |
""" | |
try: | |
# Log initial input values | |
print(f"[DEBUG] Starting time difference calculation") | |
print(f"[DEBUG] Start time: {start_time} (type: {type(start_time)})") | |
print(f"[DEBUG] End time: {end_time} (type: {type(end_time)})") | |
# Convert strings to datetime objects if needed | |
if isinstance(start_time, str): | |
print(f"[DEBUG] Attempting to parse start time string: {start_time}") | |
# Handle different ISO formats | |
try: | |
# Try standard fromisoformat first (Python 3.7+) | |
start_time = datetime.datetime.fromisoformat(start_time.replace('Z', '+00:00')) | |
print(f"[DEBUG] Successfully parsed start time using fromisoformat") | |
except ValueError as e: | |
print(f"[WARN] Failed to parse start time using fromisoformat: {e}") | |
# Fall back to parsing common JIRA date format (2023-03-15T14:30:45.000+0000) | |
if 'T' in start_time: | |
print(f"[DEBUG] Attempting to parse start time using JIRA format") | |
try: | |
date_part, time_part = start_time.split('T', 1) | |
time_part = time_part.split('.')[0] # Remove milliseconds | |
start_time = datetime.datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S") | |
print(f"[DEBUG] Successfully parsed start time using JIRA format") | |
except ValueError as e: | |
print(f"[ERROR] Failed to parse start time using JIRA format: {e}") | |
raise ValueError(f"Could not parse start time '{start_time}': {str(e)}") | |
else: | |
print(f"[DEBUG] Attempting to parse start time using direct format") | |
try: | |
start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") | |
print(f"[DEBUG] Successfully parsed start time using direct format") | |
except ValueError as e: | |
print(f"[ERROR] Failed to parse start time using direct format: {e}") | |
raise ValueError(f"Could not parse start time '{start_time}': {str(e)}") | |
if isinstance(end_time, str): | |
print(f"[DEBUG] Attempting to parse end time string: {end_time}") | |
# Handle different ISO formats | |
try: | |
end_time = datetime.datetime.fromisoformat(end_time.replace('Z', '+00:00')) | |
print(f"[DEBUG] Successfully parsed end time using fromisoformat") | |
except ValueError as e: | |
print(f"[WARN] Failed to parse end time using fromisoformat: {e}") | |
# Fall back to parsing common JIRA date format | |
if 'T' in end_time: | |
print(f"[DEBUG] Attempting to parse end time using JIRA format") | |
try: | |
date_part, time_part = end_time.split('T', 1) | |
time_part = time_part.split('.')[0] # Remove milliseconds | |
end_time = datetime.datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S") | |
print(f"[DEBUG] Successfully parsed end time using JIRA format") | |
except ValueError as e: | |
print(f"[ERROR] Failed to parse end time using JIRA format: {e}") | |
raise ValueError(f"Could not parse end time '{end_time}': {str(e)}") | |
else: | |
print(f"[DEBUG] Attempting to parse end time using direct format") | |
try: | |
end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") | |
print(f"[DEBUG] Successfully parsed end time using direct format") | |
except ValueError as e: | |
print(f"[ERROR] Failed to parse end time using direct format: {e}") | |
raise ValueError(f"Could not parse end time '{end_time}': {str(e)}") | |
# Ensure we have datetime objects | |
if not isinstance(start_time, datetime.datetime) or not isinstance(end_time, datetime.datetime): | |
error_msg = f"Invalid timestamp formats: start={type(start_time)}, end={type(end_time)}" | |
print(f"[ERROR] {error_msg}") | |
raise ValueError(error_msg) | |
# Calculate time difference in minutes (ensure positive value) | |
diff = end_time - start_time | |
if diff.total_seconds() < 0: | |
print(f"[WARN] Negative time difference detected: {diff}. Using absolute value.") | |
diff = abs(diff) | |
total_minutes = diff.total_seconds() / 60 | |
print(f"[DEBUG] Calculated total minutes: {total_minutes:.2f}") | |
# Format for Jira: days, hours and minutes for better readability | |
days = int(total_minutes / (60 * 24)) | |
remaining_minutes = total_minutes % (60 * 24) | |
hours = int(remaining_minutes / 60) | |
minutes = int(remaining_minutes % 60) | |
# Build time string based on what units are needed | |
time_spent = "" | |
if days > 0: | |
time_spent += f"{days}d " | |
if hours > 0: | |
time_spent += f"{hours}h " | |
if minutes > 0 or time_spent == "": | |
time_spent += f"{minutes}m" | |
# Log the calculation for debugging | |
print(f"[DEBUG] Time diff calculation: {start_time} to {end_time} = {time_spent} ({total_minutes:.2f} minutes)") | |
return total_minutes, time_spent.strip() | |
except Exception as e: | |
error_msg = f"Failed to calculate time difference: {str(e)}" | |
print(f"[ERROR] {error_msg}") | |
print(f"[ERROR] Input values: start_time={start_time} ({type(start_time)}), end_time={end_time} ({type(end_time)})") | |
raise ValueError(error_msg) | |
def add_jira_worklog(jira_client, ticket, time_spent, comment): | |
""" | |
Add a worklog entry to a Jira ticket. | |
Args: | |
jira_client: The Jira client instance | |
ticket: The ticket ID | |
time_spent: Time spent in Jira format (e.g. "1h 30m") | |
comment: Comment for the worklog entry | |
""" | |
try: | |
# Ensure we have a minimum worklog time (JIRA requires at least 1m) | |
if not time_spent or time_spent.strip() == "0m" or time_spent.strip() == "": | |
time_spent = "1m" | |
print(f"[WARN] Empty or zero worklog time for ticket {ticket}, using minimum of 1m") | |
# Try to use ADF format first for v3 API | |
adf_comment = create_adf_comment(comment) | |
if adf_comment: | |
# Use ADF format for worklog comment | |
jira_client.add_worklog(ticket, timeSpent=time_spent, comment=adf_comment) | |
print(f"[INFO] Added ADF worklog to Jira ticket {ticket}: {time_spent} - {comment}") | |
else: | |
# Fallback to plain text | |
jira_client.add_worklog(ticket, timeSpent=time_spent, comment=comment) | |
print(f"[INFO] Added plain text worklog to Jira ticket {ticket}: {time_spent} - {comment}") | |
except Exception as e: | |
print(f"[ERROR] Adding worklog to Jira ticket {ticket}: {e}") | |
# Try a simpler format if the original fails | |
try: | |
jira_client.add_worklog(ticket, timeSpent="10m", comment=f"{comment} (Original time: {time_spent})") | |
print(f"[INFO] Added fallback worklog to Jira ticket {ticket}") | |
except Exception as e2: | |
print(f"[ERROR] Fallback worklog also failed for ticket {ticket}: {e2}") | |
def get_created_time(jira_client, ticket_key): | |
""" | |
Get the creation time of a JIRA ticket in a robust way. | |
Args: | |
jira_client: The Jira client instance | |
ticket_key: The JIRA ticket key | |
Returns: | |
A datetime object representing the creation time | |
""" | |
try: | |
issue = jira_client.issue(ticket_key) | |
created_field = issue.fields.created | |
# Safely extract the created timestamp (handle PropertyHolder) | |
created_str = created_field | |
while hasattr(created_str, 'value'): | |
created_str = created_str.value | |
created_str = str(created_str) | |
# Handle JIRA's timestamp format | |
if 'T' in created_str: | |
# Try to parse as ISO format first | |
try: | |
return datetime.datetime.fromisoformat(created_str.replace('Z', '+00:00')) | |
except ValueError: | |
# Fall back to manual parsing | |
date_part, time_part = created_str.split('T', 1) | |
# Handle milliseconds and timezone if present | |
time_part = time_part.split('.')[0] | |
return datetime.datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S") | |
else: | |
# If it's not in ISO format, try direct parsing | |
return datetime.datetime.strptime(created_str, "%Y-%m-%d %H:%M:%S") | |
except Exception as e: | |
print(f"[ERROR] Failed to get creation time for ticket {ticket_key}: {e}") | |
# Return current time minus 1 hour as fallback | |
return datetime.datetime.utcnow() - datetime.timedelta(hours=1) | |
def close_jira_ticket(jira_client, ticket): | |
""" | |
Close a JIRA ticket (transition to Done status). | |
""" | |
try: | |
# First get current status to avoid unnecessary transitions | |
issue = jira_client.issue(ticket) | |
current_status = safe_extract_status_name(issue.fields.status) | |
# If already closed, don't change | |
if current_status in ['done', 'closed', 'resolved', 'complete']: | |
print(f"[INFO] Ticket {ticket} already in {current_status} status. No transition needed.") | |
return True | |
transitions = jira_client.transitions(ticket) | |
transition_id = None | |
# Try to find a transition named "Done", "Close" or "Closed" | |
transition_names = ['done', 'close', 'closed', 'resolve', 'resolved', 'complete'] | |
for t in transitions: | |
for transition_name in transition_names: | |
if transition_name in t['name'].lower(): | |
transition_id = t['id'] | |
break | |
if transition_id: | |
jira_client.transition_issue(ticket, transition_id) | |
print(f"[INFO] Transitioned Jira ticket {ticket} to closed status.") | |
return True | |
else: | |
print(f"[WARN] No suitable transition found to close Jira ticket {ticket}.") | |
return False | |
except Exception as e: | |
print(f"[ERROR] Closing Jira ticket {ticket}: {e}") | |
return False | |
def reopen_jira_ticket(jira_client, ticket): | |
try: | |
transitions = jira_client.transitions(ticket) | |
transition_id = None | |
# Look for transitions that indicate reopening (commonly "Reopen", "To Do", "In Progress" or "Open") | |
transition_names = ['in progress', 'to do', 'open'] | |
for t in transitions: | |
for transition_name in transition_names: | |
if transition_name in t['name'].lower(): | |
transition_id = t['id'] | |
break | |
if transition_id: | |
jira_client.transition_issue(ticket, transition_id) | |
print(f"[INFO] Reopened Jira ticket {ticket} due to status change.") | |
# After reopening, transition to In Progress | |
transition_to_in_progress(jira_client, ticket) | |
else: | |
print(f"[WARN] No suitable transition found to reopen Jira ticket {ticket}.") | |
except Exception as e: | |
print(f"[ERROR] Reopening Jira ticket {ticket}: {e}") | |
def takedown_comment_exists(jira_client, ticket): | |
""" | |
Check if a takedown comment already exists on the JIRA ticket. | |
""" | |
return comment_exists(jira_client, ticket, "Takedown reported at") | |
def transition_to_review(jira_client, ticket): | |
""" | |
Transition a JIRA ticket to the In Review status. | |
Args: | |
jira_client: The Jira client instance | |
ticket: The ticket ID | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
try: | |
# First get current status to avoid unnecessary transitions | |
issue = jira_client.issue(ticket) | |
current_status = safe_extract_status_name(issue.fields.status) | |
# If already in review or closed, don't change | |
transition_names = ['review'] | |
for transition_name in transition_names: | |
if transition_name in current_status: | |
print(f"[INFO] Ticket {ticket} already in {current_status} status. No transition needed.") | |
return True | |
transitions = jira_client.transitions(ticket) | |
# Log available transitions for debugging | |
print(f"[DEBUG] Available transitions for ticket {ticket}: {[t['name'] for t in transitions]}") | |
transition_id = None | |
# First look for exact match "In Review" with priority | |
for t in transitions: | |
if "In Review" in t['name']: # Exact match with case sensitivity | |
transition_id = t['id'] | |
print(f"[INFO] Found exact 'In Review' transition (ID: {transition_id})") | |
break | |
if transition_id: | |
jira_client.transition_issue(ticket, transition_id) | |
print(f"[INFO] Transitioned Jira ticket {ticket} to In Review status") | |
return True | |
else: | |
print(f"[WARN] No suitable transition found to set ticket {ticket} to In Review status. Available transitions: {[t['name'] for t in transitions]}") | |
return False | |
except Exception as e: | |
print(f"[ERROR] Transitioning Jira ticket {ticket} to In Review status: {e}") | |
return False | |
def transition_to_in_progress(jira_client, ticket): | |
""" | |
Transition a JIRA ticket to the In Progress status. | |
""" | |
try: | |
# First get current status to avoid unnecessary transitions | |
issue = jira_client.issue(ticket) | |
current_status = safe_extract_status_name(issue.fields.status) | |
# If already in progress, review, or closed, don't change | |
transition_names = ['in progress'] | |
for transition_name in transition_names: | |
if transition_name in current_status: | |
print(f"[INFO] Ticket {ticket} already in {current_status} status. No transition needed.") | |
return True | |
transitions = jira_client.transitions(ticket) | |
transition_id = None | |
# Look for a transition named "In Progress" | |
for t in transitions: | |
if 'in progress' in t['name'].lower(): | |
transition_id = t['id'] | |
break | |
if transition_id: | |
jira_client.transition_issue(ticket, transition_id) | |
print(f"[INFO] Transitioned Jira ticket {ticket} to In Progress status") | |
return True | |
else: | |
print(f"[WARN] No In Progress transition found for ticket {ticket}") | |
return False | |
except Exception as e: | |
print(f"[ERROR] Transitioning Jira ticket {ticket} to In Progress: {e}") | |
return False | |
def update_jira_labels(jira_client, ticket, domain): | |
""" | |
Update JIRA ticket labels based on domain fields. | |
Only handle labels that aren't custom fields. | |
""" | |
try: | |
# Get current issue data | |
issue = jira_client.issue(ticket) | |
made_changes = False | |
# Helper function to safely extract labels from PropertyHolder | |
def extract_labels(labels_field): | |
"""Extract labels from PropertyHolder object or list""" | |
if labels_field is None: | |
return [] | |
# Recursively extract from nested PropertyHolder objects | |
labels_content = labels_field | |
while hasattr(labels_content, 'value'): | |
labels_content = labels_content.value | |
print(f"[DEBUG] extract_labels: Extracted nested value, type = {type(labels_content)}") | |
# If it's a list or already properly structured | |
if isinstance(labels_content, (list, tuple)): | |
return [str(label) for label in labels_content if label is not None] | |
elif labels_content: | |
# If it's a single value, return as list | |
return [str(labels_content)] | |
else: | |
return [] | |
# Start with existing labels (excluding ones we'll manage) | |
managed_prefixes = [ | |
'takedown-requested', | |
'malicious', | |
'corporate', | |
'franchise', | |
'abuse-type' | |
] | |
# Safely extract current labels | |
current_labels_list = extract_labels(issue.fields.labels) | |
current_labels = [label for label in current_labels_list if label is not None and | |
not any(label == prefix or label.startswith(prefix) for prefix in managed_prefixes)] | |
# Add back the managed labels based on current domain state | |
new_labels = list(current_labels) | |
# Add report abuse label if applicable | |
if domain.get("report_abuse"): | |
new_labels.append("takedown-requested") | |
# Add malicious label if applicable | |
if domain.get("malicious"): | |
new_labels.append("malicious") | |
# Add corporate/franchise label if applicable | |
corporate_franchise = domain.get("corporate_franchise") | |
if corporate_franchise: | |
if str(corporate_franchise).lower() in ["corporate", "franchise"]: | |
new_labels.append(str(corporate_franchise).lower()) | |
# Add abuse type label if applicable | |
abuse_type = domain.get("abuse_type") | |
if abuse_type: | |
if str(abuse_type).lower() in ["mimic", "redirect", "independent", "n/a", "to_validate"]: | |
new_labels.append(f"abuse-type-{str(abuse_type).lower()}") | |
# Remove any None values that might have slipped through | |
new_labels = [label for label in new_labels if label is not None] | |
# Update labels only if they've changed | |
if set(new_labels) != set(current_labels_list): | |
issue.update(fields={"labels": new_labels}) | |
print(f"[INFO] Updated labels for Jira ticket {ticket}: {new_labels}") | |
made_changes = True | |
# Add a comment only for significant label changes | |
significant_changes = [] | |
if 'malicious' in new_labels and 'malicious' not in current_labels_list: | |
significant_changes.append("Domain marked as MALICIOUS") | |
if 'takedown-requested' in new_labels and 'takedown-requested' not in current_labels_list: | |
significant_changes.append("Takedown requested") | |
# Add comment for abuse type changes | |
new_abuse_labels = [l for l in new_labels if l.startswith('abuse-type-')] | |
old_abuse_labels = [l for l in current_labels_list if l is not None and l.startswith('abuse-type-')] | |
if new_abuse_labels != old_abuse_labels and new_abuse_labels: | |
abuse_type_value = new_abuse_labels[0].replace('abuse-type-', '') | |
significant_changes.append(f"Abuse type updated to: {abuse_type_value}") | |
if significant_changes: | |
add_jira_comment( | |
jira_client, | |
ticket, | |
f"Domain status updated: {', '.join(significant_changes)}" | |
) | |
return True | |
except Exception as e: | |
print(f"[ERROR] Updating labels for Jira ticket {ticket}: {e}") | |
return False | |
def update_jira_custom_fields(jira_client, ticket, domain): | |
""" | |
Update JIRA ticket custom fields based on domain data. | |
""" | |
try: | |
issue = jira_client.issue(ticket) | |
update_fields = {} | |
significant_changes = [] | |
made_changes = False | |
# Fetch all existing comments once to avoid multiple API calls | |
all_comments = jira_client.comments(ticket) | |
comment_bodies = [] | |
for comment in all_comments: | |
# Safely extract comment body from PropertyHolder (handle nested) | |
comment_body = comment.body | |
while hasattr(comment_body, 'value'): | |
comment_body = comment_body.value | |
comment_bodies.append(str(comment_body) if comment_body else '') | |
# Map corporate/franchise values to their NEW CLOUD IDs (updated from discovery) | |
corp_franchise_map = { | |
'corporate': '11343', # Updated from '18955' to new Cloud ID | |
'franchise': '11344', # Updated from '18956' to new Cloud ID | |
'none': '-1' | |
} | |
# Helper function to safely extract ID from PropertyHolder or list | |
def extract_field_id(field_value): | |
"""Extract ID from PropertyHolder object or list of PropertyHolder objects""" | |
if field_value is None: | |
return '-1' | |
def extract_id_from_property_holder(obj, depth=0): | |
"""Recursively extract ID from PropertyHolder with detailed debugging""" | |
indent = " " * depth | |
print(f"[DEBUG] {indent}extract_id_from_property_holder: obj type = {type(obj)}") | |
if obj is None: | |
return '-1' | |
# Handle lists (multi-choice fields typically return first ID) | |
if isinstance(obj, (list, tuple)): | |
print(f"[DEBUG] {indent}Found list/tuple with {len(obj)} items") | |
if len(obj) > 0: | |
first_item_id = extract_id_from_property_holder(obj[0], depth + 1) | |
return first_item_id | |
return '-1' | |
# If it's not a PropertyHolder-like object, try to get ID directly | |
if not hasattr(obj, '__class__') or 'PropertyHolder' not in str(type(obj)): | |
# Special handling for CustomFieldOption objects - extract ID properly | |
if hasattr(obj, 'id'): | |
result = str(obj.id).strip() | |
print(f"[DEBUG] {indent}CustomFieldOption ID result: '{result}'") | |
return result if result else '-1' | |
else: | |
result = str(obj).strip() | |
print(f"[DEBUG] {indent}Non-PropertyHolder ID result: '{result}'") | |
return result if result else '-1' | |
# It's a PropertyHolder, examine its attributes for ID | |
print(f"[DEBUG] {indent}PropertyHolder attributes: {[attr for attr in dir(obj) if not attr.startswith('_')]}") | |
# Try extracting ID from common attributes in order of preference | |
for attr_name in ['id', 'key', 'value', 'name', 'content']: | |
if hasattr(obj, attr_name): | |
attr_value = getattr(obj, attr_name) | |
print(f"[DEBUG] {indent}Found {attr_name} = {attr_value} (type: {type(attr_value)})") | |
if attr_value is not None: | |
if isinstance(attr_value, (list, tuple)): | |
# Handle list attributes | |
if len(attr_value) > 0: | |
return extract_id_from_property_holder(attr_value[0], depth + 1) | |
return '-1' | |
elif 'PropertyHolder' in str(type(attr_value)): | |
# Recursively extract from nested PropertyHolder | |
return extract_id_from_property_holder(attr_value, depth + 1) | |
else: | |
# It's a plain value, check if it looks like an ID | |
result = str(attr_value).strip() | |
if result and not result.startswith('<') and not result.endswith('>'): | |
return result | |
# If no suitable attribute found, return -1 | |
print(f"[DEBUG] {indent}No extractable ID found in PropertyHolder") | |
return '-1' | |
# Start extraction | |
result = extract_id_from_property_holder(field_value) | |
print(f"[DEBUG] Final extracted ID: '{result}'") | |
return result | |
# Helper function to safely extract string value from PropertyHolder | |
def extract_field_string(field_value): | |
"""Extract string value from PropertyHolder object or ADF format""" | |
if field_value is None: | |
return '' | |
def extract_from_property_holder(obj, depth=0): | |
"""Recursively extract value from PropertyHolder with detailed debugging""" | |
indent = " " * depth | |
print(f"[DEBUG] {indent}extract_from_property_holder: obj type = {type(obj)}") | |
if obj is None: | |
return '' | |
# If it's not a PropertyHolder-like object, return it | |
if not hasattr(obj, '__class__') or 'PropertyHolder' not in str(type(obj)): | |
if isinstance(obj, (list, tuple)): | |
print(f"[DEBUG] {indent}Found list/tuple with {len(obj)} items") | |
# Handle list of PropertyHolders (multi-choice fields) | |
results = [] | |
for i, item in enumerate(obj): | |
print(f"[DEBUG] {indent}Processing list item {i}: {type(item)}") | |
item_result = extract_from_property_holder(item, depth + 1) | |
if item_result: | |
results.append(item_result) | |
return ', '.join(results) if results else '' | |
else: | |
result = str(obj).strip() | |
print(f"[DEBUG] {indent}Non-PropertyHolder result: '{result}'") | |
return result | |
# It's a PropertyHolder, examine its attributes | |
print(f"[DEBUG] {indent}PropertyHolder attributes: {[attr for attr in dir(obj) if not attr.startswith('_')]}") | |
# Try extracting from common attributes in order of preference, including ADF 'content' | |
for attr_name in ['text', 'displayName', 'name', 'value', 'content', 'key', 'id']: | |
if hasattr(obj, attr_name): | |
attr_value = getattr(obj, attr_name) | |
print(f"[DEBUG] {indent}Found {attr_name} = {attr_value} (type: {type(attr_value)})") | |
# Special handling for ADF content | |
if attr_name == 'content' and isinstance(attr_value, (list, tuple)): | |
print(f"[DEBUG] {indent}Processing ADF content with {len(attr_value)} paragraphs") | |
# Handle ADF content structure | |
text_parts = [] | |
for paragraph in attr_value: | |
if isinstance(paragraph, dict) and paragraph.get('type') == 'paragraph': | |
paragraph_content = paragraph.get('content', []) | |
for text_node in paragraph_content: | |
if isinstance(text_node, dict) and text_node.get('type') == 'text': | |
text_parts.append(text_node.get('text', '')) | |
elif hasattr(paragraph, '__class__') and 'PropertyHolder' in str(type(paragraph)): | |
# Recursively extract from PropertyHolder paragraph | |
para_result = extract_from_property_holder(paragraph, depth + 1) | |
if para_result: | |
text_parts.append(para_result) | |
result = ' '.join(text_parts).strip() | |
if result: | |
print(f"[DEBUG] {indent}Extracted ADF content: '{result}'") | |
return result | |
# Recursively process the attribute value | |
if attr_value is not None: | |
if isinstance(attr_value, (list, tuple)): | |
# Handle list attributes | |
results = [] | |
for i, item in enumerate(attr_value): | |
item_result = extract_from_property_holder(item, depth + 1) | |
if item_result: | |
results.append(item_result) | |
return ', '.join(results) if results else '' | |
elif 'PropertyHolder' in str(type(attr_value)): | |
# Recursively extract from nested PropertyHolder | |
return extract_from_property_holder(attr_value, depth + 1) | |
else: | |
# It's a plain value | |
result = str(attr_value).strip() | |
if result and not result.startswith('<') and not result.endswith('>'): | |
return result | |
# If no suitable attribute found, return empty string | |
print(f"[DEBUG] {indent}No extractable value found in PropertyHolder") | |
return '' | |
# Start extraction | |
result = extract_from_property_holder(field_value) | |
# Handle ADF format (Atlassian Document Format) if result is a dict | |
if isinstance(result, str): | |
try: | |
import json | |
adf_data = json.loads(result) | |
if isinstance(adf_data, dict) and adf_data.get('type') == 'doc': | |
print(f"[DEBUG] Found ADF in string, parsing...") | |
content = adf_data.get('content', []) | |
text_parts = [] | |
for paragraph in content: | |
if paragraph.get('type') == 'paragraph': | |
paragraph_content = paragraph.get('content', []) | |
for text_node in paragraph_content: | |
if text_node.get('type') == 'text': | |
text_parts.append(text_node.get('text', '')) | |
result = ' '.join(text_parts).strip() | |
print(f"[DEBUG] Extracted from ADF string: '{result}'") | |
except: | |
pass | |
# Check if we need to handle direct ADF dict | |
if isinstance(field_value, dict) and field_value.get('type') == 'doc': | |
print(f"[DEBUG] Processing direct ADF format") | |
try: | |
content = field_value.get('content', []) | |
text_parts = [] | |
for paragraph in content: | |
if paragraph.get('type') == 'paragraph': | |
paragraph_content = paragraph.get('content', []) | |
for text_node in paragraph_content: | |
if text_node.get('type') == 'text': | |
text_parts.append(text_node.get('text', '')) | |
result = ' '.join(text_parts).strip() | |
print(f"[DEBUG] Extracted from direct ADF: '{result}'") | |
except Exception as e: | |
print(f"[DEBUG] ADF parsing failed: {e}") | |
result = str(field_value) | |
print(f"[DEBUG] Final extracted result: '{result}'") | |
return result | |
# Helper function to create ADF format for text fields | |
def create_adf_text(text): | |
"""Create ADF format for plain text fields that require Atlassian Document Format""" | |
if not text or not text.strip(): | |
return None | |
adf_result = { | |
"type": "doc", | |
"version": 1, | |
"content": [{ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": str(text).strip()}] | |
}] | |
} | |
print(f"[DEBUG] create_adf_text: Created ADF for '{text}' = {adf_result}") | |
return adf_result | |
# Safely handle corporate/franchise update | |
corporate_franchise = domain.get('corporate_franchise') | |
if corporate_franchise is not None: | |
corp_value = str(corporate_franchise).lower().strip() | |
corp_franchise_id = corp_franchise_map.get(corp_value, '-1') | |
current_corp = getattr(issue.fields, 'customfield_10367', None) | |
current_corp_id = extract_field_id(current_corp) | |
if corp_franchise_id != '-1' and corp_franchise_id != current_corp_id: | |
new_corp = [{"id": corp_franchise_id}] # Array with single object | |
update_fields['customfield_10367'] = new_corp | |
significant_changes.append(f"Corporate/Franchise: {corporate_franchise}") | |
made_changes = True | |
elif corp_franchise_id == '-1' and current_corp is not None: | |
update_fields['customfield_10367'] = [] # Empty array instead of None | |
made_changes = True | |
# Safely handle hosting provider update - CORRECTED FIELD ID from 10366 to 10368 | |
hosting_provider = domain.get('hosting_provider') | |
if hosting_provider is not None: | |
current_provider = getattr(issue.fields, 'customfield_10368', '') # CORRECTED: was 10366 | |
current_provider_str = extract_field_string(current_provider) | |
new_provider = str(hosting_provider).strip() if hosting_provider else '' | |
# Debug logging for hosting provider comparison | |
print(f"[DEBUG] Hosting provider comparison for ticket {ticket}:") | |
print(f"[DEBUG] Current: '{current_provider_str}' (type: {type(current_provider)})") | |
print(f"[DEBUG] New: '{new_provider}'") | |
print(f"[DEBUG] Equal: {current_provider_str == new_provider}") | |
if current_provider_str != new_provider and new_provider: | |
# Use ADF format for hosting provider field since it expects Atlassian Document Format | |
adf_provider = create_adf_text(new_provider) | |
if adf_provider: | |
update_fields['customfield_10368'] = adf_provider # CORRECTED: was 10366 | |
significant_changes.append(f"Hosting Provider: {new_provider}") | |
made_changes = True | |
print(f"[INFO] Hosting provider will be updated from '{current_provider_str}' to '{new_provider}'") | |
elif current_provider_str and not new_provider: # Only clear if there was a previous value | |
# Clear with empty ADF document or None | |
update_fields['customfield_10368'] = None # CORRECTED: was 10366 | |
made_changes = True | |
print(f"[INFO] Hosting provider will be cleared") | |
else: | |
print(f"[DEBUG] No hosting provider update needed") | |
# Safely handle hosting location update - CORRECTED FIELD ID from 10368 to 10366 | |
hosting_location = domain.get('hosting_location') | |
if hosting_location is not None: | |
hosting_location_id = get_hosting_location_id(hosting_location) | |
current_location = getattr(issue.fields, 'customfield_10366', None) # CORRECTED: was 10368 | |
current_location_id = extract_field_id(current_location) | |
# Debug logging for hosting location comparison | |
print(f"[DEBUG] Hosting location comparison for ticket {ticket}:") | |
print(f"[DEBUG] Current: '{current_location_id}' (type: {type(current_location)})") | |
print(f"[DEBUG] New: '{hosting_location_id}'") | |
print(f"[DEBUG] Equal: {hosting_location_id == current_location_id}") | |
if hosting_location_id != '-1' and hosting_location_id != current_location_id: | |
new_location = [{"id": hosting_location_id}] # Array with single object | |
update_fields['customfield_10366'] = new_location # CORRECTED: was 10368 | |
location_name = COUNTRY_CODE_TO_NAME.get(hosting_location.lower(), hosting_location) | |
significant_changes.append(f"Hosting Location: {location_name}") | |
made_changes = True | |
print(f"[INFO] Hosting location will be updated from '{current_location_id}' to '{hosting_location_id}'") | |
elif hosting_location_id == '-1' and current_location is not None: | |
update_fields['customfield_10366'] = [] # CORRECTED: was 10368, Empty array instead of None | |
made_changes = True | |
print(f"[INFO] Hosting location will be cleared") | |
else: | |
print(f"[DEBUG] No hosting location update needed") | |
# Safely handle target country update | |
target_country = domain.get('target_country') | |
if target_country is not None: | |
target_country_id = get_target_country_id(target_country) | |
current_country = getattr(issue.fields, 'customfield_10107', None) | |
current_country_id = extract_field_id(current_country) | |
# Debug logging for target country comparison | |
print(f"[DEBUG] Target country comparison for ticket {ticket}:") | |
print(f"[DEBUG] Current: '{current_country_id}' (type: {type(current_country)})") | |
print(f"[DEBUG] New: '{target_country_id}'") | |
print(f"[DEBUG] Equal: {target_country_id == current_country_id}") | |
if target_country_id != '-1' and target_country_id != current_country_id: | |
new_country = [{"id": target_country_id}] # Array with single object | |
update_fields['customfield_10107'] = new_country | |
country_name = COUNTRY_CODE_TO_NAME.get(target_country.lower(), target_country) | |
significant_changes.append(f"Target Country: {country_name}") | |
made_changes = True | |
print(f"[INFO] Target country will be updated from '{current_country_id}' to '{target_country_id}'") | |
elif target_country_id == '-1' and current_country is not None: | |
update_fields['customfield_10107'] = [] # Empty array instead of None | |
made_changes = True | |
print(f"[INFO] Target country will be cleared") | |
else: | |
print(f"[DEBUG] No target country update needed") | |
# Handle notes in description - only update if notes have actually changed | |
notes = domain.get('notes') | |
notes_updated = False | |
if notes and notes.strip(): | |
# Get current notes from description | |
current_notes = extract_notes_from_description(jira_client, ticket) | |
new_notes = notes.strip() | |
print(f"[DEBUG] Notes comparison for ticket {ticket}:") | |
print(f"[DEBUG] Current notes: '{current_notes}'") | |
print(f"[DEBUG] New notes: '{new_notes}'") | |
print(f"[DEBUG] Equal: {current_notes == new_notes}") | |
# Only update if notes have changed | |
if current_notes != new_notes: | |
if update_description_with_notes(jira_client, ticket, new_notes): | |
# Add a comment about the notes update | |
add_jira_comment(jira_client, ticket, f"Notes updated in description:\n{new_notes}") | |
notes_updated = True | |
print(f"[INFO] Updated notes in description for ticket {ticket}") | |
else: | |
print(f"[ERROR] Failed to update notes in description for ticket {ticket}") | |
else: | |
print(f"[DEBUG] Notes unchanged for ticket {ticket}, no update needed") | |
elif notes is not None and not notes.strip(): | |
# Notes field exists but is empty - check if we need to clear existing notes | |
current_notes = extract_notes_from_description(jira_client, ticket) | |
if current_notes: | |
if update_description_with_notes(jira_client, ticket, ''): | |
add_jira_comment(jira_client, ticket, "Notes cleared from description") | |
notes_updated = True | |
print(f"[INFO] Cleared notes from description for ticket {ticket}") | |
else: | |
print(f"[ERROR] Failed to clear notes from description for ticket {ticket}") | |
else: | |
print(f"[DEBUG] No notes to clear for ticket {ticket}") | |
else: | |
print(f"[DEBUG] No notes provided for ticket {ticket}") | |
# Only update if there are actual field changes | |
if update_fields: | |
issue.update(fields=update_fields) | |
print(f"[INFO] Updated custom fields for Jira ticket {ticket}: {update_fields}") | |
# Only add a domain information comment if there were significant changes | |
if significant_changes: | |
domain_info_comment = "Updated domain information:\n" + "\n".join(significant_changes) | |
# Check if a similar domain info comment already exists | |
similar_comment_exists = False | |
for comment_body in comment_bodies: | |
# Check for exact match of the domain information comment | |
if domain_info_comment.strip() == comment_body.strip(): | |
similar_comment_exists = True | |
print(f"[DEBUG] Exact matching domain info comment found, skipping duplicate") | |
break | |
# Also check if this is a very recent domain information comment with the same changes | |
if "Updated domain information:" in comment_body: | |
# Check if all the same changes are in this comment | |
all_changes_match = True | |
for change in significant_changes: | |
if change not in comment_body: | |
all_changes_match = False | |
break | |
# If all changes match, check if any different changes exist | |
if all_changes_match: | |
# Split the comment into lines to check for extra changes | |
comment_lines = [line.strip() for line in comment_body.split('\n') if line.strip()] | |
expected_lines = [line.strip() for line in domain_info_comment.split('\n') if line.strip()] | |
# If the number of lines matches and all expected lines are present, it's a duplicate | |
if len(comment_lines) == len(expected_lines): | |
similar_comment_exists = True | |
print(f"[DEBUG] Similar domain info comment found with matching changes, skipping duplicate") | |
break | |
if not similar_comment_exists: | |
add_jira_comment(jira_client, ticket, domain_info_comment) | |
print(f"[INFO] Added domain information comment to ticket {ticket}") | |
else: | |
print(f"[INFO] Similar domain information comment already exists in ticket {ticket}, skipping") | |
return True | |
except Exception as e: | |
print(f"[ERROR] Updating custom fields for Jira ticket {ticket}: {e}") | |
return False | |
def comment_exists(jira_client, ticket, text_pattern): | |
""" | |
Check if a comment containing the given text pattern already exists on the ticket. | |
Enhanced version with better pattern matching. | |
""" | |
try: | |
# Fetch all comments on the ticket | |
comments = jira_client.comments(ticket) | |
print(f"[DEBUG] Checking {len(comments)} comments for pattern: '{text_pattern[:50]}{'...' if len(text_pattern) > 50 else ''}' in ticket {ticket}") | |
# Pre-normalize the search pattern once | |
pattern_normalized = normalize_content_for_comparison(text_pattern) | |
# Helper that converts any JIRA comment body (plain-text, ADF dict or PropertyHolder) to plain text | |
def _get_comment_plain_text(body): | |
# Unwrap nested PropertyHolder.value attributes first | |
while hasattr(body, 'value'): | |
body = body.value | |
# Direct ADF dict | |
if isinstance(body, dict) and body.get('type') == 'doc': | |
return extract_text_from_adf(body) | |
# PropertyHolder or other complex objects – try generic extractor | |
if hasattr(body, '__class__') and 'PropertyHolder' in str(type(body)): | |
extracted = extract_field_string_from_propertyholder(body) | |
if isinstance(extracted, dict) and extracted.get('type') == 'doc': | |
return extract_text_from_adf(extracted) | |
return str(extracted) | |
# Fallback – treat as string | |
return str(body) | |
# Check each comment | |
for comment in comments: | |
raw_body = comment.body | |
plain_text = _get_comment_plain_text(raw_body) | |
comment_normalized = normalize_content_for_comparison(plain_text) | |
if pattern_normalized in comment_normalized: | |
print(f"[DEBUG] Found matching comment pattern in ticket {ticket}") | |
return True | |
print(f"[DEBUG] No matching comment pattern found in ticket {ticket}") | |
return False | |
except Exception as e: | |
print(f"[ERROR] Checking for comment pattern on ticket {ticket}: {e}") | |
# Default to False to allow adding the comment if there's an error | |
return False | |
def extract_takedown_from_description(jira_client, ticket): | |
""" | |
Extract the current takedown information from the ticket description. | |
Enhanced version with better parsing. | |
""" | |
try: | |
issue = jira_client.issue(ticket) | |
description = issue.fields.description | |
if not description: | |
print(f"[DEBUG] No description found for ticket {ticket}") | |
return '' | |
# Convert description to searchable text | |
description_text = '' | |
# Handle PropertyHolder objects | |
if hasattr(description, '__class__') and 'PropertyHolder' in str(type(description)): | |
# Try to extract text content from PropertyHolder | |
extracted = extract_field_string_from_propertyholder(description) | |
if isinstance(extracted, dict) and extracted.get('type') == 'doc': | |
# It's ADF format, extract text from it | |
description_text = extract_text_from_adf(extracted) | |
else: | |
description_text = str(extracted) | |
elif isinstance(description, dict) and description.get('type') == 'doc': | |
# Direct ADF format | |
description_text = extract_text_from_adf(description) | |
else: | |
# Plain text | |
description_text = str(description) | |
print(f"[DEBUG] Extracted description text for takedown search (length: {len(description_text)})") | |
# Look for takedown patterns - be more flexible | |
takedown_patterns = [ | |
'takedown submitted at', | |
'takedown reported at', | |
'takedown requested at' | |
] | |
lines = description_text.split('\n') | |
for line in lines: | |
line_lower = line.lower().strip() | |
for pattern in takedown_patterns: | |
if pattern in line_lower: | |
print(f"[DEBUG] Found takedown info in description: '{line.strip()}'") | |
return line.strip() | |
print(f"[DEBUG] No takedown info found in description for ticket {ticket}") | |
return '' | |
except Exception as e: | |
print(f"[ERROR] Failed to extract takedown from description for ticket {ticket}: {e}") | |
return '' | |
def extract_text_from_adf(adf_doc): | |
""" | |
Extract plain text from ADF (Atlassian Document Format) document. | |
""" | |
try: | |
# Flatten text recursively so we also capture list items, bullet points and hardBreak nodes | |
def _flatten(node): | |
collected = [] | |
ntype = node.get('type') if isinstance(node, dict) else None | |
if not ntype: | |
return collected | |
# Text node | |
if ntype == 'text': | |
collected.append(node.get('text', '')) | |
# Hard line break – treat as newline so bullets stay on separate lines | |
elif ntype == 'hardBreak': | |
collected.append('\n') | |
# Container nodes that may hold other content | |
else: | |
for child in node.get('content', []) or []: | |
collected.extend(_flatten(child)) | |
# For listItem we force newline after each item | |
if ntype == 'listItem': | |
collected.append('\n') | |
return collected | |
flattened_chars = [] | |
for blk in adf_doc.get('content', []) or []: | |
flattened_chars.extend(_flatten(blk)) | |
# add newline between top-level blocks to preserve separation | |
flattened_chars.append('\n') | |
# Join and normalise multiple consecutive newlines | |
result = ''.join(flattened_chars) | |
import re | |
result = re.sub(r'\n{2,}', '\n', result).strip('\n') | |
print(f"[DEBUG] Extracted {len(result)} characters from ADF document") | |
return result | |
except Exception as e: | |
print(f"[ERROR] Failed to extract text from ADF: {e}") | |
return '' | |
def poll_domains(domain_statuses): | |
"""Run the main polling loop in a background thread""" | |
while True: | |
current_time = datetime.datetime.utcnow().isoformat() | |
print(f"[INFO] Polling domains at {current_time} UTC...") | |
domains = get_domains() | |
print(f"[DEBUG] Retrieved {len(domains)} domains from {SERVER_URL}.") | |
for domain in domains: | |
domain_id = domain['id'] | |
current_status = domain['status'] | |
# Skip processing if domain status is 'scanning' | |
if current_status.lower() == "scanning": | |
print(f"[DEBUG] Domain ID {domain_id} is scanning. Skipping domain.") | |
continue | |
try: | |
jira_ticket = domain.get('jira_ticket') | |
except Exception as e: | |
print(f"[ERROR] Failed to get Jira ticket for domain ID {domain_id}: {e}") | |
jira_ticket = None | |
if not jira_ticket: | |
print(f"[INFO] No Jira ticket for domain ID {domain_id}. Creating one...") | |
ticket = create_jira_ticket(jira_client, domain) | |
if ticket: | |
if update_jira_ticket(domain_id, ticket): | |
jira_ticket = ticket | |
if jira_ticket: | |
print(f"[DEBUG] Processing ticket {jira_ticket} for domain ID {domain_id}") | |
# Update labels and custom fields (these no longer trigger state transitions) | |
update_jira_labels(jira_client, jira_ticket, domain) | |
update_jira_custom_fields(jira_client, jira_ticket, domain) | |
# Handle content updates (notes and takedown) - only if actually changed | |
check_and_handle_notes_updates(jira_client, jira_ticket, domain) | |
check_and_handle_takedown_updates(jira_client, jira_ticket, domain) | |
# CENTRALIZED STATE MANAGEMENT | |
# Determine what state the ticket should be in | |
target_state = determine_correct_ticket_state(domain, jira_client, jira_ticket) | |
current_state = get_current_ticket_state(jira_client, jira_ticket) | |
# Only transition if needed | |
if target_state != current_state.lower().replace(' ', '_'): | |
print(f"[INFO] Ticket {jira_ticket} needs state change: {current_state} → {target_state}") | |
# Handle special case: domain went down, need to add worklog for takedown resolution | |
if (target_state == "done" and | |
domain.get("report_abuse") and domain.get("report_abuse_timestamp")): | |
# Add worklog for takedown resolution time before closing | |
try: | |
abuse_timestamp = domain.get("report_abuse_timestamp") | |
current_datetime = datetime.datetime.utcnow() | |
# Parse takedown timestamp | |
takedown_time = None | |
if isinstance(abuse_timestamp, str): | |
if 'T' in abuse_timestamp: | |
try: | |
takedown_time = datetime.datetime.fromisoformat(abuse_timestamp.replace('Z', '+00:00')) | |
except ValueError: | |
try: | |
takedown_time = datetime.datetime.strptime(abuse_timestamp, "%Y-%m-%d %H:%M:%S") | |
except ValueError: | |
print(f"[ERROR] Could not parse takedown timestamp: {abuse_timestamp}") | |
else: | |
try: | |
takedown_time = datetime.datetime.strptime(abuse_timestamp, "%Y-%m-%d %H:%M:%S") | |
except ValueError: | |
print(f"[ERROR] Could not parse takedown timestamp: {abuse_timestamp}") | |
if takedown_time: | |
# Check if worklog already exists | |
existing_worklogs = jira_client.worklogs(jira_ticket) | |
worklog_exists = False | |
for worklog in existing_worklogs: | |
worklog_comment_body = worklog.comment if hasattr(worklog, 'comment') else '' | |
if isinstance(worklog_comment_body, str) and "Time from takedown request to resolution" in worklog_comment_body: | |
worklog_exists = True | |
break | |
if not worklog_exists: | |
_, time_spent = calculate_time_diff(takedown_time, current_datetime) | |
worklog_comment = f"Time from takedown request to resolution: {time_spent}" | |
add_jira_worklog(jira_client, jira_ticket, time_spent, worklog_comment) | |
print(f"[INFO] Added takedown resolution worklog to ticket {jira_ticket}") | |
else: | |
print(f"[DEBUG] Takedown resolution worklog already exists for ticket {jira_ticket}") | |
except Exception as e: | |
print(f"[ERROR] Failed to add takedown resolution worklog: {e}") | |
# Transition to the correct state | |
transition_ticket_to_correct_state(jira_client, jira_ticket, target_state, current_state) | |
else: | |
print(f"[DEBUG] Ticket {jira_ticket} already in correct state: {current_state}") | |
# Update domain status tracking | |
domain_statuses[domain_id] = current_status | |
time.sleep(POLL_INTERVAL) | |
def main(): | |
# Log basic configuration info at startup (omit sensitive details) | |
print("[INFO] Starting Jira Manager.") | |
print(f"[INFO] Configuration: JIRA_SERVER={JIRA_SERVER}, JIRA_PROJECT_KEY={JIRA_PROJECT_KEY}") | |
print(f"[INFO] SERVER_URL={SERVER_URL}, POLL_INTERVAL={POLL_INTERVAL} seconds.") | |
# Initialize domain status tracking dictionary | |
domain_statuses = {} | |
# Connect to Jira using personal access token (PAT) with Bearer authentication | |
global jira_client | |
try: | |
# Initialize JIRA client with explicit v3 API configuration | |
jira_options = { | |
'server': JIRA_SERVER, | |
'rest_api_version': '3', # Force v3 API | |
'rest_path': 'api', | |
'verify': True | |
} | |
jira_client = JIRA(options=jira_options) | |
# For JIRA Cloud, use email + API token as basic auth instead of Bearer token | |
# The error suggests Connect Session Auth Token parsing failed, let's try basic auth | |
if 'atlassian.net' in JIRA_SERVER: | |
print("[INFO] Detected JIRA Cloud - using email + API token authentication") | |
# For JIRA Cloud, you need email:api_token as basic auth | |
# The API token should be used with your email as username | |
jira_client._session.auth = (JIRA_EMAIL, JIRA_API_TOKEN) | |
jira_client._session.headers.update({ | |
'Accept': 'application/json', | |
'Content-Type': 'application/json' | |
}) | |
print(f"[INFO] Using email: {JIRA_EMAIL} for authentication") | |
else: | |
print("[INFO] Using Bearer token authentication for on-premise JIRA") | |
# Use Bearer token authentication for on-premise | |
jira_client._session.headers.update({ | |
"Authorization": f"Bearer {JIRA_API_TOKEN}", | |
}) | |
# Add the required header for attachment uploads. | |
jira_client._session.headers["X-Atlassian-Token"] = "no-check" | |
print(f"[INFO] Successfully connected to Jira using {'basic auth' if 'atlassian.net' in JIRA_SERVER else 'PAT'}") | |
# Verify v3 API capabilities | |
if verify_jira_v3_api(jira_client): | |
print("[INFO] JIRA v3 API verification successful.") | |
else: | |
print("[WARN] JIRA v3 API verification failed - some features may not work properly.") | |
except Exception as e: | |
print(f"[ERROR] Connecting to Jira: {e}") | |
return | |
print("[INFO] Jira Manager started. Polling for domain changes...") | |
# Start the polling in a background thread | |
import threading | |
polling_thread = threading.Thread(target=poll_domains, args=(domain_statuses,), daemon=True) | |
polling_thread.start() | |
# Start the Flask app for direct API calls | |
app.run(host='0.0.0.0', port=8080) | |
# API endpoint to directly update a JIRA ticket's labels based on domain data | |
@app.route('/update-labels', methods=['POST']) | |
def api_update_labels(): | |
global jira_client | |
if not jira_client: | |
return jsonify({"error": "JIRA client not initialized"}), 500 | |
try: | |
data = request.json | |
if not data or 'domain' not in data: | |
return jsonify({"error": "Missing domain data"}), 400 | |
domain = data['domain'] | |
jira_ticket = domain.get('jira_ticket') | |
if not jira_ticket: | |
return jsonify({"error": "No JIRA ticket associated with this domain"}), 400 | |
# Update the JIRA ticket labels | |
result = update_jira_labels(jira_client, jira_ticket, domain) | |
if result: | |
return jsonify({"success": True, "message": "JIRA labels updated"}) | |
else: | |
return jsonify({"error": "Failed to update JIRA labels"}), 500 | |
except Exception as e: | |
return jsonify({"error": f"Exception: {str(e)}"}), 500 | |
@app.route('/update-custom-fields', methods=['POST']) | |
def api_update_custom_fields(): | |
global jira_client | |
if not jira_client: | |
return jsonify({"error": "JIRA client not initialized"}), 500 | |
try: | |
data = request.json | |
if not data or 'domain' not in data: | |
return jsonify({"error": "Missing domain data"}), 400 | |
domain = data['domain'] | |
jira_ticket = domain.get('jira_ticket') | |
if not jira_ticket: | |
return jsonify({"error": "No JIRA ticket associated with this domain"}), 400 | |
# Update both labels and custom fields | |
labels_result = update_jira_labels(jira_client, jira_ticket, domain) | |
fields_result = update_jira_custom_fields(jira_client, jira_ticket, domain) | |
if labels_result and fields_result: | |
return jsonify({"success": True, "message": "JIRA ticket updated"}) | |
else: | |
return jsonify({"error": "Failed to update JIRA ticket"}), 500 | |
except Exception as e: | |
return jsonify({"error": f"Exception: {str(e)}"}), 500 | |
def create_adf_description(domain): | |
""" | |
Create an ADF (Atlassian Document Format) description for better formatting in JIRA v3. | |
Falls back to plain text if ADF formatting fails or is disabled. | |
""" | |
# Check if ADF is enabled in configuration | |
if not JIRA_USE_ADF: | |
return None | |
try: | |
# Build structured content using ADF format | |
content = [] | |
# Main heading | |
content.append({ | |
"type": "heading", | |
"attrs": {"level": 3}, | |
"content": [{"type": "text", "text": "Domain Abuse Details"}] | |
}) | |
# Basic info table | |
table_rows = [ | |
["URL", domain['url']], | |
["Status", domain['status']], | |
["Page Title", domain.get('page_title', 'N/A')], | |
["Last Checked", domain.get('last_checked', 'N/A')] | |
] | |
# Add additional fields if available | |
if domain.get("hosting_provider"): | |
table_rows.append(["Hosting Provider", domain.get('hosting_provider')]) | |
if domain.get("target_country"): | |
country_name = COUNTRY_CODE_TO_NAME.get(domain["target_country"].lower(), domain["target_country"]) | |
table_rows.append(["Target Country", country_name]) | |
if domain.get("hosting_location"): | |
location_name = COUNTRY_CODE_TO_NAME.get(domain["hosting_location"].lower(), domain["hosting_location"]) | |
table_rows.append(["Hosting Location", location_name]) | |
if domain.get("abuse_type"): | |
table_rows.append(["Abuse Type", domain.get('abuse_type')]) | |
# Create table structure | |
table_content = { | |
"type": "table", | |
"attrs": {"isNumberColumnEnabled": False, "layout": "default"}, | |
"content": [] | |
} | |
for row_data in table_rows: | |
row = { | |
"type": "tableRow", | |
"content": [ | |
{ | |
"type": "tableHeader", | |
"content": [{"type": "paragraph", "content": [{"type": "text", "text": row_data[0]}]}] | |
}, | |
{ | |
"type": "tableCell", | |
"content": [{"type": "paragraph", "content": [{"type": "text", "text": str(row_data[1])}]}] | |
} | |
] | |
} | |
table_content["content"].append(row) | |
content.append(table_content) | |
# Add note about preview image | |
content.append({ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": "Please refer to the attached preview image for more details."}] | |
}) | |
# Add takedown info if present (above Notes section) | |
if domain.get("report_abuse") and domain.get("report_abuse_timestamp"): | |
content.append({ | |
"type": "heading", | |
"attrs": {"level": 3}, | |
"content": [{"type": "text", "text": "Takedown Reported"}] | |
}) | |
content.append({ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": f"Takedown submitted at {domain.get('report_abuse_timestamp')}."}] | |
}) | |
# Add Notes section if notes exist | |
notes = domain.get('notes') | |
if notes and notes.strip(): | |
content.append({ | |
"type": "heading", | |
"attrs": {"level": 3}, | |
"content": [{"type": "text", "text": "Notes"}] | |
}) | |
content.append({ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": notes.strip()}] | |
}) | |
# Return ADF structure | |
return { | |
"type": "doc", | |
"version": 1, | |
"content": content | |
} | |
except Exception as e: | |
print(f"[WARN] Failed to create ADF description: {e}. Falling back to plain text.") | |
return None | |
def create_adf_comment(text): | |
""" | |
Create an ADF formatted comment for JIRA v3. | |
Falls back to plain text if ADF formatting fails or is disabled. | |
""" | |
# Check if ADF is enabled in configuration | |
if not JIRA_USE_ADF: | |
return None | |
try: | |
return { | |
"type": "doc", | |
"version": 1, | |
"content": [{ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": text}] | |
}] | |
} | |
except Exception: | |
return None | |
def verify_jira_v3_api(jira_client): | |
""" | |
Verify JIRA v3 API connectivity and capabilities. | |
This helps ensure the client is properly configured for v3 API features. | |
""" | |
try: | |
# Display configuration status | |
print(f"[INFO] JIRA API Configuration:") | |
print(f"[INFO] - Server: {JIRA_SERVER}") | |
print(f"[INFO] - Email: {JIRA_EMAIL}") | |
print(f"[INFO] - API Version: {JIRA_API_VERSION}") | |
print(f"[INFO] - ADF Format: {'Enabled' if JIRA_USE_ADF else 'Disabled'}") | |
# Check if email is properly configured for JIRA Cloud | |
if 'atlassian.net' in JIRA_SERVER and JIRA_EMAIL == 'your-email@sixt.com': | |
print(f"[WARN] Please update JIRA_EMAIL in config.py or set JIRA_EMAIL environment variable") | |
print(f"[WARN] Current email placeholder: {JIRA_EMAIL}") | |
# Test basic connectivity | |
server_info = jira_client.server_info() | |
print(f"[INFO] Connected to JIRA server: {server_info.get('serverTitle', 'Unknown')}") | |
print(f"[INFO] JIRA version: {server_info.get('version', 'Unknown')}") | |
# Test if we can access the configured project | |
try: | |
project = jira_client.project(JIRA_PROJECT_KEY) | |
print(f"[INFO] Successfully accessed project: {project.name} ({project.key})") | |
except Exception as e: | |
print(f"[WARN] Could not access project {JIRA_PROJECT_KEY}: {e}") | |
# Test if we can list issue types to verify permissions | |
try: | |
issue_types = jira_client.issue_types() | |
available_types = [it.name for it in issue_types] | |
if JIRA_ISSUE_TYPE in available_types: | |
print(f"[INFO] Issue type '{JIRA_ISSUE_TYPE}' is available") | |
else: | |
print(f"[WARN] Issue type '{JIRA_ISSUE_TYPE}' not found. Available: {available_types}") | |
except Exception as e: | |
print(f"[WARN] Could not retrieve issue types: {e}") | |
# Verify v3 API specific features | |
try: | |
# Test if server supports ADF (indicates v3 API support) | |
# This is a simple test - if we can create an issue with ADF content, v3 is supported | |
print(f"[INFO] JIRA Cloud v3 API features should be available") | |
if JIRA_USE_ADF: | |
print(f"[INFO] ADF formatting enabled for enhanced content display") | |
else: | |
print(f"[INFO] ADF formatting disabled - using plain text format") | |
return True | |
except Exception as e: | |
print(f"[WARN] V3 API features may not be fully available: {e}") | |
return False | |
except Exception as e: | |
print(f"[ERROR] Failed to verify JIRA API connectivity: {e}") | |
return False | |
def test_jira_authentication(): | |
""" | |
Test JIRA authentication with detailed debugging information. | |
This helps identify authentication issues before running the main application. | |
""" | |
print("[INFO] Testing JIRA authentication...") | |
print(f"[INFO] Server: {JIRA_SERVER}") | |
print(f"[INFO] Email: {JIRA_EMAIL}") | |
print(f"[INFO] API Token: {JIRA_API_TOKEN[:10]}...{JIRA_API_TOKEN[-4:] if len(JIRA_API_TOKEN) > 14 else '[SHORT_TOKEN]'}") | |
try: | |
# Test with direct requests first | |
import requests | |
from base64 import b64encode | |
# Create basic auth header | |
credentials = f"{JIRA_EMAIL}:{JIRA_API_TOKEN}" | |
encoded_credentials = b64encode(credentials.encode()).decode() | |
headers = { | |
'Authorization': f'Basic {encoded_credentials}', | |
'Accept': 'application/json', | |
'Content-Type': 'application/json' | |
} | |
# Test 1: Get server info | |
print("\n[TEST 1] Testing server connection...") | |
response = requests.get(f"{JIRA_SERVER}/rest/api/3/serverInfo", headers=headers) | |
print(f"[TEST 1] Status Code: {response.status_code}") | |
if response.status_code == 200: | |
server_info = response.json() | |
print(f"[TEST 1] ✅ Server: {server_info.get('serverTitle', 'Unknown')}") | |
print(f"[TEST 1] ✅ Version: {server_info.get('version', 'Unknown')}") | |
else: | |
print(f"[TEST 1] ❌ Error: {response.text}") | |
return False | |
# Test 2: Get current user | |
print("\n[TEST 2] Testing user authentication...") | |
response = requests.get(f"{JIRA_SERVER}/rest/api/3/myself", headers=headers) | |
print(f"[TEST 2] Status Code: {response.status_code}") | |
if response.status_code == 200: | |
user_info = response.json() | |
print(f"[TEST 2] ✅ User: {user_info.get('displayName', 'Unknown')}") | |
print(f"[TEST 2] ✅ Email: {user_info.get('emailAddress', 'Unknown')}") | |
else: | |
print(f"[TEST 2] ❌ Error: {response.text}") | |
print(f"[TEST 2] ❌ This suggests invalid credentials or insufficient permissions") | |
return False | |
# Test 3: Test project access | |
print(f"\n[TEST 3] Testing project access for {JIRA_PROJECT_KEY}...") | |
response = requests.get(f"{JIRA_SERVER}/rest/api/3/project/{JIRA_PROJECT_KEY}", headers=headers) | |
print(f"[TEST 3] Status Code: {response.status_code}") | |
if response.status_code == 200: | |
project_info = response.json() | |
print(f"[TEST 3] ✅ Project: {project_info.get('name', 'Unknown')}") | |
print(f"[TEST 3] ✅ Key: {project_info.get('key', 'Unknown')}") | |
else: | |
print(f"[TEST 3] ❌ Error: {response.text}") | |
print(f"[TEST 3] ❌ No access to project {JIRA_PROJECT_KEY}") | |
return False | |
# Test 4: Test issue access (try to get a recent issue) | |
print(f"\n[TEST 4] Testing issue search in project {JIRA_PROJECT_KEY}...") | |
jql = f"project = {JIRA_PROJECT_KEY} ORDER BY created DESC" | |
response = requests.get( | |
f"{JIRA_SERVER}/rest/api/3/search", | |
headers=headers, | |
params={'jql': jql, 'maxResults': 1} | |
) | |
print(f"[TEST 4] Status Code: {response.status_code}") | |
if response.status_code == 200: | |
search_results = response.json() | |
total = search_results.get('total', 0) | |
print(f"[TEST 4] ✅ Found {total} issues in project") | |
if total > 0 and search_results.get('issues'): | |
recent_issue = search_results['issues'][0] | |
print(f"[TEST 4] ✅ Recent issue: {recent_issue.get('key', 'Unknown')}") | |
else: | |
print(f"[TEST 4] ❌ Error: {response.text}") | |
return False | |
print("\n[SUCCESS] All authentication tests passed! ✅") | |
return True | |
except Exception as e: | |
print(f"\n[ERROR] Authentication test failed: {e}") | |
return False | |
def extract_notes_from_description(jira_client, ticket): | |
""" | |
Extract the current notes from the ticket description. | |
Returns the notes content or empty string if no notes section found. | |
""" | |
try: | |
issue = jira_client.issue(ticket) | |
description = issue.fields.description | |
if not description: | |
return '' | |
# Handle PropertyHolder objects first | |
actual_description = description | |
if hasattr(description, '__class__') and 'PropertyHolder' in str(type(description)): | |
print(f"[DEBUG] Description is PropertyHolder, checking for ADF...") | |
# Check if this is an ADF PropertyHolder (has type, version, content) | |
if (hasattr(description, 'type') and hasattr(description, 'version') and | |
hasattr(description, 'content')): | |
doc_type = getattr(description, 'type', None) | |
print(f"[DEBUG] PropertyHolder doc type: {doc_type}") | |
if doc_type == 'doc': | |
print(f"[DEBUG] PropertyHolder is ADF document, reconstructing for notes extraction...") | |
reconstructed_adf = reconstruct_adf_from_propertyholder(description) | |
if reconstructed_adf: | |
actual_description = reconstructed_adf | |
print(f"[DEBUG] Successfully reconstructed ADF for notes extraction") | |
else: | |
print(f"[DEBUG] Failed to reconstruct ADF, using text extraction") | |
actual_description = extract_field_string_from_propertyholder(description) | |
else: | |
print(f"[DEBUG] PropertyHolder not ADF document, using text extraction") | |
actual_description = extract_field_string_from_propertyholder(description) | |
else: | |
print(f"[DEBUG] PropertyHolder missing ADF attributes, using text extraction") | |
# Use our existing PropertyHolder extraction logic | |
actual_description = extract_field_string_from_propertyholder(description) | |
print(f"[DEBUG] Extracted description type: {type(actual_description)}") | |
# If extraction returned a string that looks like JSON (ADF), try to parse it | |
if isinstance(actual_description, str) and actual_description.strip().startswith('{'): | |
try: | |
import json | |
actual_description = json.loads(actual_description) | |
print(f"[DEBUG] Parsed ADF from PropertyHolder string") | |
except: | |
print(f"[DEBUG] Failed to parse as JSON, treating as plain text") | |
# Handle both ADF and plain text descriptions | |
description_text = '' | |
if isinstance(actual_description, dict) and actual_description.get('type') == 'doc': | |
print(f"[DEBUG] Processing ADF description for notes extraction") | |
# ADF format - extract text content | |
content = actual_description.get('content', []) | |
text_parts = [] | |
for block in content: | |
if block.get('type') == 'paragraph': | |
paragraph_content = block.get('content', []) | |
for text_node in paragraph_content: | |
if text_node.get('type') == 'text': | |
text_parts.append(text_node.get('text', '')) | |
elif block.get('type') == 'heading': | |
heading_content = block.get('content', []) | |
for text_node in heading_content: | |
if text_node.get('type') == 'text': | |
text_parts.append(text_node.get('text', '')) | |
description_text = '\n'.join(text_parts) | |
else: | |
print(f"[DEBUG] Processing plain text description for notes extraction") | |
# Plain text or PropertyHolder - convert to string | |
description_text = str(actual_description) | |
# Look for Notes section | |
lines = description_text.split('\n') | |
notes_section = False | |
notes_lines = [] | |
for line in lines: | |
line = line.strip() | |
if line.lower().startswith('notes:') or line.lower() == 'notes': | |
notes_section = True | |
continue | |
elif notes_section: | |
# Stop if we hit another section or empty line after notes | |
if line.startswith('###') or line.startswith('##') or (not line and notes_lines): | |
break | |
if line: # Only add non-empty lines | |
notes_lines.append(line) | |
result = '\n'.join(notes_lines).strip() | |
print(f"[DEBUG] Extracted notes from description for ticket {ticket}: '{result}'") | |
return result | |
except Exception as e: | |
print(f"[ERROR] Failed to extract notes from description for ticket {ticket}: {e}") | |
return '' | |
def extract_field_string_from_propertyholder(field_value): | |
"""Helper function to extract string content from PropertyHolder, including ADF content""" | |
if field_value is None: | |
return '' | |
# First, try to reconstruct ADF if this looks like an ADF PropertyHolder | |
# Be more aggressive about detecting ADF - if it has type, version, and content, it's likely ADF | |
if (hasattr(field_value, 'type') and hasattr(field_value, 'version') and | |
hasattr(field_value, 'content')): | |
print(f"[DEBUG] Detected potential ADF PropertyHolder (has type, version, content), attempting reconstruction") | |
adf_doc = reconstruct_adf_from_propertyholder(field_value) | |
if adf_doc: | |
print(f"[DEBUG] Successfully reconstructed ADF document") | |
return adf_doc | |
else: | |
print(f"[DEBUG] ADF reconstruction failed, but PropertyHolder has ADF structure - creating fallback ADF") | |
# If reconstruction failed but this clearly has ADF structure, create a minimal ADF | |
try: | |
doc_type = getattr(field_value, 'type', None) | |
doc_version = getattr(field_value, 'version', None) | |
doc_content = getattr(field_value, 'content', None) | |
fallback_adf = { | |
"type": "doc", | |
"version": int(doc_version) if doc_version else 1, | |
"content": [{ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": "[Content could not be extracted from PropertyHolder]"}] | |
}] | |
} | |
if validate_adf_document(fallback_adf): | |
print(f"[DEBUG] Created fallback ADF document") | |
return fallback_adf | |
else: | |
print(f"[DEBUG] Fallback ADF validation failed") | |
except Exception as e: | |
print(f"[DEBUG] Error creating fallback ADF: {e}") | |
def extract_recursive(obj, depth=0): | |
indent = " " * depth | |
print(f"[DEBUG] {indent}extract_recursive: obj type = {type(obj)}") | |
if obj is None: | |
return '' | |
# If it's not a PropertyHolder-like object, return it as-is | |
if not hasattr(obj, '__class__') or 'PropertyHolder' not in str(type(obj)): | |
# Return dicts and lists as-is to preserve ADF structure | |
if isinstance(obj, (dict, list)): | |
return obj | |
return str(obj) | |
# It's a PropertyHolder, examine its attributes | |
attrs = [attr for attr in dir(obj) if not attr.startswith('_') and not callable(getattr(obj, attr, None))] | |
print(f"[DEBUG] {indent}PropertyHolder attributes: {attrs}") | |
# For ADF content, prioritize 'value' which might contain the full ADF structure | |
# Then try other attributes | |
for attr_name in ['value', 'text', 'displayName', 'name', 'content', 'key', 'id']: | |
if hasattr(obj, attr_name): | |
try: | |
attr_value = getattr(obj, attr_name) | |
print(f"[DEBUG] {indent}Found {attr_name} = {type(attr_value)}") | |
if attr_value is not None: | |
if isinstance(attr_value, dict): | |
# If it's a dict, check if it's ADF format | |
if attr_value.get('type') == 'doc': | |
print(f"[DEBUG] {indent}Found ADF document in {attr_name}") | |
return attr_value # Return ADF as-is | |
else: | |
print(f"[DEBUG] {indent}Found dict in {attr_name} but not ADF") | |
return attr_value # Return dict as-is | |
elif isinstance(attr_value, str): | |
# Check if string contains JSON (ADF) | |
if attr_value.strip().startswith('{') and 'type' in attr_value and 'doc' in attr_value: | |
try: | |
import json | |
parsed = json.loads(attr_value) | |
if parsed.get('type') == 'doc': | |
print(f"[DEBUG] {indent}Found ADF JSON string in {attr_name}") | |
return parsed # Return parsed ADF | |
except: | |
pass | |
print(f"[DEBUG] {indent}Found string in {attr_name}") | |
return attr_value | |
elif isinstance(attr_value, (list, tuple)): | |
# Handle list attributes - could be ADF content | |
if attr_name == 'content': | |
print(f"[DEBUG] {indent}Found content list in {attr_name}") | |
# For ADF content, return the whole structure | |
return attr_value | |
else: | |
# For other lists, try to extract text | |
results = [] | |
for item in attr_value: | |
item_result = extract_recursive(item, depth + 1) | |
if item_result: | |
if isinstance(item_result, str): | |
results.append(item_result) | |
else: | |
# If it's not a string, it might be ADF structure | |
return attr_value | |
return ', '.join(results) if results else attr_value | |
elif 'PropertyHolder' in str(type(attr_value)): | |
# Recursively extract from nested PropertyHolder | |
print(f"[DEBUG] {indent}Found nested PropertyHolder in {attr_name}") | |
return extract_recursive(attr_value, depth + 1) | |
else: | |
print(f"[DEBUG] {indent}Found other type in {attr_name}: {type(attr_value)}") | |
return str(attr_value) | |
except Exception as e: | |
print(f"[DEBUG] {indent}Error accessing {attr_name}: {e}") | |
continue | |
print(f"[DEBUG] {indent}No extractable content found in PropertyHolder") | |
return '' | |
result = extract_recursive(field_value) | |
print(f"[DEBUG] Final extracted from PropertyHolder: type={type(result)}") | |
if isinstance(result, dict) and result.get('type') == 'doc': | |
print(f"[DEBUG] Extracted ADF document successfully") | |
elif isinstance(result, str) and len(result) > 100: | |
print(f"[DEBUG] Extracted long string: {result[:100]}...") | |
else: | |
print(f"[DEBUG] Extracted: {result}") | |
return result | |
def update_description_with_notes(jira_client, ticket, new_notes): | |
""" | |
Update the ticket description with new notes, preserving the existing content. | |
""" | |
try: | |
issue = jira_client.issue(ticket) | |
current_description = issue.fields.description | |
if not current_description: | |
print(f"[WARN] No existing description found for ticket {ticket}") | |
return False | |
print(f"[DEBUG] Current description type: {type(current_description)}") | |
# CRITICAL SAFETY CHECK: Any PropertyHolder with type, version, content is ADF | |
if (hasattr(current_description, '__class__') and 'PropertyHolder' in str(type(current_description)) and | |
hasattr(current_description, 'type') and hasattr(current_description, 'version') and | |
hasattr(current_description, 'content')): | |
print(f"[DEBUG] SAFETY CHECK: PropertyHolder with ADF attributes detected - FORCING ADF format") | |
# Force ADF reconstruction | |
reconstructed_adf = reconstruct_adf_from_propertyholder(current_description) | |
if reconstructed_adf and validate_adf_document(reconstructed_adf): | |
print(f"[DEBUG] SAFETY CHECK: ADF reconstruction successful") | |
actual_description = reconstructed_adf | |
is_adf_format = True | |
# Update with ADF format | |
updated_description = update_adf_description_notes(actual_description, new_notes) | |
if updated_description and validate_adf_document(updated_description): | |
print(f"[DEBUG] SAFETY CHECK: Attempting ADF update with type: {type(updated_description)}") | |
issue.update(fields={"description": updated_description}) | |
print(f"[INFO] SAFETY CHECK: Successfully updated ADF description with notes for ticket {ticket}") | |
return True | |
else: | |
print(f"[ERROR] SAFETY CHECK: Failed to create valid ADF update") | |
else: | |
print(f"[DEBUG] SAFETY CHECK: ADF reconstruction failed, creating minimal ADF") | |
# Create a minimal valid ADF document | |
minimal_adf = { | |
"type": "doc", | |
"version": 1, | |
"content": [ | |
{ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": "Domain information (content extraction failed)"}] | |
} | |
] | |
} | |
# Add notes section | |
if new_notes and new_notes.strip(): | |
minimal_adf["content"].extend([ | |
{ | |
"type": "heading", | |
"attrs": {"level": 3}, | |
"content": [{"type": "text", "text": "Notes"}] | |
}, | |
{ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": new_notes.strip()}] | |
} | |
]) | |
if validate_adf_document(minimal_adf): | |
print(f"[DEBUG] SAFETY CHECK: Using minimal ADF with notes") | |
issue.update(fields={"description": minimal_adf}) | |
print(f"[INFO] SAFETY CHECK: Successfully updated with minimal ADF for ticket {ticket}") | |
return True | |
else: | |
print(f"[ERROR] SAFETY CHECK: Even minimal ADF validation failed") | |
# Handle PropertyHolder objects first to determine the actual format | |
actual_description = current_description | |
is_adf_format = False | |
if hasattr(current_description, '__class__') and 'PropertyHolder' in str(type(current_description)): | |
print(f"[DEBUG] Description is PropertyHolder, checking for ADF format...") | |
# Check if this is an ADF PropertyHolder (has type, version, content) | |
has_type = hasattr(current_description, 'type') | |
has_version = hasattr(current_description, 'version') | |
has_content = hasattr(current_description, 'content') | |
print(f"[DEBUG] PropertyHolder attributes check: type={has_type}, version={has_version}, content={has_content}") | |
if has_type and has_version and has_content: | |
try: | |
doc_type = getattr(current_description, 'type', None) | |
doc_version = getattr(current_description, 'version', None) | |
doc_content = getattr(current_description, 'content', None) | |
print(f"[DEBUG] PropertyHolder values: type='{doc_type}', version='{doc_version}', content_type={type(doc_content)}") | |
# This is clearly an ADF PropertyHolder - try to reconstruct it | |
if doc_type == 'doc' or (doc_type and 'doc' in str(doc_type).lower()) or (has_type and has_version and has_content): | |
print(f"[DEBUG] PropertyHolder detected as ADF document, attempting reconstruction...") | |
reconstructed_adf = reconstruct_adf_from_propertyholder(current_description) | |
if reconstructed_adf: | |
actual_description = reconstructed_adf | |
is_adf_format = True | |
print(f"[DEBUG] Successfully reconstructed ADF from PropertyHolder") | |
else: | |
print(f"[DEBUG] ADF reconstruction failed, this PropertyHolder has ADF structure but reconstruction failed") | |
# Since this is clearly ADF format (has type, version, content), we should treat it as ADF | |
# Try a simpler approach - create a basic ADF structure | |
print(f"[DEBUG] Attempting manual ADF reconstruction for PropertyHolder with ADF attributes...") | |
try: | |
manual_adf = { | |
"type": "doc", | |
"version": int(doc_version) if doc_version else 1, | |
"content": [] | |
} | |
# Try to extract text content and create simple paragraphs | |
if doc_content and isinstance(doc_content, (list, tuple)): | |
for item in doc_content: | |
if hasattr(item, '__class__') and 'PropertyHolder' in str(type(item)): | |
# Try to extract text from this item | |
item_type = getattr(item, 'type', 'paragraph') | |
if item_type in ['paragraph', 'heading']: | |
manual_adf["content"].append({ | |
"type": item_type, | |
"content": [{"type": "text", "text": ""}] | |
}) | |
if manual_adf["content"]: | |
actual_description = manual_adf | |
is_adf_format = True | |
print(f"[DEBUG] Manual ADF reconstruction successful") | |
else: | |
print(f"[DEBUG] Manual ADF reconstruction failed, falling back to text") | |
actual_description = "[ADF Content - Could not extract]" | |
except Exception as e: | |
print(f"[DEBUG] Manual ADF reconstruction error: {e}") | |
actual_description = "[ADF Content - Extraction failed]" | |
else: | |
print(f"[DEBUG] PropertyHolder has ADF attributes but type is not 'doc': {doc_type}") | |
# Still try ADF reconstruction since it has the right attributes | |
reconstructed_adf = reconstruct_adf_from_propertyholder(current_description) | |
if reconstructed_adf: | |
actual_description = reconstructed_adf | |
is_adf_format = True | |
print(f"[DEBUG] Successfully reconstructed ADF despite non-'doc' type") | |
else: | |
extracted_content = extract_field_string_from_propertyholder(current_description) | |
actual_description = extracted_content | |
print(f"[DEBUG] PropertyHolder extraction completed, treating as text") | |
except Exception as e: | |
print(f"[DEBUG] Error accessing PropertyHolder attributes: {e}") | |
extracted_content = extract_field_string_from_propertyholder(current_description) | |
actual_description = extracted_content | |
else: | |
print(f"[DEBUG] PropertyHolder missing ADF attributes, using text extraction") | |
extracted_content = extract_field_string_from_propertyholder(current_description) | |
actual_description = extracted_content | |
elif isinstance(current_description, dict) and current_description.get('type') == 'doc': | |
actual_description = current_description | |
is_adf_format = True | |
print(f"[DEBUG] Description is direct ADF format") | |
else: | |
actual_description = current_description | |
print(f"[DEBUG] Description is plain text or other format") | |
print(f"[DEBUG] Final format detection: is_adf_format={is_adf_format}, actual_description_type={type(actual_description)}") | |
# Update based on the detected format | |
if is_adf_format: | |
print(f"[DEBUG] Updating ADF description with notes") | |
updated_description = update_adf_description_notes(actual_description, new_notes) | |
else: | |
print(f"[DEBUG] Updating plain text description with notes") | |
updated_description = update_text_description_notes(str(actual_description), new_notes) | |
if updated_description: | |
print(f"[DEBUG] Attempting to update description with type: {type(updated_description)}") | |
issue.update(fields={"description": updated_description}) | |
print(f"[INFO] Updated description with new notes for ticket {ticket}") | |
return True | |
else: | |
print(f"[WARN] Failed to create updated description for ticket {ticket}") | |
return False | |
except Exception as e: | |
print(f"[ERROR] Failed to update description for ticket {ticket}: {e}") | |
print(f"[ERROR] Description type was: {type(current_description) if 'current_description' in locals() else 'unknown'}") | |
if 'updated_description' in locals(): | |
print(f"[ERROR] Updated description type was: {type(updated_description)}") | |
return False | |
def update_adf_description_notes(adf_description, new_notes): | |
""" | |
Update ADF description with new notes section. | |
""" | |
try: | |
content = adf_description.get('content', []) | |
updated_content = [] | |
notes_section_found = False | |
in_notes_section = False | |
# Process existing content and replace notes section | |
for block in content: | |
if block.get('type') == 'heading': | |
heading_text = '' | |
heading_content = block.get('content', []) | |
for text_node in heading_content: | |
if text_node.get('type') == 'text': | |
heading_text += text_node.get('text', '') | |
if 'notes' in heading_text.lower(): | |
notes_section_found = True | |
in_notes_section = True | |
# Add updated notes section | |
updated_content.append(block) # Keep the heading | |
if new_notes and new_notes.strip(): | |
# Add notes content as paragraph | |
updated_content.append({ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": new_notes.strip()}] | |
}) | |
continue | |
else: | |
# Any other heading ends the notes section | |
in_notes_section = False | |
updated_content.append(block) | |
elif in_notes_section: | |
# Skip ALL content in the notes section (paragraphs, etc.) | |
continue | |
else: | |
updated_content.append(block) | |
# If no notes section found, add one at the end | |
if not notes_section_found and new_notes and new_notes.strip(): | |
updated_content.append({ | |
"type": "heading", | |
"attrs": {"level": 3}, | |
"content": [{"type": "text", "text": "Notes"}] | |
}) | |
updated_content.append({ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": new_notes.strip()}] | |
}) | |
return { | |
"type": "doc", | |
"version": 1, | |
"content": updated_content | |
} | |
except Exception as e: | |
print(f"[ERROR] Failed to update ADF description with notes: {e}") | |
return None | |
def update_text_description_notes(text_description, new_notes): | |
""" | |
Update plain text description with new notes section. | |
""" | |
try: | |
lines = text_description.split('\n') | |
updated_lines = [] | |
notes_section = False | |
notes_section_processed = False | |
for line in lines: | |
line_stripped = line.strip() | |
if line_stripped.lower().startswith('notes:') or line_stripped.lower() == 'notes' or line_stripped.lower().startswith('### notes'): | |
notes_section = True | |
notes_section_processed = True | |
updated_lines.append(line) | |
# Add new notes content | |
if new_notes and new_notes.strip(): | |
updated_lines.append(new_notes.strip()) | |
continue | |
elif notes_section: | |
# Skip lines until we hit another section (any heading) or end | |
if line_stripped.startswith('##') or line_stripped.startswith('###') or line_stripped.lower().startswith('takedown'): | |
notes_section = False | |
updated_lines.append(line) | |
# Skip ALL old notes content - don't add the line | |
continue | |
else: | |
updated_lines.append(line) | |
# If no notes section found, add one at the end | |
if not notes_section_processed and new_notes and new_notes.strip(): | |
updated_lines.append('') | |
updated_lines.append('### Notes') | |
updated_lines.append(new_notes.strip()) | |
return '\n'.join(updated_lines) | |
except Exception as e: | |
print(f"[ERROR] Failed to update text description with notes: {e}") | |
return None | |
def validate_adf_document(adf_doc): | |
""" | |
Validate that an ADF document has the required structure. | |
""" | |
if not isinstance(adf_doc, dict): | |
print(f"[DEBUG] ADF validation failed: not a dict") | |
return False | |
if adf_doc.get('type') != 'doc': | |
print(f"[DEBUG] ADF validation failed: type is not 'doc', got {adf_doc.get('type')}") | |
return False | |
if not isinstance(adf_doc.get('content'), list): | |
print(f"[DEBUG] ADF validation failed: content is not a list") | |
return False | |
if 'version' not in adf_doc: | |
print(f"[DEBUG] ADF validation warning: no version specified, adding default") | |
adf_doc['version'] = 1 | |
print(f"[DEBUG] ADF validation passed: doc with {len(adf_doc['content'])} content items") | |
return True | |
def reconstruct_adf_from_propertyholder(property_holder): | |
""" | |
Reconstruct proper ADF format from PropertyHolder objects. | |
This handles cases where JIRA returns PropertyHolder objects that represent ADF content. | |
""" | |
if not property_holder or not hasattr(property_holder, '__class__') or 'PropertyHolder' not in str(type(property_holder)): | |
return None | |
print(f"[DEBUG] Reconstructing ADF from PropertyHolder") | |
try: | |
# Check if this looks like an ADF document root (has type, version, content) | |
if (hasattr(property_holder, 'type') and hasattr(property_holder, 'version') and | |
hasattr(property_holder, 'content')): | |
doc_type = getattr(property_holder, 'type', None) | |
version = getattr(property_holder, 'version', None) | |
content = getattr(property_holder, 'content', None) | |
print(f"[DEBUG] Found ADF document: type={doc_type}, version={version}, content_type={type(content)}") | |
if doc_type == 'doc' and content is not None: | |
# Reconstruct the content array | |
reconstructed_content = [] | |
if isinstance(content, (list, tuple)): | |
print(f"[DEBUG] Processing {len(content)} content items") | |
for i, item in enumerate(content): | |
print(f"[DEBUG] Processing content item {i}: {type(item)}") | |
reconstructed_item = reconstruct_adf_content_item(item) | |
if reconstructed_item: | |
reconstructed_content.append(reconstructed_item) | |
print(f"[DEBUG] Successfully added content item {i}") | |
else: | |
print(f"[DEBUG] Failed to reconstruct content item {i}") | |
adf_doc = { | |
"type": "doc", | |
"version": int(version) if version else 1, | |
"content": reconstructed_content | |
} | |
print(f"[DEBUG] Reconstructed ADF document with {len(reconstructed_content)} content items") | |
# Validate the reconstructed ADF | |
if validate_adf_document(adf_doc): | |
print(f"[DEBUG] ADF reconstruction successful and validated") | |
return adf_doc | |
else: | |
print(f"[ERROR] ADF reconstruction failed validation") | |
return None | |
print(f"[DEBUG] PropertyHolder does not contain ADF document structure") | |
return None | |
except Exception as e: | |
print(f"[ERROR] Exception during ADF reconstruction: {e}") | |
return None | |
def reconstruct_adf_content_item(item): | |
""" | |
Reconstruct individual ADF content items (paragraphs, headings, etc.) from PropertyHolder. | |
""" | |
if not item or not hasattr(item, '__class__') or 'PropertyHolder' not in str(type(item)): | |
print(f"[DEBUG] Item is not PropertyHolder: {type(item)}") | |
return None | |
print(f"[DEBUG] Reconstructing content item: {type(item)}") | |
try: | |
# Get the type and content of this item | |
item_type = getattr(item, 'type', None) | |
item_content = getattr(item, 'content', None) | |
item_attrs = getattr(item, 'attrs', None) | |
# For text nodes, also check for 'text' attribute directly | |
item_text = getattr(item, 'text', None) if item_type == 'text' else None | |
print(f"[DEBUG] Content item: type={item_type}, has_content={item_content is not None}, has_attrs={item_attrs is not None}, has_text={item_text is not None}") | |
if not item_type: | |
print(f"[DEBUG] No type found for content item") | |
return None | |
# Build the content item | |
content_item = {"type": item_type} | |
# Add attributes if present | |
if item_attrs: | |
try: | |
if hasattr(item_attrs, '__dict__'): | |
# Convert PropertyHolder attrs to dict | |
attrs_dict = {} | |
for attr_name in dir(item_attrs): | |
if not attr_name.startswith('_') and not callable(getattr(item_attrs, attr_name, None)): | |
attr_value = getattr(item_attrs, attr_name, None) | |
if attr_value is not None: | |
attrs_dict[attr_name] = attr_value | |
if attrs_dict: | |
content_item["attrs"] = attrs_dict | |
print(f"[DEBUG] Added attrs from PropertyHolder: {attrs_dict}") | |
elif isinstance(item_attrs, dict): | |
content_item["attrs"] = item_attrs | |
print(f"[DEBUG] Added attrs from dict: {item_attrs}") | |
else: | |
# Try to convert to dict if it's a simple value | |
content_item["attrs"] = item_attrs | |
print(f"[DEBUG] Added attrs as-is: {item_attrs}") | |
except Exception as e: | |
print(f"[DEBUG] Error processing attrs: {e}") | |
# Handle different content types | |
if item_type == 'text': | |
# Text nodes require a 'text' property with actual content | |
text_content = None | |
# Try to get text from the 'text' attribute first | |
if item_text is not None: | |
text_content = str(item_text).strip() | |
print(f"[DEBUG] Found text in 'text' attribute: '{text_content}'") | |
# If no text attribute or empty, try content | |
if not text_content and item_content is not None: | |
if isinstance(item_content, str): | |
text_content = item_content.strip() | |
print(f"[DEBUG] Found text in 'content' attribute: '{text_content}'") | |
elif isinstance(item_content, (list, tuple)) and len(item_content) > 0: | |
# Sometimes text is in a list | |
first_item = item_content[0] | |
if isinstance(first_item, str): | |
text_content = first_item.strip() | |
print(f"[DEBUG] Found text in content list: '{text_content}'") | |
# Text nodes MUST have text content - skip if empty | |
if text_content: | |
content_item["text"] = text_content | |
print(f"[DEBUG] Created text node with content: '{text_content}'") | |
else: | |
print(f"[DEBUG] Skipping empty text node") | |
return None | |
elif item_type == 'hardBreak': | |
# Hard breaks don't need content - they're self-closing | |
print(f"[DEBUG] Created hardBreak node (no content needed)") | |
elif item_content is not None: | |
# For other node types (paragraph, heading, etc.), process content | |
try: | |
if isinstance(item_content, (list, tuple)): | |
# Recursively process content array | |
reconstructed_children = [] | |
print(f"[DEBUG] Processing {len(item_content)} child items") | |
for i, child in enumerate(item_content): | |
print(f"[DEBUG] Processing child {i}: {type(child)}") | |
if hasattr(child, '__class__') and 'PropertyHolder' in str(type(child)): | |
child_item = reconstruct_adf_content_item(child) | |
if child_item: | |
reconstructed_children.append(child_item) | |
print(f"[DEBUG] Added reconstructed child {i}") | |
else: | |
print(f"[DEBUG] Skipped empty child {i}") | |
elif isinstance(child, dict): | |
reconstructed_children.append(child) | |
print(f"[DEBUG] Added dict child {i}") | |
elif isinstance(child, str) and child.strip(): | |
# Handle text content - wrap in text node if needed | |
if item_type == 'paragraph': | |
reconstructed_children.append({"type": "text", "text": child.strip()}) | |
print(f"[DEBUG] Added text child {i}: {child.strip()}") | |
else: | |
reconstructed_children.append(child) | |
else: | |
print(f"[DEBUG] Skipped empty or unknown child type {i}: {type(child)}") | |
# Only add content if we have valid children | |
if reconstructed_children: | |
content_item["content"] = reconstructed_children | |
print(f"[DEBUG] Added {len(reconstructed_children)} content children") | |
elif item_type in ['paragraph', 'heading']: | |
# For paragraphs/headings, we need at least one child - add empty text | |
content_item["content"] = [{"type": "text", "text": ""}] | |
print(f"[DEBUG] Added empty text child for {item_type}") | |
elif isinstance(item_content, str) and item_content.strip(): | |
# Single text content - wrap in text node if this is a paragraph | |
if item_type == 'paragraph': | |
content_item["content"] = [{"type": "text", "text": item_content.strip()}] | |
print(f"[DEBUG] Added wrapped text content: {item_content.strip()}") | |
else: | |
content_item["content"] = item_content.strip() | |
print(f"[DEBUG] Added raw content: {item_content.strip()}") | |
else: | |
print(f"[DEBUG] No valid content found for {item_type}") | |
# For structural elements that need content, add empty text | |
if item_type in ['paragraph', 'heading']: | |
content_item["content"] = [{"type": "text", "text": ""}] | |
print(f"[DEBUG] Added empty content for structural element {item_type}") | |
except Exception as e: | |
print(f"[DEBUG] Error processing content: {e}") | |
print(f"[DEBUG] Reconstructed {item_type} item successfully") | |
return content_item | |
except Exception as e: | |
print(f"[ERROR] Failed to reconstruct content item: {e}") | |
return None | |
def update_description_with_takedown(jira_client, ticket, takedown_info): | |
""" | |
Update the ticket description with takedown information, preserving the existing content. | |
Adds takedown section above Notes section. | |
""" | |
try: | |
issue = jira_client.issue(ticket) | |
current_description = issue.fields.description | |
if not current_description: | |
print(f"[WARN] No existing description found for ticket {ticket}") | |
return False | |
print(f"[DEBUG] Current description type for takedown update: {type(current_description)}") | |
# CRITICAL SAFETY CHECK: Any PropertyHolder with type, version, content is ADF | |
if (hasattr(current_description, '__class__') and 'PropertyHolder' in str(type(current_description)) and | |
hasattr(current_description, 'type') and hasattr(current_description, 'version') and | |
hasattr(current_description, 'content')): | |
print(f"[DEBUG] TAKEDOWN SAFETY CHECK: PropertyHolder with ADF attributes detected - FORCING ADF format") | |
# Force ADF reconstruction | |
reconstructed_adf = reconstruct_adf_from_propertyholder(current_description) | |
if reconstructed_adf and validate_adf_document(reconstructed_adf): | |
print(f"[DEBUG] TAKEDOWN SAFETY CHECK: ADF reconstruction successful") | |
actual_description = reconstructed_adf | |
is_adf_format = True | |
# Update with ADF format | |
updated_description = update_adf_description_takedown(actual_description, takedown_info) | |
if updated_description and validate_adf_document(updated_description): | |
print(f"[DEBUG] TAKEDOWN SAFETY CHECK: Attempting ADF update with type: {type(updated_description)}") | |
issue.update(fields={"description": updated_description}) | |
print(f"[INFO] TAKEDOWN SAFETY CHECK: Successfully updated ADF description with takedown for ticket {ticket}") | |
return True | |
else: | |
print(f"[ERROR] TAKEDOWN SAFETY CHECK: Failed to create valid ADF update") | |
else: | |
print(f"[DEBUG] TAKEDOWN SAFETY CHECK: ADF reconstruction failed, creating minimal ADF") | |
# Create a minimal valid ADF document with takedown info | |
minimal_adf = { | |
"type": "doc", | |
"version": 1, | |
"content": [ | |
{ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": "Domain information (content extraction failed)"}] | |
} | |
] | |
} | |
# Add takedown section | |
if takedown_info and takedown_info.strip(): | |
minimal_adf["content"].extend([ | |
{ | |
"type": "heading", | |
"attrs": {"level": 3}, | |
"content": [{"type": "text", "text": "Takedown Reported"}] | |
}, | |
{ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": takedown_info.strip()}] | |
} | |
]) | |
if validate_adf_document(minimal_adf): | |
print(f"[DEBUG] TAKEDOWN SAFETY CHECK: Using minimal ADF with takedown") | |
issue.update(fields={"description": minimal_adf}) | |
print(f"[INFO] TAKEDOWN SAFETY CHECK: Successfully updated with minimal ADF for ticket {ticket}") | |
return True | |
else: | |
print(f"[ERROR] TAKEDOWN SAFETY CHECK: Even minimal ADF validation failed") | |
# Handle other description formats | |
actual_description = current_description | |
is_adf_format = False | |
if hasattr(current_description, '__class__') and 'PropertyHolder' in str(type(current_description)): | |
extracted_content = extract_field_string_from_propertyholder(current_description) | |
if isinstance(extracted_content, dict) and extracted_content.get('type') == 'doc': | |
actual_description = extracted_content | |
is_adf_format = True | |
else: | |
actual_description = extracted_content | |
elif isinstance(current_description, dict) and current_description.get('type') == 'doc': | |
actual_description = current_description | |
is_adf_format = True | |
# Update based on the detected format | |
if is_adf_format: | |
print(f"[DEBUG] Updating ADF description with takedown") | |
updated_description = update_adf_description_takedown(actual_description, takedown_info) | |
else: | |
print(f"[DEBUG] Updating plain text description with takedown") | |
updated_description = update_text_description_takedown(str(actual_description), takedown_info) | |
if updated_description: | |
print(f"[DEBUG] Attempting to update description with takedown, type: {type(updated_description)}") | |
issue.update(fields={"description": updated_description}) | |
print(f"[INFO] Updated description with takedown information for ticket {ticket}") | |
return True | |
else: | |
print(f"[WARN] Failed to create updated description with takedown for ticket {ticket}") | |
return False | |
except Exception as e: | |
print(f"[ERROR] Failed to update description with takedown for ticket {ticket}: {e}") | |
return False | |
def update_adf_description_takedown(adf_description, takedown_info): | |
""" | |
Update ADF description with takedown section (above Notes section). | |
""" | |
try: | |
content = adf_description.get('content', []) | |
updated_content = [] | |
takedown_section_found = False | |
notes_section_index = -1 | |
# First pass: find existing takedown and notes sections | |
for i, block in enumerate(content): | |
if block.get('type') == 'heading': | |
heading_text = '' | |
heading_content = block.get('content', []) | |
for text_node in heading_content: | |
if text_node.get('type') == 'text': | |
heading_text += text_node.get('text', '') | |
if 'takedown' in heading_text.lower(): | |
takedown_section_found = True | |
elif 'notes' in heading_text.lower(): | |
notes_section_index = i | |
# Second pass: rebuild content with takedown section | |
i = 0 | |
while i < len(content): | |
block = content[i] | |
# Check if we're at the notes section and need to insert takedown before it | |
if (notes_section_index == i and not takedown_section_found and | |
takedown_info and takedown_info.strip()): | |
# Insert takedown section before notes | |
updated_content.append({ | |
"type": "heading", | |
"attrs": {"level": 3}, | |
"content": [{"type": "text", "text": "Takedown Reported"}] | |
}) | |
updated_content.append({ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": takedown_info.strip()}] | |
}) | |
takedown_section_found = True | |
# Handle existing takedown section | |
if block.get('type') == 'heading': | |
heading_text = '' | |
heading_content = block.get('content', []) | |
for text_node in heading_content: | |
if text_node.get('type') == 'text': | |
heading_text += text_node.get('text', '') | |
if 'takedown' in heading_text.lower(): | |
# Replace existing takedown section | |
updated_content.append(block) # Keep the heading | |
if takedown_info and takedown_info.strip(): | |
updated_content.append({ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": takedown_info.strip()}] | |
}) | |
# Skip the old takedown content (next paragraph) | |
if i + 1 < len(content) and content[i + 1].get('type') == 'paragraph': | |
i += 1 # Skip next paragraph | |
i += 1 | |
continue | |
else: | |
updated_content.append(block) | |
else: | |
updated_content.append(block) | |
i += 1 | |
# If no notes section found and no takedown section exists, add takedown at the end | |
if not takedown_section_found and takedown_info and takedown_info.strip(): | |
updated_content.append({ | |
"type": "heading", | |
"attrs": {"level": 3}, | |
"content": [{"type": "text", "text": "Takedown Reported"}] | |
}) | |
updated_content.append({ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": takedown_info.strip()}] | |
}) | |
return { | |
"type": "doc", | |
"version": 1, | |
"content": updated_content | |
} | |
except Exception as e: | |
print(f"[ERROR] Failed to update ADF description with takedown: {e}") | |
return None | |
def update_text_description_takedown(text_description, takedown_info): | |
""" | |
Update plain text description with takedown section (above Notes section). | |
""" | |
try: | |
lines = text_description.split('\n') | |
updated_lines = [] | |
takedown_section = False | |
takedown_section_processed = False | |
notes_section_index = -1 | |
# Find Notes section | |
for i, line in enumerate(lines): | |
if line.strip().lower().startswith('notes') or line.strip().lower() == 'notes': | |
notes_section_index = i | |
break | |
# Process lines | |
for i, line in enumerate(lines): | |
line_stripped = line.strip() | |
# Insert takedown before notes section if not already processed | |
if (i == notes_section_index and not takedown_section_processed and | |
takedown_info and takedown_info.strip()): | |
updated_lines.append('') | |
updated_lines.append('### Takedown Reported') | |
updated_lines.append(takedown_info.strip()) | |
takedown_section_processed = True | |
# Handle existing takedown section | |
if (line_stripped.lower().startswith('takedown') or | |
'takedown submitted' in line_stripped.lower() or | |
'takedown reported' in line_stripped.lower()): | |
takedown_section = True | |
takedown_section_processed = True | |
updated_lines.append('### Takedown Reported') | |
# Add new takedown content | |
if takedown_info and takedown_info.strip(): | |
updated_lines.append(takedown_info.strip()) | |
continue | |
elif takedown_section: | |
# Skip lines until we hit another section or notes | |
if (line_stripped.startswith('##') or line_stripped.startswith('###') or | |
line_stripped.lower().startswith('notes')): | |
takedown_section = False | |
updated_lines.append(line) | |
# Skip old takedown content | |
continue | |
else: | |
updated_lines.append(line) | |
# If no notes section found and no takedown section exists, add takedown at the end | |
if not takedown_section_processed and takedown_info and takedown_info.strip(): | |
updated_lines.append('') | |
updated_lines.append('### Takedown Reported') | |
updated_lines.append(takedown_info.strip()) | |
return '\n'.join(updated_lines) | |
except Exception as e: | |
print(f"[ERROR] Failed to update text description with takedown: {e}") | |
return None | |
def cleanup_duplicate_notes_in_all_tickets(): | |
""" | |
TEMPORARY CLEANUP FUNCTION - Run once to fix duplicate notes sections. | |
This function will find all tickets in the project and clean up duplicate Notes sections. | |
""" | |
if not jira_client: | |
print("[ERROR] JIRA client not initialized. Cannot run cleanup.") | |
return | |
print("[INFO] Starting cleanup of duplicate notes sections in all tickets...") | |
try: | |
# Get all tickets from the project using pagination | |
jql = f"project = {JIRA_PROJECT_KEY} ORDER BY created DESC" | |
print(f"[INFO] Searching for tickets with JQL: {jql}") | |
# Use pagination to get ALL tickets | |
all_issues = [] | |
start_at = 0 | |
max_results = 100 # JIRA's typical page size | |
while True: | |
print(f"[INFO] Fetching tickets {start_at} to {start_at + max_results}...") | |
issues_batch = jira_client.search_issues(jql, startAt=start_at, maxResults=max_results) | |
if not issues_batch: | |
break | |
all_issues.extend(issues_batch) | |
print(f"[INFO] Got {len(issues_batch)} tickets in this batch. Total so far: {len(all_issues)}") | |
# If we got fewer than max_results, we're at the end | |
if len(issues_batch) < max_results: | |
break | |
start_at += max_results | |
print(f"[INFO] Found {len(all_issues)} total tickets to check") | |
cleaned_count = 0 | |
error_count = 0 | |
for i, issue in enumerate(all_issues): | |
ticket_key = issue.key | |
try: | |
print(f"[INFO] Checking ticket {ticket_key} ({i + 1}/{len(all_issues)})...") | |
# Get the current description | |
description = issue.fields.description | |
if not description: | |
print(f"[DEBUG] Ticket {ticket_key} has no description, skipping") | |
continue | |
# Determine if this ticket needs cleaning | |
needs_cleaning = False | |
cleaned_description = None | |
# Handle PropertyHolder (ADF) descriptions | |
if hasattr(description, '__class__') and 'PropertyHolder' in str(type(description)): | |
print(f"[DEBUG] Ticket {ticket_key} has PropertyHolder description") | |
# Try to reconstruct ADF first | |
if (hasattr(description, 'type') and hasattr(description, 'version') and | |
hasattr(description, 'content')): | |
reconstructed_adf = reconstruct_adf_from_propertyholder(description) | |
if reconstructed_adf: | |
# Check if it has duplicate notes before cleaning | |
original_notes_count = count_notes_sections_adf(reconstructed_adf) | |
if original_notes_count > 1: | |
print(f"[INFO] Ticket {ticket_key} has {original_notes_count} notes sections - NEEDS CLEANING") | |
cleaned_description = cleanup_duplicate_notes_adf(reconstructed_adf) | |
needs_cleaning = True | |
else: | |
print(f"[DEBUG] Ticket {ticket_key} has {original_notes_count} notes sections - OK") | |
else: | |
# Fall back to text extraction | |
text_description = extract_field_string_from_propertyholder(description) | |
original_notes_count = count_notes_sections_text(str(text_description)) | |
if original_notes_count > 1: | |
print(f"[INFO] Ticket {ticket_key} has {original_notes_count} text notes sections - NEEDS CLEANING") | |
cleaned_description = cleanup_duplicate_notes_text(str(text_description)) | |
needs_cleaning = True | |
else: | |
print(f"[DEBUG] Ticket {ticket_key} has {original_notes_count} text notes sections - OK") | |
else: | |
# Extract as text | |
text_description = extract_field_string_from_propertyholder(description) | |
original_notes_count = count_notes_sections_text(str(text_description)) | |
if original_notes_count > 1: | |
print(f"[INFO] Ticket {ticket_key} has {original_notes_count} text notes sections - NEEDS CLEANING") | |
cleaned_description = cleanup_duplicate_notes_text(str(text_description)) | |
needs_cleaning = True | |
else: | |
print(f"[DEBUG] Ticket {ticket_key} has {original_notes_count} text notes sections - OK") | |
# Handle direct ADF descriptions | |
elif isinstance(description, dict) and description.get('type') == 'doc': | |
print(f"[DEBUG] Ticket {ticket_key} has direct ADF description") | |
original_notes_count = count_notes_sections_adf(description) | |
if original_notes_count > 1: | |
print(f"[INFO] Ticket {ticket_key} has {original_notes_count} ADF notes sections - NEEDS CLEANING") | |
cleaned_description = cleanup_duplicate_notes_adf(description) | |
needs_cleaning = True | |
else: | |
print(f"[DEBUG] Ticket {ticket_key} has {original_notes_count} ADF notes sections - OK") | |
# Handle plain text descriptions | |
else: | |
print(f"[DEBUG] Ticket {ticket_key} has plain text description") | |
text_desc = str(description) | |
original_notes_count = count_notes_sections_text(text_desc) | |
if original_notes_count > 1: | |
print(f"[INFO] Ticket {ticket_key} has {original_notes_count} text notes sections - NEEDS CLEANING") | |
cleaned_description = cleanup_duplicate_notes_text(text_desc) | |
needs_cleaning = True | |
else: | |
print(f"[DEBUG] Ticket {ticket_key} has {original_notes_count} text notes sections - OK") | |
# Update the ticket if cleaning is needed | |
if needs_cleaning and cleaned_description: | |
print(f"[INFO] Updating ticket {ticket_key} with cleaned description...") | |
issue.update(fields={"description": cleaned_description}) | |
# Add a comment about the cleanup | |
cleanup_comment = "Cleaned up duplicate notes sections caused by a system bug. No content was lost - duplicate sections were consolidated." | |
add_jira_comment(jira_client, ticket_key, cleanup_comment) | |
cleaned_count += 1 | |
print(f"[SUCCESS] Cleaned ticket {ticket_key}") | |
except Exception as e: | |
print(f"[ERROR] Failed to process ticket {ticket_key}: {e}") | |
error_count += 1 | |
continue | |
print(f"\n[CLEANUP SUMMARY]") | |
print(f"Total tickets checked: {len(all_issues)}") | |
print(f"Tickets cleaned: {cleaned_count}") | |
print(f"Errors encountered: {error_count}") | |
print(f"[INFO] Cleanup completed!") | |
except Exception as e: | |
print(f"[ERROR] Failed to run cleanup: {e}") | |
def count_notes_sections_adf(adf_description): | |
""" | |
Count how many notes sections exist in an ADF description. | |
""" | |
try: | |
content = adf_description.get('content', []) | |
notes_count = 0 | |
for block in content: | |
if block.get('type') == 'heading': | |
heading_text = '' | |
heading_content = block.get('content', []) | |
for text_node in heading_content: | |
if text_node.get('type') == 'text': | |
heading_text += text_node.get('text', '') | |
if 'notes' in heading_text.lower(): | |
notes_count += 1 | |
return notes_count | |
except Exception as e: | |
print(f"[ERROR] Failed to count ADF notes sections: {e}") | |
return 0 | |
def count_notes_sections_text(text_description): | |
""" | |
Count how many notes sections exist in a text description. | |
""" | |
try: | |
lines = text_description.split('\n') | |
notes_count = 0 | |
for line in lines: | |
line_stripped = line.strip() | |
if (line_stripped.lower().startswith('notes') or | |
line_stripped.lower().startswith('### notes') or | |
line_stripped.lower() == 'notes'): | |
notes_count += 1 | |
return notes_count | |
except Exception as e: | |
print(f"[ERROR] Failed to count text notes sections: {e}") | |
return 0 | |
def cleanup_duplicate_notes_adf(adf_description): | |
""" | |
Helper function to clean duplicate notes sections from ADF description. | |
""" | |
try: | |
content = adf_description.get('content', []) | |
cleaned_content = [] | |
first_notes_content = None | |
found_notes = False | |
# First pass: collect all content except notes sections, but keep the first notes content | |
i = 0 | |
while i < len(content): | |
block = content[i] | |
if block.get('type') == 'heading': | |
heading_text = '' | |
heading_content = block.get('content', []) | |
for text_node in heading_content: | |
if text_node.get('type') == 'text': | |
heading_text += text_node.get('text', '') | |
if 'notes' in heading_text.lower(): | |
# Found a notes section | |
if not found_notes: | |
# This is the first notes section - keep its content | |
found_notes = True | |
notes_content = [] | |
i += 1 # Move past the heading | |
# Collect all paragraphs until next heading or end | |
while i < len(content): | |
next_block = content[i] | |
if next_block.get('type') == 'heading': | |
break # Stop at next heading | |
notes_content.append(next_block) | |
i += 1 | |
# Extract text from the first notes content | |
if notes_content: | |
notes_text_parts = [] | |
for note_block in notes_content: | |
if note_block.get('type') == 'paragraph': | |
para_content = note_block.get('content', []) | |
for text_node in para_content: | |
if text_node.get('type') == 'text': | |
text = text_node.get('text', '').strip() | |
if text: | |
notes_text_parts.append(text) | |
if notes_text_parts: | |
first_notes_content = ' '.join(notes_text_parts) | |
continue # Don't increment i again | |
else: | |
# This is a duplicate notes section - skip it entirely | |
print(f"[DEBUG] Skipping duplicate notes section") | |
i += 1 # Move past the heading | |
# Skip all content until next heading or end | |
while i < len(content): | |
next_block = content[i] | |
if next_block.get('type') == 'heading': | |
break # Stop at next heading | |
i += 1 | |
continue # Don't increment i again | |
else: | |
cleaned_content.append(block) | |
else: | |
cleaned_content.append(block) | |
i += 1 | |
# If we found any notes sections (meaning we had duplicates), add back just one clean notes section | |
if found_notes: | |
print(f"[DEBUG] Found duplicate notes sections, keeping only the first one") | |
# Add a single notes section at the end with the first content we found | |
if first_notes_content: | |
cleaned_content.append({ | |
"type": "heading", | |
"attrs": {"level": 3}, | |
"content": [{"type": "text", "text": "Notes"}] | |
}) | |
cleaned_content.append({ | |
"type": "paragraph", | |
"content": [{"type": "text", "text": first_notes_content}] | |
}) | |
return { | |
"type": "doc", | |
"version": 1, | |
"content": cleaned_content | |
} | |
# No notes sections found or no changes needed | |
return adf_description | |
except Exception as e: | |
print(f"[ERROR] Failed to clean ADF notes: {e}") | |
return adf_description | |
def cleanup_duplicate_notes_text(text_description): | |
""" | |
Helper function to clean duplicate notes sections from plain text description. | |
""" | |
try: | |
lines = text_description.split('\n') | |
cleaned_lines = [] | |
first_notes_content = None | |
found_notes = False | |
in_notes = False | |
current_notes = [] | |
for line in lines: | |
line_stripped = line.strip() | |
# Check if this is a notes heading | |
if (line_stripped.lower().startswith('notes') or | |
line_stripped.lower().startswith('### notes') or | |
line_stripped.lower() == 'notes'): | |
if not found_notes: | |
# This is the first notes section - start collecting its content | |
found_notes = True | |
in_notes = True | |
current_notes = [] | |
else: | |
# This is a duplicate notes section - skip it | |
print(f"[DEBUG] Skipping duplicate text notes section") | |
in_notes = True # Set to true to skip the content | |
current_notes = [] # Reset but don't save | |
continue | |
# Check if we hit another section (end notes) | |
elif (line_stripped.startswith('###') or line_stripped.startswith('##') or | |
line_stripped.lower().startswith('takedown')): | |
if in_notes and current_notes and first_notes_content is None: | |
# Save the first notes content we found | |
first_notes_content = '\n'.join(current_notes).strip() | |
in_notes = False | |
cleaned_lines.append(line) | |
# If we're in the first notes section, collect the content | |
elif in_notes and not found_notes: | |
# This should never happen due to logic above, but safety check | |
if line.strip(): | |
current_notes.append(line.strip()) | |
elif in_notes and found_notes and first_notes_content is None: | |
# We're in the first notes section | |
if line.strip(): | |
current_notes.append(line.strip()) | |
elif in_notes: | |
# We're in a duplicate notes section - skip the content | |
continue | |
# Regular content (not in notes) | |
else: | |
cleaned_lines.append(line) | |
# Don't forget to save the notes if we ended in the first notes section | |
if in_notes and current_notes and first_notes_content is None: | |
first_notes_content = '\n'.join(current_notes).strip() | |
# If we found notes sections (meaning we had duplicates), add back just one | |
if found_notes: | |
print(f"[DEBUG] Found duplicate text notes sections, keeping only the first one") | |
# Add single notes section with first content | |
if first_notes_content: | |
cleaned_lines.append('') | |
cleaned_lines.append('### Notes') | |
cleaned_lines.append(first_notes_content) | |
return '\n'.join(cleaned_lines) | |
# No changes needed | |
return text_description | |
except Exception as e: | |
print(f"[ERROR] Failed to clean text notes: {e}") | |
return text_description | |
def normalize_content_for_comparison(content): | |
""" | |
Normalize content for accurate comparison by removing formatting differences. | |
""" | |
if not content: | |
return "" | |
# Convert to string and normalize whitespace | |
content_str = str(content).strip() | |
# Remove common formatting differences | |
content_str = content_str.replace('\r\n', '\n').replace('\r', '\n') | |
# Normalize multiple whitespace to single space | |
import re | |
content_str = re.sub(r'\s+', ' ', content_str) | |
# Remove trailing/leading whitespace | |
content_str = content_str.strip() | |
return content_str | |
def determine_correct_ticket_state(domain, jira_client, ticket): | |
""" | |
Determine the correct JIRA ticket state based on simple, clear rules. | |
Rules: | |
1. If domain status is "down" → ticket should be "Done" (regardless of anything else) | |
2. If domain status is not "down" and there's a takedown (report_abuse + timestamp) → ticket should be "In Review" | |
3. If domain status is not "down" and no takedown → ticket should be "In Progress" | |
""" | |
domain_status = domain.get('status', '').lower() | |
has_takedown = domain.get("report_abuse") and domain.get("report_abuse_timestamp") | |
print(f"[DEBUG] Determining state for ticket {ticket}: status='{domain_status}', has_takedown={has_takedown}") | |
if domain_status == "down": | |
target_state = "done" | |
print(f"[DEBUG] Domain is DOWN → target state: {target_state}") | |
elif has_takedown: | |
target_state = "review" | |
print(f"[DEBUG] Domain is UP with takedown → target state: {target_state}") | |
else: | |
target_state = "in_progress" | |
print(f"[DEBUG] Domain is UP without takedown → target state: {target_state}") | |
return target_state | |
def get_current_ticket_state(jira_client, ticket): | |
""" | |
Get the current state of a JIRA ticket. | |
""" | |
try: | |
issue = jira_client.issue(ticket) | |
current_status = safe_extract_status_name(issue.fields.status) | |
print(f"[DEBUG] Current ticket {ticket} state: '{current_status}'") | |
return current_status | |
except Exception as e: | |
print(f"[ERROR] Failed to get current state for ticket {ticket}: {e}") | |
return "unknown" | |
def transition_ticket_to_correct_state(jira_client, ticket, target_state, current_state): | |
""" | |
Transition a ticket to the correct state if it's not already there. | |
""" | |
# Normalize states for comparison | |
current_normalized = current_state.lower().replace(' ', '_') | |
target_normalized = target_state.lower().replace(' ', '_') | |
# Map of state equivalencies | |
done_states = ['done', 'closed', 'resolved', 'complete'] | |
review_states = ['review', 'in_review'] | |
progress_states = ['in_progress', 'to_do', 'open'] | |
# Check if we're already in the correct state | |
if target_normalized == 'done' and current_normalized in done_states: | |
print(f"[DEBUG] Ticket {ticket} already in correct done state: {current_state}") | |
return True | |
elif target_normalized == 'review' and current_normalized in review_states: | |
print(f"[DEBUG] Ticket {ticket} already in correct review state: {current_state}") | |
return True | |
elif target_normalized == 'in_progress' and current_normalized in progress_states: | |
print(f"[DEBUG] Ticket {ticket} already in correct progress state: {current_state}") | |
return True | |
# Need to transition - call appropriate function | |
if target_normalized == 'done': | |
print(f"[INFO] Transitioning ticket {ticket} to Done") | |
return close_jira_ticket(jira_client, ticket) | |
elif target_normalized == 'review': | |
print(f"[INFO] Transitioning ticket {ticket} to Review") | |
return transition_to_review(jira_client, ticket) | |
elif target_normalized == 'in_progress': | |
print(f"[INFO] Transitioning ticket {ticket} to In Progress") | |
return transition_to_in_progress(jira_client, ticket) | |
else: | |
print(f"[WARN] Unknown target state: {target_state}") | |
return False | |
def check_and_handle_takedown_updates(jira_client, ticket, domain): | |
""" | |
Check if takedown information needs to be updated and handle it properly. | |
Only update if the content has actually changed. | |
""" | |
takedown_requested = domain.get("report_abuse") and domain.get("report_abuse_timestamp") | |
if not takedown_requested: | |
print(f"[DEBUG] No takedown request for ticket {ticket}") | |
return False | |
abuse_timestamp = domain.get("report_abuse_timestamp") | |
new_takedown_info = f"Takedown submitted at {abuse_timestamp}." | |
takedown_comment = f"Takedown reported at {abuse_timestamp}." | |
print(f"[DEBUG] Checking takedown for ticket {ticket}: timestamp={abuse_timestamp}") | |
# PRIMARY CHECK: If this exact comment already exists, skip everything | |
if comment_exists(jira_client, ticket, takedown_comment): | |
print(f"[DEBUG] Takedown comment already exists for ticket {ticket}, skipping all updates") | |
return False | |
# SECONDARY CHECK: Check if this exact takedown info is already in description | |
if comment_exists(jira_client, ticket, f"Takedown submitted at {abuse_timestamp}"): | |
print(f"[DEBUG] Takedown info already exists in description for ticket {ticket}, skipping all updates") | |
return False | |
# TERTIARY CHECK: Look for any takedown with this timestamp in description | |
current_takedown = extract_takedown_from_description(jira_client, ticket) | |
if current_takedown and abuse_timestamp in current_takedown: | |
print(f"[DEBUG] Takedown timestamp {abuse_timestamp} already found in description for ticket {ticket}, skipping") | |
return False | |
print(f"[INFO] Adding new takedown info for ticket {ticket}: {new_takedown_info}") | |
# Update description with takedown information | |
if update_description_with_takedown(jira_client, ticket, new_takedown_info): | |
# Add comment about the takedown | |
add_jira_comment(jira_client, ticket, takedown_comment) | |
print(f"[INFO] Added takedown comment to ticket {ticket}") | |
return True | |
else: | |
print(f"[ERROR] Failed to update takedown description for ticket {ticket}") | |
return False | |
def check_and_handle_notes_updates(jira_client, ticket, domain): | |
""" | |
Check if notes need to be updated and handle them properly. | |
Only update if the content has actually changed. | |
""" | |
notes = domain.get('notes') | |
new_notes = notes.strip() if notes else "" | |
print(f"[DEBUG] Checking notes for ticket {ticket}: notes='{new_notes[:100]}{'...' if len(new_notes) > 100 else ''}'") | |
# If there are no new notes, check if we need to clear existing ones | |
if not new_notes: | |
current_notes = extract_notes_from_description(jira_client, ticket) | |
if not current_notes: | |
print(f"[DEBUG] No notes to add or clear for ticket {ticket}") | |
return False | |
# There are existing notes to clear | |
print(f"[INFO] Clearing notes for ticket {ticket}") | |
clear_comment = "Notes cleared from description." | |
# PRIMARY CHECK: If clear comment already exists, skip | |
if comment_exists(jira_client, ticket, clear_comment): | |
print(f"[DEBUG] Notes cleared comment already exists for ticket {ticket}, skipping") | |
return False | |
if update_description_with_notes(jira_client, ticket, ''): | |
add_jira_comment(jira_client, ticket, clear_comment) | |
print(f"[INFO] Added notes cleared comment to ticket {ticket}") | |
return True | |
return False | |
# There are new notes - check content before updating | |
# PRIMARY CHECK: If "Notes updated" comment with this exact content already exists, skip | |
notes_comment_content = f"Notes updated in description:\n{new_notes}" | |
if comment_exists(jira_client, ticket, notes_comment_content): | |
print(f"[DEBUG] Notes update comment with exact content already exists for ticket {ticket}, skipping all updates") | |
return False | |
# CONTENT CHECK: Compare current vs new notes content | |
current_notes = extract_notes_from_description(jira_client, ticket) | |
# Normalize both for comparison | |
current_normalized = normalize_content_for_comparison(current_notes) | |
new_normalized = normalize_content_for_comparison(new_notes) | |
print(f"[DEBUG] Notes content comparison for ticket {ticket}:") | |
print(f"[DEBUG] Current notes (raw): '{current_notes[:100]}{'...' if len(current_notes) > 100 else ''}'") | |
print(f"[DEBUG] New notes (raw): '{new_notes[:100]}{'...' if len(new_notes) > 100 else ''}'") | |
print(f"[DEBUG] Current (normalized): '{current_normalized[:100]}{'...' if len(current_normalized) > 100 else ''}'") | |
print(f"[DEBUG] New (normalized): '{new_normalized[:100]}{'...' if len(new_normalized) > 100 else ''}'") | |
print(f"[DEBUG] Equal: {current_normalized == new_normalized}") | |
# Only update if content has actually changed | |
if current_normalized != new_normalized: | |
print(f"[INFO] Notes content differs, updating for ticket {ticket}") | |
if update_description_with_notes(jira_client, ticket, new_notes): | |
# Add comment with the actual notes content | |
add_jira_comment(jira_client, ticket, notes_comment_content) | |
print(f"[INFO] Added notes update comment to ticket {ticket}") | |
return True | |
else: | |
print(f"[ERROR] Failed to update notes for ticket {ticket}") | |
return False | |
else: | |
print(f"[DEBUG] Notes content identical for ticket {ticket}, no update needed") | |
return False | |
def normalize_content_for_comparison(content): | |
""" | |
Normalize content for accurate comparison by removing formatting differences. | |
Enhanced version that properly extracts PropertyHolder content first. | |
""" | |
if not content: | |
return "" | |
# CRITICAL: Extract PropertyHolder content first before normalizing | |
content_str = "" | |
if hasattr(content, '__class__') and 'PropertyHolder' in str(type(content)): | |
print(f"[DEBUG] Detected PropertyHolder in normalize_content_for_comparison, extracting content...") | |
# Try to extract the actual content from PropertyHolder | |
extracted_content = extract_field_string_from_propertyholder(content) | |
if isinstance(extracted_content, dict) and extracted_content.get('type') == 'doc': | |
# It's ADF format | |
content_str = extract_text_from_adf(extracted_content) | |
print(f"[DEBUG] Extracted ADF content from PropertyHolder: '{content_str[:100]}...'") | |
else: | |
content_str = str(extracted_content) if extracted_content else "" | |
print(f"[DEBUG] Extracted text content from PropertyHolder: '{content_str[:100]}...'") | |
else: | |
# It's already a string or other format | |
content_str = str(content).strip() | |
if not content_str: | |
return "" | |
# Remove common formatting differences | |
content_str = content_str.replace('\r\n', '\n').replace('\r', '\n') | |
# Normalize multiple consecutive newlines to single newlines | |
import re | |
content_str = re.sub(r'\n\s*\n', '\n', content_str) | |
# Normalize whitespace within lines (but preserve line structure) | |
lines = content_str.split('\n') | |
normalized_lines = [] | |
for line in lines: | |
# Strip each line and normalize internal whitespace | |
normalized_line = re.sub(r'\s+', ' ', line.strip()) | |
if normalized_line: # Only add non-empty lines | |
normalized_lines.append(normalized_line) | |
result = '\n'.join(normalized_lines) | |
# Final cleanup - remove any remaining extra whitespace | |
result = result.strip() | |
print(f"[DEBUG] Normalized content: '{content_str[:50]}...' → '{result[:50]}...'") | |
return result | |
def extract_notes_from_description(jira_client, ticket): | |
""" | |
Extract the current notes from the ticket description. | |
Enhanced version with better parsing and debugging. | |
""" | |
try: | |
issue = jira_client.issue(ticket) | |
description = issue.fields.description | |
if not description: | |
print(f"[DEBUG] No description found for ticket {ticket}") | |
return '' | |
print(f"[DEBUG] Description type for notes extraction: {type(description)}") | |
# Convert description to searchable text | |
description_text = '' | |
# Handle PropertyHolder objects | |
if hasattr(description, '__class__') and 'PropertyHolder' in str(type(description)): | |
print(f"[DEBUG] Extracting from PropertyHolder for notes") | |
# Try to extract text content from PropertyHolder | |
extracted = extract_field_string_from_propertyholder(description) | |
if isinstance(extracted, dict) and extracted.get('type') == 'doc': | |
# It's ADF format, extract text from it | |
description_text = extract_text_from_adf(extracted) | |
print(f"[DEBUG] Extracted text from ADF PropertyHolder: {len(description_text)} chars") | |
else: | |
description_text = str(extracted) | |
print(f"[DEBUG] Extracted text from PropertyHolder: {len(description_text)} chars") | |
elif isinstance(description, dict) and description.get('type') == 'doc': | |
print(f"[DEBUG] Extracting from direct ADF format for notes") | |
# Direct ADF format | |
description_text = extract_text_from_adf(description) | |
print(f"[DEBUG] Extracted text from direct ADF: {len(description_text)} chars") | |
else: | |
print(f"[DEBUG] Treating as plain text for notes") | |
# Plain text | |
description_text = str(description) | |
print(f"[DEBUG] Full description text for notes search (first 200 chars): '{description_text[:200]}{'...' if len(description_text) > 200 else ''}'") | |
# Look for Notes section - be more flexible and thorough | |
lines = description_text.split('\n') | |
notes_section = False | |
notes_lines = [] | |
print(f"[DEBUG] Searching through {len(lines)} lines for notes section") | |
for i, line in enumerate(lines): | |
line_stripped = line.strip() | |
print(f"[DEBUG] Line {i}: '{line_stripped[:50]}{'...' if len(line_stripped) > 50 else ''}' | in_notes={notes_section}") | |
# Check for notes heading (various formats) - case insensitive | |
line_lower = line_stripped.lower() | |
if (line_lower == 'notes' or | |
line_lower == '### notes' or | |
line_lower == '## notes' or | |
line_lower.startswith('notes:') or | |
line_lower.startswith('### notes') or | |
line_lower.startswith('## notes')): | |
notes_section = True | |
print(f"[DEBUG] Found notes section header at line {i}: '{line_stripped}'") | |
continue | |
elif notes_section: | |
# Check if we hit another section (any heading or specific sections) | |
if (line_stripped.startswith('###') or | |
line_stripped.startswith('##') or | |
line_lower.startswith('takedown') or | |
line_lower.startswith('hosting provider') or | |
(not line_stripped and notes_lines)): # Empty line after we have notes content | |
print(f"[DEBUG] End of notes section at line {i}: '{line_stripped}'") | |
break | |
if line_stripped: # Only add non-empty lines | |
notes_lines.append(line_stripped) | |
print(f"[DEBUG] Added to notes: '{line_stripped}'") | |
result = '\n'.join(notes_lines).strip() | |
print(f"[DEBUG] Final extracted notes from description (length: {len(result)}): '{result}'") | |
return result | |
except Exception as e: | |
print(f"[ERROR] Failed to extract notes from description for ticket {ticket}: {e}") | |
import traceback | |
print(f"[ERROR] Full traceback: {traceback.format_exc()}") | |
return '' | |
if __name__ == "__main__": | |
# To run the cleanup, uncomment the following lines: | |
main() # This will initialize the JIRA client | |
# cleanup_duplicate_notes_in_all_tickets() # This will run the cleanup | |
# main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment