Last active
May 25, 2017 08:52
-
-
Save jnhdny/98a8e3c5400b0f47a5dd5828f26764cd to your computer and use it in GitHub Desktop.
Read PSM LMD template from S3 bucket and hide specified columns
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import urllib | |
import os | |
from io import BytesIO | |
import boto3 | |
import openpyxl | |
from openpyxl.utils import get_column_interval, get_column_letter | |
from openpyxl.writer.excel import save_virtual_workbook | |
s3 = boto3.client('s3') | |
PASSWORD = os.environ['EXCEL_PASSWORD'] | |
NON_COLD_CHAIN_PRODUCTS = 55 | |
COLD_CHAIN_PRODUCTS = 10 | |
REPEATING = 6 | |
START_COLUMN = 9 | |
def hide_columns(input_worksheet, start_column, ranges, number_of_products, repeating): | |
"""Hide columns in *ranges* in worksheet, starting from start_column | |
and running for *number_of_products* column groups. | |
Column groups span *repeating* columns | |
ranges is a list of two-tuples like [(0,2), (4,5)] | |
""" | |
for product_start in range(number_of_products): | |
for beg, end in ranges: | |
range_start = start_column + (product_start * repeating) + beg | |
range_end = start_column + (product_start * repeating) + end | |
input_worksheet.column_dimensions.group(get_column_letter(range_start), | |
get_column_letter(range_end), | |
hidden=True) | |
return input_worksheet | |
def show_columns(input_worksheet, start_column, ranges, number_of_products, repeating): | |
"""Hide columns in *ranges* in worksheet, starting from start_column | |
and running for *number_of_products* column groups. | |
Column groups span *repeating* columns | |
ranges is a list of two-tuples like [(4,6),] | |
""" | |
for product_start in range(number_of_products): | |
for beg, end in ranges: | |
range_start = start_column + (product_start * repeating) + beg | |
range_end = start_column + (product_start * repeating) + end | |
for letter in get_column_interval(range_start, range_end): | |
try: | |
del input_worksheet.column_dimensions[letter] | |
except KeyError: | |
# print "Error deleting %s" % letter | |
pass | |
return input_worksheet | |
def disable_protection(input_worksheet, password): | |
"""Disable protection for input worksheet""" | |
worksheet_protection = input_worksheet.protection | |
worksheet_protection.set_password(password) | |
worksheet_protection.disable() | |
return input_worksheet | |
def is_psm_non_cold_chain(input_worksheet): | |
"""Returns True if input_worksheet is cold chain PSM HIV template""" | |
i3_cell = input_worksheet.cell('I3').value | |
kk3_cell = input_worksheet.cell('KK3').value | |
a3_cell = input_worksheet.cell('A3').value | |
try: | |
valid = "AZT/3TC" in i3_cell and "CAP" in kk3_cell and "Profile" in a3_cell | |
except TypeError: | |
valid = False | |
return valid | |
def is_psm_cold_chain(input_worksheet): | |
"""Returns True if input_worksheet is cold chain PSM HIV template""" | |
i3_cell = input_worksheet.cell('I3').value | |
ay3_cell = input_worksheet.cell('AY3').value | |
a3_cell = input_worksheet.cell('A3').value | |
try: | |
valid = "BD FACSCount" in i3_cell and "CAP-CTM" in ay3_cell and "Profile" in a3_cell | |
except TypeError: | |
valid = False | |
return valid | |
def process_lmd(lmd_file): | |
"""If workbook has qualifying sheet, returns processed workbook with only VQTO columns visible in those sheets""" | |
workbook = openpyxl.load_workbook(lmd_file) | |
sheet_names = workbook.get_sheet_names() | |
has_valid_sheet = False | |
for sheet_name in sheet_names: | |
worksheet = workbook.get_sheet_by_name(sheet_name) | |
if is_psm_non_cold_chain(worksheet): | |
has_valid_sheet = True | |
worksheet = disable_protection(worksheet, PASSWORD) | |
worksheet = show_columns(worksheet, START_COLUMN, [(4, 6)], NON_COLD_CHAIN_PRODUCTS, REPEATING) | |
worksheet = hide_columns(worksheet, START_COLUMN, [(0, 2), (4, 5)], NON_COLD_CHAIN_PRODUCTS, REPEATING) | |
elif is_psm_cold_chain(worksheet): | |
has_valid_sheet = True | |
worksheet = disable_protection(worksheet, PASSWORD) | |
worksheet = show_columns(worksheet, START_COLUMN, [(4, 6)], COLD_CHAIN_PRODUCTS, REPEATING) | |
worksheet = hide_columns(worksheet, START_COLUMN, [(0, 2), (4, 5)], COLD_CHAIN_PRODUCTS, REPEATING) | |
if has_valid_sheet: | |
return workbook | |
else: | |
return None | |
def lambda_handler(event, context): | |
source_bucket = event['Records'][0]['s3']['bucket']['name'] | |
key = urllib.unquote_plus( | |
event['Records'][0]['s3']['object']['key'].encode('utf8')) | |
file_name = key.split('/')[-1] | |
new_key = 'reviewed/' + file_name | |
try: | |
response = s3.get_object(Bucket=source_bucket, Key=key) | |
print("CONTENT TYPE: " + response['ContentType']) | |
wb = response['Body'] | |
processed_wb = process_lmd(BytesIO(wb.read())) | |
if not(processed_wb is None): | |
s3.put_object(Body=BytesIO(save_virtual_workbook( | |
processed_wb)), Key=new_key, Bucket=source_bucket) | |
return response['ContentType'] | |
except Exception as e: | |
print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment