Created
September 8, 2014 10:28
-
-
Save daniilyar/f524306ded618ad591ac to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.sql.Connection; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.sql.Types; | |
import java.util.HashMap; | |
import java.util.Map; | |
import javax.sql.DataSource; | |
import org.apache.commons.dbutils.DbUtils; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.stereotype.Component; | |
public class Processor { | |
private static final Logger LOG = LoggerFactory.getLogger(Processor.class); | |
private static final int BATCH_SIZE = 1000; | |
private static final String SQL_GET_ALL_FS_PERM_SEC_IDS = ""; | |
private static final String SQL_DELETE_OBSOLETE_SECURITIES = ""; | |
private static final String SQL_GET_AVG_PRICES_FOR_SECURITY = ""; | |
private static final String SQL_APPEND_AVG_PRICES_FOR_SECURITY = "insert into THEMES.AVG_PRICES " | |
+ "(fs_perm_sec_id, price_1d, price_7d, price_1m, price_3m, price_1y, price_ytd) values (?,?,?,?,?,?,?)"; | |
private DataSource dataSource; | |
private Connection conn; | |
private PreparedStatement psGetAllFsPermSecIds; | |
private PreparedStatement psGetAvgPricesForSecurity; | |
private PreparedStatement psUpdateAvgPrices; | |
public void setDataSource(DataSource dataSource) { | |
this.dataSource = dataSource; | |
} | |
public void init() { | |
try { | |
conn = dataSource.getConnection(); | |
conn.setAutoCommit(false); | |
psGetAllFsPermSecIds = conn.prepareStatement(SQL_GET_ALL_FS_PERM_SEC_IDS); | |
psGetAvgPricesForSecurity = conn.prepareStatement(SQL_GET_AVG_PRICES_FOR_SECURITY); | |
psUpdateAvgPrices = conn.prepareStatement(SQL_APPEND_AVG_PRICES_FOR_SECURITY); | |
} catch (SQLException e) { | |
throw new IllegalStateException("Error occured when initializing JDBC resources: ", e); | |
} | |
} | |
public void release() { | |
try { | |
LOG.info("Releasing all resources ..."); | |
DbUtils.closeQuietly(psGetAllFsPermSecIds); | |
DbUtils.closeQuietly(psGetAvgPricesForSecurity); | |
DbUtils.closeQuietly(psUpdateAvgPrices); | |
conn.close(); | |
} catch (SQLException e) { | |
throw new IllegalStateException("Cannot release the connection: ", e); | |
} | |
} | |
public void calcAveragePrices() throws SQLException { | |
deleteObsoleteSecurities(); | |
LOG.info("Average prices calculation started"); | |
int count = 0; | |
try (ResultSet allFsPermSecIdsRs = psGetAllFsPermSecIds.executeQuery();) { | |
while (allFsPermSecIdsRs.next()) { | |
String fsPermSecId = allFsPermSecIdsRs.getString("FS_PERM_SEC_ID"); | |
calcAveragePricesForSecurity(fsPermSecId); | |
count++; | |
if (count % BATCH_SIZE == 0) { | |
LOG.info("Processed {} securities", count); | |
psUpdateAvgPrices.executeBatch(); | |
} | |
} | |
psUpdateAvgPrices.executeBatch(); // push last chunk | |
LOG.info("Done, totally {} records were merged into THEMES.AVERAGE_PRICES table", count); | |
} catch (SQLException e) { | |
throw new IllegalStateException("Cannot recalculate THEMES.AVERAGE_PRICES table: ", e); | |
} | |
LOG.info("Average prices calculation finished, committing ..."); | |
conn.commit(); | |
} | |
private void deleteObsoleteSecurities() { | |
LOG.info("Removing obsolete records from THEMES.AVERAGE_PRICES table ..."); | |
try (PreparedStatement ps = conn.prepareStatement(SQL_DELETE_OBSOLETE_SECURITIES);) { | |
int removedRecords = ps.executeUpdate(); | |
LOG.info("Removing obsolete records - finished, {} records removed", removedRecords); | |
} catch (SQLException e) { | |
throw new IllegalStateException("Cannot delete obsolete securities: ", e); | |
} | |
} | |
private void calcAveragePricesForSecurity(String fsPermSecId) throws SQLException { | |
LOG.debug("Processing {} ...", fsPermSecId); | |
for (int i = 1; i <= 6; i++) { | |
psGetAvgPricesForSecurity.setString(i, fsPermSecId); | |
} | |
Map<String, Double> avgPricesForPeriods = new HashMap<>(); | |
try (ResultSet rs = psGetAvgPricesForSecurity.executeQuery();) { | |
while (rs.next()) { | |
String period = rs.getString("PERIOD"); | |
double avgPrice = rs.getDouble("AVG_PRICE"); | |
avgPricesForPeriods.put(period, avgPrice); | |
} | |
} | |
psUpdateAvgPrices.setString(1, fsPermSecId); | |
Period[] values = Period.values(); | |
for (int i = 0; i < values.length; i++) { | |
Double avgPrice = avgPricesForPeriods.get(values[i].name()); | |
if (avgPrice == null) { | |
psUpdateAvgPrices.setNull(i + 2, Types.DOUBLE); | |
} else { | |
psUpdateAvgPrices.setDouble(i + 2, avgPrice); | |
} | |
} | |
psUpdateAvgPrices.addBatch(); | |
} | |
private enum Period { | |
DAY, | |
WEEK, | |
MONTH, | |
QUARTER, | |
YEAR, | |
YTD; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment