Skip to content

Instantly share code, notes, and snippets.

@steklopod
Created July 10, 2018 10:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save steklopod/5caf0971b323b7b20a2d3117c9c386b8 to your computer and use it in GitHub Desktop.
Save steklopod/5caf0971b323b7b20a2d3117c9c386b8 to your computer and use it in GitHub Desktop.
JDBC methods (select, insert)
package ru.gamble.servicebus.core.processor.ifd.manager;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import ru.gamble.servicebus.core.domain.Task;
import ru.gamble.servicebus.core.domain.TaskType;
import ru.gamble.servicebus.core.exception.PackageIdUpdaterExeption;
import ru.gamble.servicebus.core.exception.RowException;
import ru.gamble.servicebus.core.exception.StatusUpdaterException;
import ru.gamble.servicebus.core.exception.TaskTypeConfigurationException;
import ru.gamble.servicebus.core.model.ifd.TableRows;
import ru.gamble.servicebus.core.service.ServiceBusMainParamService;
import ru.gamble.servicebus.core.service.TaskTypeService;
import ru.gamble.servicebus.core.util.StringParseUtils;
import javax.sql.DataSource;
import java.sql.*;
import java.util.AbstractMap.SimpleEntry;
import java.util.*;
import java.util.Map.Entry;
import java.util.function.Function;
import static java.util.stream.Collectors.toList;
import static ru.gamble.servicebus.core.model.ifd.SignalTableColumnNames.*;
import static ru.gamble.servicebus.core.model.ifd.SignalTableStatus.*;
import static ru.gamble.servicebus.core.util.DbUtil.generateFilledInClauseForLargeBatches;
import static ru.gamble.servicebus.core.util.DbUtil.getColumnNamesClassesFromResultSet;
@Slf4j
@Service
public class TableWorker extends ParamsHelper {
@Autowired
private ServiceBusMainParamService paramService;
@Autowired
private TaskTypeService taskTypeService;
/**
* Выбор всех строк во входящей таблице для переноса с учетом игнорируемых колонок.
*
* @param ds datasource;
* @param tableName входящая таблица для переноса;
* @param ignoredColumns список игнорируемых колонок.
* @return объект пару: а)таблицу целиком б) список названий колонок для переноса.
*/
public Entry<List<TableRows>, List<String>> selectAllRows(DataSource dsToReadColumnNames, String inRange, DataSource ds, String tableName, List<String> ignoredColumns) {
String whereClause = " WHERE " + PACKAGE_ID.getColumnName() + " IN (" + inRange + ")";
try (Connection connection = dsToReadColumnNames.getConnection();
Statement stmnt = connection.createStatement();
ResultSet rsForColumnNames = stmnt.executeQuery("SELECT * FROM " + tableName + " WHERE 1 = 0")) {
Map<String, Class<?>> columnNamesAndClasses = getColumnNamesClassesFromResultSet(rsForColumnNames);
columnNamesAndClasses.keySet().removeAll(ignoredColumns);
LinkedList<String> colNamesOnly = new LinkedList<>(columnNamesAndClasses.keySet());
String select = generateSelectString(tableName, colNamesOnly) + whereClause;
try (Connection conn = ds.getConnection();
ResultSet resultSet = conn.prepareStatement(select).executeQuery()) {
List<TableRows> tableRows = formTable(resultSet, columnNamesAndClasses);
return new SimpleEntry<>(tableRows, colNamesOnly);
}
} catch (SQLException | TaskTypeConfigurationException e) {
log.error("Can't get data from " + tableName);
throw new RowException(e);
}
}
/**
* Проверка DataSource на признак Oracle для определения SQL-типов.
*/
public static DataSource selectDataSourceToReadColumnNames(DataSource signal, DataSource buffer) {
DataSource dsSignal = signal;
DataSource dsBuffer = buffer;
try (Connection senderConnection = dsSignal.getConnection();
Connection receiverConnection = dsBuffer.getConnection()) {
String senderDbName = senderConnection.getMetaData().getDatabaseProductName().toLowerCase();
String receiverDbName = receiverConnection.getMetaData().getDatabaseProductName().toLowerCase();
if (!senderDbName.equals(receiverDbName) && senderDbName.contains("oracle")) {
return dsBuffer;
} else {
return dsSignal;
}
} catch (SQLException e) {
throw new RowException(e);
}
}
/**
* Инициализация списка имен колонок в целевой таблице.
*/
List<String> initColumnNamesList(ResultSet resultSet) throws SQLException {
ResultSetMetaData metaData = resultSet.getMetaData();
List<String> columnNames = new LinkedList<>();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
columnNames.add(metaData.getColumnName(i));
}
return columnNames;
}
/**
* Вставка всех строк в целевую БУФЕРНУЮ таблицу с учетом изменения `package_id`.
*
* @param ds datasource;
* @param tableName имя таблицы для вставки;
* @param rowList объект таблицы, содержащий в себе все строки {@link TableRows row}
* @param columnNames список названий колонок таблицы;
*/
public void insertRows(DataSource ds, String tableName, List<TableRows> rowList, Map<Integer, Integer> idsNewIds, List<String> columnNames) {
String insert = generateInsertString(tableName, columnNames);
int packageIdIndex = columnNames.indexOf(PACKAGE_ID.getColumnName()) + 1;
try (Connection connection = ds.getConnection();
PreparedStatement stmnt = connection.prepareStatement(insert)) {
for (TableRows tableWorkerCurrent: rowList) {
int z = 1;
List<Object> valuesToInsert = new LinkedList<>(tableWorkerCurrent.getRow());
for (Object valueOfCurrentColumn: valuesToInsert) {
if (valueOfCurrentColumn != null) {
if (z == packageIdIndex) {
Optional<Integer> autoGenId = Optional.ofNullable(idsNewIds.get(valueOfCurrentColumn));
if (autoGenId.isPresent()) {
stmnt.setObject(z++, autoGenId.get());
}
} else {
stmnt.setObject(z++, valueOfCurrentColumn);
}
} else {
stmnt.setNull(z++, Types.NULL);
}
}
stmnt.executeUpdate();
}
} catch (SQLException e) {
log.error("Can't insert data in " + tableName, e.getLocalizedMessage());
throw new RowException(e);
}
}
/**
* Вставка всех строк в целевую СИГНАЛЬНУЮ таблицу.
*
* @param ds datasource;
* @param tableName имя таблицы для вставки;
* @param rowList объект таблицы, содержащий в себе все строки {@link TableRows row};
* @param columnNames список названий колонок таблицы;
* @param systemMethodIdOfRequester новое значение колонки `receiver_id`.
* @return словарь (старый id -> автоинкр. id)
*/
public Map<Integer, Integer> insertRowsInSignal(DataSource ds, String tableName, List<TableRows> rowList, List<String> columnNames, Optional<Integer> systemMethodIdOfRequester) {
String insert = generateInsertString(tableName, columnNames);
ArrayList<Integer> autoIncrementsIds = new ArrayList<>();
Map<Integer, Integer> idsNewId = new HashMap<>();
int k = 0;
int statusCodeIndex = columnNames.indexOf(STATUS_CODE.getColumnName()) + 1;
int packageIdIndex = columnNames.indexOf(PACKAGE_ID.getColumnName()) + 1;
int receiverIdIndex = columnNames.indexOf(RECEIVER_ID.getColumnName()) + 1;
try (Connection connection = ds.getConnection();
PreparedStatement stmnt = connection.prepareStatement(insert, Statement.RETURN_GENERATED_KEYS)) {
for (TableRows listOfRows: rowList) {
List<Object> valuesToInsert = new LinkedList<>(listOfRows.getRow());
setObjectForEachColumnOfRowForSignalTable(valuesToInsert, autoIncrementsIds, stmnt, statusCodeIndex, packageIdIndex, receiverIdIndex, systemMethodIdOfRequester);
stmnt.executeUpdate();
try (ResultSet rs = stmnt.getGeneratedKeys()) {
while (rs.next()) {
Integer key = autoIncrementsIds.get(k++);
idsNewId.put(key, rs.getInt(1));
}
}
}
} catch (SQLException e) {
log.error("Can't insert data in " + tableName, e.getLocalizedMessage());
throw new RowException(e);
}
return idsNewId;
}
private void setObjectForEachColumnOfRowForSignalTable(List<Object> values, List<Integer> autoIncrementsIds, PreparedStatement stmnt,
final int statusCodeIndex, final int packageIdIndex, int receiverIdIndex, Optional<Integer> systemMethodIdOfRequester) throws SQLException {
int z = 1;
for (Object entry: values) {
if (entry != null) {
if (z == statusCodeIndex) {
stmnt.setObject(z++, PROCESSING_BY_SOURCE_SYS.getId());
} else if (z == packageIdIndex) {
autoIncrementsIds.add((Integer) entry);
stmnt.setNull(z++, Types.NULL);
} else if (z == receiverIdIndex && systemMethodIdOfRequester.isPresent()) {
stmnt.setObject(z++, systemMethodIdOfRequester.get());
} else {
stmnt.setObject(z++, entry);
}
} else {
stmnt.setNull(z++, Types.NULL);
}
}
}
/**
* Формирование объекта таблицы, состоящей из кортежей {@link TableRows row}
*
* @param resultSet resultSet;
* @param columnNames список имен колонок таблицы.
* @return список всех строк таблицы.
* @throws SQLException
*/
List<TableRows> formTable(ResultSet resultSet, Map<String, Class<?>> columnNames) throws SQLException {
List<TableRows> table = new LinkedList<>();
if (resultSet != null) {
while (resultSet.next()) {
TableRows tableRow = new TableRows();
for (Map.Entry<String, Class<?>> colNameAndClass: columnNames.entrySet()) {
tableRow.add(resultSet.getObject(colNameAndClass.getKey(), colNameAndClass.getValue()));
}
table.add(tableRow);
}
} else {
log.debug("resultSet is empty");
}
return table;
}
public static List<String> getColumnNames(Entry<List<TableRows>, List<String>> signalTableRows) {
return new LinkedList<>(signalTableRows.getValue());
}
public static List<TableRows> getTableRows(Entry<List<TableRows>, List<String>> signalTableRows) {
return signalTableRows.getKey();
}
public Entry<List<TableRows>, List<String>> selectAllRows(DataSource dsToReadColumnNames, String inRange, DataSource dataSource, String tableName, String... ignoredColumns) {
return selectAllRows(dsToReadColumnNames, inRange, dataSource, tableName, Arrays.asList(ignoredColumns));
}
/**
* Генерация строки для вставки, имея список колонок.
*
* @param tableToWriteIn таблица исходящих сообщений;
* @param columnNames список имен колонок.
* @return подготовленнную SQL строку для вставки PreparedStatement.
*/
public static String generateInsertString(String tableToWriteIn, List<String> columnNames) {
return "INSERT INTO " + tableToWriteIn + " ("
+ String.join(", ", columnNames)
+ ") VALUES ("
+ String.join(", ", Collections.nCopies(columnNames.size(), "?"))
+ ")";
}
/**
* Генерация строки выборки, имея список колонок.
*
* @param tableName таблица из которой читаем данные;
* @param columnNames список названий колонок таблицы;
* @return подготовленнную SQL строку для выборки колонок из переданного списка строк.
*/
public static String generateSelectString(String tableName, Collection<String> columnNames) {
return "SELECT " + String.join(", ", columnNames) + " FROM " + tableName;
}
/**
* @param exchange текущий;
* @param targetTTParamName имя параметра целевых ТТ'ов, содержие целочисленное искомое значение;
* @param currentTTParamName имя параметра текущего ТТ, содержащего список целевых takType'ов.
* @return минимальное значение параметра `targetTTParamName` из целевых takType'ов.
*/
public Optional<Integer> getMinValueFromTargetTaskTypes(Exchange exchange, String targetTTParamName, String currentTTParamName) {
Task task = exchange.getIn().getBody(Task.class);
Function<String, TaskType> taskTypeByName = taskTypeService::getTaskTypeByName;
Function<TaskType, Integer> extractIntParam = taskType -> StringParseUtils.extractIntValueFromString(
paramService.getByNameAndType(targetTTParamName, taskType)
.getValue());
Function<String, Integer> extractIntParamValueFromTaskType = taskTypeByName.andThen(extractIntParam);
return getTTParamsAllAsList(task, currentTTParamName, false).stream()
.map(extractIntParamValueFromTaskType)
.min(Integer::compare);
}
/**
* Обновление `status_code` в #tableName, где `status_code` = 1 и `esb_package_id` меньше #minimum.
*
* @param tableName имя целевой таблицы;
* @param dataSource dataSource;
* @param minimum минимальное значение параметра `targetTTParamName` из целевых takType'ов.
*/
public void updateStatusCode(String tableName, DataSource dataSource, Integer minimum) {
String sqlUpdate = "UPDATE " + tableName + " SET " + STATUS_CODE.getColumnName() + " = " + SUCCESS_COMPLETED.getId() +
" WHERE " + STATUS_CODE.getColumnName() + " = " + PROCESSING_BY_ESB.getId() + " AND " + ESB_PACKAGE_ID.getColumnName() + " <= ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sqlUpdate)) {
ps.setInt(1, minimum);
ps.executeUpdate();
} catch (SQLException e) {
throw new StatusUpdaterException("Can't update `status_code` in " + tableName, e);
}
}
/**
* Обновление esb_package_id в сигнальной таблице.
* @param tableName наименование сигнальной таблицы;
* @param ps preparedStatement.
* @param esbPackageToBeInserted значение `esb_package_id` который будет записан;
* @param packageIdOfUpdatedRow id строки в таблице, у которой будет обновлено `esb_package_id`.
*/
public void updateEsbPckgId(String tableName, PreparedStatement ps, int esbPackageToBeInserted, int packageIdOfUpdatedRow) {
try {
ps.clearParameters();
ps.setInt(1, esbPackageToBeInserted);
ps.setInt(2, packageIdOfUpdatedRow);
ps.executeUpdate();
} catch (SQLException e) {
throw new PackageIdUpdaterExeption("Can't update esb_package_id in " + tableName, e);
}
}
public static String getUpdateString(String tableName) {
return "UPDATE " + tableName + " SET " + ESB_PACKAGE_ID.getColumnName() + " = ?" +
" WHERE " + PACKAGE_ID.getColumnName() + " = ?";
}
/**
* Обновление `status_code` в таблице, где `status_code` == 3.
*
* @param statusCodeNewValue новое значение;
* @param tableName таблица;
* @param dataSource dataSource;
* @param fromOldIdsToAutoIncrIds
*/
public void updateStatusCode(int statusCodeNewValue, String tableName, DataSource dataSource, Map<Integer, Integer> fromOldIdsToAutoIncrIds) {
List<String> pckgIds = fromOldIdsToAutoIncrIds.values().stream().map(String::valueOf).collect(toList());
String whereTerm = generateFilledInClauseForLargeBatches(pckgIds, PACKAGE_ID.getColumnName());
String sqlUpdate = "UPDATE " + tableName + " SET " + STATUS_CODE.getColumnName() + " = ?" +
" WHERE " + STATUS_CODE.getColumnName() + " = " + PROCESSING_BY_SOURCE_SYS.getId() +
" AND " + whereTerm;
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sqlUpdate)) {
ps.setInt(1, statusCodeNewValue);
ps.executeUpdate();
} catch (SQLException e) {
throw new StatusUpdaterException("Can't update `status_code` in " + tableName, e);
}
}
/**
* Получение списка id, где status_code = 2.
*
* @param tableName наименование сигнальной таблицы;
* @param connection connection.
* @return список `package_id` со статусом 2.
*/
public List<Integer> getAllEsbPackageIdsWithStatus2(String tableName, Connection connection) {
String sqlSelect = "SELECT " + PACKAGE_ID.getColumnName() + " FROM " + tableName +
" WHERE " + STATUS_CODE.getColumnName() + " = " + READY_TO_PROCESS.getId() +
" AND " + ESB_PACKAGE_ID.getColumnName() + " IS NULL ORDER BY 1";
List<Integer> ids = new ArrayList<>();
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sqlSelect)) {
while (rs.next()) {
ids.add(rs.getInt(1));
}
} catch (SQLException e) {
throw new PackageIdUpdaterExeption("Can't get ids with status_code = " + READY_TO_PROCESS.getId() + " from " + tableName, e);
}
return ids;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment