Skip to content

Instantly share code, notes, and snippets.

@jnhdny
Last active May 25, 2017 08:52
Show Gist options
  • Save jnhdny/98a8e3c5400b0f47a5dd5828f26764cd to your computer and use it in GitHub Desktop.
Save jnhdny/98a8e3c5400b0f47a5dd5828f26764cd to your computer and use it in GitHub Desktop.
Read PSM LMD template from S3 bucket and hide specified columns
from __future__ import print_function
import urllib
import os
from io import BytesIO
import boto3
import openpyxl
from openpyxl.utils import get_column_interval, get_column_letter
from openpyxl.writer.excel import save_virtual_workbook
s3 = boto3.client('s3')
PASSWORD = os.environ['EXCEL_PASSWORD']
NON_COLD_CHAIN_PRODUCTS = 55
COLD_CHAIN_PRODUCTS = 10
REPEATING = 6
START_COLUMN = 9
def hide_columns(input_worksheet, start_column, ranges, number_of_products, repeating):
"""Hide columns in *ranges* in worksheet, starting from start_column
and running for *number_of_products* column groups.
Column groups span *repeating* columns
ranges is a list of two-tuples like [(0,2), (4,5)]
"""
for product_start in range(number_of_products):
for beg, end in ranges:
range_start = start_column + (product_start * repeating) + beg
range_end = start_column + (product_start * repeating) + end
input_worksheet.column_dimensions.group(get_column_letter(range_start),
get_column_letter(range_end),
hidden=True)
return input_worksheet
def show_columns(input_worksheet, start_column, ranges, number_of_products, repeating):
"""Hide columns in *ranges* in worksheet, starting from start_column
and running for *number_of_products* column groups.
Column groups span *repeating* columns
ranges is a list of two-tuples like [(4,6),]
"""
for product_start in range(number_of_products):
for beg, end in ranges:
range_start = start_column + (product_start * repeating) + beg
range_end = start_column + (product_start * repeating) + end
for letter in get_column_interval(range_start, range_end):
try:
del input_worksheet.column_dimensions[letter]
except KeyError:
# print "Error deleting %s" % letter
pass
return input_worksheet
def disable_protection(input_worksheet, password):
"""Disable protection for input worksheet"""
worksheet_protection = input_worksheet.protection
worksheet_protection.set_password(password)
worksheet_protection.disable()
return input_worksheet
def is_psm_non_cold_chain(input_worksheet):
"""Returns True if input_worksheet is cold chain PSM HIV template"""
i3_cell = input_worksheet.cell('I3').value
kk3_cell = input_worksheet.cell('KK3').value
a3_cell = input_worksheet.cell('A3').value
try:
valid = "AZT/3TC" in i3_cell and "CAP" in kk3_cell and "Profile" in a3_cell
except TypeError:
valid = False
return valid
def is_psm_cold_chain(input_worksheet):
"""Returns True if input_worksheet is cold chain PSM HIV template"""
i3_cell = input_worksheet.cell('I3').value
ay3_cell = input_worksheet.cell('AY3').value
a3_cell = input_worksheet.cell('A3').value
try:
valid = "BD FACSCount" in i3_cell and "CAP-CTM" in ay3_cell and "Profile" in a3_cell
except TypeError:
valid = False
return valid
def process_lmd(lmd_file):
"""If workbook has qualifying sheet, returns processed workbook with only VQTO columns visible in those sheets"""
workbook = openpyxl.load_workbook(lmd_file)
sheet_names = workbook.get_sheet_names()
has_valid_sheet = False
for sheet_name in sheet_names:
worksheet = workbook.get_sheet_by_name(sheet_name)
if is_psm_non_cold_chain(worksheet):
has_valid_sheet = True
worksheet = disable_protection(worksheet, PASSWORD)
worksheet = show_columns(worksheet, START_COLUMN, [(4, 6)], NON_COLD_CHAIN_PRODUCTS, REPEATING)
worksheet = hide_columns(worksheet, START_COLUMN, [(0, 2), (4, 5)], NON_COLD_CHAIN_PRODUCTS, REPEATING)
elif is_psm_cold_chain(worksheet):
has_valid_sheet = True
worksheet = disable_protection(worksheet, PASSWORD)
worksheet = show_columns(worksheet, START_COLUMN, [(4, 6)], COLD_CHAIN_PRODUCTS, REPEATING)
worksheet = hide_columns(worksheet, START_COLUMN, [(0, 2), (4, 5)], COLD_CHAIN_PRODUCTS, REPEATING)
if has_valid_sheet:
return workbook
else:
return None
def lambda_handler(event, context):
source_bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(
event['Records'][0]['s3']['object']['key'].encode('utf8'))
file_name = key.split('/')[-1]
new_key = 'reviewed/' + file_name
try:
response = s3.get_object(Bucket=source_bucket, Key=key)
print("CONTENT TYPE: " + response['ContentType'])
wb = response['Body']
processed_wb = process_lmd(BytesIO(wb.read()))
if not(processed_wb is None):
s3.put_object(Body=BytesIO(save_virtual_workbook(
processed_wb)), Key=new_key, Bucket=source_bucket)
return response['ContentType']
except Exception as e:
print(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment