Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@daniilyar
Created September 8, 2014 10:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daniilyar/f524306ded618ad591ac to your computer and use it in GitHub Desktop.
Save daniilyar/f524306ded618ad591ac to your computer and use it in GitHub Desktop.
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.apache.commons.dbutils.DbUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
public class Processor {
private static final Logger LOG = LoggerFactory.getLogger(Processor.class);
private static final int BATCH_SIZE = 1000;
private static final String SQL_GET_ALL_FS_PERM_SEC_IDS = "";
private static final String SQL_DELETE_OBSOLETE_SECURITIES = "";
private static final String SQL_GET_AVG_PRICES_FOR_SECURITY = "";
private static final String SQL_APPEND_AVG_PRICES_FOR_SECURITY = "insert into THEMES.AVG_PRICES "
+ "(fs_perm_sec_id, price_1d, price_7d, price_1m, price_3m, price_1y, price_ytd) values (?,?,?,?,?,?,?)";
private DataSource dataSource;
private Connection conn;
private PreparedStatement psGetAllFsPermSecIds;
private PreparedStatement psGetAvgPricesForSecurity;
private PreparedStatement psUpdateAvgPrices;
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
public void init() {
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
psGetAllFsPermSecIds = conn.prepareStatement(SQL_GET_ALL_FS_PERM_SEC_IDS);
psGetAvgPricesForSecurity = conn.prepareStatement(SQL_GET_AVG_PRICES_FOR_SECURITY);
psUpdateAvgPrices = conn.prepareStatement(SQL_APPEND_AVG_PRICES_FOR_SECURITY);
} catch (SQLException e) {
throw new IllegalStateException("Error occured when initializing JDBC resources: ", e);
}
}
public void release() {
try {
LOG.info("Releasing all resources ...");
DbUtils.closeQuietly(psGetAllFsPermSecIds);
DbUtils.closeQuietly(psGetAvgPricesForSecurity);
DbUtils.closeQuietly(psUpdateAvgPrices);
conn.close();
} catch (SQLException e) {
throw new IllegalStateException("Cannot release the connection: ", e);
}
}
public void calcAveragePrices() throws SQLException {
deleteObsoleteSecurities();
LOG.info("Average prices calculation started");
int count = 0;
try (ResultSet allFsPermSecIdsRs = psGetAllFsPermSecIds.executeQuery();) {
while (allFsPermSecIdsRs.next()) {
String fsPermSecId = allFsPermSecIdsRs.getString("FS_PERM_SEC_ID");
calcAveragePricesForSecurity(fsPermSecId);
count++;
if (count % BATCH_SIZE == 0) {
LOG.info("Processed {} securities", count);
psUpdateAvgPrices.executeBatch();
}
}
psUpdateAvgPrices.executeBatch(); // push last chunk
LOG.info("Done, totally {} records were merged into THEMES.AVERAGE_PRICES table", count);
} catch (SQLException e) {
throw new IllegalStateException("Cannot recalculate THEMES.AVERAGE_PRICES table: ", e);
}
LOG.info("Average prices calculation finished, committing ...");
conn.commit();
}
private void deleteObsoleteSecurities() {
LOG.info("Removing obsolete records from THEMES.AVERAGE_PRICES table ...");
try (PreparedStatement ps = conn.prepareStatement(SQL_DELETE_OBSOLETE_SECURITIES);) {
int removedRecords = ps.executeUpdate();
LOG.info("Removing obsolete records - finished, {} records removed", removedRecords);
} catch (SQLException e) {
throw new IllegalStateException("Cannot delete obsolete securities: ", e);
}
}
private void calcAveragePricesForSecurity(String fsPermSecId) throws SQLException {
LOG.debug("Processing {} ...", fsPermSecId);
for (int i = 1; i <= 6; i++) {
psGetAvgPricesForSecurity.setString(i, fsPermSecId);
}
Map<String, Double> avgPricesForPeriods = new HashMap<>();
try (ResultSet rs = psGetAvgPricesForSecurity.executeQuery();) {
while (rs.next()) {
String period = rs.getString("PERIOD");
double avgPrice = rs.getDouble("AVG_PRICE");
avgPricesForPeriods.put(period, avgPrice);
}
}
psUpdateAvgPrices.setString(1, fsPermSecId);
Period[] values = Period.values();
for (int i = 0; i < values.length; i++) {
Double avgPrice = avgPricesForPeriods.get(values[i].name());
if (avgPrice == null) {
psUpdateAvgPrices.setNull(i + 2, Types.DOUBLE);
} else {
psUpdateAvgPrices.setDouble(i + 2, avgPrice);
}
}
psUpdateAvgPrices.addBatch();
}
private enum Period {
DAY,
WEEK,
MONTH,
QUARTER,
YEAR,
YTD;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment