Skip to content

Instantly share code, notes, and snippets.

@yanweijia
Last active November 13, 2023 12:38
Show Gist options
  • Save yanweijia/208e3262e0ce44c069ddd870230d6639 to your computer and use it in GitHub Desktop.
Save yanweijia/208e3262e0ce44c069ddd870230d6639 to your computer and use it in GitHub Desktop.
外部数据与 db 数据比对 & 增量更新工具类
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 增量更新工具类<br/>
* 主要用于将增量数据筛选出来
场景:
db中有全量数据, 然后第三方接口返回全量数据, 需要比对并在db中增量更新.
主要适合于从外部同步全量数据并对db进行更新的场景, 可以避免写出 on duplicate key for update 类似语句减少db死锁情况. 数据量级不宜太大, 建议小于50w条记录比对使用
数据会以keyMapper字段为唯一标识进行判断, 在将list聚合为map的过程中, 会以keyMapper字段为map的key进行添加, 遇到重复项会以新的item进行替换旧的item.
* @author yanweijia
*/
public class IncreUpdateUtil {
/**
* 获取新增的数据list
*
* @param allDataList 调用接口返回的全量数据
* @param keyMapper 数据唯一标识, 如 CorpTagGroup::getGroupId
* @param newDataOperation 需要对筛选出来待插入db的数据 做初始化操作
* @see #filterNewDataList(Map, Map, Consumer)
*/
public static <K, V> List<V> filterNewDataList(List<V> allDataList, List<V> dbDataList, Function<V, K> keyMapper, Consumer<V> newDataOperation) {
Map<K, V> allDataMap = allDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal));
Map<K, V> dbDataMap = dbDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal));
return filterNewDataList(allDataMap, dbDataMap, newDataOperation);
}
/**
* 构造待删除数据的List
*
* @see #filterRemoveList(Map, Map)
*/
public static <K, V> List<V> filterRemoveList(List<V> allDataList, List<V> dbDataList, Function<V, K> keyMapper) {
Map<K, V> allDataMap = allDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal));
Map<K, V> dbDataMap = dbDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal));
return filterRemoveList(allDataMap, dbDataMap);
}
/**
* 筛选待更新的数据, 并使用 updateFieldBiConsumer 提供的逻辑将数据更新
*
* @see #filterUpdateList(Map, Map, BiPredicate, BiConsumer)
*/
public static <K, V> List<V> filterUpdateList(List<V> allDataList, List<V> dbDataList, Function<V, K> keyMapper, BiPredicate<V, V> condition, BiConsumer<V, V> updateFieldBiConsumer) {
Map<K, V> allDataMap = allDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal));
Map<K, V> dbDataMap = dbDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal));
return filterUpdateList(allDataMap, dbDataMap, condition, updateFieldBiConsumer);
}
/**
* 构造新增数据的list
*
* @param allMap 接口返回的数据, key为唯一标识
* @param dbDataMap db查询到的数据, key为唯一标识, value为db对象
* @param newDataOperation 需要对筛选出来待插入db的数据 做初始化操作
*/
public static <K, V> List<V> filterNewDataList(Map<K, V> allMap, Map<K, V> dbDataMap, Consumer<V> newDataOperation) {
//将接口返回的数据和db中的数据取差集即为需要 新插入db的数据
Set<K> newDataKeysSet = new HashSet<>(allMap.keySet());
newDataKeysSet.removeAll(dbDataMap.keySet());
List<V> newDataList = new ArrayList<>();
newDataKeysSet.forEach(key -> {
V item = allMap.get(key);
//对 item 做自定义操作
newDataOperation.accept(item);
newDataList.add(item);
});
return newDataList;
}
/**
* 构造待删除数据的 list
*/
public static <K, V> List<V> filterRemoveList(Map<K, V> allMap, Map<K, V> dbDataMap) {
Set<K> removeDataKeySet = new HashSet<>(dbDataMap.keySet());
removeDataKeySet.removeAll(allMap.keySet());
List<V> removeDataList = new ArrayList<>();
removeDataKeySet.forEach(key -> removeDataList.add(dbDataMap.get(key)));
return removeDataList;
}
/**
* 筛选待更新的数据
*
* @param allMap 调用接口返回的数据
* @param dbDataMap db中的数据
* @param condition 判断是否需要更新的条件, 结果为true则该数据是需要更新的数据
* @param updateFieldBiConsumer 对待更新数据进行填充
*/
public static <K, V> List<V> filterUpdateList(Map<K, V> allMap, Map<K, V> dbDataMap, BiPredicate<V, V> condition, BiConsumer<V, V> updateFieldBiConsumer) {
Set<K> updateDataKeySet = new HashSet<>(allMap.keySet());
updateDataKeySet.retainAll(dbDataMap.keySet());
List<V> updateDataList = new ArrayList<>();
updateDataKeySet.forEach(key -> {
V data1 = allMap.get(key);
V dbData = dbDataMap.get(key);
// 判断是否需要更新的逻辑
if (condition.test(data1, dbData)) {
// updateDataList 更新字段逻辑 (将data1中的部分变化字段赋值给dbData)
updateFieldBiConsumer.accept(data1, dbData);
updateDataList.add(dbData);
}
});
return updateDataList;
}
}
/*
// 测试样例:
public void zengliangUtil() {
List<CorpTagGroup> corpTagListWx = new ArrayList<>();
List<CorpTagGroup> corpTagListDb = new ArrayList<>();
corpTagListWx.add(new CorpTagGroup(null, "groupId1", "标签组1", null, "companyCode", 1, null, null, null));
corpTagListWx.add(new CorpTagGroup(null, "groupId2", "标签组2", null, "companyCode", 2, null, null, null));
corpTagListWx.add(new CorpTagGroup(null, "groupId3", "标签组3改名了", null, "companyCode", 3, null, null, null));
corpTagListDb.add(new CorpTagGroup(2L, "groupId2", "标签组2", null, "companyCode", 2, null, null, true));
corpTagListDb.add(new CorpTagGroup(3L, "groupId3", "标签组3", null, "companyCode", 3, null, null, true));
corpTagListDb.add(new CorpTagGroup(4L, "groupId4", "标签组4", null, "companyCode", 4, null, null, true));
List<CorpTagGroup> listToInsert = IncreUpdateUtil.filterNewDataList(corpTagListWx, corpTagListDb, CorpTagGroup::getGroupId
, item -> {
item.setInserttime(new Date());
item.setIsactive(true);
});
List<CorpTagGroup> listToRemove = IncreUpdateUtil.filterRemoveList(corpTagListWx, corpTagListDb, CorpTagGroup::getGroupId);
List<CorpTagGroup> listToUpdate = IncreUpdateUtil.filterUpdateList(corpTagListWx, corpTagListDb, CorpTagGroup::getGroupId
, (wx, db) -> !StringUtils.equals(wx.getGroupName(), db.getGroupName()) || !wx.getGroupOrder().equals(db.getGroupOrder())
, (wx, db) -> {
db.setGroupOrder(wx.getGroupOrder());
db.setGroupName(wx.getGroupName());
db.setUpdatetime(new Date());
db.setIsactive(true);
});
System.out.println(listToInsert);
System.out.println(listToRemove);
System.out.println(listToUpdate);
System.out.println("done");
}
//运行结果:
//Connected to the target VM, address: '127.0.0.1:64356', transport: 'socket'
//[CorpTagGroup(id=null, groupId=groupId1, groupName=标签组1, createTime=null, companyCode=companyCode, groupOrder=1, inserttime=Wed Jul 29 10:50:11 CST 2020, updatetime=null, isactive=true)]
//[CorpTagGroup(id=4, groupId=groupId4, groupName=标签组4, createTime=null, companyCode=companyCode, groupOrder=4, inserttime=null, updatetime=null, isactive=true)]
//[CorpTagGroup(id=3, groupId=groupId3, groupName=标签组3改名了, createTime=null, companyCode=companyCode, groupOrder=3, inserttime=null, updatetime=Wed Jul 29 10:50:11 CST 2020, isactive=true)]
//done
//Disconnected from the target VM, address: '127.0.0.1:64356', transport: 'socket'
//Process finished with exit code 0
*/
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 增量更新工具类<br/>
* 主要用于将增量数据筛选出来
* <p>
* 场景:
* db中有全量数据, 然后第三方接口返回全量数据, 需要比对并在db中增量更新.
* 主要适合于从外部同步全量数据并对db进行更新的场景, 可以避免写出 on duplicate key for update 类似语句减少db死锁情况. 数据量级不宜太大, 建议小于50w条记录比对使用
* 数据会以keyMapper字段为唯一标识进行判断, 在将list聚合为map的过程中, 会以keyMapper字段为map的key进行添加, 遇到重复项会以新的item进行替换旧的item.
*
* @author weijia.yan
* @date 2023/11/13
*/
public class IncreUpdateUtil {
/**
* 获取新增的数据list
*
* @param allDataList 调用接口返回的全量数据
* @param key1Mapper 数据唯一标识, 如 CorpTagGroup::getGroupId
* @param key2Mapper 数据唯一标识, 如 CorpTagGroup::getGroupId
* @param newDataOperation 需要对筛选出来待插入db的数据 做初始化操作
* @see #filterNewDataList(Map, Map, Consumer)
*/
public static <K, V1, V2> List<V1> filterNewDataList(List<V1> allDataList, List<V2> dbDataList, Function<V1, K> key1Mapper, Function<V2, K> key2Mapper, Consumer<V1> newDataOperation) {
Map<K, V1> allDataMap = allDataList.stream().collect(Collectors.toMap(key1Mapper, item -> item, (oldVal, newVal) -> newVal));
Map<K, V2> dbDataMap = dbDataList.stream().collect(Collectors.toMap(key2Mapper, item -> item, (oldVal, newVal) -> newVal));
return filterNewDataList(allDataMap, dbDataMap, newDataOperation);
}
/**
* 构造待删除数据的List
*
* @see #filterRemoveList(Map, Map)
*/
public static <K, V1, V2> List<V2> filterRemoveList(List<V1> allDataList, List<V2> dbDataList, Function<V1, K> key1Mapper, Function<V2, K> key2Mapper) {
Map<K, V1> allDataMap = allDataList.stream().collect(Collectors.toMap(key1Mapper, item -> item, (oldVal, newVal) -> newVal));
Map<K, V2> dbDataMap = dbDataList.stream().collect(Collectors.toMap(key2Mapper, item -> item, (oldVal, newVal) -> newVal));
return filterRemoveList(allDataMap, dbDataMap);
}
/**
* 筛选待更新的数据, 并使用 updateFieldBiConsumer 提供的逻辑将数据更新
*
* @param condition 判断是否需要更新的条件(比较非主键信息,比如同一个 id 的姓名发生变化, 就需要更新 db 数据), 结果为true则该数据是需要更新的数据. 格式为:(outData1,dbData2)->{StringUtils.equals(data1.name,data2.name)}
* @param updateFieldBiConsumer 可为空, 如果数据不一致需要更新, 则执行此 consumer, 后续返回的数据就是更新后的data2 数据. 比如(data1,data2)->{data2.setName(date1.getName());}
* @see #filterUpdateList(Map, Map, BiPredicate, BiConsumer)
*/
public static <K, V1, V2> List<V2> filterUpdateList(List<V1> allDataList, List<V2> dbDataList, Function<V1, K> key1Mapper, Function<V2, K> key2Mapper, BiPredicate<V1, V2> condition, BiConsumer<V1, V2> updateFieldBiConsumer) {
Map<K, V1> allDataMap = allDataList.stream().collect(Collectors.toMap(key1Mapper, item -> item, (oldVal, newVal) -> newVal));
Map<K, V2> dbDataMap = dbDataList.stream().collect(Collectors.toMap(key2Mapper, item -> item, (oldVal, newVal) -> newVal));
return filterUpdateList(allDataMap, dbDataMap, condition, updateFieldBiConsumer);
}
/**
* 构造新增数据的list
*
* @param allMap 接口返回的数据, key为唯一标识
* @param dbDataMap db查询到的数据, key为唯一标识, value为db对象
* @param newDataOperation 需要对筛选出来待插入db的数据 做初始化操作
*/
public static <K, V1, V2> List<V1> filterNewDataList(Map<K, V1> allMap, Map<K, V2> dbDataMap, Consumer<V1> newDataOperation) {
//将接口返回的数据和db中的数据取差集即为需要 新插入db的数据
Set<K> newDataKeysSet = new HashSet<>(allMap.keySet());
newDataKeysSet.removeAll(dbDataMap.keySet());
List<V1> newDataList = new ArrayList<>();
newDataKeysSet.forEach(key -> {
V1 item = allMap.get(key);
//对 item 做自定义操作
if (null != newDataOperation) {
newDataOperation.accept(item);
}
newDataList.add(item);
});
return newDataList;
}
/**
* 构造待删除数据的 list
*/
public static <K, V1, V2> List<V2> filterRemoveList(Map<K, V1> allMap, Map<K, V2> dbDataMap) {
Set<K> removeDataKeySet = new HashSet<>(dbDataMap.keySet());
removeDataKeySet.removeAll(allMap.keySet());
List<V2> removeDataList = new ArrayList<>();
removeDataKeySet.forEach(key -> removeDataList.add(dbDataMap.get(key)));
return removeDataList;
}
/**
* 筛选待更新的数据
*
* @param allMap 调用接口返回的数据
* @param dbDataMap db中的数据
* @param condition 判断是否需要更新的条件, 结果为true则该数据是需要更新的数据
* @param updateFieldBiConsumer 对待更新数据进行填充
*/
public static <K, V1, V2> List<V2> filterUpdateList(Map<K, V1> allMap, Map<K, V2> dbDataMap, BiPredicate<V1, V2> condition, BiConsumer<V1, V2> updateFieldBiConsumer) {
Set<K> updateDataKeySet = new HashSet<>(allMap.keySet());
updateDataKeySet.retainAll(dbDataMap.keySet());
List<V2> updateDataList = new ArrayList<>();
updateDataKeySet.forEach(key -> {
V1 data1 = allMap.get(key);
V2 dbData = dbDataMap.get(key);
// 判断是否需要更新的逻辑
if (condition.test(data1, dbData)) {
// updateDataList 更新字段逻辑 (将data1中的部分变化字段赋值给dbData)
if (null != updateFieldBiConsumer) {
updateFieldBiConsumer.accept(data1, dbData);
}
updateDataList.add(dbData);
}
});
return updateDataList;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment