Skip to content

Instantly share code, notes, and snippets.

@darkarp
Last active June 28, 2025 18:28
Show Gist options
  • Save darkarp/b496e05dfcf6fd75c6157b886d383b39 to your computer and use it in GitHub Desktop.
Save darkarp/b496e05dfcf6fd75c6157b886d383b39 to your computer and use it in GitHub Desktop.
import time
import datetime
import requests
from jira import JIRA
from config import (
JIRA_SERVER,
JIRA_API_TOKEN,
JIRA_EMAIL,
JIRA_PROJECT_KEY,
JIRA_EPIC,
JIRA_LABELS,
JIRA_ISSUE_TYPE,
JIRA_USE_ADF,
JIRA_API_VERSION,
SERVER_URL,
POLL_INTERVAL,
)
from flask import Flask, request, jsonify
app = Flask(__name__)
jira_client = None
def safe_extract_status_name(status_field):
"""Safely extract status name from PropertyHolder objects"""
if status_field is None:
return ''
status_name = status_field.name if hasattr(status_field, 'name') else status_field
# Handle nested PropertyHolder objects
while hasattr(status_name, 'value'):
status_name = status_name.value
return str(status_name).lower() if status_name else ''
COUNTRY_CODE_TO_NAME = {
'af': 'afghanistan',
'ax': 'aland islands',
'al': 'albania',
'dz': 'algeria',
'as': 'american samoa',
'ad': 'andorra',
'ao': 'angola',
'ai': 'anguilla',
'aq': 'antarctica',
'ag': 'antigua and barbuda',
'ar': 'argentina',
'am': 'armenia',
'aw': 'aruba',
'au': 'australia',
'at': 'austria',
'az': 'azerbaijan',
'bs': 'bahamas',
'bh': 'bahrain',
'bd': 'bangladesh',
'bb': 'barbados',
'by': 'belarus',
'be': 'belgium',
'bz': 'belize',
'bj': 'benin',
'bm': 'bermuda',
'bt': 'bhutan',
'bo': 'bolivia',
'bq': 'bonaire',
'ba': 'bosnia and herzegovina',
'bw': 'botswana',
'bv': 'bouvet island',
'br': 'brazil',
'io': 'british indian ocean territory',
'vg': 'british virgin islands',
'bn': 'brunei',
'bg': 'bulgaria',
'bf': 'burkina faso',
'bi': 'burundi',
'kh': 'cambodia',
'cm': 'cameroon',
'ca': 'canada',
'cv': 'cape verde',
'ky': 'cayman islands',
'cf': 'central african republic',
'td': 'chad',
'cl': 'chile',
'cn': 'china',
'cx': 'christmas island',
'cc': 'cocos islands',
'co': 'colombia',
'km': 'comoros',
'ck': 'cook islands',
'cr': 'costa rica',
'hr': 'croatia',
'cu': 'cuba',
'cw': 'curacao',
'cy': 'cyprus',
'cz': 'czech republic',
'cd': 'democratic republic of the congo',
'dk': 'denmark',
'dj': 'djibouti',
'dm': 'dominica',
'do': 'dominican republic',
'tl': 'east timor',
'ec': 'ecuador',
'eg': 'egypt',
'sv': 'el salvador',
'gq': 'equatorial guinea',
'er': 'eritrea',
'ee': 'estonia',
'sz': 'eswatini',
'et': 'ethiopia',
'fk': 'falkland islands',
'fo': 'faroe islands',
'fj': 'fiji',
'fi': 'finland',
'fr': 'france',
'pf': 'french polynesia',
'tf': 'french southern territories',
'ga': 'gabon',
'gm': 'gambia',
'ge': 'georgia',
'de': 'germany',
'gh': 'ghana',
'gi': 'gibraltar',
'gr': 'greece',
'gl': 'greenland',
'gd': 'grenada',
'gu': 'guam',
'gt': 'guatemala',
'gg': 'guernsey',
'gn': 'guinea',
'gw': 'guinea-bissau',
'gy': 'guyana',
'ht': 'haiti',
'hm': 'heard island and mcdonald islands',
'hn': 'honduras',
'hk': 'hong kong',
'hu': 'hungary',
'is': 'iceland',
'in': 'india',
'id': 'indonesia',
'ir': 'iran',
'iq': 'iraq',
'ie': 'ireland',
'im': 'isle of man',
'il': 'israel',
'it': 'italy',
'ci': 'ivory coast',
'jm': 'jamaica',
'jp': 'japan',
'je': 'jersey',
'jo': 'jordan',
'kz': 'kazakhstan',
'ke': 'kenya',
'ki': 'kiribati',
'xk': 'kosovo',
'kw': 'kuwait',
'kg': 'kyrgyzstan',
'la': 'laos',
'lv': 'latvia',
'lb': 'lebanon',
'ls': 'lesotho',
'lr': 'liberia',
'ly': 'libya',
'li': 'liechtenstein',
'lt': 'lithuania',
'lu': 'luxembourg',
'mo': 'macau',
'mg': 'madagascar',
'mw': 'malawi',
'my': 'malaysia',
'mv': 'maldives',
'ml': 'mali',
'mt': 'malta',
'mh': 'marshall islands',
'mr': 'mauritania',
'mu': 'mauritius',
'mx': 'mexico',
'fm': 'micronesia',
'md': 'moldova',
'mc': 'monaco',
'mn': 'mongolia',
'me': 'montenegro',
'ms': 'montserrat',
'ma': 'morocco',
'mz': 'mozambique',
'mm': 'myanmar',
'na': 'namibia',
'nr': 'nauru',
'np': 'nepal',
'nl': 'netherlands',
'nc': 'new caledonia',
'nz': 'new zealand',
'ni': 'nicaragua',
'ne': 'niger',
'ng': 'nigeria',
'nu': 'niue',
'nf': 'norfolk island',
'kp': 'north korea',
'mk': 'north macedonia',
'mp': 'northern mariana islands',
'no': 'norway',
'om': 'oman',
'pk': 'pakistan',
'pw': 'palau',
'ps': 'palestine',
'pa': 'panama',
'pg': 'papua new guinea',
'py': 'paraguay',
'pe': 'peru',
'ph': 'philippines',
'pn': 'pitcairn islands',
'pl': 'poland',
'pt': 'portugal',
'pr': 'puerto rico',
'qa': 'qatar',
'cg': 'republic of the congo',
'ro': 'romania',
'ru': 'russia',
'rw': 'rwanda',
'bl': 'saint barthelemy',
'sh': 'saint helena',
'kn': 'saint kitts and nevis',
'lc': 'saint lucia',
'mf': 'saint martin',
'pm': 'saint pierre and miquelon',
'vc': 'saint vincent and the grenadines',
'ws': 'samoa',
'sm': 'san marino',
'st': 'sao tome and principe',
'sa': 'saudi arabia',
'sn': 'senegal',
'rs': 'serbia',
'sc': 'seychelles',
'sl': 'sierra leone',
'sg': 'singapore',
'sx': 'sint maarten',
'sk': 'slovakia',
'si': 'slovenia',
'sb': 'solomon islands',
'so': 'somalia',
'za': 'south africa',
'gs': 'south georgia',
'kr': 'south korea',
'ss': 'south sudan',
'es': 'spain',
'lk': 'sri lanka',
'sd': 'sudan',
'sr': 'suriname',
'sj': 'svalbard',
'se': 'sweden',
'ch': 'switzerland',
'sy': 'syria',
'tw': 'taiwan',
'tj': 'tajikistan',
'tz': 'tanzania',
'th': 'thailand',
'tg': 'togo',
'tk': 'tokelau',
'to': 'tonga',
'tt': 'trinidad and tobago',
'tn': 'tunisia',
'tr': 'turkey',
'tm': 'turkmenistan',
'tc': 'turks and caicos islands',
'tv': 'tuvalu',
'vi': 'u.s. virgin islands',
'ug': 'uganda',
'ua': 'ukraine',
'ae': 'united arab emirates',
'gb': 'united kingdom',
'us': 'united states',
'uy': 'uruguay',
'uz': 'uzbekistan',
'vu': 'vanuatu',
'va': 'vatican city',
've': 'venezuela',
'vn': 'vietnam',
'wf': 'wallis and futuna',
'eh': 'western sahara',
'ye': 'yemen',
'zm': 'zambia',
'zw': 'zimbabwe'
}
HOSTING_LOCATION_IDS = {
'abkhazia': '11345',
'afghanistan': '11346',
'akrotiri and dhekelia': '11347',
'aland islands': '11348',
'albania': '11349',
'algeria': '11350',
'american samoa': '11351',
'andorra': '11352',
'angola': '11353',
'anguilla': '11354',
'antarctica': '11355',
'antigua and barbuda': '11356',
'argentina': '11357',
'armenia': '11358',
'aruba': '11359',
'ashmore and cartier islands': '11360',
'australia': '11361',
'austria': '11362',
'azerbaijan': '11363',
'bahamas': '11364',
'bahrain': '11365',
'bangladesh': '11366',
'barbados': '11367',
'belarus': '11368',
'belgium': '11369',
'belize': '11370',
'benin': '11371',
'bermuda': '11372',
'bhutan': '11373',
'bir tawil': '11374',
'bolivia': '11375',
'bonaire': '11376',
'bosnia and herzegovina': '11377',
'botswana': '11378',
'bouvet island': '11379',
'brazil': '11380',
'british indian ocean territory': '11381',
'british virgin islands': '11382',
'brunei': '11383',
'bulgaria': '11384',
'burkina faso': '11385',
'burundi': '11386',
'cambodia': '11387',
'cameroon': '11388',
'canada': '11389',
'cape verde': '11390',
'cayman islands': '11391',
'central african republic': '11392',
'chad': '11393',
'chile': '11394',
'china': '11395',
'christmas island': '11396',
'clipperton island': '11397',
'cocos islands': '11398',
'colombia': '11399',
'comoros': '11400',
'cook islands': '11401',
'coral sea islands': '11402',
'costa rica': '11403',
'croatia': '11404',
'cuba': '11405',
'curacao': '11406',
'cyprus': '11407',
'czech republic': '11408',
'democratic republic of the congo': '11409',
'denmark': '11410',
'djibouti': '11411',
'dominica': '11412',
'dominican republic': '11413',
'east timor': '11414',
'easter island': '11415',
'ecuador': '11416',
'egypt': '11417',
'el salvador': '11418',
'equatorial guinea': '11419',
'eritrea': '11420',
'estonia': '11421',
'eswatini': '11422',
'ethiopia': '11423',
'falkland islands': '11424',
'faroe islands': '11425',
'fiji': '11426',
'finland': '11427',
'france': '11428',
'french polynesia': '11429',
'french southern territories': '11430',
'gabon': '11431',
'gambia': '11432',
'georgia': '11433',
'germany': '11434',
'ghana': '11435',
'gibraltar': '11436',
'greece': '11437',
'greenland': '11438',
'grenada': '11439',
'guam': '11440',
'guatemala': '11441',
'guernsey': '11442',
'guinea': '11443',
'guinea-bissau': '11444',
'guyana': '11445',
'haiti': '11446',
'heard island and mcdonald islands': '11447',
'honduras': '11448',
'hong kong': '11449',
'hungary': '11450',
'iceland': '11451',
'india': '11452',
'indonesia': '11453',
'iran': '11454',
'iraq': '11455',
'ireland': '11456',
'isle of man': '11457',
'israel': '11458',
'italy': '11459',
'ivory coast': '11460',
'jamaica': '11461',
'jan mayen': '11462',
'japan': '11463',
'jersey': '11464',
'jordan': '11465',
'kazakhstan': '11466',
'kenya': '11467',
'kiribati': '11468',
'kosovo': '11469',
'kuwait': '11470',
'kyrgyzstan': '11471',
'laos': '11472',
'latvia': '11473',
'lebanon': '11474',
'lesotho': '11475',
'liberia': '11476',
'libya': '11477',
'liechtenstein': '11478',
'lithuania': '11479',
'luxembourg': '11480',
'macau': '11481',
'madagascar': '11482',
'malawi': '11483',
'malaysia': '11484',
'maldives': '11485',
'mali': '11486',
'malta': '11487',
'marshall islands': '11488',
'mauritania': '11489',
'mauritius': '11490',
'mexico': '11491',
'micronesia': '11492',
'moldova': '11493',
'monaco': '11494',
'mongolia': '11495',
'montenegro': '11496',
'montserrat': '11497',
'morocco': '11498',
'mozambique': '11499',
'myanmar': '11500',
'namibia': '11501',
'nauru': '11502',
'nepal': '11503',
'netherlands': '11504',
'new caledonia': '11505',
'new zealand': '11506',
'nicaragua': '11507',
'niger': '11508',
'nigeria': '11509',
'niue': '11510',
'norfolk island': '11511',
'north korea': '11512',
'north macedonia': '11513',
'northern cyprus': '11514',
'northern mariana islands': '11515',
'norway': '11516',
'oman': '11517',
'pakistan': '11518',
'palau': '11519',
'palestine': '11520',
'panama': '11521',
'papua new guinea': '11522',
'paraguay': '11523',
'peru': '11524',
'philippines': '11525',
'pitcairn islands': '11526',
'poland': '11527',
'portugal': '11528',
'puerto rico': '11529',
'qatar': '11530',
'republic of the congo': '11531',
'romania': '11532',
'russia': '11533',
'rwanda': '11534',
'saba': '11535',
'saint barthelemy': '11536',
'saint helena': '11537',
'saint kitts and nevis': '11538',
'saint lucia': '11539',
'saint martin': '11540',
'saint pierre and miquelon': '11541',
'saint vincent and the grenadines': '11542',
'samoa': '11543',
'san marino': '11544',
'sao tome and principe': '11545',
'saudi arabia': '11546',
'senegal': '11547',
'serbia': '11548',
'seychelles': '11549',
'sierra leone': '11550',
'singapore': '11551',
'sint eustatius': '11552',
'sint maarten': '11553',
'slovakia': '11554',
'slovenia': '11555',
'solomon islands': '11556',
'somalia': '11557',
'somaliland': '11558',
'south africa': '11559',
'south georgia': '11560',
'south korea': '11561',
'south ossetia': '11562',
'south sudan': '11563',
'spain': '11564',
'sri lanka': '11565',
'sudan': '11566',
'suriname': '11567',
'svalbard': '11568',
'sweden': '11569',
'switzerland': '11570',
'syria': '11571',
'taiwan': '11572',
'tajikistan': '11573',
'tanzania': '11574',
'thailand': '11575',
'togo': '11576',
'tokelau': '11577',
'tonga': '11578',
'transnistria': '11579',
'trinidad and tobago': '11580',
'tunisia': '11581',
'turkey': '11582',
'turkmenistan': '11583',
'turks and caicos islands': '11584',
'tuvalu': '11585',
'u.s. virgin islands': '11586',
'uganda': '11587',
'ukraine': '11588',
'united arab emirates': '11589',
'united kingdom': '11590',
'united states': '11591',
'uruguay': '11592',
'uzbekistan': '11593',
'vanuatu': '11594',
'vatican city': '11595',
'venezuela': '11596',
'vietnam': '11597',
'wallis and futuna': '11598',
'western sahara': '11599',
'yemen': '11600',
'zambia': '11601',
'zimbabwe': '11602'
}
# Add this mapping at the top of the file, after HOSTING_LOCATION_IDS
TARGET_COUNTRY_IDS = {
'afghanistan': '10163',
'akrotiri and dhekelia': '10164',
'aland islands': '10165',
'albania': '10167',
'algeria': '10169',
'american samoa': '10171',
'andorra': '10172',
'angola': '10174',
'anguilla': '10176',
'antarctica': '10178',
'antigua and barbuda': '10180',
'argentina': '10182',
'armenia': '10184',
'aruba': '10186',
'ashmore and cartier islands': '10188',
'australia': '10190',
'austria': '10192',
'azerbaijan': '10194',
'bahamas': '10195',
'bahrain': '10196',
'bangladesh': '10197',
'barbados': '10198',
'belarus': '10199',
'belgium': '10200',
'belize': '10201',
'benin': '10202',
'bermuda': '10203',
'bhutan': '10204',
'bir tawil': '10205',
'bolivia': '10206',
'bonaire': '10207',
'bosnia and herzegovina': '10208',
'botswana': '10209',
'bouvet island': '10210',
'brazil': '10211',
'british indian ocean territory': '10212',
'british virgin islands': '10213',
'brunei': '10214',
'bulgaria': '10215',
'burkina faso': '10216',
'burundi': '10217',
'cambodia': '10218',
'cameroon': '10219',
'canada': '10220',
'cape verde': '10221',
'cayman islands': '10222',
'central african republic': '10223',
'chad': '10224',
'chile': '10225',
'china': '10226',
'christmas island': '10227',
'clipperton island': '10228',
'cocos islands': '10229',
'colombia': '10230',
'comoros': '10231',
'cook islands': '10232',
'coral sea islands': '10233',
'costa rica': '10234',
'croatia': '10235',
'cuba': '10236',
'curacao': '10237',
'cyprus': '10238',
'czech republic': '10239',
'democratic republic of the congo': '10240',
'denmark': '10241',
'djibouti': '10242',
'dominica': '10243',
'dominican republic': '10244',
'east timor': '10245',
'easter island': '10246',
'ecuador': '10247',
'egypt': '10248',
'el salvador': '10249',
'equatorial guinea': '10250',
'eritrea': '10251',
'estonia': '10252',
'eswatini': '10253',
'ethiopia': '10254',
'falkland islands': '10255',
'faroe islands': '10256',
'fiji': '10257',
'finland': '10258',
'france': '10259',
'french polynesia': '10260',
'french southern territories': '10261',
'gabon': '10262',
'gambia': '10263',
'georgia': '10264',
'germany': '10265',
'ghana': '10266',
'gibraltar': '10267',
'greece': '10268',
'greenland': '10269',
'grenada': '10270',
'guam': '10271',
'guatemala': '10272',
'guernsey': '10273',
'guinea': '10274',
'guinea-bissau': '10275',
'guyana': '10276',
'haiti': '10277',
'heard island and mcdonald islands': '10278',
'honduras': '10279',
'hong kong': '10280',
'hungary': '10281',
'iceland': '10282',
'india': '10283',
'indonesia': '10284',
'iran': '10285',
'iraq': '10286',
'ireland': '10287',
'isle of man': '10288',
'israel': '10289',
'italy': '10290',
'ivory coast': '10291',
'jamaica': '10292',
'jan mayen': '10293',
'japan': '10294',
'jersey': '10295',
'jordan': '10296',
'kazakhstan': '10297',
'kenya': '10298',
'kiribati': '10299',
'kosovo': '10300',
'kuwait': '10301',
'kyrgyzstan': '10302',
'laos': '10303',
'latvia': '10304',
'lebanon': '10305',
'lesotho': '10306',
'liberia': '10307',
'libya': '10308',
'liechtenstein': '10309',
'lithuania': '10310',
'luxembourg': '10311',
'macau': '10312',
'madagascar': '10313',
'malawi': '10314',
'malaysia': '10315',
'maldives': '10316',
'mali': '10317',
'malta': '10318',
'marshall islands': '10319',
'mauritania': '10320',
'mauritius': '10321',
'mexico': '10322',
'micronesia': '10323',
'moldova': '10324',
'monaco': '10325',
'mongolia': '10326',
'montenegro': '10327',
'montserrat': '10328',
'morocco': '10329',
'mozambique': '10330',
'myanmar': '10331',
'namibia': '10332',
'nauru': '10333',
'nepal': '10334',
'netherlands': '10335',
'new caledonia': '10336',
'new zealand': '10337',
'nicaragua': '10338',
'niger': '10339',
'nigeria': '10340',
'niue': '10341',
'norfolk island': '10342',
'north korea': '10343',
'north macedonia': '10344',
'northern cyprus': '10345',
'northern mariana islands': '10346',
'norway': '10347',
'oman': '10348',
'pakistan': '10349',
'palau': '10350',
'palestine': '10351',
'panama': '10352',
'papua new guinea': '10353',
'paraguay': '10354',
'peru': '10355',
'philippines': '10356',
'pitcairn islands': '10357',
'poland': '10358',
'portugal': '10359',
'puerto rico': '10360',
'qatar': '10361',
'republic of the congo': '10362',
'romania': '10363',
'russia': '10364',
'rwanda': '10365',
'saba': '10366',
'saint barthelemy': '10367',
'saint helena': '10368',
'saint kitts and nevis': '10369',
'saint lucia': '10370',
'saint martin': '10371',
'saint pierre and miquelon': '10372',
'saint vincent and the grenadines': '10373',
'samoa': '10374',
'san marino': '10375',
'sao tome and principe': '10376',
'saudi arabia': '10377',
'senegal': '10378',
'serbia': '10379',
'seychelles': '10380',
'sierra leone': '10381',
'singapore': '10382',
'sint eustatius': '10383',
'sint maarten': '10384',
'slovakia': '10385',
'slovenia': '10386',
'solomon islands': '10387',
'somalia': '10388',
'somaliland': '10389',
'south africa': '10390',
'south georgia': '10391',
'south korea': '10392',
'south ossetia': '10393',
'south sudan': '10394',
'spain': '10395',
'sri lanka': '10396',
'sudan': '10397',
'suriname': '10398',
'svalbard': '10399',
'sweden': '10400',
'switzerland': '10401',
'syria': '10402',
'taiwan': '10403',
'tajikistan': '10404',
'tanzania': '10405',
'thailand': '10406',
'togo': '10407',
'tokelau': '10408',
'tonga': '10409',
'transnistria': '10410',
'trinidad and tobago': '10411',
'tunisia': '10412',
'turkey': '10413',
'turkmenistan': '10414',
'turks and caicos islands': '10415',
'tuvalu': '10416',
'u.s. virgin islands': '10417',
'uganda': '10418',
'ukraine': '10419',
'united arab emirates': '10420',
'united kingdom': '10421',
'united states': '10422',
'uruguay': '10423',
'uzbekistan': '10424',
'vanuatu': '10425',
'vatican city': '10426',
'venezuela': '10427',
'vietnam': '10428',
'wallis and futuna': '10429',
'western sahara': '10430',
'yemen': '10431',
'zambia': '10432',
'zimbabwe': '10433'
}
def get_hosting_location_id(location_code):
"""Convert hosting location code to JIRA ID"""
if not location_code:
return '-1' # None value
location_code = location_code.lower().strip()
country_name = COUNTRY_CODE_TO_NAME.get(location_code)
if not country_name:
print(f"[WARN] Unknown country code for hosting location: {location_code}")
return '-1'
return HOSTING_LOCATION_IDS.get(country_name, '-1')
def get_target_country_id(country_code):
"""Convert target country code to JIRA ID"""
if not country_code:
return '-1' # None value
country_code = country_code.lower().strip()
country_name = COUNTRY_CODE_TO_NAME.get(country_code)
if not country_name:
print(f"[WARN] Unknown country code for target country: {country_code}")
return '-1'
return TARGET_COUNTRY_IDS.get(country_name, '-1')
def get_domains():
try:
resp = requests.get(f"{SERVER_URL}/domains")
resp.raise_for_status()
return resp.json()
except Exception as e:
print(f"[ERROR] Retrieving domains: {e}")
return []
def update_jira_ticket(domain_id, ticket):
try:
payload = {"jira_ticket": ticket}
resp = requests.post(f"{SERVER_URL}/jira/update-ticket/{domain_id}", json=payload)
resp.raise_for_status()
print(f"[INFO] Updated domain {domain_id} with Jira ticket {ticket}")
return True
except Exception as e:
print(f"[ERROR] Updating Jira ticket for domain {domain_id}: {e}")
return False
def create_jira_ticket(jira_client, domain):
# Updated summary with a better title
summary = f"Domain Abuse - Takedown: {domain['url']}"
# Try to create ADF description first, fall back to plain text
adf_description = create_adf_description(domain)
if adf_description:
# Use ADF format for v3 API
description = adf_description
print(f"[INFO] Using ADF format for ticket description")
else:
# Fallback to plain text description
# Include hosting provider info if available
hosting_info = ""
if domain.get("hosting_provider"):
hosting_info = f"Hosting Provider: {domain.get('hosting_provider')}\n"
# Updated description with additional info and a note about the attached image
description = (
f"New domain abuse ticket created:\n\n"
f"URL: {domain['url']}\n"
f"Status: {domain['status']}\n"
f"Page Title: {domain.get('page_title', '')}\n"
f"{hosting_info}"
)
# Add target country and hosting location to the description if available
if domain.get("target_country"):
country_name = COUNTRY_CODE_TO_NAME.get(domain["target_country"].lower(), domain["target_country"])
description += f"Target Country: {country_name}\n"
if domain.get("hosting_location"):
location_name = COUNTRY_CODE_TO_NAME.get(domain["hosting_location"].lower(), domain["hosting_location"])
description += f"Hosting Location: {location_name}\n"
# Add abuse type information to description if available
if domain.get("abuse_type"):
description += f"Abuse Type: {domain.get('abuse_type')}\n"
description += f"Last Checked: {domain.get('last_checked', '')}\n\n"
description += f"Please refer to the attached preview image for more details."
# Append takedown abuse information if already provided
if domain.get("report_abuse") and domain.get("report_abuse_timestamp"):
description += (
"\n\n### Takedown Reported\n"
f"Takedown submitted at {domain.get('report_abuse_timestamp')}."
)
# Add Notes section if notes exist
notes = domain.get('notes')
if notes and notes.strip():
description += f"\n\n### Notes\n{notes.strip()}"
print(f"[INFO] Using plain text format for ticket description")
# Prepare the labels from the configuration and add additional labels
labels = list(JIRA_LABELS)
# Add corporate/franchise label if applicable - Fixed NoneType error
corporate_franchise = domain.get("corporate_franchise")
if corporate_franchise:
if str(corporate_franchise).lower() in ["corporate", "franchise"]:
labels.append(str(corporate_franchise).lower())
# Add abuse type label if applicable
abuse_type = domain.get("abuse_type")
if abuse_type:
if str(abuse_type).lower() in ["mimic", "redirect", "independent", "n/a", "to_validate"]:
labels.append(f"abuse-type-{str(abuse_type).lower()}")
# Add report abuse label if applicable
if domain.get("report_abuse"):
labels.append("takedown-requested")
try:
# Map corporate/franchise values to their NEW CLOUD IDs (updated from discovery)
corp_franchise_map = {
'corporate': '11343', # Updated from '18955' to new Cloud ID
'franchise': '11344', # Updated from '18956' to new Cloud ID
'none': '-1'
}
# Safely handle corporate/franchise value
corporate_franchise = domain.get('corporate_franchise')
corp_franchise_id = '-1' # Default to None/empty value
if corporate_franchise:
corp_value = str(corporate_franchise).lower().strip()
if corp_value in corp_franchise_map:
corp_franchise_id = corp_franchise_map[corp_value]
# Safely handle hosting location
hosting_location = domain.get('hosting_location')
hosting_location_id = get_hosting_location_id(hosting_location) if hosting_location else '-1'
# Safely handle target country
target_country = domain.get('target_country')
target_country_id = get_target_country_id(target_country) if target_country else '-1'
# Safely handle hosting provider - as string
hosting_provider = domain.get('hosting_provider')
hosting_provider_str = str(hosting_provider).strip() if hosting_provider else None
# Prepare custom fields, only include non-empty values
issue_dict = {
'project': {'key': JIRA_PROJECT_KEY},
'summary': summary,
'description': description,
'issuetype': {'name': JIRA_ISSUE_TYPE},
'labels': labels,
'parent': {'key': JIRA_EPIC}, # Epic must be an object with key property
}
# Only add custom fields if they have valid values - CORRECTED FIELD MAPPINGS
if corp_franchise_id != '-1':
issue_dict['customfield_10367'] = [{"id": corp_franchise_id}] # Corporate/Franchise - CORRECT
if hosting_provider_str:
issue_dict['customfield_10368'] = hosting_provider_str # Hosting Provider - CORRECTED FROM 10366 to 10368
if hosting_location_id != '-1':
issue_dict['customfield_10366'] = [{"id": hosting_location_id}] # Hosting Location - CORRECTED FROM 10368 to 10366
if target_country_id != '-1':
issue_dict['customfield_10107'] = [{"id": target_country_id}] # Target Country - CORRECT
issue = jira_client.create_issue(fields=issue_dict)
print(f"[INFO] Created Jira ticket {issue.key} for domain {domain['url']}")
# Always move the ticket to "In Progress" after creation
# First check available transitions
transitions = jira_client.transitions(issue.key)
transition_id = None
for t in transitions:
if 'in progress' in t['name'].lower():
transition_id = t['id']
break
if transition_id:
jira_client.transition_issue(issue.key, transition_id)
print(f"[INFO] Set ticket {issue.key} to In Progress")
else:
print(f"[WARN] Could not find In Progress transition for ticket {issue.key}")
# If a preview image exists, download it and attach to the ticket.
screenshot_url = domain.get('screenshot_url')
if screenshot_url:
try:
response = requests.get(screenshot_url)
response.raise_for_status()
from io import BytesIO
image_file = BytesIO(response.content)
image_file.name = "preview.png"
jira_client.add_attachment(issue=issue.key, attachment=image_file, filename="preview.png")
print(f"[INFO] Attached preview image for domain {domain['url']}")
except Exception as attach_error:
print(f"[ERROR] Failed to attach preview image for domain {domain['url']}: {attach_error}")
# If a takedown was already requested when creating the ticket, transition to Review
if domain.get("report_abuse") and domain.get("report_abuse_timestamp"):
transition_to_review(jira_client, issue.key)
print(f"[INFO] Moved ticket {issue.key} to Review due to existing takedown request")
# If domain is already down, transition to Done
if domain.get("status", "").lower() == "down":
close_jira_ticket(jira_client, issue.key)
print(f"[INFO] Moved ticket {issue.key} to Done due to domain being down")
return issue.key
except Exception as e:
print(f"[ERROR] Creating Jira ticket for domain {domain['url']}: {e}")
return None
def add_jira_comment(jira_client, ticket, comment):
try:
# Try to use ADF format first for v3 API
adf_comment = create_adf_comment(comment)
if adf_comment:
# Use ADF format
jira_client.add_comment(ticket, body=adf_comment)
print(f"[INFO] Added ADF comment to Jira ticket {ticket}")
else:
# Fallback to plain text
jira_client.add_comment(ticket, comment)
print(f"[INFO] Added plain text comment to Jira ticket {ticket}")
except Exception as e:
# If ADF fails, try plain text as fallback
try:
jira_client.add_comment(ticket, comment)
print(f"[INFO] Added fallback plain text comment to Jira ticket {ticket}")
except Exception as e2:
print(f"[ERROR] Adding comment to Jira ticket {ticket}: {e2}")
print(f"[ERROR] Original ADF error: {e}")
def calculate_time_diff(start_time, end_time):
"""
Calculate time difference between two timestamps and format it for Jira worklog.
Args:
start_time: Start timestamp (ISO format string or datetime)
end_time: End timestamp (ISO format string or datetime)
Returns:
Tuple of (total_minutes, formatted_time_for_jira)
"""
try:
# Log initial input values
print(f"[DEBUG] Starting time difference calculation")
print(f"[DEBUG] Start time: {start_time} (type: {type(start_time)})")
print(f"[DEBUG] End time: {end_time} (type: {type(end_time)})")
# Convert strings to datetime objects if needed
if isinstance(start_time, str):
print(f"[DEBUG] Attempting to parse start time string: {start_time}")
# Handle different ISO formats
try:
# Try standard fromisoformat first (Python 3.7+)
start_time = datetime.datetime.fromisoformat(start_time.replace('Z', '+00:00'))
print(f"[DEBUG] Successfully parsed start time using fromisoformat")
except ValueError as e:
print(f"[WARN] Failed to parse start time using fromisoformat: {e}")
# Fall back to parsing common JIRA date format (2023-03-15T14:30:45.000+0000)
if 'T' in start_time:
print(f"[DEBUG] Attempting to parse start time using JIRA format")
try:
date_part, time_part = start_time.split('T', 1)
time_part = time_part.split('.')[0] # Remove milliseconds
start_time = datetime.datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S")
print(f"[DEBUG] Successfully parsed start time using JIRA format")
except ValueError as e:
print(f"[ERROR] Failed to parse start time using JIRA format: {e}")
raise ValueError(f"Could not parse start time '{start_time}': {str(e)}")
else:
print(f"[DEBUG] Attempting to parse start time using direct format")
try:
start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
print(f"[DEBUG] Successfully parsed start time using direct format")
except ValueError as e:
print(f"[ERROR] Failed to parse start time using direct format: {e}")
raise ValueError(f"Could not parse start time '{start_time}': {str(e)}")
if isinstance(end_time, str):
print(f"[DEBUG] Attempting to parse end time string: {end_time}")
# Handle different ISO formats
try:
end_time = datetime.datetime.fromisoformat(end_time.replace('Z', '+00:00'))
print(f"[DEBUG] Successfully parsed end time using fromisoformat")
except ValueError as e:
print(f"[WARN] Failed to parse end time using fromisoformat: {e}")
# Fall back to parsing common JIRA date format
if 'T' in end_time:
print(f"[DEBUG] Attempting to parse end time using JIRA format")
try:
date_part, time_part = end_time.split('T', 1)
time_part = time_part.split('.')[0] # Remove milliseconds
end_time = datetime.datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S")
print(f"[DEBUG] Successfully parsed end time using JIRA format")
except ValueError as e:
print(f"[ERROR] Failed to parse end time using JIRA format: {e}")
raise ValueError(f"Could not parse end time '{end_time}': {str(e)}")
else:
print(f"[DEBUG] Attempting to parse end time using direct format")
try:
end_time = datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
print(f"[DEBUG] Successfully parsed end time using direct format")
except ValueError as e:
print(f"[ERROR] Failed to parse end time using direct format: {e}")
raise ValueError(f"Could not parse end time '{end_time}': {str(e)}")
# Ensure we have datetime objects
if not isinstance(start_time, datetime.datetime) or not isinstance(end_time, datetime.datetime):
error_msg = f"Invalid timestamp formats: start={type(start_time)}, end={type(end_time)}"
print(f"[ERROR] {error_msg}")
raise ValueError(error_msg)
# Calculate time difference in minutes (ensure positive value)
diff = end_time - start_time
if diff.total_seconds() < 0:
print(f"[WARN] Negative time difference detected: {diff}. Using absolute value.")
diff = abs(diff)
total_minutes = diff.total_seconds() / 60
print(f"[DEBUG] Calculated total minutes: {total_minutes:.2f}")
# Format for Jira: days, hours and minutes for better readability
days = int(total_minutes / (60 * 24))
remaining_minutes = total_minutes % (60 * 24)
hours = int(remaining_minutes / 60)
minutes = int(remaining_minutes % 60)
# Build time string based on what units are needed
time_spent = ""
if days > 0:
time_spent += f"{days}d "
if hours > 0:
time_spent += f"{hours}h "
if minutes > 0 or time_spent == "":
time_spent += f"{minutes}m"
# Log the calculation for debugging
print(f"[DEBUG] Time diff calculation: {start_time} to {end_time} = {time_spent} ({total_minutes:.2f} minutes)")
return total_minutes, time_spent.strip()
except Exception as e:
error_msg = f"Failed to calculate time difference: {str(e)}"
print(f"[ERROR] {error_msg}")
print(f"[ERROR] Input values: start_time={start_time} ({type(start_time)}), end_time={end_time} ({type(end_time)})")
raise ValueError(error_msg)
def add_jira_worklog(jira_client, ticket, time_spent, comment):
"""
Add a worklog entry to a Jira ticket.
Args:
jira_client: The Jira client instance
ticket: The ticket ID
time_spent: Time spent in Jira format (e.g. "1h 30m")
comment: Comment for the worklog entry
"""
try:
# Ensure we have a minimum worklog time (JIRA requires at least 1m)
if not time_spent or time_spent.strip() == "0m" or time_spent.strip() == "":
time_spent = "1m"
print(f"[WARN] Empty or zero worklog time for ticket {ticket}, using minimum of 1m")
# Try to use ADF format first for v3 API
adf_comment = create_adf_comment(comment)
if adf_comment:
# Use ADF format for worklog comment
jira_client.add_worklog(ticket, timeSpent=time_spent, comment=adf_comment)
print(f"[INFO] Added ADF worklog to Jira ticket {ticket}: {time_spent} - {comment}")
else:
# Fallback to plain text
jira_client.add_worklog(ticket, timeSpent=time_spent, comment=comment)
print(f"[INFO] Added plain text worklog to Jira ticket {ticket}: {time_spent} - {comment}")
except Exception as e:
print(f"[ERROR] Adding worklog to Jira ticket {ticket}: {e}")
# Try a simpler format if the original fails
try:
jira_client.add_worklog(ticket, timeSpent="10m", comment=f"{comment} (Original time: {time_spent})")
print(f"[INFO] Added fallback worklog to Jira ticket {ticket}")
except Exception as e2:
print(f"[ERROR] Fallback worklog also failed for ticket {ticket}: {e2}")
def get_created_time(jira_client, ticket_key):
"""
Get the creation time of a JIRA ticket in a robust way.
Args:
jira_client: The Jira client instance
ticket_key: The JIRA ticket key
Returns:
A datetime object representing the creation time
"""
try:
issue = jira_client.issue(ticket_key)
created_field = issue.fields.created
# Safely extract the created timestamp (handle PropertyHolder)
created_str = created_field
while hasattr(created_str, 'value'):
created_str = created_str.value
created_str = str(created_str)
# Handle JIRA's timestamp format
if 'T' in created_str:
# Try to parse as ISO format first
try:
return datetime.datetime.fromisoformat(created_str.replace('Z', '+00:00'))
except ValueError:
# Fall back to manual parsing
date_part, time_part = created_str.split('T', 1)
# Handle milliseconds and timezone if present
time_part = time_part.split('.')[0]
return datetime.datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S")
else:
# If it's not in ISO format, try direct parsing
return datetime.datetime.strptime(created_str, "%Y-%m-%d %H:%M:%S")
except Exception as e:
print(f"[ERROR] Failed to get creation time for ticket {ticket_key}: {e}")
# Return current time minus 1 hour as fallback
return datetime.datetime.utcnow() - datetime.timedelta(hours=1)
def close_jira_ticket(jira_client, ticket):
"""
Close a JIRA ticket (transition to Done status).
"""
try:
# First get current status to avoid unnecessary transitions
issue = jira_client.issue(ticket)
current_status = safe_extract_status_name(issue.fields.status)
# If already closed, don't change
if current_status in ['done', 'closed', 'resolved', 'complete']:
print(f"[INFO] Ticket {ticket} already in {current_status} status. No transition needed.")
return True
transitions = jira_client.transitions(ticket)
transition_id = None
# Try to find a transition named "Done", "Close" or "Closed"
transition_names = ['done', 'close', 'closed', 'resolve', 'resolved', 'complete']
for t in transitions:
for transition_name in transition_names:
if transition_name in t['name'].lower():
transition_id = t['id']
break
if transition_id:
jira_client.transition_issue(ticket, transition_id)
print(f"[INFO] Transitioned Jira ticket {ticket} to closed status.")
return True
else:
print(f"[WARN] No suitable transition found to close Jira ticket {ticket}.")
return False
except Exception as e:
print(f"[ERROR] Closing Jira ticket {ticket}: {e}")
return False
def reopen_jira_ticket(jira_client, ticket):
try:
transitions = jira_client.transitions(ticket)
transition_id = None
# Look for transitions that indicate reopening (commonly "Reopen", "To Do", "In Progress" or "Open")
transition_names = ['in progress', 'to do', 'open']
for t in transitions:
for transition_name in transition_names:
if transition_name in t['name'].lower():
transition_id = t['id']
break
if transition_id:
jira_client.transition_issue(ticket, transition_id)
print(f"[INFO] Reopened Jira ticket {ticket} due to status change.")
# After reopening, transition to In Progress
transition_to_in_progress(jira_client, ticket)
else:
print(f"[WARN] No suitable transition found to reopen Jira ticket {ticket}.")
except Exception as e:
print(f"[ERROR] Reopening Jira ticket {ticket}: {e}")
def takedown_comment_exists(jira_client, ticket):
"""
Check if a takedown comment already exists on the JIRA ticket.
"""
return comment_exists(jira_client, ticket, "Takedown reported at")
def transition_to_review(jira_client, ticket):
"""
Transition a JIRA ticket to the In Review status.
Args:
jira_client: The Jira client instance
ticket: The ticket ID
Returns:
bool: True if successful, False otherwise
"""
try:
# First get current status to avoid unnecessary transitions
issue = jira_client.issue(ticket)
current_status = safe_extract_status_name(issue.fields.status)
# If already in review or closed, don't change
transition_names = ['review']
for transition_name in transition_names:
if transition_name in current_status:
print(f"[INFO] Ticket {ticket} already in {current_status} status. No transition needed.")
return True
transitions = jira_client.transitions(ticket)
# Log available transitions for debugging
print(f"[DEBUG] Available transitions for ticket {ticket}: {[t['name'] for t in transitions]}")
transition_id = None
# First look for exact match "In Review" with priority
for t in transitions:
if "In Review" in t['name']: # Exact match with case sensitivity
transition_id = t['id']
print(f"[INFO] Found exact 'In Review' transition (ID: {transition_id})")
break
if transition_id:
jira_client.transition_issue(ticket, transition_id)
print(f"[INFO] Transitioned Jira ticket {ticket} to In Review status")
return True
else:
print(f"[WARN] No suitable transition found to set ticket {ticket} to In Review status. Available transitions: {[t['name'] for t in transitions]}")
return False
except Exception as e:
print(f"[ERROR] Transitioning Jira ticket {ticket} to In Review status: {e}")
return False
def transition_to_in_progress(jira_client, ticket):
"""
Transition a JIRA ticket to the In Progress status.
"""
try:
# First get current status to avoid unnecessary transitions
issue = jira_client.issue(ticket)
current_status = safe_extract_status_name(issue.fields.status)
# If already in progress, review, or closed, don't change
transition_names = ['in progress']
for transition_name in transition_names:
if transition_name in current_status:
print(f"[INFO] Ticket {ticket} already in {current_status} status. No transition needed.")
return True
transitions = jira_client.transitions(ticket)
transition_id = None
# Look for a transition named "In Progress"
for t in transitions:
if 'in progress' in t['name'].lower():
transition_id = t['id']
break
if transition_id:
jira_client.transition_issue(ticket, transition_id)
print(f"[INFO] Transitioned Jira ticket {ticket} to In Progress status")
return True
else:
print(f"[WARN] No In Progress transition found for ticket {ticket}")
return False
except Exception as e:
print(f"[ERROR] Transitioning Jira ticket {ticket} to In Progress: {e}")
return False
def update_jira_labels(jira_client, ticket, domain):
"""
Update JIRA ticket labels based on domain fields.
Only handle labels that aren't custom fields.
"""
try:
# Get current issue data
issue = jira_client.issue(ticket)
made_changes = False
# Helper function to safely extract labels from PropertyHolder
def extract_labels(labels_field):
"""Extract labels from PropertyHolder object or list"""
if labels_field is None:
return []
# Recursively extract from nested PropertyHolder objects
labels_content = labels_field
while hasattr(labels_content, 'value'):
labels_content = labels_content.value
print(f"[DEBUG] extract_labels: Extracted nested value, type = {type(labels_content)}")
# If it's a list or already properly structured
if isinstance(labels_content, (list, tuple)):
return [str(label) for label in labels_content if label is not None]
elif labels_content:
# If it's a single value, return as list
return [str(labels_content)]
else:
return []
# Start with existing labels (excluding ones we'll manage)
managed_prefixes = [
'takedown-requested',
'malicious',
'corporate',
'franchise',
'abuse-type'
]
# Safely extract current labels
current_labels_list = extract_labels(issue.fields.labels)
current_labels = [label for label in current_labels_list if label is not None and
not any(label == prefix or label.startswith(prefix) for prefix in managed_prefixes)]
# Add back the managed labels based on current domain state
new_labels = list(current_labels)
# Add report abuse label if applicable
if domain.get("report_abuse"):
new_labels.append("takedown-requested")
# Add malicious label if applicable
if domain.get("malicious"):
new_labels.append("malicious")
# Add corporate/franchise label if applicable
corporate_franchise = domain.get("corporate_franchise")
if corporate_franchise:
if str(corporate_franchise).lower() in ["corporate", "franchise"]:
new_labels.append(str(corporate_franchise).lower())
# Add abuse type label if applicable
abuse_type = domain.get("abuse_type")
if abuse_type:
if str(abuse_type).lower() in ["mimic", "redirect", "independent", "n/a", "to_validate"]:
new_labels.append(f"abuse-type-{str(abuse_type).lower()}")
# Remove any None values that might have slipped through
new_labels = [label for label in new_labels if label is not None]
# Update labels only if they've changed
if set(new_labels) != set(current_labels_list):
issue.update(fields={"labels": new_labels})
print(f"[INFO] Updated labels for Jira ticket {ticket}: {new_labels}")
made_changes = True
# Add a comment only for significant label changes
significant_changes = []
if 'malicious' in new_labels and 'malicious' not in current_labels_list:
significant_changes.append("Domain marked as MALICIOUS")
if 'takedown-requested' in new_labels and 'takedown-requested' not in current_labels_list:
significant_changes.append("Takedown requested")
# Add comment for abuse type changes
new_abuse_labels = [l for l in new_labels if l.startswith('abuse-type-')]
old_abuse_labels = [l for l in current_labels_list if l is not None and l.startswith('abuse-type-')]
if new_abuse_labels != old_abuse_labels and new_abuse_labels:
abuse_type_value = new_abuse_labels[0].replace('abuse-type-', '')
significant_changes.append(f"Abuse type updated to: {abuse_type_value}")
if significant_changes:
add_jira_comment(
jira_client,
ticket,
f"Domain status updated: {', '.join(significant_changes)}"
)
return True
except Exception as e:
print(f"[ERROR] Updating labels for Jira ticket {ticket}: {e}")
return False
def update_jira_custom_fields(jira_client, ticket, domain):
"""
Update JIRA ticket custom fields based on domain data.
"""
try:
issue = jira_client.issue(ticket)
update_fields = {}
significant_changes = []
made_changes = False
# Fetch all existing comments once to avoid multiple API calls
all_comments = jira_client.comments(ticket)
comment_bodies = []
for comment in all_comments:
# Safely extract comment body from PropertyHolder (handle nested)
comment_body = comment.body
while hasattr(comment_body, 'value'):
comment_body = comment_body.value
comment_bodies.append(str(comment_body) if comment_body else '')
# Map corporate/franchise values to their NEW CLOUD IDs (updated from discovery)
corp_franchise_map = {
'corporate': '11343', # Updated from '18955' to new Cloud ID
'franchise': '11344', # Updated from '18956' to new Cloud ID
'none': '-1'
}
# Helper function to safely extract ID from PropertyHolder or list
def extract_field_id(field_value):
"""Extract ID from PropertyHolder object or list of PropertyHolder objects"""
if field_value is None:
return '-1'
def extract_id_from_property_holder(obj, depth=0):
"""Recursively extract ID from PropertyHolder with detailed debugging"""
indent = " " * depth
print(f"[DEBUG] {indent}extract_id_from_property_holder: obj type = {type(obj)}")
if obj is None:
return '-1'
# Handle lists (multi-choice fields typically return first ID)
if isinstance(obj, (list, tuple)):
print(f"[DEBUG] {indent}Found list/tuple with {len(obj)} items")
if len(obj) > 0:
first_item_id = extract_id_from_property_holder(obj[0], depth + 1)
return first_item_id
return '-1'
# If it's not a PropertyHolder-like object, try to get ID directly
if not hasattr(obj, '__class__') or 'PropertyHolder' not in str(type(obj)):
# Special handling for CustomFieldOption objects - extract ID properly
if hasattr(obj, 'id'):
result = str(obj.id).strip()
print(f"[DEBUG] {indent}CustomFieldOption ID result: '{result}'")
return result if result else '-1'
else:
result = str(obj).strip()
print(f"[DEBUG] {indent}Non-PropertyHolder ID result: '{result}'")
return result if result else '-1'
# It's a PropertyHolder, examine its attributes for ID
print(f"[DEBUG] {indent}PropertyHolder attributes: {[attr for attr in dir(obj) if not attr.startswith('_')]}")
# Try extracting ID from common attributes in order of preference
for attr_name in ['id', 'key', 'value', 'name', 'content']:
if hasattr(obj, attr_name):
attr_value = getattr(obj, attr_name)
print(f"[DEBUG] {indent}Found {attr_name} = {attr_value} (type: {type(attr_value)})")
if attr_value is not None:
if isinstance(attr_value, (list, tuple)):
# Handle list attributes
if len(attr_value) > 0:
return extract_id_from_property_holder(attr_value[0], depth + 1)
return '-1'
elif 'PropertyHolder' in str(type(attr_value)):
# Recursively extract from nested PropertyHolder
return extract_id_from_property_holder(attr_value, depth + 1)
else:
# It's a plain value, check if it looks like an ID
result = str(attr_value).strip()
if result and not result.startswith('<') and not result.endswith('>'):
return result
# If no suitable attribute found, return -1
print(f"[DEBUG] {indent}No extractable ID found in PropertyHolder")
return '-1'
# Start extraction
result = extract_id_from_property_holder(field_value)
print(f"[DEBUG] Final extracted ID: '{result}'")
return result
# Helper function to safely extract string value from PropertyHolder
def extract_field_string(field_value):
"""Extract string value from PropertyHolder object or ADF format"""
if field_value is None:
return ''
def extract_from_property_holder(obj, depth=0):
"""Recursively extract value from PropertyHolder with detailed debugging"""
indent = " " * depth
print(f"[DEBUG] {indent}extract_from_property_holder: obj type = {type(obj)}")
if obj is None:
return ''
# If it's not a PropertyHolder-like object, return it
if not hasattr(obj, '__class__') or 'PropertyHolder' not in str(type(obj)):
if isinstance(obj, (list, tuple)):
print(f"[DEBUG] {indent}Found list/tuple with {len(obj)} items")
# Handle list of PropertyHolders (multi-choice fields)
results = []
for i, item in enumerate(obj):
print(f"[DEBUG] {indent}Processing list item {i}: {type(item)}")
item_result = extract_from_property_holder(item, depth + 1)
if item_result:
results.append(item_result)
return ', '.join(results) if results else ''
else:
result = str(obj).strip()
print(f"[DEBUG] {indent}Non-PropertyHolder result: '{result}'")
return result
# It's a PropertyHolder, examine its attributes
print(f"[DEBUG] {indent}PropertyHolder attributes: {[attr for attr in dir(obj) if not attr.startswith('_')]}")
# Try extracting from common attributes in order of preference, including ADF 'content'
for attr_name in ['text', 'displayName', 'name', 'value', 'content', 'key', 'id']:
if hasattr(obj, attr_name):
attr_value = getattr(obj, attr_name)
print(f"[DEBUG] {indent}Found {attr_name} = {attr_value} (type: {type(attr_value)})")
# Special handling for ADF content
if attr_name == 'content' and isinstance(attr_value, (list, tuple)):
print(f"[DEBUG] {indent}Processing ADF content with {len(attr_value)} paragraphs")
# Handle ADF content structure
text_parts = []
for paragraph in attr_value:
if isinstance(paragraph, dict) and paragraph.get('type') == 'paragraph':
paragraph_content = paragraph.get('content', [])
for text_node in paragraph_content:
if isinstance(text_node, dict) and text_node.get('type') == 'text':
text_parts.append(text_node.get('text', ''))
elif hasattr(paragraph, '__class__') and 'PropertyHolder' in str(type(paragraph)):
# Recursively extract from PropertyHolder paragraph
para_result = extract_from_property_holder(paragraph, depth + 1)
if para_result:
text_parts.append(para_result)
result = ' '.join(text_parts).strip()
if result:
print(f"[DEBUG] {indent}Extracted ADF content: '{result}'")
return result
# Recursively process the attribute value
if attr_value is not None:
if isinstance(attr_value, (list, tuple)):
# Handle list attributes
results = []
for i, item in enumerate(attr_value):
item_result = extract_from_property_holder(item, depth + 1)
if item_result:
results.append(item_result)
return ', '.join(results) if results else ''
elif 'PropertyHolder' in str(type(attr_value)):
# Recursively extract from nested PropertyHolder
return extract_from_property_holder(attr_value, depth + 1)
else:
# It's a plain value
result = str(attr_value).strip()
if result and not result.startswith('<') and not result.endswith('>'):
return result
# If no suitable attribute found, return empty string
print(f"[DEBUG] {indent}No extractable value found in PropertyHolder")
return ''
# Start extraction
result = extract_from_property_holder(field_value)
# Handle ADF format (Atlassian Document Format) if result is a dict
if isinstance(result, str):
try:
import json
adf_data = json.loads(result)
if isinstance(adf_data, dict) and adf_data.get('type') == 'doc':
print(f"[DEBUG] Found ADF in string, parsing...")
content = adf_data.get('content', [])
text_parts = []
for paragraph in content:
if paragraph.get('type') == 'paragraph':
paragraph_content = paragraph.get('content', [])
for text_node in paragraph_content:
if text_node.get('type') == 'text':
text_parts.append(text_node.get('text', ''))
result = ' '.join(text_parts).strip()
print(f"[DEBUG] Extracted from ADF string: '{result}'")
except:
pass
# Check if we need to handle direct ADF dict
if isinstance(field_value, dict) and field_value.get('type') == 'doc':
print(f"[DEBUG] Processing direct ADF format")
try:
content = field_value.get('content', [])
text_parts = []
for paragraph in content:
if paragraph.get('type') == 'paragraph':
paragraph_content = paragraph.get('content', [])
for text_node in paragraph_content:
if text_node.get('type') == 'text':
text_parts.append(text_node.get('text', ''))
result = ' '.join(text_parts).strip()
print(f"[DEBUG] Extracted from direct ADF: '{result}'")
except Exception as e:
print(f"[DEBUG] ADF parsing failed: {e}")
result = str(field_value)
print(f"[DEBUG] Final extracted result: '{result}'")
return result
# Helper function to create ADF format for text fields
def create_adf_text(text):
"""Create ADF format for plain text fields that require Atlassian Document Format"""
if not text or not text.strip():
return None
adf_result = {
"type": "doc",
"version": 1,
"content": [{
"type": "paragraph",
"content": [{"type": "text", "text": str(text).strip()}]
}]
}
print(f"[DEBUG] create_adf_text: Created ADF for '{text}' = {adf_result}")
return adf_result
# Safely handle corporate/franchise update
corporate_franchise = domain.get('corporate_franchise')
if corporate_franchise is not None:
corp_value = str(corporate_franchise).lower().strip()
corp_franchise_id = corp_franchise_map.get(corp_value, '-1')
current_corp = getattr(issue.fields, 'customfield_10367', None)
current_corp_id = extract_field_id(current_corp)
if corp_franchise_id != '-1' and corp_franchise_id != current_corp_id:
new_corp = [{"id": corp_franchise_id}] # Array with single object
update_fields['customfield_10367'] = new_corp
significant_changes.append(f"Corporate/Franchise: {corporate_franchise}")
made_changes = True
elif corp_franchise_id == '-1' and current_corp is not None:
update_fields['customfield_10367'] = [] # Empty array instead of None
made_changes = True
# Safely handle hosting provider update - CORRECTED FIELD ID from 10366 to 10368
hosting_provider = domain.get('hosting_provider')
if hosting_provider is not None:
current_provider = getattr(issue.fields, 'customfield_10368', '') # CORRECTED: was 10366
current_provider_str = extract_field_string(current_provider)
new_provider = str(hosting_provider).strip() if hosting_provider else ''
# Debug logging for hosting provider comparison
print(f"[DEBUG] Hosting provider comparison for ticket {ticket}:")
print(f"[DEBUG] Current: '{current_provider_str}' (type: {type(current_provider)})")
print(f"[DEBUG] New: '{new_provider}'")
print(f"[DEBUG] Equal: {current_provider_str == new_provider}")
if current_provider_str != new_provider and new_provider:
# Use ADF format for hosting provider field since it expects Atlassian Document Format
adf_provider = create_adf_text(new_provider)
if adf_provider:
update_fields['customfield_10368'] = adf_provider # CORRECTED: was 10366
significant_changes.append(f"Hosting Provider: {new_provider}")
made_changes = True
print(f"[INFO] Hosting provider will be updated from '{current_provider_str}' to '{new_provider}'")
elif current_provider_str and not new_provider: # Only clear if there was a previous value
# Clear with empty ADF document or None
update_fields['customfield_10368'] = None # CORRECTED: was 10366
made_changes = True
print(f"[INFO] Hosting provider will be cleared")
else:
print(f"[DEBUG] No hosting provider update needed")
# Safely handle hosting location update - CORRECTED FIELD ID from 10368 to 10366
hosting_location = domain.get('hosting_location')
if hosting_location is not None:
hosting_location_id = get_hosting_location_id(hosting_location)
current_location = getattr(issue.fields, 'customfield_10366', None) # CORRECTED: was 10368
current_location_id = extract_field_id(current_location)
# Debug logging for hosting location comparison
print(f"[DEBUG] Hosting location comparison for ticket {ticket}:")
print(f"[DEBUG] Current: '{current_location_id}' (type: {type(current_location)})")
print(f"[DEBUG] New: '{hosting_location_id}'")
print(f"[DEBUG] Equal: {hosting_location_id == current_location_id}")
if hosting_location_id != '-1' and hosting_location_id != current_location_id:
new_location = [{"id": hosting_location_id}] # Array with single object
update_fields['customfield_10366'] = new_location # CORRECTED: was 10368
location_name = COUNTRY_CODE_TO_NAME.get(hosting_location.lower(), hosting_location)
significant_changes.append(f"Hosting Location: {location_name}")
made_changes = True
print(f"[INFO] Hosting location will be updated from '{current_location_id}' to '{hosting_location_id}'")
elif hosting_location_id == '-1' and current_location is not None:
update_fields['customfield_10366'] = [] # CORRECTED: was 10368, Empty array instead of None
made_changes = True
print(f"[INFO] Hosting location will be cleared")
else:
print(f"[DEBUG] No hosting location update needed")
# Safely handle target country update
target_country = domain.get('target_country')
if target_country is not None:
target_country_id = get_target_country_id(target_country)
current_country = getattr(issue.fields, 'customfield_10107', None)
current_country_id = extract_field_id(current_country)
# Debug logging for target country comparison
print(f"[DEBUG] Target country comparison for ticket {ticket}:")
print(f"[DEBUG] Current: '{current_country_id}' (type: {type(current_country)})")
print(f"[DEBUG] New: '{target_country_id}'")
print(f"[DEBUG] Equal: {target_country_id == current_country_id}")
if target_country_id != '-1' and target_country_id != current_country_id:
new_country = [{"id": target_country_id}] # Array with single object
update_fields['customfield_10107'] = new_country
country_name = COUNTRY_CODE_TO_NAME.get(target_country.lower(), target_country)
significant_changes.append(f"Target Country: {country_name}")
made_changes = True
print(f"[INFO] Target country will be updated from '{current_country_id}' to '{target_country_id}'")
elif target_country_id == '-1' and current_country is not None:
update_fields['customfield_10107'] = [] # Empty array instead of None
made_changes = True
print(f"[INFO] Target country will be cleared")
else:
print(f"[DEBUG] No target country update needed")
# Handle notes in description - only update if notes have actually changed
notes = domain.get('notes')
notes_updated = False
if notes and notes.strip():
# Get current notes from description
current_notes = extract_notes_from_description(jira_client, ticket)
new_notes = notes.strip()
print(f"[DEBUG] Notes comparison for ticket {ticket}:")
print(f"[DEBUG] Current notes: '{current_notes}'")
print(f"[DEBUG] New notes: '{new_notes}'")
print(f"[DEBUG] Equal: {current_notes == new_notes}")
# Only update if notes have changed
if current_notes != new_notes:
if update_description_with_notes(jira_client, ticket, new_notes):
# Add a comment about the notes update
add_jira_comment(jira_client, ticket, f"Notes updated in description:\n{new_notes}")
notes_updated = True
print(f"[INFO] Updated notes in description for ticket {ticket}")
else:
print(f"[ERROR] Failed to update notes in description for ticket {ticket}")
else:
print(f"[DEBUG] Notes unchanged for ticket {ticket}, no update needed")
elif notes is not None and not notes.strip():
# Notes field exists but is empty - check if we need to clear existing notes
current_notes = extract_notes_from_description(jira_client, ticket)
if current_notes:
if update_description_with_notes(jira_client, ticket, ''):
add_jira_comment(jira_client, ticket, "Notes cleared from description")
notes_updated = True
print(f"[INFO] Cleared notes from description for ticket {ticket}")
else:
print(f"[ERROR] Failed to clear notes from description for ticket {ticket}")
else:
print(f"[DEBUG] No notes to clear for ticket {ticket}")
else:
print(f"[DEBUG] No notes provided for ticket {ticket}")
# Only update if there are actual field changes
if update_fields:
issue.update(fields=update_fields)
print(f"[INFO] Updated custom fields for Jira ticket {ticket}: {update_fields}")
# Only add a domain information comment if there were significant changes
if significant_changes:
domain_info_comment = "Updated domain information:\n" + "\n".join(significant_changes)
# Check if a similar domain info comment already exists
similar_comment_exists = False
for comment_body in comment_bodies:
# Check for exact match of the domain information comment
if domain_info_comment.strip() == comment_body.strip():
similar_comment_exists = True
print(f"[DEBUG] Exact matching domain info comment found, skipping duplicate")
break
# Also check if this is a very recent domain information comment with the same changes
if "Updated domain information:" in comment_body:
# Check if all the same changes are in this comment
all_changes_match = True
for change in significant_changes:
if change not in comment_body:
all_changes_match = False
break
# If all changes match, check if any different changes exist
if all_changes_match:
# Split the comment into lines to check for extra changes
comment_lines = [line.strip() for line in comment_body.split('\n') if line.strip()]
expected_lines = [line.strip() for line in domain_info_comment.split('\n') if line.strip()]
# If the number of lines matches and all expected lines are present, it's a duplicate
if len(comment_lines) == len(expected_lines):
similar_comment_exists = True
print(f"[DEBUG] Similar domain info comment found with matching changes, skipping duplicate")
break
if not similar_comment_exists:
add_jira_comment(jira_client, ticket, domain_info_comment)
print(f"[INFO] Added domain information comment to ticket {ticket}")
else:
print(f"[INFO] Similar domain information comment already exists in ticket {ticket}, skipping")
return True
except Exception as e:
print(f"[ERROR] Updating custom fields for Jira ticket {ticket}: {e}")
return False
def comment_exists(jira_client, ticket, text_pattern):
"""
Check if a comment containing the given text pattern already exists on the ticket.
Enhanced version with better pattern matching.
"""
try:
# Fetch all comments on the ticket
comments = jira_client.comments(ticket)
print(f"[DEBUG] Checking {len(comments)} comments for pattern: '{text_pattern[:50]}{'...' if len(text_pattern) > 50 else ''}' in ticket {ticket}")
# Pre-normalize the search pattern once
pattern_normalized = normalize_content_for_comparison(text_pattern)
# Helper that converts any JIRA comment body (plain-text, ADF dict or PropertyHolder) to plain text
def _get_comment_plain_text(body):
# Unwrap nested PropertyHolder.value attributes first
while hasattr(body, 'value'):
body = body.value
# Direct ADF dict
if isinstance(body, dict) and body.get('type') == 'doc':
return extract_text_from_adf(body)
# PropertyHolder or other complex objects – try generic extractor
if hasattr(body, '__class__') and 'PropertyHolder' in str(type(body)):
extracted = extract_field_string_from_propertyholder(body)
if isinstance(extracted, dict) and extracted.get('type') == 'doc':
return extract_text_from_adf(extracted)
return str(extracted)
# Fallback – treat as string
return str(body)
# Check each comment
for comment in comments:
raw_body = comment.body
plain_text = _get_comment_plain_text(raw_body)
comment_normalized = normalize_content_for_comparison(plain_text)
if pattern_normalized in comment_normalized:
print(f"[DEBUG] Found matching comment pattern in ticket {ticket}")
return True
print(f"[DEBUG] No matching comment pattern found in ticket {ticket}")
return False
except Exception as e:
print(f"[ERROR] Checking for comment pattern on ticket {ticket}: {e}")
# Default to False to allow adding the comment if there's an error
return False
def extract_takedown_from_description(jira_client, ticket):
"""
Extract the current takedown information from the ticket description.
Enhanced version with better parsing.
"""
try:
issue = jira_client.issue(ticket)
description = issue.fields.description
if not description:
print(f"[DEBUG] No description found for ticket {ticket}")
return ''
# Convert description to searchable text
description_text = ''
# Handle PropertyHolder objects
if hasattr(description, '__class__') and 'PropertyHolder' in str(type(description)):
# Try to extract text content from PropertyHolder
extracted = extract_field_string_from_propertyholder(description)
if isinstance(extracted, dict) and extracted.get('type') == 'doc':
# It's ADF format, extract text from it
description_text = extract_text_from_adf(extracted)
else:
description_text = str(extracted)
elif isinstance(description, dict) and description.get('type') == 'doc':
# Direct ADF format
description_text = extract_text_from_adf(description)
else:
# Plain text
description_text = str(description)
print(f"[DEBUG] Extracted description text for takedown search (length: {len(description_text)})")
# Look for takedown patterns - be more flexible
takedown_patterns = [
'takedown submitted at',
'takedown reported at',
'takedown requested at'
]
lines = description_text.split('\n')
for line in lines:
line_lower = line.lower().strip()
for pattern in takedown_patterns:
if pattern in line_lower:
print(f"[DEBUG] Found takedown info in description: '{line.strip()}'")
return line.strip()
print(f"[DEBUG] No takedown info found in description for ticket {ticket}")
return ''
except Exception as e:
print(f"[ERROR] Failed to extract takedown from description for ticket {ticket}: {e}")
return ''
def extract_text_from_adf(adf_doc):
"""
Extract plain text from ADF (Atlassian Document Format) document.
"""
try:
# Flatten text recursively so we also capture list items, bullet points and hardBreak nodes
def _flatten(node):
collected = []
ntype = node.get('type') if isinstance(node, dict) else None
if not ntype:
return collected
# Text node
if ntype == 'text':
collected.append(node.get('text', ''))
# Hard line break – treat as newline so bullets stay on separate lines
elif ntype == 'hardBreak':
collected.append('\n')
# Container nodes that may hold other content
else:
for child in node.get('content', []) or []:
collected.extend(_flatten(child))
# For listItem we force newline after each item
if ntype == 'listItem':
collected.append('\n')
return collected
flattened_chars = []
for blk in adf_doc.get('content', []) or []:
flattened_chars.extend(_flatten(blk))
# add newline between top-level blocks to preserve separation
flattened_chars.append('\n')
# Join and normalise multiple consecutive newlines
result = ''.join(flattened_chars)
import re
result = re.sub(r'\n{2,}', '\n', result).strip('\n')
print(f"[DEBUG] Extracted {len(result)} characters from ADF document")
return result
except Exception as e:
print(f"[ERROR] Failed to extract text from ADF: {e}")
return ''
def poll_domains(domain_statuses):
"""Run the main polling loop in a background thread"""
while True:
current_time = datetime.datetime.utcnow().isoformat()
print(f"[INFO] Polling domains at {current_time} UTC...")
domains = get_domains()
print(f"[DEBUG] Retrieved {len(domains)} domains from {SERVER_URL}.")
for domain in domains:
domain_id = domain['id']
current_status = domain['status']
# Skip processing if domain status is 'scanning'
if current_status.lower() == "scanning":
print(f"[DEBUG] Domain ID {domain_id} is scanning. Skipping domain.")
continue
try:
jira_ticket = domain.get('jira_ticket')
except Exception as e:
print(f"[ERROR] Failed to get Jira ticket for domain ID {domain_id}: {e}")
jira_ticket = None
if not jira_ticket:
print(f"[INFO] No Jira ticket for domain ID {domain_id}. Creating one...")
ticket = create_jira_ticket(jira_client, domain)
if ticket:
if update_jira_ticket(domain_id, ticket):
jira_ticket = ticket
if jira_ticket:
print(f"[DEBUG] Processing ticket {jira_ticket} for domain ID {domain_id}")
# Update labels and custom fields (these no longer trigger state transitions)
update_jira_labels(jira_client, jira_ticket, domain)
update_jira_custom_fields(jira_client, jira_ticket, domain)
# Handle content updates (notes and takedown) - only if actually changed
check_and_handle_notes_updates(jira_client, jira_ticket, domain)
check_and_handle_takedown_updates(jira_client, jira_ticket, domain)
# CENTRALIZED STATE MANAGEMENT
# Determine what state the ticket should be in
target_state = determine_correct_ticket_state(domain, jira_client, jira_ticket)
current_state = get_current_ticket_state(jira_client, jira_ticket)
# Only transition if needed
if target_state != current_state.lower().replace(' ', '_'):
print(f"[INFO] Ticket {jira_ticket} needs state change: {current_state} → {target_state}")
# Handle special case: domain went down, need to add worklog for takedown resolution
if (target_state == "done" and
domain.get("report_abuse") and domain.get("report_abuse_timestamp")):
# Add worklog for takedown resolution time before closing
try:
abuse_timestamp = domain.get("report_abuse_timestamp")
current_datetime = datetime.datetime.utcnow()
# Parse takedown timestamp
takedown_time = None
if isinstance(abuse_timestamp, str):
if 'T' in abuse_timestamp:
try:
takedown_time = datetime.datetime.fromisoformat(abuse_timestamp.replace('Z', '+00:00'))
except ValueError:
try:
takedown_time = datetime.datetime.strptime(abuse_timestamp, "%Y-%m-%d %H:%M:%S")
except ValueError:
print(f"[ERROR] Could not parse takedown timestamp: {abuse_timestamp}")
else:
try:
takedown_time = datetime.datetime.strptime(abuse_timestamp, "%Y-%m-%d %H:%M:%S")
except ValueError:
print(f"[ERROR] Could not parse takedown timestamp: {abuse_timestamp}")
if takedown_time:
# Check if worklog already exists
existing_worklogs = jira_client.worklogs(jira_ticket)
worklog_exists = False
for worklog in existing_worklogs:
worklog_comment_body = worklog.comment if hasattr(worklog, 'comment') else ''
if isinstance(worklog_comment_body, str) and "Time from takedown request to resolution" in worklog_comment_body:
worklog_exists = True
break
if not worklog_exists:
_, time_spent = calculate_time_diff(takedown_time, current_datetime)
worklog_comment = f"Time from takedown request to resolution: {time_spent}"
add_jira_worklog(jira_client, jira_ticket, time_spent, worklog_comment)
print(f"[INFO] Added takedown resolution worklog to ticket {jira_ticket}")
else:
print(f"[DEBUG] Takedown resolution worklog already exists for ticket {jira_ticket}")
except Exception as e:
print(f"[ERROR] Failed to add takedown resolution worklog: {e}")
# Transition to the correct state
transition_ticket_to_correct_state(jira_client, jira_ticket, target_state, current_state)
else:
print(f"[DEBUG] Ticket {jira_ticket} already in correct state: {current_state}")
# Update domain status tracking
domain_statuses[domain_id] = current_status
time.sleep(POLL_INTERVAL)
def main():
# Log basic configuration info at startup (omit sensitive details)
print("[INFO] Starting Jira Manager.")
print(f"[INFO] Configuration: JIRA_SERVER={JIRA_SERVER}, JIRA_PROJECT_KEY={JIRA_PROJECT_KEY}")
print(f"[INFO] SERVER_URL={SERVER_URL}, POLL_INTERVAL={POLL_INTERVAL} seconds.")
# Initialize domain status tracking dictionary
domain_statuses = {}
# Connect to Jira using personal access token (PAT) with Bearer authentication
global jira_client
try:
# Initialize JIRA client with explicit v3 API configuration
jira_options = {
'server': JIRA_SERVER,
'rest_api_version': '3', # Force v3 API
'rest_path': 'api',
'verify': True
}
jira_client = JIRA(options=jira_options)
# For JIRA Cloud, use email + API token as basic auth instead of Bearer token
# The error suggests Connect Session Auth Token parsing failed, let's try basic auth
if 'atlassian.net' in JIRA_SERVER:
print("[INFO] Detected JIRA Cloud - using email + API token authentication")
# For JIRA Cloud, you need email:api_token as basic auth
# The API token should be used with your email as username
jira_client._session.auth = (JIRA_EMAIL, JIRA_API_TOKEN)
jira_client._session.headers.update({
'Accept': 'application/json',
'Content-Type': 'application/json'
})
print(f"[INFO] Using email: {JIRA_EMAIL} for authentication")
else:
print("[INFO] Using Bearer token authentication for on-premise JIRA")
# Use Bearer token authentication for on-premise
jira_client._session.headers.update({
"Authorization": f"Bearer {JIRA_API_TOKEN}",
})
# Add the required header for attachment uploads.
jira_client._session.headers["X-Atlassian-Token"] = "no-check"
print(f"[INFO] Successfully connected to Jira using {'basic auth' if 'atlassian.net' in JIRA_SERVER else 'PAT'}")
# Verify v3 API capabilities
if verify_jira_v3_api(jira_client):
print("[INFO] JIRA v3 API verification successful.")
else:
print("[WARN] JIRA v3 API verification failed - some features may not work properly.")
except Exception as e:
print(f"[ERROR] Connecting to Jira: {e}")
return
print("[INFO] Jira Manager started. Polling for domain changes...")
# Start the polling in a background thread
import threading
polling_thread = threading.Thread(target=poll_domains, args=(domain_statuses,), daemon=True)
polling_thread.start()
# Start the Flask app for direct API calls
app.run(host='0.0.0.0', port=8080)
# API endpoint to directly update a JIRA ticket's labels based on domain data
@app.route('/update-labels', methods=['POST'])
def api_update_labels():
global jira_client
if not jira_client:
return jsonify({"error": "JIRA client not initialized"}), 500
try:
data = request.json
if not data or 'domain' not in data:
return jsonify({"error": "Missing domain data"}), 400
domain = data['domain']
jira_ticket = domain.get('jira_ticket')
if not jira_ticket:
return jsonify({"error": "No JIRA ticket associated with this domain"}), 400
# Update the JIRA ticket labels
result = update_jira_labels(jira_client, jira_ticket, domain)
if result:
return jsonify({"success": True, "message": "JIRA labels updated"})
else:
return jsonify({"error": "Failed to update JIRA labels"}), 500
except Exception as e:
return jsonify({"error": f"Exception: {str(e)}"}), 500
@app.route('/update-custom-fields', methods=['POST'])
def api_update_custom_fields():
global jira_client
if not jira_client:
return jsonify({"error": "JIRA client not initialized"}), 500
try:
data = request.json
if not data or 'domain' not in data:
return jsonify({"error": "Missing domain data"}), 400
domain = data['domain']
jira_ticket = domain.get('jira_ticket')
if not jira_ticket:
return jsonify({"error": "No JIRA ticket associated with this domain"}), 400
# Update both labels and custom fields
labels_result = update_jira_labels(jira_client, jira_ticket, domain)
fields_result = update_jira_custom_fields(jira_client, jira_ticket, domain)
if labels_result and fields_result:
return jsonify({"success": True, "message": "JIRA ticket updated"})
else:
return jsonify({"error": "Failed to update JIRA ticket"}), 500
except Exception as e:
return jsonify({"error": f"Exception: {str(e)}"}), 500
def create_adf_description(domain):
"""
Create an ADF (Atlassian Document Format) description for better formatting in JIRA v3.
Falls back to plain text if ADF formatting fails or is disabled.
"""
# Check if ADF is enabled in configuration
if not JIRA_USE_ADF:
return None
try:
# Build structured content using ADF format
content = []
# Main heading
content.append({
"type": "heading",
"attrs": {"level": 3},
"content": [{"type": "text", "text": "Domain Abuse Details"}]
})
# Basic info table
table_rows = [
["URL", domain['url']],
["Status", domain['status']],
["Page Title", domain.get('page_title', 'N/A')],
["Last Checked", domain.get('last_checked', 'N/A')]
]
# Add additional fields if available
if domain.get("hosting_provider"):
table_rows.append(["Hosting Provider", domain.get('hosting_provider')])
if domain.get("target_country"):
country_name = COUNTRY_CODE_TO_NAME.get(domain["target_country"].lower(), domain["target_country"])
table_rows.append(["Target Country", country_name])
if domain.get("hosting_location"):
location_name = COUNTRY_CODE_TO_NAME.get(domain["hosting_location"].lower(), domain["hosting_location"])
table_rows.append(["Hosting Location", location_name])
if domain.get("abuse_type"):
table_rows.append(["Abuse Type", domain.get('abuse_type')])
# Create table structure
table_content = {
"type": "table",
"attrs": {"isNumberColumnEnabled": False, "layout": "default"},
"content": []
}
for row_data in table_rows:
row = {
"type": "tableRow",
"content": [
{
"type": "tableHeader",
"content": [{"type": "paragraph", "content": [{"type": "text", "text": row_data[0]}]}]
},
{
"type": "tableCell",
"content": [{"type": "paragraph", "content": [{"type": "text", "text": str(row_data[1])}]}]
}
]
}
table_content["content"].append(row)
content.append(table_content)
# Add note about preview image
content.append({
"type": "paragraph",
"content": [{"type": "text", "text": "Please refer to the attached preview image for more details."}]
})
# Add takedown info if present (above Notes section)
if domain.get("report_abuse") and domain.get("report_abuse_timestamp"):
content.append({
"type": "heading",
"attrs": {"level": 3},
"content": [{"type": "text", "text": "Takedown Reported"}]
})
content.append({
"type": "paragraph",
"content": [{"type": "text", "text": f"Takedown submitted at {domain.get('report_abuse_timestamp')}."}]
})
# Add Notes section if notes exist
notes = domain.get('notes')
if notes and notes.strip():
content.append({
"type": "heading",
"attrs": {"level": 3},
"content": [{"type": "text", "text": "Notes"}]
})
content.append({
"type": "paragraph",
"content": [{"type": "text", "text": notes.strip()}]
})
# Return ADF structure
return {
"type": "doc",
"version": 1,
"content": content
}
except Exception as e:
print(f"[WARN] Failed to create ADF description: {e}. Falling back to plain text.")
return None
def create_adf_comment(text):
"""
Create an ADF formatted comment for JIRA v3.
Falls back to plain text if ADF formatting fails or is disabled.
"""
# Check if ADF is enabled in configuration
if not JIRA_USE_ADF:
return None
try:
return {
"type": "doc",
"version": 1,
"content": [{
"type": "paragraph",
"content": [{"type": "text", "text": text}]
}]
}
except Exception:
return None
def verify_jira_v3_api(jira_client):
"""
Verify JIRA v3 API connectivity and capabilities.
This helps ensure the client is properly configured for v3 API features.
"""
try:
# Display configuration status
print(f"[INFO] JIRA API Configuration:")
print(f"[INFO] - Server: {JIRA_SERVER}")
print(f"[INFO] - Email: {JIRA_EMAIL}")
print(f"[INFO] - API Version: {JIRA_API_VERSION}")
print(f"[INFO] - ADF Format: {'Enabled' if JIRA_USE_ADF else 'Disabled'}")
# Check if email is properly configured for JIRA Cloud
if 'atlassian.net' in JIRA_SERVER and JIRA_EMAIL == 'your-email@sixt.com':
print(f"[WARN] Please update JIRA_EMAIL in config.py or set JIRA_EMAIL environment variable")
print(f"[WARN] Current email placeholder: {JIRA_EMAIL}")
# Test basic connectivity
server_info = jira_client.server_info()
print(f"[INFO] Connected to JIRA server: {server_info.get('serverTitle', 'Unknown')}")
print(f"[INFO] JIRA version: {server_info.get('version', 'Unknown')}")
# Test if we can access the configured project
try:
project = jira_client.project(JIRA_PROJECT_KEY)
print(f"[INFO] Successfully accessed project: {project.name} ({project.key})")
except Exception as e:
print(f"[WARN] Could not access project {JIRA_PROJECT_KEY}: {e}")
# Test if we can list issue types to verify permissions
try:
issue_types = jira_client.issue_types()
available_types = [it.name for it in issue_types]
if JIRA_ISSUE_TYPE in available_types:
print(f"[INFO] Issue type '{JIRA_ISSUE_TYPE}' is available")
else:
print(f"[WARN] Issue type '{JIRA_ISSUE_TYPE}' not found. Available: {available_types}")
except Exception as e:
print(f"[WARN] Could not retrieve issue types: {e}")
# Verify v3 API specific features
try:
# Test if server supports ADF (indicates v3 API support)
# This is a simple test - if we can create an issue with ADF content, v3 is supported
print(f"[INFO] JIRA Cloud v3 API features should be available")
if JIRA_USE_ADF:
print(f"[INFO] ADF formatting enabled for enhanced content display")
else:
print(f"[INFO] ADF formatting disabled - using plain text format")
return True
except Exception as e:
print(f"[WARN] V3 API features may not be fully available: {e}")
return False
except Exception as e:
print(f"[ERROR] Failed to verify JIRA API connectivity: {e}")
return False
def test_jira_authentication():
"""
Test JIRA authentication with detailed debugging information.
This helps identify authentication issues before running the main application.
"""
print("[INFO] Testing JIRA authentication...")
print(f"[INFO] Server: {JIRA_SERVER}")
print(f"[INFO] Email: {JIRA_EMAIL}")
print(f"[INFO] API Token: {JIRA_API_TOKEN[:10]}...{JIRA_API_TOKEN[-4:] if len(JIRA_API_TOKEN) > 14 else '[SHORT_TOKEN]'}")
try:
# Test with direct requests first
import requests
from base64 import b64encode
# Create basic auth header
credentials = f"{JIRA_EMAIL}:{JIRA_API_TOKEN}"
encoded_credentials = b64encode(credentials.encode()).decode()
headers = {
'Authorization': f'Basic {encoded_credentials}',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
# Test 1: Get server info
print("\n[TEST 1] Testing server connection...")
response = requests.get(f"{JIRA_SERVER}/rest/api/3/serverInfo", headers=headers)
print(f"[TEST 1] Status Code: {response.status_code}")
if response.status_code == 200:
server_info = response.json()
print(f"[TEST 1] ✅ Server: {server_info.get('serverTitle', 'Unknown')}")
print(f"[TEST 1] ✅ Version: {server_info.get('version', 'Unknown')}")
else:
print(f"[TEST 1] ❌ Error: {response.text}")
return False
# Test 2: Get current user
print("\n[TEST 2] Testing user authentication...")
response = requests.get(f"{JIRA_SERVER}/rest/api/3/myself", headers=headers)
print(f"[TEST 2] Status Code: {response.status_code}")
if response.status_code == 200:
user_info = response.json()
print(f"[TEST 2] ✅ User: {user_info.get('displayName', 'Unknown')}")
print(f"[TEST 2] ✅ Email: {user_info.get('emailAddress', 'Unknown')}")
else:
print(f"[TEST 2] ❌ Error: {response.text}")
print(f"[TEST 2] ❌ This suggests invalid credentials or insufficient permissions")
return False
# Test 3: Test project access
print(f"\n[TEST 3] Testing project access for {JIRA_PROJECT_KEY}...")
response = requests.get(f"{JIRA_SERVER}/rest/api/3/project/{JIRA_PROJECT_KEY}", headers=headers)
print(f"[TEST 3] Status Code: {response.status_code}")
if response.status_code == 200:
project_info = response.json()
print(f"[TEST 3] ✅ Project: {project_info.get('name', 'Unknown')}")
print(f"[TEST 3] ✅ Key: {project_info.get('key', 'Unknown')}")
else:
print(f"[TEST 3] ❌ Error: {response.text}")
print(f"[TEST 3] ❌ No access to project {JIRA_PROJECT_KEY}")
return False
# Test 4: Test issue access (try to get a recent issue)
print(f"\n[TEST 4] Testing issue search in project {JIRA_PROJECT_KEY}...")
jql = f"project = {JIRA_PROJECT_KEY} ORDER BY created DESC"
response = requests.get(
f"{JIRA_SERVER}/rest/api/3/search",
headers=headers,
params={'jql': jql, 'maxResults': 1}
)
print(f"[TEST 4] Status Code: {response.status_code}")
if response.status_code == 200:
search_results = response.json()
total = search_results.get('total', 0)
print(f"[TEST 4] ✅ Found {total} issues in project")
if total > 0 and search_results.get('issues'):
recent_issue = search_results['issues'][0]
print(f"[TEST 4] ✅ Recent issue: {recent_issue.get('key', 'Unknown')}")
else:
print(f"[TEST 4] ❌ Error: {response.text}")
return False
print("\n[SUCCESS] All authentication tests passed! ✅")
return True
except Exception as e:
print(f"\n[ERROR] Authentication test failed: {e}")
return False
def extract_notes_from_description(jira_client, ticket):
"""
Extract the current notes from the ticket description.
Returns the notes content or empty string if no notes section found.
"""
try:
issue = jira_client.issue(ticket)
description = issue.fields.description
if not description:
return ''
# Handle PropertyHolder objects first
actual_description = description
if hasattr(description, '__class__') and 'PropertyHolder' in str(type(description)):
print(f"[DEBUG] Description is PropertyHolder, checking for ADF...")
# Check if this is an ADF PropertyHolder (has type, version, content)
if (hasattr(description, 'type') and hasattr(description, 'version') and
hasattr(description, 'content')):
doc_type = getattr(description, 'type', None)
print(f"[DEBUG] PropertyHolder doc type: {doc_type}")
if doc_type == 'doc':
print(f"[DEBUG] PropertyHolder is ADF document, reconstructing for notes extraction...")
reconstructed_adf = reconstruct_adf_from_propertyholder(description)
if reconstructed_adf:
actual_description = reconstructed_adf
print(f"[DEBUG] Successfully reconstructed ADF for notes extraction")
else:
print(f"[DEBUG] Failed to reconstruct ADF, using text extraction")
actual_description = extract_field_string_from_propertyholder(description)
else:
print(f"[DEBUG] PropertyHolder not ADF document, using text extraction")
actual_description = extract_field_string_from_propertyholder(description)
else:
print(f"[DEBUG] PropertyHolder missing ADF attributes, using text extraction")
# Use our existing PropertyHolder extraction logic
actual_description = extract_field_string_from_propertyholder(description)
print(f"[DEBUG] Extracted description type: {type(actual_description)}")
# If extraction returned a string that looks like JSON (ADF), try to parse it
if isinstance(actual_description, str) and actual_description.strip().startswith('{'):
try:
import json
actual_description = json.loads(actual_description)
print(f"[DEBUG] Parsed ADF from PropertyHolder string")
except:
print(f"[DEBUG] Failed to parse as JSON, treating as plain text")
# Handle both ADF and plain text descriptions
description_text = ''
if isinstance(actual_description, dict) and actual_description.get('type') == 'doc':
print(f"[DEBUG] Processing ADF description for notes extraction")
# ADF format - extract text content
content = actual_description.get('content', [])
text_parts = []
for block in content:
if block.get('type') == 'paragraph':
paragraph_content = block.get('content', [])
for text_node in paragraph_content:
if text_node.get('type') == 'text':
text_parts.append(text_node.get('text', ''))
elif block.get('type') == 'heading':
heading_content = block.get('content', [])
for text_node in heading_content:
if text_node.get('type') == 'text':
text_parts.append(text_node.get('text', ''))
description_text = '\n'.join(text_parts)
else:
print(f"[DEBUG] Processing plain text description for notes extraction")
# Plain text or PropertyHolder - convert to string
description_text = str(actual_description)
# Look for Notes section
lines = description_text.split('\n')
notes_section = False
notes_lines = []
for line in lines:
line = line.strip()
if line.lower().startswith('notes:') or line.lower() == 'notes':
notes_section = True
continue
elif notes_section:
# Stop if we hit another section or empty line after notes
if line.startswith('###') or line.startswith('##') or (not line and notes_lines):
break
if line: # Only add non-empty lines
notes_lines.append(line)
result = '\n'.join(notes_lines).strip()
print(f"[DEBUG] Extracted notes from description for ticket {ticket}: '{result}'")
return result
except Exception as e:
print(f"[ERROR] Failed to extract notes from description for ticket {ticket}: {e}")
return ''
def extract_field_string_from_propertyholder(field_value):
"""Helper function to extract string content from PropertyHolder, including ADF content"""
if field_value is None:
return ''
# First, try to reconstruct ADF if this looks like an ADF PropertyHolder
# Be more aggressive about detecting ADF - if it has type, version, and content, it's likely ADF
if (hasattr(field_value, 'type') and hasattr(field_value, 'version') and
hasattr(field_value, 'content')):
print(f"[DEBUG] Detected potential ADF PropertyHolder (has type, version, content), attempting reconstruction")
adf_doc = reconstruct_adf_from_propertyholder(field_value)
if adf_doc:
print(f"[DEBUG] Successfully reconstructed ADF document")
return adf_doc
else:
print(f"[DEBUG] ADF reconstruction failed, but PropertyHolder has ADF structure - creating fallback ADF")
# If reconstruction failed but this clearly has ADF structure, create a minimal ADF
try:
doc_type = getattr(field_value, 'type', None)
doc_version = getattr(field_value, 'version', None)
doc_content = getattr(field_value, 'content', None)
fallback_adf = {
"type": "doc",
"version": int(doc_version) if doc_version else 1,
"content": [{
"type": "paragraph",
"content": [{"type": "text", "text": "[Content could not be extracted from PropertyHolder]"}]
}]
}
if validate_adf_document(fallback_adf):
print(f"[DEBUG] Created fallback ADF document")
return fallback_adf
else:
print(f"[DEBUG] Fallback ADF validation failed")
except Exception as e:
print(f"[DEBUG] Error creating fallback ADF: {e}")
def extract_recursive(obj, depth=0):
indent = " " * depth
print(f"[DEBUG] {indent}extract_recursive: obj type = {type(obj)}")
if obj is None:
return ''
# If it's not a PropertyHolder-like object, return it as-is
if not hasattr(obj, '__class__') or 'PropertyHolder' not in str(type(obj)):
# Return dicts and lists as-is to preserve ADF structure
if isinstance(obj, (dict, list)):
return obj
return str(obj)
# It's a PropertyHolder, examine its attributes
attrs = [attr for attr in dir(obj) if not attr.startswith('_') and not callable(getattr(obj, attr, None))]
print(f"[DEBUG] {indent}PropertyHolder attributes: {attrs}")
# For ADF content, prioritize 'value' which might contain the full ADF structure
# Then try other attributes
for attr_name in ['value', 'text', 'displayName', 'name', 'content', 'key', 'id']:
if hasattr(obj, attr_name):
try:
attr_value = getattr(obj, attr_name)
print(f"[DEBUG] {indent}Found {attr_name} = {type(attr_value)}")
if attr_value is not None:
if isinstance(attr_value, dict):
# If it's a dict, check if it's ADF format
if attr_value.get('type') == 'doc':
print(f"[DEBUG] {indent}Found ADF document in {attr_name}")
return attr_value # Return ADF as-is
else:
print(f"[DEBUG] {indent}Found dict in {attr_name} but not ADF")
return attr_value # Return dict as-is
elif isinstance(attr_value, str):
# Check if string contains JSON (ADF)
if attr_value.strip().startswith('{') and 'type' in attr_value and 'doc' in attr_value:
try:
import json
parsed = json.loads(attr_value)
if parsed.get('type') == 'doc':
print(f"[DEBUG] {indent}Found ADF JSON string in {attr_name}")
return parsed # Return parsed ADF
except:
pass
print(f"[DEBUG] {indent}Found string in {attr_name}")
return attr_value
elif isinstance(attr_value, (list, tuple)):
# Handle list attributes - could be ADF content
if attr_name == 'content':
print(f"[DEBUG] {indent}Found content list in {attr_name}")
# For ADF content, return the whole structure
return attr_value
else:
# For other lists, try to extract text
results = []
for item in attr_value:
item_result = extract_recursive(item, depth + 1)
if item_result:
if isinstance(item_result, str):
results.append(item_result)
else:
# If it's not a string, it might be ADF structure
return attr_value
return ', '.join(results) if results else attr_value
elif 'PropertyHolder' in str(type(attr_value)):
# Recursively extract from nested PropertyHolder
print(f"[DEBUG] {indent}Found nested PropertyHolder in {attr_name}")
return extract_recursive(attr_value, depth + 1)
else:
print(f"[DEBUG] {indent}Found other type in {attr_name}: {type(attr_value)}")
return str(attr_value)
except Exception as e:
print(f"[DEBUG] {indent}Error accessing {attr_name}: {e}")
continue
print(f"[DEBUG] {indent}No extractable content found in PropertyHolder")
return ''
result = extract_recursive(field_value)
print(f"[DEBUG] Final extracted from PropertyHolder: type={type(result)}")
if isinstance(result, dict) and result.get('type') == 'doc':
print(f"[DEBUG] Extracted ADF document successfully")
elif isinstance(result, str) and len(result) > 100:
print(f"[DEBUG] Extracted long string: {result[:100]}...")
else:
print(f"[DEBUG] Extracted: {result}")
return result
def update_description_with_notes(jira_client, ticket, new_notes):
"""
Update the ticket description with new notes, preserving the existing content.
"""
try:
issue = jira_client.issue(ticket)
current_description = issue.fields.description
if not current_description:
print(f"[WARN] No existing description found for ticket {ticket}")
return False
print(f"[DEBUG] Current description type: {type(current_description)}")
# CRITICAL SAFETY CHECK: Any PropertyHolder with type, version, content is ADF
if (hasattr(current_description, '__class__') and 'PropertyHolder' in str(type(current_description)) and
hasattr(current_description, 'type') and hasattr(current_description, 'version') and
hasattr(current_description, 'content')):
print(f"[DEBUG] SAFETY CHECK: PropertyHolder with ADF attributes detected - FORCING ADF format")
# Force ADF reconstruction
reconstructed_adf = reconstruct_adf_from_propertyholder(current_description)
if reconstructed_adf and validate_adf_document(reconstructed_adf):
print(f"[DEBUG] SAFETY CHECK: ADF reconstruction successful")
actual_description = reconstructed_adf
is_adf_format = True
# Update with ADF format
updated_description = update_adf_description_notes(actual_description, new_notes)
if updated_description and validate_adf_document(updated_description):
print(f"[DEBUG] SAFETY CHECK: Attempting ADF update with type: {type(updated_description)}")
issue.update(fields={"description": updated_description})
print(f"[INFO] SAFETY CHECK: Successfully updated ADF description with notes for ticket {ticket}")
return True
else:
print(f"[ERROR] SAFETY CHECK: Failed to create valid ADF update")
else:
print(f"[DEBUG] SAFETY CHECK: ADF reconstruction failed, creating minimal ADF")
# Create a minimal valid ADF document
minimal_adf = {
"type": "doc",
"version": 1,
"content": [
{
"type": "paragraph",
"content": [{"type": "text", "text": "Domain information (content extraction failed)"}]
}
]
}
# Add notes section
if new_notes and new_notes.strip():
minimal_adf["content"].extend([
{
"type": "heading",
"attrs": {"level": 3},
"content": [{"type": "text", "text": "Notes"}]
},
{
"type": "paragraph",
"content": [{"type": "text", "text": new_notes.strip()}]
}
])
if validate_adf_document(minimal_adf):
print(f"[DEBUG] SAFETY CHECK: Using minimal ADF with notes")
issue.update(fields={"description": minimal_adf})
print(f"[INFO] SAFETY CHECK: Successfully updated with minimal ADF for ticket {ticket}")
return True
else:
print(f"[ERROR] SAFETY CHECK: Even minimal ADF validation failed")
# Handle PropertyHolder objects first to determine the actual format
actual_description = current_description
is_adf_format = False
if hasattr(current_description, '__class__') and 'PropertyHolder' in str(type(current_description)):
print(f"[DEBUG] Description is PropertyHolder, checking for ADF format...")
# Check if this is an ADF PropertyHolder (has type, version, content)
has_type = hasattr(current_description, 'type')
has_version = hasattr(current_description, 'version')
has_content = hasattr(current_description, 'content')
print(f"[DEBUG] PropertyHolder attributes check: type={has_type}, version={has_version}, content={has_content}")
if has_type and has_version and has_content:
try:
doc_type = getattr(current_description, 'type', None)
doc_version = getattr(current_description, 'version', None)
doc_content = getattr(current_description, 'content', None)
print(f"[DEBUG] PropertyHolder values: type='{doc_type}', version='{doc_version}', content_type={type(doc_content)}")
# This is clearly an ADF PropertyHolder - try to reconstruct it
if doc_type == 'doc' or (doc_type and 'doc' in str(doc_type).lower()) or (has_type and has_version and has_content):
print(f"[DEBUG] PropertyHolder detected as ADF document, attempting reconstruction...")
reconstructed_adf = reconstruct_adf_from_propertyholder(current_description)
if reconstructed_adf:
actual_description = reconstructed_adf
is_adf_format = True
print(f"[DEBUG] Successfully reconstructed ADF from PropertyHolder")
else:
print(f"[DEBUG] ADF reconstruction failed, this PropertyHolder has ADF structure but reconstruction failed")
# Since this is clearly ADF format (has type, version, content), we should treat it as ADF
# Try a simpler approach - create a basic ADF structure
print(f"[DEBUG] Attempting manual ADF reconstruction for PropertyHolder with ADF attributes...")
try:
manual_adf = {
"type": "doc",
"version": int(doc_version) if doc_version else 1,
"content": []
}
# Try to extract text content and create simple paragraphs
if doc_content and isinstance(doc_content, (list, tuple)):
for item in doc_content:
if hasattr(item, '__class__') and 'PropertyHolder' in str(type(item)):
# Try to extract text from this item
item_type = getattr(item, 'type', 'paragraph')
if item_type in ['paragraph', 'heading']:
manual_adf["content"].append({
"type": item_type,
"content": [{"type": "text", "text": ""}]
})
if manual_adf["content"]:
actual_description = manual_adf
is_adf_format = True
print(f"[DEBUG] Manual ADF reconstruction successful")
else:
print(f"[DEBUG] Manual ADF reconstruction failed, falling back to text")
actual_description = "[ADF Content - Could not extract]"
except Exception as e:
print(f"[DEBUG] Manual ADF reconstruction error: {e}")
actual_description = "[ADF Content - Extraction failed]"
else:
print(f"[DEBUG] PropertyHolder has ADF attributes but type is not 'doc': {doc_type}")
# Still try ADF reconstruction since it has the right attributes
reconstructed_adf = reconstruct_adf_from_propertyholder(current_description)
if reconstructed_adf:
actual_description = reconstructed_adf
is_adf_format = True
print(f"[DEBUG] Successfully reconstructed ADF despite non-'doc' type")
else:
extracted_content = extract_field_string_from_propertyholder(current_description)
actual_description = extracted_content
print(f"[DEBUG] PropertyHolder extraction completed, treating as text")
except Exception as e:
print(f"[DEBUG] Error accessing PropertyHolder attributes: {e}")
extracted_content = extract_field_string_from_propertyholder(current_description)
actual_description = extracted_content
else:
print(f"[DEBUG] PropertyHolder missing ADF attributes, using text extraction")
extracted_content = extract_field_string_from_propertyholder(current_description)
actual_description = extracted_content
elif isinstance(current_description, dict) and current_description.get('type') == 'doc':
actual_description = current_description
is_adf_format = True
print(f"[DEBUG] Description is direct ADF format")
else:
actual_description = current_description
print(f"[DEBUG] Description is plain text or other format")
print(f"[DEBUG] Final format detection: is_adf_format={is_adf_format}, actual_description_type={type(actual_description)}")
# Update based on the detected format
if is_adf_format:
print(f"[DEBUG] Updating ADF description with notes")
updated_description = update_adf_description_notes(actual_description, new_notes)
else:
print(f"[DEBUG] Updating plain text description with notes")
updated_description = update_text_description_notes(str(actual_description), new_notes)
if updated_description:
print(f"[DEBUG] Attempting to update description with type: {type(updated_description)}")
issue.update(fields={"description": updated_description})
print(f"[INFO] Updated description with new notes for ticket {ticket}")
return True
else:
print(f"[WARN] Failed to create updated description for ticket {ticket}")
return False
except Exception as e:
print(f"[ERROR] Failed to update description for ticket {ticket}: {e}")
print(f"[ERROR] Description type was: {type(current_description) if 'current_description' in locals() else 'unknown'}")
if 'updated_description' in locals():
print(f"[ERROR] Updated description type was: {type(updated_description)}")
return False
def update_adf_description_notes(adf_description, new_notes):
"""
Update ADF description with new notes section.
"""
try:
content = adf_description.get('content', [])
updated_content = []
notes_section_found = False
in_notes_section = False
# Process existing content and replace notes section
for block in content:
if block.get('type') == 'heading':
heading_text = ''
heading_content = block.get('content', [])
for text_node in heading_content:
if text_node.get('type') == 'text':
heading_text += text_node.get('text', '')
if 'notes' in heading_text.lower():
notes_section_found = True
in_notes_section = True
# Add updated notes section
updated_content.append(block) # Keep the heading
if new_notes and new_notes.strip():
# Add notes content as paragraph
updated_content.append({
"type": "paragraph",
"content": [{"type": "text", "text": new_notes.strip()}]
})
continue
else:
# Any other heading ends the notes section
in_notes_section = False
updated_content.append(block)
elif in_notes_section:
# Skip ALL content in the notes section (paragraphs, etc.)
continue
else:
updated_content.append(block)
# If no notes section found, add one at the end
if not notes_section_found and new_notes and new_notes.strip():
updated_content.append({
"type": "heading",
"attrs": {"level": 3},
"content": [{"type": "text", "text": "Notes"}]
})
updated_content.append({
"type": "paragraph",
"content": [{"type": "text", "text": new_notes.strip()}]
})
return {
"type": "doc",
"version": 1,
"content": updated_content
}
except Exception as e:
print(f"[ERROR] Failed to update ADF description with notes: {e}")
return None
def update_text_description_notes(text_description, new_notes):
"""
Update plain text description with new notes section.
"""
try:
lines = text_description.split('\n')
updated_lines = []
notes_section = False
notes_section_processed = False
for line in lines:
line_stripped = line.strip()
if line_stripped.lower().startswith('notes:') or line_stripped.lower() == 'notes' or line_stripped.lower().startswith('### notes'):
notes_section = True
notes_section_processed = True
updated_lines.append(line)
# Add new notes content
if new_notes and new_notes.strip():
updated_lines.append(new_notes.strip())
continue
elif notes_section:
# Skip lines until we hit another section (any heading) or end
if line_stripped.startswith('##') or line_stripped.startswith('###') or line_stripped.lower().startswith('takedown'):
notes_section = False
updated_lines.append(line)
# Skip ALL old notes content - don't add the line
continue
else:
updated_lines.append(line)
# If no notes section found, add one at the end
if not notes_section_processed and new_notes and new_notes.strip():
updated_lines.append('')
updated_lines.append('### Notes')
updated_lines.append(new_notes.strip())
return '\n'.join(updated_lines)
except Exception as e:
print(f"[ERROR] Failed to update text description with notes: {e}")
return None
def validate_adf_document(adf_doc):
"""
Validate that an ADF document has the required structure.
"""
if not isinstance(adf_doc, dict):
print(f"[DEBUG] ADF validation failed: not a dict")
return False
if adf_doc.get('type') != 'doc':
print(f"[DEBUG] ADF validation failed: type is not 'doc', got {adf_doc.get('type')}")
return False
if not isinstance(adf_doc.get('content'), list):
print(f"[DEBUG] ADF validation failed: content is not a list")
return False
if 'version' not in adf_doc:
print(f"[DEBUG] ADF validation warning: no version specified, adding default")
adf_doc['version'] = 1
print(f"[DEBUG] ADF validation passed: doc with {len(adf_doc['content'])} content items")
return True
def reconstruct_adf_from_propertyholder(property_holder):
"""
Reconstruct proper ADF format from PropertyHolder objects.
This handles cases where JIRA returns PropertyHolder objects that represent ADF content.
"""
if not property_holder or not hasattr(property_holder, '__class__') or 'PropertyHolder' not in str(type(property_holder)):
return None
print(f"[DEBUG] Reconstructing ADF from PropertyHolder")
try:
# Check if this looks like an ADF document root (has type, version, content)
if (hasattr(property_holder, 'type') and hasattr(property_holder, 'version') and
hasattr(property_holder, 'content')):
doc_type = getattr(property_holder, 'type', None)
version = getattr(property_holder, 'version', None)
content = getattr(property_holder, 'content', None)
print(f"[DEBUG] Found ADF document: type={doc_type}, version={version}, content_type={type(content)}")
if doc_type == 'doc' and content is not None:
# Reconstruct the content array
reconstructed_content = []
if isinstance(content, (list, tuple)):
print(f"[DEBUG] Processing {len(content)} content items")
for i, item in enumerate(content):
print(f"[DEBUG] Processing content item {i}: {type(item)}")
reconstructed_item = reconstruct_adf_content_item(item)
if reconstructed_item:
reconstructed_content.append(reconstructed_item)
print(f"[DEBUG] Successfully added content item {i}")
else:
print(f"[DEBUG] Failed to reconstruct content item {i}")
adf_doc = {
"type": "doc",
"version": int(version) if version else 1,
"content": reconstructed_content
}
print(f"[DEBUG] Reconstructed ADF document with {len(reconstructed_content)} content items")
# Validate the reconstructed ADF
if validate_adf_document(adf_doc):
print(f"[DEBUG] ADF reconstruction successful and validated")
return adf_doc
else:
print(f"[ERROR] ADF reconstruction failed validation")
return None
print(f"[DEBUG] PropertyHolder does not contain ADF document structure")
return None
except Exception as e:
print(f"[ERROR] Exception during ADF reconstruction: {e}")
return None
def reconstruct_adf_content_item(item):
"""
Reconstruct individual ADF content items (paragraphs, headings, etc.) from PropertyHolder.
"""
if not item or not hasattr(item, '__class__') or 'PropertyHolder' not in str(type(item)):
print(f"[DEBUG] Item is not PropertyHolder: {type(item)}")
return None
print(f"[DEBUG] Reconstructing content item: {type(item)}")
try:
# Get the type and content of this item
item_type = getattr(item, 'type', None)
item_content = getattr(item, 'content', None)
item_attrs = getattr(item, 'attrs', None)
# For text nodes, also check for 'text' attribute directly
item_text = getattr(item, 'text', None) if item_type == 'text' else None
print(f"[DEBUG] Content item: type={item_type}, has_content={item_content is not None}, has_attrs={item_attrs is not None}, has_text={item_text is not None}")
if not item_type:
print(f"[DEBUG] No type found for content item")
return None
# Build the content item
content_item = {"type": item_type}
# Add attributes if present
if item_attrs:
try:
if hasattr(item_attrs, '__dict__'):
# Convert PropertyHolder attrs to dict
attrs_dict = {}
for attr_name in dir(item_attrs):
if not attr_name.startswith('_') and not callable(getattr(item_attrs, attr_name, None)):
attr_value = getattr(item_attrs, attr_name, None)
if attr_value is not None:
attrs_dict[attr_name] = attr_value
if attrs_dict:
content_item["attrs"] = attrs_dict
print(f"[DEBUG] Added attrs from PropertyHolder: {attrs_dict}")
elif isinstance(item_attrs, dict):
content_item["attrs"] = item_attrs
print(f"[DEBUG] Added attrs from dict: {item_attrs}")
else:
# Try to convert to dict if it's a simple value
content_item["attrs"] = item_attrs
print(f"[DEBUG] Added attrs as-is: {item_attrs}")
except Exception as e:
print(f"[DEBUG] Error processing attrs: {e}")
# Handle different content types
if item_type == 'text':
# Text nodes require a 'text' property with actual content
text_content = None
# Try to get text from the 'text' attribute first
if item_text is not None:
text_content = str(item_text).strip()
print(f"[DEBUG] Found text in 'text' attribute: '{text_content}'")
# If no text attribute or empty, try content
if not text_content and item_content is not None:
if isinstance(item_content, str):
text_content = item_content.strip()
print(f"[DEBUG] Found text in 'content' attribute: '{text_content}'")
elif isinstance(item_content, (list, tuple)) and len(item_content) > 0:
# Sometimes text is in a list
first_item = item_content[0]
if isinstance(first_item, str):
text_content = first_item.strip()
print(f"[DEBUG] Found text in content list: '{text_content}'")
# Text nodes MUST have text content - skip if empty
if text_content:
content_item["text"] = text_content
print(f"[DEBUG] Created text node with content: '{text_content}'")
else:
print(f"[DEBUG] Skipping empty text node")
return None
elif item_type == 'hardBreak':
# Hard breaks don't need content - they're self-closing
print(f"[DEBUG] Created hardBreak node (no content needed)")
elif item_content is not None:
# For other node types (paragraph, heading, etc.), process content
try:
if isinstance(item_content, (list, tuple)):
# Recursively process content array
reconstructed_children = []
print(f"[DEBUG] Processing {len(item_content)} child items")
for i, child in enumerate(item_content):
print(f"[DEBUG] Processing child {i}: {type(child)}")
if hasattr(child, '__class__') and 'PropertyHolder' in str(type(child)):
child_item = reconstruct_adf_content_item(child)
if child_item:
reconstructed_children.append(child_item)
print(f"[DEBUG] Added reconstructed child {i}")
else:
print(f"[DEBUG] Skipped empty child {i}")
elif isinstance(child, dict):
reconstructed_children.append(child)
print(f"[DEBUG] Added dict child {i}")
elif isinstance(child, str) and child.strip():
# Handle text content - wrap in text node if needed
if item_type == 'paragraph':
reconstructed_children.append({"type": "text", "text": child.strip()})
print(f"[DEBUG] Added text child {i}: {child.strip()}")
else:
reconstructed_children.append(child)
else:
print(f"[DEBUG] Skipped empty or unknown child type {i}: {type(child)}")
# Only add content if we have valid children
if reconstructed_children:
content_item["content"] = reconstructed_children
print(f"[DEBUG] Added {len(reconstructed_children)} content children")
elif item_type in ['paragraph', 'heading']:
# For paragraphs/headings, we need at least one child - add empty text
content_item["content"] = [{"type": "text", "text": ""}]
print(f"[DEBUG] Added empty text child for {item_type}")
elif isinstance(item_content, str) and item_content.strip():
# Single text content - wrap in text node if this is a paragraph
if item_type == 'paragraph':
content_item["content"] = [{"type": "text", "text": item_content.strip()}]
print(f"[DEBUG] Added wrapped text content: {item_content.strip()}")
else:
content_item["content"] = item_content.strip()
print(f"[DEBUG] Added raw content: {item_content.strip()}")
else:
print(f"[DEBUG] No valid content found for {item_type}")
# For structural elements that need content, add empty text
if item_type in ['paragraph', 'heading']:
content_item["content"] = [{"type": "text", "text": ""}]
print(f"[DEBUG] Added empty content for structural element {item_type}")
except Exception as e:
print(f"[DEBUG] Error processing content: {e}")
print(f"[DEBUG] Reconstructed {item_type} item successfully")
return content_item
except Exception as e:
print(f"[ERROR] Failed to reconstruct content item: {e}")
return None
def update_description_with_takedown(jira_client, ticket, takedown_info):
"""
Update the ticket description with takedown information, preserving the existing content.
Adds takedown section above Notes section.
"""
try:
issue = jira_client.issue(ticket)
current_description = issue.fields.description
if not current_description:
print(f"[WARN] No existing description found for ticket {ticket}")
return False
print(f"[DEBUG] Current description type for takedown update: {type(current_description)}")
# CRITICAL SAFETY CHECK: Any PropertyHolder with type, version, content is ADF
if (hasattr(current_description, '__class__') and 'PropertyHolder' in str(type(current_description)) and
hasattr(current_description, 'type') and hasattr(current_description, 'version') and
hasattr(current_description, 'content')):
print(f"[DEBUG] TAKEDOWN SAFETY CHECK: PropertyHolder with ADF attributes detected - FORCING ADF format")
# Force ADF reconstruction
reconstructed_adf = reconstruct_adf_from_propertyholder(current_description)
if reconstructed_adf and validate_adf_document(reconstructed_adf):
print(f"[DEBUG] TAKEDOWN SAFETY CHECK: ADF reconstruction successful")
actual_description = reconstructed_adf
is_adf_format = True
# Update with ADF format
updated_description = update_adf_description_takedown(actual_description, takedown_info)
if updated_description and validate_adf_document(updated_description):
print(f"[DEBUG] TAKEDOWN SAFETY CHECK: Attempting ADF update with type: {type(updated_description)}")
issue.update(fields={"description": updated_description})
print(f"[INFO] TAKEDOWN SAFETY CHECK: Successfully updated ADF description with takedown for ticket {ticket}")
return True
else:
print(f"[ERROR] TAKEDOWN SAFETY CHECK: Failed to create valid ADF update")
else:
print(f"[DEBUG] TAKEDOWN SAFETY CHECK: ADF reconstruction failed, creating minimal ADF")
# Create a minimal valid ADF document with takedown info
minimal_adf = {
"type": "doc",
"version": 1,
"content": [
{
"type": "paragraph",
"content": [{"type": "text", "text": "Domain information (content extraction failed)"}]
}
]
}
# Add takedown section
if takedown_info and takedown_info.strip():
minimal_adf["content"].extend([
{
"type": "heading",
"attrs": {"level": 3},
"content": [{"type": "text", "text": "Takedown Reported"}]
},
{
"type": "paragraph",
"content": [{"type": "text", "text": takedown_info.strip()}]
}
])
if validate_adf_document(minimal_adf):
print(f"[DEBUG] TAKEDOWN SAFETY CHECK: Using minimal ADF with takedown")
issue.update(fields={"description": minimal_adf})
print(f"[INFO] TAKEDOWN SAFETY CHECK: Successfully updated with minimal ADF for ticket {ticket}")
return True
else:
print(f"[ERROR] TAKEDOWN SAFETY CHECK: Even minimal ADF validation failed")
# Handle other description formats
actual_description = current_description
is_adf_format = False
if hasattr(current_description, '__class__') and 'PropertyHolder' in str(type(current_description)):
extracted_content = extract_field_string_from_propertyholder(current_description)
if isinstance(extracted_content, dict) and extracted_content.get('type') == 'doc':
actual_description = extracted_content
is_adf_format = True
else:
actual_description = extracted_content
elif isinstance(current_description, dict) and current_description.get('type') == 'doc':
actual_description = current_description
is_adf_format = True
# Update based on the detected format
if is_adf_format:
print(f"[DEBUG] Updating ADF description with takedown")
updated_description = update_adf_description_takedown(actual_description, takedown_info)
else:
print(f"[DEBUG] Updating plain text description with takedown")
updated_description = update_text_description_takedown(str(actual_description), takedown_info)
if updated_description:
print(f"[DEBUG] Attempting to update description with takedown, type: {type(updated_description)}")
issue.update(fields={"description": updated_description})
print(f"[INFO] Updated description with takedown information for ticket {ticket}")
return True
else:
print(f"[WARN] Failed to create updated description with takedown for ticket {ticket}")
return False
except Exception as e:
print(f"[ERROR] Failed to update description with takedown for ticket {ticket}: {e}")
return False
def update_adf_description_takedown(adf_description, takedown_info):
"""
Update ADF description with takedown section (above Notes section).
"""
try:
content = adf_description.get('content', [])
updated_content = []
takedown_section_found = False
notes_section_index = -1
# First pass: find existing takedown and notes sections
for i, block in enumerate(content):
if block.get('type') == 'heading':
heading_text = ''
heading_content = block.get('content', [])
for text_node in heading_content:
if text_node.get('type') == 'text':
heading_text += text_node.get('text', '')
if 'takedown' in heading_text.lower():
takedown_section_found = True
elif 'notes' in heading_text.lower():
notes_section_index = i
# Second pass: rebuild content with takedown section
i = 0
while i < len(content):
block = content[i]
# Check if we're at the notes section and need to insert takedown before it
if (notes_section_index == i and not takedown_section_found and
takedown_info and takedown_info.strip()):
# Insert takedown section before notes
updated_content.append({
"type": "heading",
"attrs": {"level": 3},
"content": [{"type": "text", "text": "Takedown Reported"}]
})
updated_content.append({
"type": "paragraph",
"content": [{"type": "text", "text": takedown_info.strip()}]
})
takedown_section_found = True
# Handle existing takedown section
if block.get('type') == 'heading':
heading_text = ''
heading_content = block.get('content', [])
for text_node in heading_content:
if text_node.get('type') == 'text':
heading_text += text_node.get('text', '')
if 'takedown' in heading_text.lower():
# Replace existing takedown section
updated_content.append(block) # Keep the heading
if takedown_info and takedown_info.strip():
updated_content.append({
"type": "paragraph",
"content": [{"type": "text", "text": takedown_info.strip()}]
})
# Skip the old takedown content (next paragraph)
if i + 1 < len(content) and content[i + 1].get('type') == 'paragraph':
i += 1 # Skip next paragraph
i += 1
continue
else:
updated_content.append(block)
else:
updated_content.append(block)
i += 1
# If no notes section found and no takedown section exists, add takedown at the end
if not takedown_section_found and takedown_info and takedown_info.strip():
updated_content.append({
"type": "heading",
"attrs": {"level": 3},
"content": [{"type": "text", "text": "Takedown Reported"}]
})
updated_content.append({
"type": "paragraph",
"content": [{"type": "text", "text": takedown_info.strip()}]
})
return {
"type": "doc",
"version": 1,
"content": updated_content
}
except Exception as e:
print(f"[ERROR] Failed to update ADF description with takedown: {e}")
return None
def update_text_description_takedown(text_description, takedown_info):
"""
Update plain text description with takedown section (above Notes section).
"""
try:
lines = text_description.split('\n')
updated_lines = []
takedown_section = False
takedown_section_processed = False
notes_section_index = -1
# Find Notes section
for i, line in enumerate(lines):
if line.strip().lower().startswith('notes') or line.strip().lower() == 'notes':
notes_section_index = i
break
# Process lines
for i, line in enumerate(lines):
line_stripped = line.strip()
# Insert takedown before notes section if not already processed
if (i == notes_section_index and not takedown_section_processed and
takedown_info and takedown_info.strip()):
updated_lines.append('')
updated_lines.append('### Takedown Reported')
updated_lines.append(takedown_info.strip())
takedown_section_processed = True
# Handle existing takedown section
if (line_stripped.lower().startswith('takedown') or
'takedown submitted' in line_stripped.lower() or
'takedown reported' in line_stripped.lower()):
takedown_section = True
takedown_section_processed = True
updated_lines.append('### Takedown Reported')
# Add new takedown content
if takedown_info and takedown_info.strip():
updated_lines.append(takedown_info.strip())
continue
elif takedown_section:
# Skip lines until we hit another section or notes
if (line_stripped.startswith('##') or line_stripped.startswith('###') or
line_stripped.lower().startswith('notes')):
takedown_section = False
updated_lines.append(line)
# Skip old takedown content
continue
else:
updated_lines.append(line)
# If no notes section found and no takedown section exists, add takedown at the end
if not takedown_section_processed and takedown_info and takedown_info.strip():
updated_lines.append('')
updated_lines.append('### Takedown Reported')
updated_lines.append(takedown_info.strip())
return '\n'.join(updated_lines)
except Exception as e:
print(f"[ERROR] Failed to update text description with takedown: {e}")
return None
def cleanup_duplicate_notes_in_all_tickets():
"""
TEMPORARY CLEANUP FUNCTION - Run once to fix duplicate notes sections.
This function will find all tickets in the project and clean up duplicate Notes sections.
"""
if not jira_client:
print("[ERROR] JIRA client not initialized. Cannot run cleanup.")
return
print("[INFO] Starting cleanup of duplicate notes sections in all tickets...")
try:
# Get all tickets from the project using pagination
jql = f"project = {JIRA_PROJECT_KEY} ORDER BY created DESC"
print(f"[INFO] Searching for tickets with JQL: {jql}")
# Use pagination to get ALL tickets
all_issues = []
start_at = 0
max_results = 100 # JIRA's typical page size
while True:
print(f"[INFO] Fetching tickets {start_at} to {start_at + max_results}...")
issues_batch = jira_client.search_issues(jql, startAt=start_at, maxResults=max_results)
if not issues_batch:
break
all_issues.extend(issues_batch)
print(f"[INFO] Got {len(issues_batch)} tickets in this batch. Total so far: {len(all_issues)}")
# If we got fewer than max_results, we're at the end
if len(issues_batch) < max_results:
break
start_at += max_results
print(f"[INFO] Found {len(all_issues)} total tickets to check")
cleaned_count = 0
error_count = 0
for i, issue in enumerate(all_issues):
ticket_key = issue.key
try:
print(f"[INFO] Checking ticket {ticket_key} ({i + 1}/{len(all_issues)})...")
# Get the current description
description = issue.fields.description
if not description:
print(f"[DEBUG] Ticket {ticket_key} has no description, skipping")
continue
# Determine if this ticket needs cleaning
needs_cleaning = False
cleaned_description = None
# Handle PropertyHolder (ADF) descriptions
if hasattr(description, '__class__') and 'PropertyHolder' in str(type(description)):
print(f"[DEBUG] Ticket {ticket_key} has PropertyHolder description")
# Try to reconstruct ADF first
if (hasattr(description, 'type') and hasattr(description, 'version') and
hasattr(description, 'content')):
reconstructed_adf = reconstruct_adf_from_propertyholder(description)
if reconstructed_adf:
# Check if it has duplicate notes before cleaning
original_notes_count = count_notes_sections_adf(reconstructed_adf)
if original_notes_count > 1:
print(f"[INFO] Ticket {ticket_key} has {original_notes_count} notes sections - NEEDS CLEANING")
cleaned_description = cleanup_duplicate_notes_adf(reconstructed_adf)
needs_cleaning = True
else:
print(f"[DEBUG] Ticket {ticket_key} has {original_notes_count} notes sections - OK")
else:
# Fall back to text extraction
text_description = extract_field_string_from_propertyholder(description)
original_notes_count = count_notes_sections_text(str(text_description))
if original_notes_count > 1:
print(f"[INFO] Ticket {ticket_key} has {original_notes_count} text notes sections - NEEDS CLEANING")
cleaned_description = cleanup_duplicate_notes_text(str(text_description))
needs_cleaning = True
else:
print(f"[DEBUG] Ticket {ticket_key} has {original_notes_count} text notes sections - OK")
else:
# Extract as text
text_description = extract_field_string_from_propertyholder(description)
original_notes_count = count_notes_sections_text(str(text_description))
if original_notes_count > 1:
print(f"[INFO] Ticket {ticket_key} has {original_notes_count} text notes sections - NEEDS CLEANING")
cleaned_description = cleanup_duplicate_notes_text(str(text_description))
needs_cleaning = True
else:
print(f"[DEBUG] Ticket {ticket_key} has {original_notes_count} text notes sections - OK")
# Handle direct ADF descriptions
elif isinstance(description, dict) and description.get('type') == 'doc':
print(f"[DEBUG] Ticket {ticket_key} has direct ADF description")
original_notes_count = count_notes_sections_adf(description)
if original_notes_count > 1:
print(f"[INFO] Ticket {ticket_key} has {original_notes_count} ADF notes sections - NEEDS CLEANING")
cleaned_description = cleanup_duplicate_notes_adf(description)
needs_cleaning = True
else:
print(f"[DEBUG] Ticket {ticket_key} has {original_notes_count} ADF notes sections - OK")
# Handle plain text descriptions
else:
print(f"[DEBUG] Ticket {ticket_key} has plain text description")
text_desc = str(description)
original_notes_count = count_notes_sections_text(text_desc)
if original_notes_count > 1:
print(f"[INFO] Ticket {ticket_key} has {original_notes_count} text notes sections - NEEDS CLEANING")
cleaned_description = cleanup_duplicate_notes_text(text_desc)
needs_cleaning = True
else:
print(f"[DEBUG] Ticket {ticket_key} has {original_notes_count} text notes sections - OK")
# Update the ticket if cleaning is needed
if needs_cleaning and cleaned_description:
print(f"[INFO] Updating ticket {ticket_key} with cleaned description...")
issue.update(fields={"description": cleaned_description})
# Add a comment about the cleanup
cleanup_comment = "Cleaned up duplicate notes sections caused by a system bug. No content was lost - duplicate sections were consolidated."
add_jira_comment(jira_client, ticket_key, cleanup_comment)
cleaned_count += 1
print(f"[SUCCESS] Cleaned ticket {ticket_key}")
except Exception as e:
print(f"[ERROR] Failed to process ticket {ticket_key}: {e}")
error_count += 1
continue
print(f"\n[CLEANUP SUMMARY]")
print(f"Total tickets checked: {len(all_issues)}")
print(f"Tickets cleaned: {cleaned_count}")
print(f"Errors encountered: {error_count}")
print(f"[INFO] Cleanup completed!")
except Exception as e:
print(f"[ERROR] Failed to run cleanup: {e}")
def count_notes_sections_adf(adf_description):
"""
Count how many notes sections exist in an ADF description.
"""
try:
content = adf_description.get('content', [])
notes_count = 0
for block in content:
if block.get('type') == 'heading':
heading_text = ''
heading_content = block.get('content', [])
for text_node in heading_content:
if text_node.get('type') == 'text':
heading_text += text_node.get('text', '')
if 'notes' in heading_text.lower():
notes_count += 1
return notes_count
except Exception as e:
print(f"[ERROR] Failed to count ADF notes sections: {e}")
return 0
def count_notes_sections_text(text_description):
"""
Count how many notes sections exist in a text description.
"""
try:
lines = text_description.split('\n')
notes_count = 0
for line in lines:
line_stripped = line.strip()
if (line_stripped.lower().startswith('notes') or
line_stripped.lower().startswith('### notes') or
line_stripped.lower() == 'notes'):
notes_count += 1
return notes_count
except Exception as e:
print(f"[ERROR] Failed to count text notes sections: {e}")
return 0
def cleanup_duplicate_notes_adf(adf_description):
"""
Helper function to clean duplicate notes sections from ADF description.
"""
try:
content = adf_description.get('content', [])
cleaned_content = []
first_notes_content = None
found_notes = False
# First pass: collect all content except notes sections, but keep the first notes content
i = 0
while i < len(content):
block = content[i]
if block.get('type') == 'heading':
heading_text = ''
heading_content = block.get('content', [])
for text_node in heading_content:
if text_node.get('type') == 'text':
heading_text += text_node.get('text', '')
if 'notes' in heading_text.lower():
# Found a notes section
if not found_notes:
# This is the first notes section - keep its content
found_notes = True
notes_content = []
i += 1 # Move past the heading
# Collect all paragraphs until next heading or end
while i < len(content):
next_block = content[i]
if next_block.get('type') == 'heading':
break # Stop at next heading
notes_content.append(next_block)
i += 1
# Extract text from the first notes content
if notes_content:
notes_text_parts = []
for note_block in notes_content:
if note_block.get('type') == 'paragraph':
para_content = note_block.get('content', [])
for text_node in para_content:
if text_node.get('type') == 'text':
text = text_node.get('text', '').strip()
if text:
notes_text_parts.append(text)
if notes_text_parts:
first_notes_content = ' '.join(notes_text_parts)
continue # Don't increment i again
else:
# This is a duplicate notes section - skip it entirely
print(f"[DEBUG] Skipping duplicate notes section")
i += 1 # Move past the heading
# Skip all content until next heading or end
while i < len(content):
next_block = content[i]
if next_block.get('type') == 'heading':
break # Stop at next heading
i += 1
continue # Don't increment i again
else:
cleaned_content.append(block)
else:
cleaned_content.append(block)
i += 1
# If we found any notes sections (meaning we had duplicates), add back just one clean notes section
if found_notes:
print(f"[DEBUG] Found duplicate notes sections, keeping only the first one")
# Add a single notes section at the end with the first content we found
if first_notes_content:
cleaned_content.append({
"type": "heading",
"attrs": {"level": 3},
"content": [{"type": "text", "text": "Notes"}]
})
cleaned_content.append({
"type": "paragraph",
"content": [{"type": "text", "text": first_notes_content}]
})
return {
"type": "doc",
"version": 1,
"content": cleaned_content
}
# No notes sections found or no changes needed
return adf_description
except Exception as e:
print(f"[ERROR] Failed to clean ADF notes: {e}")
return adf_description
def cleanup_duplicate_notes_text(text_description):
"""
Helper function to clean duplicate notes sections from plain text description.
"""
try:
lines = text_description.split('\n')
cleaned_lines = []
first_notes_content = None
found_notes = False
in_notes = False
current_notes = []
for line in lines:
line_stripped = line.strip()
# Check if this is a notes heading
if (line_stripped.lower().startswith('notes') or
line_stripped.lower().startswith('### notes') or
line_stripped.lower() == 'notes'):
if not found_notes:
# This is the first notes section - start collecting its content
found_notes = True
in_notes = True
current_notes = []
else:
# This is a duplicate notes section - skip it
print(f"[DEBUG] Skipping duplicate text notes section")
in_notes = True # Set to true to skip the content
current_notes = [] # Reset but don't save
continue
# Check if we hit another section (end notes)
elif (line_stripped.startswith('###') or line_stripped.startswith('##') or
line_stripped.lower().startswith('takedown')):
if in_notes and current_notes and first_notes_content is None:
# Save the first notes content we found
first_notes_content = '\n'.join(current_notes).strip()
in_notes = False
cleaned_lines.append(line)
# If we're in the first notes section, collect the content
elif in_notes and not found_notes:
# This should never happen due to logic above, but safety check
if line.strip():
current_notes.append(line.strip())
elif in_notes and found_notes and first_notes_content is None:
# We're in the first notes section
if line.strip():
current_notes.append(line.strip())
elif in_notes:
# We're in a duplicate notes section - skip the content
continue
# Regular content (not in notes)
else:
cleaned_lines.append(line)
# Don't forget to save the notes if we ended in the first notes section
if in_notes and current_notes and first_notes_content is None:
first_notes_content = '\n'.join(current_notes).strip()
# If we found notes sections (meaning we had duplicates), add back just one
if found_notes:
print(f"[DEBUG] Found duplicate text notes sections, keeping only the first one")
# Add single notes section with first content
if first_notes_content:
cleaned_lines.append('')
cleaned_lines.append('### Notes')
cleaned_lines.append(first_notes_content)
return '\n'.join(cleaned_lines)
# No changes needed
return text_description
except Exception as e:
print(f"[ERROR] Failed to clean text notes: {e}")
return text_description
def normalize_content_for_comparison(content):
"""
Normalize content for accurate comparison by removing formatting differences.
"""
if not content:
return ""
# Convert to string and normalize whitespace
content_str = str(content).strip()
# Remove common formatting differences
content_str = content_str.replace('\r\n', '\n').replace('\r', '\n')
# Normalize multiple whitespace to single space
import re
content_str = re.sub(r'\s+', ' ', content_str)
# Remove trailing/leading whitespace
content_str = content_str.strip()
return content_str
def determine_correct_ticket_state(domain, jira_client, ticket):
"""
Determine the correct JIRA ticket state based on simple, clear rules.
Rules:
1. If domain status is "down" → ticket should be "Done" (regardless of anything else)
2. If domain status is not "down" and there's a takedown (report_abuse + timestamp) → ticket should be "In Review"
3. If domain status is not "down" and no takedown → ticket should be "In Progress"
"""
domain_status = domain.get('status', '').lower()
has_takedown = domain.get("report_abuse") and domain.get("report_abuse_timestamp")
print(f"[DEBUG] Determining state for ticket {ticket}: status='{domain_status}', has_takedown={has_takedown}")
if domain_status == "down":
target_state = "done"
print(f"[DEBUG] Domain is DOWN → target state: {target_state}")
elif has_takedown:
target_state = "review"
print(f"[DEBUG] Domain is UP with takedown → target state: {target_state}")
else:
target_state = "in_progress"
print(f"[DEBUG] Domain is UP without takedown → target state: {target_state}")
return target_state
def get_current_ticket_state(jira_client, ticket):
"""
Get the current state of a JIRA ticket.
"""
try:
issue = jira_client.issue(ticket)
current_status = safe_extract_status_name(issue.fields.status)
print(f"[DEBUG] Current ticket {ticket} state: '{current_status}'")
return current_status
except Exception as e:
print(f"[ERROR] Failed to get current state for ticket {ticket}: {e}")
return "unknown"
def transition_ticket_to_correct_state(jira_client, ticket, target_state, current_state):
"""
Transition a ticket to the correct state if it's not already there.
"""
# Normalize states for comparison
current_normalized = current_state.lower().replace(' ', '_')
target_normalized = target_state.lower().replace(' ', '_')
# Map of state equivalencies
done_states = ['done', 'closed', 'resolved', 'complete']
review_states = ['review', 'in_review']
progress_states = ['in_progress', 'to_do', 'open']
# Check if we're already in the correct state
if target_normalized == 'done' and current_normalized in done_states:
print(f"[DEBUG] Ticket {ticket} already in correct done state: {current_state}")
return True
elif target_normalized == 'review' and current_normalized in review_states:
print(f"[DEBUG] Ticket {ticket} already in correct review state: {current_state}")
return True
elif target_normalized == 'in_progress' and current_normalized in progress_states:
print(f"[DEBUG] Ticket {ticket} already in correct progress state: {current_state}")
return True
# Need to transition - call appropriate function
if target_normalized == 'done':
print(f"[INFO] Transitioning ticket {ticket} to Done")
return close_jira_ticket(jira_client, ticket)
elif target_normalized == 'review':
print(f"[INFO] Transitioning ticket {ticket} to Review")
return transition_to_review(jira_client, ticket)
elif target_normalized == 'in_progress':
print(f"[INFO] Transitioning ticket {ticket} to In Progress")
return transition_to_in_progress(jira_client, ticket)
else:
print(f"[WARN] Unknown target state: {target_state}")
return False
def check_and_handle_takedown_updates(jira_client, ticket, domain):
"""
Check if takedown information needs to be updated and handle it properly.
Only update if the content has actually changed.
"""
takedown_requested = domain.get("report_abuse") and domain.get("report_abuse_timestamp")
if not takedown_requested:
print(f"[DEBUG] No takedown request for ticket {ticket}")
return False
abuse_timestamp = domain.get("report_abuse_timestamp")
new_takedown_info = f"Takedown submitted at {abuse_timestamp}."
takedown_comment = f"Takedown reported at {abuse_timestamp}."
print(f"[DEBUG] Checking takedown for ticket {ticket}: timestamp={abuse_timestamp}")
# PRIMARY CHECK: If this exact comment already exists, skip everything
if comment_exists(jira_client, ticket, takedown_comment):
print(f"[DEBUG] Takedown comment already exists for ticket {ticket}, skipping all updates")
return False
# SECONDARY CHECK: Check if this exact takedown info is already in description
if comment_exists(jira_client, ticket, f"Takedown submitted at {abuse_timestamp}"):
print(f"[DEBUG] Takedown info already exists in description for ticket {ticket}, skipping all updates")
return False
# TERTIARY CHECK: Look for any takedown with this timestamp in description
current_takedown = extract_takedown_from_description(jira_client, ticket)
if current_takedown and abuse_timestamp in current_takedown:
print(f"[DEBUG] Takedown timestamp {abuse_timestamp} already found in description for ticket {ticket}, skipping")
return False
print(f"[INFO] Adding new takedown info for ticket {ticket}: {new_takedown_info}")
# Update description with takedown information
if update_description_with_takedown(jira_client, ticket, new_takedown_info):
# Add comment about the takedown
add_jira_comment(jira_client, ticket, takedown_comment)
print(f"[INFO] Added takedown comment to ticket {ticket}")
return True
else:
print(f"[ERROR] Failed to update takedown description for ticket {ticket}")
return False
def check_and_handle_notes_updates(jira_client, ticket, domain):
"""
Check if notes need to be updated and handle them properly.
Only update if the content has actually changed.
"""
notes = domain.get('notes')
new_notes = notes.strip() if notes else ""
print(f"[DEBUG] Checking notes for ticket {ticket}: notes='{new_notes[:100]}{'...' if len(new_notes) > 100 else ''}'")
# If there are no new notes, check if we need to clear existing ones
if not new_notes:
current_notes = extract_notes_from_description(jira_client, ticket)
if not current_notes:
print(f"[DEBUG] No notes to add or clear for ticket {ticket}")
return False
# There are existing notes to clear
print(f"[INFO] Clearing notes for ticket {ticket}")
clear_comment = "Notes cleared from description."
# PRIMARY CHECK: If clear comment already exists, skip
if comment_exists(jira_client, ticket, clear_comment):
print(f"[DEBUG] Notes cleared comment already exists for ticket {ticket}, skipping")
return False
if update_description_with_notes(jira_client, ticket, ''):
add_jira_comment(jira_client, ticket, clear_comment)
print(f"[INFO] Added notes cleared comment to ticket {ticket}")
return True
return False
# There are new notes - check content before updating
# PRIMARY CHECK: If "Notes updated" comment with this exact content already exists, skip
notes_comment_content = f"Notes updated in description:\n{new_notes}"
if comment_exists(jira_client, ticket, notes_comment_content):
print(f"[DEBUG] Notes update comment with exact content already exists for ticket {ticket}, skipping all updates")
return False
# CONTENT CHECK: Compare current vs new notes content
current_notes = extract_notes_from_description(jira_client, ticket)
# Normalize both for comparison
current_normalized = normalize_content_for_comparison(current_notes)
new_normalized = normalize_content_for_comparison(new_notes)
print(f"[DEBUG] Notes content comparison for ticket {ticket}:")
print(f"[DEBUG] Current notes (raw): '{current_notes[:100]}{'...' if len(current_notes) > 100 else ''}'")
print(f"[DEBUG] New notes (raw): '{new_notes[:100]}{'...' if len(new_notes) > 100 else ''}'")
print(f"[DEBUG] Current (normalized): '{current_normalized[:100]}{'...' if len(current_normalized) > 100 else ''}'")
print(f"[DEBUG] New (normalized): '{new_normalized[:100]}{'...' if len(new_normalized) > 100 else ''}'")
print(f"[DEBUG] Equal: {current_normalized == new_normalized}")
# Only update if content has actually changed
if current_normalized != new_normalized:
print(f"[INFO] Notes content differs, updating for ticket {ticket}")
if update_description_with_notes(jira_client, ticket, new_notes):
# Add comment with the actual notes content
add_jira_comment(jira_client, ticket, notes_comment_content)
print(f"[INFO] Added notes update comment to ticket {ticket}")
return True
else:
print(f"[ERROR] Failed to update notes for ticket {ticket}")
return False
else:
print(f"[DEBUG] Notes content identical for ticket {ticket}, no update needed")
return False
def normalize_content_for_comparison(content):
"""
Normalize content for accurate comparison by removing formatting differences.
Enhanced version that properly extracts PropertyHolder content first.
"""
if not content:
return ""
# CRITICAL: Extract PropertyHolder content first before normalizing
content_str = ""
if hasattr(content, '__class__') and 'PropertyHolder' in str(type(content)):
print(f"[DEBUG] Detected PropertyHolder in normalize_content_for_comparison, extracting content...")
# Try to extract the actual content from PropertyHolder
extracted_content = extract_field_string_from_propertyholder(content)
if isinstance(extracted_content, dict) and extracted_content.get('type') == 'doc':
# It's ADF format
content_str = extract_text_from_adf(extracted_content)
print(f"[DEBUG] Extracted ADF content from PropertyHolder: '{content_str[:100]}...'")
else:
content_str = str(extracted_content) if extracted_content else ""
print(f"[DEBUG] Extracted text content from PropertyHolder: '{content_str[:100]}...'")
else:
# It's already a string or other format
content_str = str(content).strip()
if not content_str:
return ""
# Remove common formatting differences
content_str = content_str.replace('\r\n', '\n').replace('\r', '\n')
# Normalize multiple consecutive newlines to single newlines
import re
content_str = re.sub(r'\n\s*\n', '\n', content_str)
# Normalize whitespace within lines (but preserve line structure)
lines = content_str.split('\n')
normalized_lines = []
for line in lines:
# Strip each line and normalize internal whitespace
normalized_line = re.sub(r'\s+', ' ', line.strip())
if normalized_line: # Only add non-empty lines
normalized_lines.append(normalized_line)
result = '\n'.join(normalized_lines)
# Final cleanup - remove any remaining extra whitespace
result = result.strip()
print(f"[DEBUG] Normalized content: '{content_str[:50]}...' → '{result[:50]}...'")
return result
def extract_notes_from_description(jira_client, ticket):
"""
Extract the current notes from the ticket description.
Enhanced version with better parsing and debugging.
"""
try:
issue = jira_client.issue(ticket)
description = issue.fields.description
if not description:
print(f"[DEBUG] No description found for ticket {ticket}")
return ''
print(f"[DEBUG] Description type for notes extraction: {type(description)}")
# Convert description to searchable text
description_text = ''
# Handle PropertyHolder objects
if hasattr(description, '__class__') and 'PropertyHolder' in str(type(description)):
print(f"[DEBUG] Extracting from PropertyHolder for notes")
# Try to extract text content from PropertyHolder
extracted = extract_field_string_from_propertyholder(description)
if isinstance(extracted, dict) and extracted.get('type') == 'doc':
# It's ADF format, extract text from it
description_text = extract_text_from_adf(extracted)
print(f"[DEBUG] Extracted text from ADF PropertyHolder: {len(description_text)} chars")
else:
description_text = str(extracted)
print(f"[DEBUG] Extracted text from PropertyHolder: {len(description_text)} chars")
elif isinstance(description, dict) and description.get('type') == 'doc':
print(f"[DEBUG] Extracting from direct ADF format for notes")
# Direct ADF format
description_text = extract_text_from_adf(description)
print(f"[DEBUG] Extracted text from direct ADF: {len(description_text)} chars")
else:
print(f"[DEBUG] Treating as plain text for notes")
# Plain text
description_text = str(description)
print(f"[DEBUG] Full description text for notes search (first 200 chars): '{description_text[:200]}{'...' if len(description_text) > 200 else ''}'")
# Look for Notes section - be more flexible and thorough
lines = description_text.split('\n')
notes_section = False
notes_lines = []
print(f"[DEBUG] Searching through {len(lines)} lines for notes section")
for i, line in enumerate(lines):
line_stripped = line.strip()
print(f"[DEBUG] Line {i}: '{line_stripped[:50]}{'...' if len(line_stripped) > 50 else ''}' | in_notes={notes_section}")
# Check for notes heading (various formats) - case insensitive
line_lower = line_stripped.lower()
if (line_lower == 'notes' or
line_lower == '### notes' or
line_lower == '## notes' or
line_lower.startswith('notes:') or
line_lower.startswith('### notes') or
line_lower.startswith('## notes')):
notes_section = True
print(f"[DEBUG] Found notes section header at line {i}: '{line_stripped}'")
continue
elif notes_section:
# Check if we hit another section (any heading or specific sections)
if (line_stripped.startswith('###') or
line_stripped.startswith('##') or
line_lower.startswith('takedown') or
line_lower.startswith('hosting provider') or
(not line_stripped and notes_lines)): # Empty line after we have notes content
print(f"[DEBUG] End of notes section at line {i}: '{line_stripped}'")
break
if line_stripped: # Only add non-empty lines
notes_lines.append(line_stripped)
print(f"[DEBUG] Added to notes: '{line_stripped}'")
result = '\n'.join(notes_lines).strip()
print(f"[DEBUG] Final extracted notes from description (length: {len(result)}): '{result}'")
return result
except Exception as e:
print(f"[ERROR] Failed to extract notes from description for ticket {ticket}: {e}")
import traceback
print(f"[ERROR] Full traceback: {traceback.format_exc()}")
return ''
if __name__ == "__main__":
# To run the cleanup, uncomment the following lines:
main() # This will initialize the JIRA client
# cleanup_duplicate_notes_in_all_tickets() # This will run the cleanup
# main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment