Skip to content

Instantly share code, notes, and snippets.

@afonsoaugusto
Created April 7, 2022 18:14
Show Gist options
  • Save afonsoaugusto/47221cad9d95154f0d77675e81876d7f to your computer and use it in GitHub Desktop.
Save afonsoaugusto/47221cad9d95154f0d77675e81876d7f to your computer and use it in GitHub Desktop.
Jira-to-s3.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Install and Import Dependent modules"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install tlslite"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install oauth2"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import urllib\n",
"import oauth2 as oauth\n",
"from tlslite.utils import keyfactory\n",
"import json\n",
"import sys\n",
"import os\n",
"import base64\n",
"import boto3\n",
"from boto3.dynamodb.conditions import Key, Attr\n",
"import datetime\n",
"import logging\n",
"import pprint\n",
"import time\n",
"from pytz import timezone\n",
"\n",
"logger = logging.getLogger()\n",
"logger.setLevel(logging.INFO)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Set-up ssm Parameters as shown in blogpost and proceed to next section (below are the ssm parameters)\n",
"- jira_access_urls: Parameter to store URLs to access JIRA via RestAPI\n",
"- jira_access_secrets: Parameter to store Secrets to access JIRA\n",
"- jira_access_private_key: Parameter to store Private Keycorresponding to public key specified in Jira RestAPI configuration."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ssm = boto3.client(\"ssm\", region_name='us-east-1')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Perform 3-Legged OAuth Process Referred to as “OAuth dance”"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Define the Siganture Class to sign JIRA RESTApi requests"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class SignatureMethod_RSA_SHA1(oauth.SignatureMethod):\n",
" name = 'RSA-SHA1'\n",
"\n",
" def signing_base(self, request, consumer, token):\n",
" if not hasattr(request, 'normalized_url') or request.normalized_url is None:\n",
" raise ValueError(\"Base URL for request is not set.\")\n",
"\n",
" sig = (\n",
" oauth.escape(request.method),\n",
" oauth.escape(request.normalized_url),\n",
" oauth.escape(request.get_normalized_parameters()),\n",
" )\n",
"\n",
" key = '%s&' % oauth.escape(consumer.secret)\n",
" if token:\n",
" key += oauth.escape(token.secret)\n",
" raw = '&'.join(sig)\n",
" return key, raw\n",
"\n",
" def sign(self, request, consumer, token):\n",
"\n",
" key, raw = self.signing_base(request, consumer, token)\n",
"\n",
" # SSM support to fetch private key\n",
" ssm_param = ssm.get_parameter(Name='jira_access_private_key', WithDecryption=True)\n",
" jira_private_key_str = ssm_param['Parameter']['Value']\n",
"\n",
" privateKeyString = jira_private_key_str.strip()\n",
"\n",
" privatekey = keyfactory.parsePrivateKey(privateKeyString)\n",
"\n",
" # Used encode() to convert to bytes\n",
" signature = privatekey.hashAndSign(raw.encode())\n",
" return base64.b64encode(signature)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Get consume_key & consumer_secret from ssm Parameter these were defined in JIRA portal"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"jira_secrets = json.loads(ssm.get_parameter(Name='jira_access_secrets', WithDecryption=True)['Parameter']['Value'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"jira_secrets"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"consumer_key = jira_secrets[\"consumer_key\"]\n",
"consumer_key"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"consumer_secret = jira_secrets[\"consumer_secret\"]\n",
"consumer_secret"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Define URLs to be used in 3 Legged OAuth Process\n",
"- These URLs are defined while setting-up RestAPI endpoint in jira\n",
"- Here are sample URLs to show the how it is formed and it's components\n",
" - request_token_url = 'https://jiratoawss3.atlassian.net/plugins/servlet/oauth/request-token'\n",
" - access_token_url = 'https://jiratoawss3.atlassian.net/plugins/servlet/oauth/access-token'\n",
" - authorize_url = 'https://jiratoawss3.atlassian.net/plugins/servlet/oauth/authorize'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"request_token_url = 'input_here'\n",
"access_token_url = 'input_here'\n",
"authorize_url = 'input_here'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Step-1 of 3-Legged OAuth Process\n",
"- Generate Request Token"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create Consumer using consumer_key and consumer_secret\n",
"consumer = oauth.Consumer(consumer_key, consumer_secret)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Use Consumer to create oauth client\n",
"client = oauth.Client(consumer)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Add Signature Method to the client\n",
"client.set_signature_method(SignatureMethod_RSA_SHA1())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get response from request token URL using the client\n",
"resp, content = client.request(request_token_url, \"POST\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Convert the content received from previous step into a Dictionary\n",
"request_token = dict(urllib.parse.parse_qsl(content))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# request token has two components oauth_token and oauth_token_secret\n",
"request_token"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Step-2 of 3 Legged OAuth Process\n",
"- Manually Approve the Request Token by opening below URL in a Browser\n",
"- Approve the request by opening the below user in a browser\n",
"- Example Value of final autorize user is:\n",
"- https://jiratoawss3.atlassian.net/plugins/servlet/oauth/authorize?oauth_token=wYLlIxmcsnZTHgTy2ZpUmBakqzmqSbww"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"authorize_url + '?oauth_token=' + request_token[b'oauth_token'].decode()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Step-3 of 3 Legged OAuth Process\n",
"- Use Approved Request Token to generate Access Token"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create an oauth token using components of request token\n",
"token = oauth.Token(request_token[b'oauth_token'], request_token[b'oauth_token_secret'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Use Consumer and token to create oauth client\n",
"client = oauth.Client(consumer, token)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Add Signature Method to the client\n",
"client.set_signature_method(SignatureMethod_RSA_SHA1())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get response from access token URL using the client\n",
"access_token_resp, access_token_content = client.request(access_token_url, \"POST\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"access_token_content"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Update access_token key in SSM Parameter jira_access_secrets with value of access_token_content and go to next step\n",
"- This Access token is valid for 5 years (expires_in key of access_token_content tells when token will expire in seconds)\n",
"- Rotation of Access Key depends on Organization's Security policy and is out of scope of this blogpost"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Test Access to Jira using Access Token"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Convert Access Token to Dictionary\n",
"access_token = dict(urllib.parse.parse_qsl(access_token_content))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Display the value of access token\n",
"access_token"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create an oauth token using components of access token\n",
"accessToken = oauth.Token(access_token[b'oauth_token'], access_token[b'oauth_token_secret'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Use Consumer and Access Token to create oauth client\n",
"client = oauth.Client(consumer, accessToken)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Add Signature Method to the client\n",
"client.set_signature_method(SignatureMethod_RSA_SHA1())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### data_url is also defined while setting-up RestAPI endpoint in jira\n",
"- Below is the sample value of data URL\n",
" - https://awsjiratos3.atlassian.net/rest/api/2/search?jql=project=Test_Project_1\n",
"- Data URL has two components\n",
" - RestAPI Endpoint to get data from JIRA\n",
" - JQL (Jira Query Language) to filter data from JIRA example: by project, time frame etc."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define Data URL to pull data from JIRA mention test project with limited records\n",
"data_url = 'input_here'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pull Data for test project from JIRA\n",
"jira_resp, jira_content = client.request(data_url, \"GET\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# jira_content will have data returned from JIRA in json format\n",
"pprint.pprint(jira_content)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Sample Code to Pull Data From JIRA in to s3\n",
"- This process should be done as part of code deployment (Cloudformation or CDK)\n",
"- AWS Region should be parameterized "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create get_data function to orchestrate data pull from JIRA and extract data to s3\n",
"- As part of Orchestration this function will identify number of records which will be extracted from JIRA\n",
"- Loops through them based on JIRA Page size\n",
"- Calls query_endpoint to get data for a page\n",
"- Writes data to s3"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_data(project, load_type, start_date, end_date, s3_end_date_prefix):\n",
" \"\"\"\n",
" :param\n",
" project: JIRA Project for which data needs to be extracted to s3\n",
" load_type: Bulk / Incremental\n",
" start_date: Date as String in 'YYYY-MM-DD HH:00' format (Blank in if load_type is Bulk)\n",
" end_date: Date as String in 'YYYY-MM-DD HH:00' format\n",
" s3_end_date_prefix: End Date formatted as s3 Prefix\n",
"\n",
" :return:\n",
" Total Number of Records Extracted From JIRA\n",
" \"\"\"\n",
" \n",
" # Define s3 client\n",
" s3 = boto3.client('s3')\n",
"\n",
" # Define s3 bucket & key where JIRA Output will be written\n",
" # these can be passed as Glue Job Parameter or Lambda Function Environment Variable\n",
" # or stored as ssm parameter\n",
" output_s3_bucket = 'sample-project-tracking-ingest'\n",
" key_prefix = 'jira_data' + '/' + project + '/' + s3_end_date_prefix + '/'\n",
"\n",
" # Set values for JQL Query Variables\n",
" start_page_index = 0\n",
" start_at_index = 0\n",
" result_size = 50\n",
" max_pages = 100000\n",
"\n",
" # Define Output file name\n",
" file_name = project + '_' + str(start_page_index) + '.json'\n",
"\n",
" # Define variables to create Consumer & Signature for accessing JIRA - jira_access_secrets\n",
" jira_access_secrets = json.loads(ssm.get_parameter(Name='jira_access_secrets', WithDecryption=True)['Parameter']['Value'])\n",
" consumer_key = jira_access_secrets['consumer_key']\n",
" consumer_secret = jira_access_secrets['consumer_secret']\n",
"\n",
" oauth_token_param_str = jira_access_secrets['access_token']\n",
" oauth_token = oauth_token_param_str.encode()\n",
"\n",
" # Define Base URL, Search End Point & JQL Query Template\n",
" jira_access_urls = json.loads(ssm.get_parameter(Name='jira_access_urls', WithDecryption=True)['Parameter']['Value'])\n",
" endpoint_url = jira_access_urls['data_url']\n",
"\n",
" if load_type == 'Bulk':\n",
" query_string_template = \"?jql=project={0}{1}&startAt={2}&maxResults={3}&fields=*all&expand=renderedFields,names,schema,transitions,operations,changelog,projects.issuetypes.fields&fieldsByKeys=true\"\n",
" date_condition = \"&updated<='\" + end_date + \"'\"\n",
" escaped_date_condition = urllib.parse.quote(date_condition, safe='')\n",
" # Replace variables in query string template\n",
" query_string = query_string_template.format(project,\n",
" escaped_date_condition,\n",
" start_at_index,\n",
" result_size)\n",
" else:\n",
" query_string_template = \"?jql=project={0}{1}{2}&startAt={3}&maxResults={4}&fields=*all&expand=renderedFields,names,schema,transitions,operations,changelog,projects.issuetypes.fields&fieldsByKeys=true\"\n",
" date_condition1 = \"&updated>'\" + start_date + \"'\"\n",
" date_condition2 = \"&updated<='\" + end_date + \"'\"\n",
" escaped_date_condition1 = urllib.parse.quote(date_condition1, safe='')\n",
" escaped_date_condition2 = urllib.parse.quote(date_condition2, safe='')\n",
" # Replace variables in query string template\n",
" query_string = query_string_template.format(project,\n",
" escaped_date_condition1,\n",
" escaped_date_condition2,\n",
" start_at_index,\n",
" result_size)\n",
"\n",
" logger.info(\"Generating token consumer\")\n",
"\n",
" # Generate consumer\n",
" consumer = oauth.Consumer(consumer_key, consumer_secret)\n",
"\n",
" # Generate Access Token\n",
" access_token = dict(urllib.parse.parse_qsl(oauth_token))\n",
"\n",
" # Generate Client Token from Access Token\n",
" client_token = oauth.Token(access_token[b'oauth_token'], access_token[b'oauth_token_secret'])\n",
"\n",
" logger.info(\"Retrieving data: {0}\".format(query_string))\n",
" data = {}\n",
"\n",
" # Call query_endpoint function to hit JIRA API Endpoint\n",
" try:\n",
" data = query_endpoint(query_string, endpoint_url, consumer, client_token)\n",
" except Exception as data_err:\n",
" logger.error(\"Unable to retrieve data: {0}\".format(str(data_err)))\n",
" data = {}\n",
"\n",
" #\n",
" num_entries = 0\n",
" json_issues_list = None\n",
"\n",
" # Get Total Rows & Extract Data Records\n",
" total_entries = data.get('total', 0)\n",
" issues_list = data.get('issues', [])\n",
" estimated_max_pages = (total_entries // result_size) + 1\n",
"\n",
" logger.info(\"Total number of issues: {0}. Initial page retrieved {1} issues.\".format(total_entries,\n",
" len(issues_list)))\n",
"\n",
" # Parameter to sleep between two API calls\n",
" api_sleep_in_seconds = 1\n",
"\n",
" # Sleep for api_sleep_in_seconds between two API calls to JIRA\n",
" if total_entries > 0:\n",
" logger.info(\"API sleep for {0} seconds.\".format(api_sleep_in_seconds))\n",
" time.sleep(api_sleep_in_seconds)\n",
"\n",
" # Put the Data to s3\n",
" if len(issues_list) > 0:\n",
" try:\n",
" json_issues_list = json.dumps(issues_list)\n",
" except Exception as json_err:\n",
" logger.error(\"Unable to serialize issues_list to json: {0}\".format(str(json_err)))\n",
" if json_issues_list is not None and type(json_issues_list) is str:\n",
" logger.info(\n",
" \"Attempting to write JSON to buket {0} and key {1}\".format(output_s3_bucket, key_prefix + file_name))\n",
" try:\n",
" s3.put_object(Bucket=output_s3_bucket, Key=key_prefix + file_name, Body=json_issues_list)\n",
" except Exception as e:\n",
" logger.error(\"Unable to write JSON to buket {0} and key {1} because {2}\".format(output_s3_bucket,\n",
" key_prefix + file_name,\n",
" str(e)))\n",
"\n",
" num_entries += len(issues_list)\n",
" start_at_index += len(issues_list)\n",
" start_page_index += 1\n",
"\n",
" # Paginate\n",
" logger.info(\"Query has {0} total entries. Max pages set to {1}\".format(total_entries, max_pages))\n",
"\n",
" while start_at_index < total_entries and start_page_index < max_pages and start_page_index < estimated_max_pages:\n",
"\n",
" file_name = project + '_' + str(start_page_index) + '.json'\n",
"\n",
" if load_type == 'Bulk':\n",
" query_string_template = \"?jql=project={0}{1}&startAt={2}&maxResults={3}&fields=*all&expand=renderedFields,names,schema,transitions,operations,changelog,projects.issuetypes.fields&fieldsByKeys=true\"\n",
" date_condition = \"&updated<='\" + end_date + \"'\"\n",
" escaped_date_condition = urllib.parse.quote(date_condition, safe='')\n",
" # Replace variables in query string template\n",
" query_string = query_string_template.format(project,\n",
" escaped_date_condition,\n",
" start_at_index,\n",
" result_size)\n",
" else:\n",
" query_string_template = \"?jql=project={0}{1}{2}&startAt={3}&maxResults={4}&fields=*all&expand=renderedFields,names,schema,transitions,operations,changelog,projects.issuetypes.fields&fieldsByKeys=true\"\n",
" date_condition1 = \"&updated>'\" + start_date + \"'\"\n",
" date_condition2 = \"&updated<='\" + end_date + \"'\"\n",
" escaped_date_condition1 = urllib.parse.quote(date_condition1, safe='')\n",
" escaped_date_condition2 = urllib.parse.quote(date_condition2, safe='')\n",
" # Replace variables in query string template\n",
" query_string = query_string_template.format(project,\n",
" escaped_date_condition1,\n",
" escaped_date_condition2,\n",
" start_at_index,\n",
" result_size)\n",
"\n",
" logger.info(\"Retrieving data: {0}\".format(endpoint_url + query_string))\n",
"\n",
" try:\n",
" data = query_endpoint(query_string, endpoint_url, consumer, client_token)\n",
" except Exception as data_err:\n",
" logger.error(\"Unable to retrieve data: {0}\".format(str(data_err)))\n",
" data = {}\n",
" break\n",
"\n",
" # Extract Data Records\n",
" issues_list = data.get('issues', [])\n",
"\n",
" if len(issues_list) > 0:\n",
" logger.info(\"Retrieved {0} pages out of max {1}\".format(start_page_index, max_pages))\n",
"\n",
" # Increment counters\n",
" num_entries += len(issues_list)\n",
" start_at_index += len(issues_list)\n",
" start_page_index += 1\n",
"\n",
" # Handle throttling\n",
" if len(issues_list) > 0:\n",
" logger.info(\"Sleep for {0} seconds.\".format(api_sleep_in_seconds))\n",
" time.sleep(api_sleep_in_seconds)\n",
"\n",
" # Handle serialization\n",
" if len(issues_list) > 0:\n",
" # Note: I want this to fail hard\n",
" try:\n",
" json_issues_list = json.dumps(issues_list)\n",
" except Exception as json_err:\n",
" logger.error(\"Unable to serialize issues_list to json: {0}\".format(str(json_err)))\n",
" json_issues_list = None\n",
"\n",
" if json_issues_list is not None and type(json_issues_list) is str:\n",
" try:\n",
" s3.put_object(Bucket=output_s3_bucket, Key=key_prefix + file_name, Body=json_issues_list)\n",
" except Exception as e:\n",
" logger.error(\"Unable to write JSON to bucket {0} and key {1} because {2}\".format(output_s3_bucket,\n",
" key_prefix + file_name,\n",
" str(e)))\n",
"\n",
" logger.info(\"Completed call to Search API for project {0}\".format(project))\n",
" return num_entries\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create query_endpoint which will be called from get_data function\n",
"- This function will hit the JIRA RESTApi end point to extract the data and return it to get_data function"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def query_endpoint(query_string, endpoint_url, consumer, client_token):\n",
" \"\"\"Query JIRA endpoint.\n",
"\n",
" :param endpoint_url:\n",
" :param query_string:\n",
" :param access_token: Output of get_access_token\n",
" :param consumer_key:\n",
" :param consumer_secret:\n",
" :return:\n",
" \"\"\"\n",
" # Create oauth client\n",
" client = oauth.Client(consumer, client_token)\n",
" # Create Signature for hitting JIRA End Point\n",
" client.set_signature_method(SignatureMethod_RSA_SHA1())\n",
"\n",
" # Retrieve data\n",
" logger.info(\"Attempting to retrieve data from endpoint '{0}' with query '{1}'\".format(endpoint_url,\n",
" query_string))\n",
"\n",
" data_url = endpoint_url + query_string\n",
"\n",
" logger.info(\"Requesting data: {0}\".format(data_url))\n",
"\n",
" data_resp, data_content = None, None\n",
" try:\n",
" data_resp, data_content = client.request(data_url, \"GET\")\n",
" except Exception as dreq_err:\n",
" logger.error(\"Unable to retrieve data: response={0}. Error: {1}\".format(data_resp, str(dreq_err)))\n",
"\n",
" logger.info(\"Response from service endpoint: {0}\".format(data_resp))\n",
"\n",
" content = None\n",
" if data_content is not None:\n",
" logger.info(\"Attempting to deserialize data\")\n",
" # Output needs to be decoded from bytes to utf8 string.\n",
" content = json.loads(data_content.decode('utf8'))\n",
"\n",
" return content\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### main function\n",
"- Loops through Projects in Scope for data extraction from JIRA\n",
"- Inside each loop:\n",
" - Identifies the Load Type for the Project (Bulk vs Incremental)\n",
" - In case of Incremental Load Type Identifies Start Date & End Date i.e. date range for which data needs to be extracted from JIRA\n",
" - Invokes get_data for data extraction\n",
" - Updates DynamoDB with new last_ingestion date which will be set to start date for next execution\n",
"- Define this as main function inside glue Job or as lambda handler inside lambda function"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define variables DynamoDB, these can be passed as Glue Job Parameter or Lambda Function Environment Variable\n",
"# or stored as ssm parameter\n",
"dynamodb_table = 'jira_batch_tracking'\n",
"dynamodb_partition_key = 'jira_project_name'\n",
"\n",
"# Define AWS Region, this needs to be identified and passed dynamically as aprt of CI/CD code deployment process\n",
"aws_region = 'us-east-1'\n",
"\n",
"# Get the values of JIRA projects, these can be passed as Glue Job Parameter or Lambda Function Environment Variable\n",
"# or stored as ssm parameter\n",
"jira_projects = 'Test_Project_1, Test_Project_2, Test_Project_3'\n",
"\n",
"# Convert the Project String into Project List\n",
"jira_project_list = jira_projects.split(',')\n",
"\n",
"# Specify the Timezone in which JIRA is set-up\n",
"est = timezone('EST')\n",
"\n",
"# Calculate Current System date\n",
"current_DT = datetime.datetime.now(est)\n",
"year_DT = current_DT.year\n",
"month_DT = current_DT.month\n",
"day_DT = current_DT.day\n",
"hour_DT = current_DT.hour + 2\n",
"minute_DT = current_DT.minute\n",
"\n",
"# Prefix Month, Day and Hour with 0 in case it's a single digit value\n",
"month_DT_str = '{0:02}'.format(month_DT)\n",
"day_DT_str = '{0:02}'.format(day_DT)\n",
"hour_DT_str = '{0:02}'.format(hour_DT)\n",
"\n",
"start_date = ''\n",
"s3_end_date_prefix = ''\n",
"\n",
"# Calculate the Datetime till JIRA data will be pulled\n",
"# This is rounded upto the hour\n",
"end_date = \"{0}-{1}-{2} {3}:00\".format(year_DT,month_DT_str,day_DT_str,hour_DT_str)\n",
"logger.info(\"Value of End Date is - {0}\".format(end_date))\n",
"s3_end_date_prefix = 'yr=' + str(year_DT) + '/mo=' + str(month_DT) + '/dt=' + str(day_DT) + '/hr=' + str(\n",
" hour_DT) + ':00'\n",
"\n",
"# Call get_data function to extract data from JIRA one Project at a time\n",
"for project in jira_project_list:\n",
" logger.info(\"JIRA Data Pull for Project - {0}\".format(project))\n",
"\n",
" # Query Project From DynamoDB Tracking Table\n",
" dynamodb_client = boto3.resource('dynamodb', region_name=aws_region)\n",
" db_mtable = dynamodb_client.Table(dynamodb_table)\n",
" response = db_mtable.query(\n",
" KeyConditionExpression=Key(dynamodb_partition_key).eq(project)\n",
" )\n",
"\n",
" # If Project is Not Present in DynamoDB set Load Type as Bulk\n",
" # All the data from Jira will be extracted to s3\n",
" if response['Count'] == 0:\n",
" # Define Load Type as History as DynamoDB doesn't have entry for the Project\n",
" load_type = 'Bulk'\n",
" # Call get_data function to extract data for the Project from JIRA\n",
" total_entries = get_data(project, load_type, start_date, end_date, s3_end_date_prefix)\n",
"\n",
" # If Project is Present in DynamoDB set Load Type as Incremental\n",
" # Set start_date as last_ingest_date from DynamoDB Table\n",
" # Only JIRA Stories modified between start_date and end_date will be extracted to s3\n",
" else:\n",
" # Define Load Type as Incremental as DynamoDB has entry for the Project\n",
" load_type = 'Incremental'\n",
" # Get the Last Ingestion Date from DynamoDB response object and assign it to Start Date\n",
" start_date = response['Items'][0]['last_ingest_date']\n",
" logger.info(\"Value of Start Date is - {0}\".format(start_date))\n",
" # Call get_data function to extract data for the Project from JIRA\n",
" total_entries = get_data(project, load_type, start_date, end_date, s3_end_date_prefix)\n",
"\n",
" # Insert/Update Entry in DynamoDB for the Project and set end_date as last_ingest_date\n",
" put_response = db_mtable.put_item(\n",
" Item={\n",
" dynamodb_partition_key: project,\n",
" 'last_ingest_date': end_date\n",
" }\n",
" )\n",
" logger.info(\"Value of DynamoDB Put Response is - {0}\".format(put_response))\n",
" logger.info(\n",
" \"JIRA Data Pull for Project - {0} completed and total {1} records loaded to landing bucket\".format(project,\n",
" total_entries))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment