Skip to content

Instantly share code, notes, and snippets.

@Hc747
Created January 2, 2020 03:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Hc747/129ce75e6b4237791e8d1f33b737d694 to your computer and use it in GitHub Desktop.
Save Hc747/129ce75e6b4237791e8d1f33b737d694 to your computer and use it in GitHub Desktop.
package au.edu.uac.apply.services
import au.edu.uac.agentportal.model.channel.InstitutionChannel
import grails.gorm.transactions.Transactional
import groovy.sql.GroovyRowResult
import groovy.sql.Sql
import java.util.concurrent.ConcurrentHashMap
import static grails.async.Promises.task
@Transactional
class CourseOfferingPullService {
def dataSource
def dataSource_apply
def persistenceInterceptor
private final Set<String> processing = ConcurrentHashMap.newKeySet()
//TODO: ensure transactionality
//TODO: expose via controller
def process(final String institution) {
if (!institution) {
throw new IllegalStateException("Institution id not permitted to be null or empty.")
}
if (!processing.add(institution)) {
log.warn("Course Offering pull service is running for institution: ${institution}")
return null
}
task {
final def start = new Date()
try {
final def apply_db = new Sql(dataSource_apply)
final def inserts = run { download(apply_db, institution) }
final def agent_portal_db = new Sql(dataSource)
final def results = run { upload(agent_portal_db, inserts) }
log.info("Results: $results")
} catch (Throwable t) {
log.error("An exception occurred during the execution of the Course Offering pull service for institution: ${institution}", t)
} finally {
log.info("Course Offering pulling service for instititution (${institution}) is finished in ${new Date().time - start.time} ms")
if (!processing.remove(institution)) {
throw new IllegalMonitorStateException("Internal state of CourseOfferingPullService#PROCESSING is invalid.")
}
}
}
}
private <T> T run(final Closure<T> action) {
try {
persistenceInterceptor.init()
return action()
} catch (Exception e) {
log.error e.message
} finally {
persistenceInterceptor.destroy()
}
return null
}
private static Map<String, String> download(final Sql db, final String institution) { //TODO: use institution parameter
final Map<String, String> output = [:]
for (final def entry in SCHEMA_MAPPING.entrySet()) {
final def table = entry.key
final def context = entry.value
output[table] = insert(db, table, context.query, context.transformations)
}
return output
}
private static def upload(final Sql db, final Map<String, String> payload) {
final def output = [:]
for (final def entry in payload.entrySet()) {
final def table = entry.key
final def query = entry.value
try {
output[table] = query == null ? -1 : db.executeUpdate(query)
} catch (Exception e) {
output[table] = e.message
}
}
return output
}
private static final def createTransformer(final String prefix, final Map<String, ?> transformations) {
return { final GroovyRowResult row, final String key ->
def value = row[prefix + key]
if (value == null || (value instanceof String && value.isAllWhitespace())) {
return null
}
if (transformations?.containsKey(key)) {
final def transformation = transformations[key]
value = transformation(value)
}
return "'$value'"
}
}
private static final String insert(final Sql db, final String table, final String query, final Map<String, ?> transformations) {
final def rows = db.rows(query)
if (rows.isEmpty()) {
return null
}
final def prefix = table + '_'
final def transformer = createTransformer(prefix, transformations)
final def columns = rows.first().collect { r -> r.key.toString().replaceFirst(prefix, '') }
final def values = rows.collect { row -> "(${columns.collect { column -> transformer(row, column) }.join(', ')})" }.join(', ')
final def update = """
INSERT INTO $table (${columns.join(', ')})
VALUES $values
ON DUPLICATE KEY UPDATE ${columns.collect { column -> "$column = VALUES($column)" }.join(', ')}
"""
return update.trim()
}
//TODO: ensure column name are aligned with the database table; parameterise institution_code
private static final String CHANNEL_COURSE_QUERY = '''
SELECT
c.course_code AS channel_course_id,
0 AS channel_course_version,
c.institution_code AS channel_course_institution_channel_id,
c.course_code AS channel_course_channel_course_code,
cp.property_value AS channel_course_display_code,
cc.cricos_course_code AS channel_course_cricos_code,
c.course_status AS channel_course_status,
c.course_desc_short AS channel_course_description,
c.course_level AS channel_course_level,
c.open_to_year12_flag AS channel_course_open_to_year12_flag,
c.open_to_os_flag AS channel_course_open_to_os_flag,
c.campus_location AS channel_course_campus_location,
c.acknowledgement_text AS channel_course_acknowledgement_text
FROM course AS c
LEFT JOIN cricos_course AS cc ON c.course_code = cc.course_code
LEFT JOIN course_properties AS cp ON c.course_code = cp.domain_code AND cp.property_name = 'ALL-PORTAL-AU01'
WHERE c.institution_code = 'AU'
'''.trim()
//TODO: ensure column name are aligned with the database table; parameterise institution_code
private static final String COURSE_COMBINATION_QUERY = '''
SELECT
cc.course_code as course_combination_course_code,
cc.seq_num as course_combination_seq_num,
cc.course_code_selectable as course_combination_course_code_selectable,
cc.mandatory_flag as course_combination_mandatory_flag
FROM course_combination AS cc
WHERE
EXISTS (SELECT * FROM course c WHERE c.course_code = cc.course_code AND c.institution_code = 'AU')
AND
EXISTS (SELECT * FROM course c WHERE c.course_code = cc.course_code_selectable AND c.institution_code = 'AU')
'''.trim()
//TODO: ensure column name are aligned with the database table; parameterise institution_code
private static final def COURSE_COMBINATION_INVALID_QUERY = '''
SELECT
cc.course_code_fdd as course_combination_course_code_fdd,
cc.seq_num as course_combination_invalid_seq_num,
cc.course_code_1 as course_combination_invalid_course_code_1,
cc.course_code_2 as course_combination_invalid_course_code_2
FROM course_combination_invalid AS cc
WHERE
EXISTS (SELECT * FROM course c WHERE c.course_code = cc.course_code_1 AND c.institution_code = 'AU')
AND
EXISTS (SELECT * FROM course c WHERE c.course_code = cc.course_code_2 AND c.institution_code = 'AU')
'''
private static final def SCHEMA_MAPPING = [
channel_course: [
query: CHANNEL_COURSE_QUERY,
transformations: [
description: { String description ->
final def index = description.indexOf(' at ')
return index > -1 ? description.substring(0, index) : description
},
institution_channel_id: { String code -> InstitutionChannel.findByInstChannelCode(code)?.id }
]
],
course_combination: [
query: COURSE_COMBINATION_QUERY
],
course_combination_invalid: [
query: COURSE_COMBINATION_INVALID_QUERY
]
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment