【feat】 增加全表Bulk Inset ,全表Bulk Update

This commit is contained in:
Kris 2025-07-16 12:05:17 +08:00
parent d920ec792f
commit 4bc2b8075a

View File

@ -92,6 +92,8 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
if (result != null) {
return result;
}
}else {
autoImmigrationBatch(param, futures);
}
return ReturnT.SUCCESS;
} catch (Exception exception) {
@ -102,7 +104,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
}
/**
* 组装执行参数
* 单表组装Insert执行参数
*/
public ReturnT<String> manualImmigrationBatch(SalesforceParam param, List<Future<?>> futures) throws Exception {
List<String> apis;
@ -160,7 +162,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
manualCreatedNewIdBatch(salesforceParam, bulkConnection);
createdNewIdBatch(salesforceParam, bulkConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
@ -187,10 +189,97 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
return null;
}
/**
* 多表组装Insert执行参数
*/
public void autoImmigrationBatch(SalesforceParam param, List<Future<?>> futures) throws Exception {
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils.isEmpty(param.getIds());
if (isFull) {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_lock", 1);
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isNotEmpty(list)) {
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
String message = "api:" + apiNames + " is locked";
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
EmailUtil.send("DataDump ERROR", format);
return ;
}
}
BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect();
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_work", 1)
.orderByAsc("data_index")
.last(" limit 10");
while (true) {
List<DataObject> dataObjects = dataObjectService.list(qw);
//判断dataObjects是否为空
if (CollectionUtils.isEmpty(dataObjects)) {
return;
}
for (DataObject object : dataObjects) {
TimeUnit.MILLISECONDS.sleep(1);
try {
List<SalesforceParam> salesforceParams = null;
object.setDataLock(1);
dataObjectService.updateById(object);
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", object.getName());
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
createdNewIdBatch(salesforceParam, bulkConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 1);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
object.setDataWork(0);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
log.error("manualImmigration error", e);
throw new RuntimeException(e);
} finally {
if (isFull) {
object.setDataLock(0);
dataObjectService.updateById(object);
}
}
}
}
}
/**
* 执行数据Insert
*/
public void manualCreatedNewIdBatch(SalesforceParam param, BulkConnection bulkConnection) throws Exception {
public void createdNewIdBatch(SalesforceParam param, BulkConnection bulkConnection) throws Exception {
String api = param.getApi();
QueryWrapper<DataField> dbQw = new QueryWrapper<>();
dbQw.eq("api", api);
@ -411,6 +500,8 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
if (result != null) {
return result;
}
}else {
autoUpdateSfDataBatch(param, futures);
}
return ReturnT.SUCCESS;
} catch (InterruptedException e) {
@ -423,7 +514,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
}
/**
* 组装执行参数
* 单彪组装Update执行参数
*/
public ReturnT<String> updateSfDataBatch(SalesforceParam param, List<Future<?>> futures) throws Exception {
List<String> apis;
@ -508,6 +599,88 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
return null;
}
/**
* 多表组装Update执行参数
*/
public void autoUpdateSfDataBatch(SalesforceParam param, List<Future<?>> futures) throws Exception {
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils.isEmpty(param.getIds());
if (isFull) {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_lock", 1);
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isNotEmpty(list)) {
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
String message = "api:" + apiNames + " is locked";
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
EmailUtil.send("DataDump ERROR", format);
return ;
}
}
BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect();
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_work", 1)
.eq("need_update",1)
.orderByAsc("data_index")
.last(" limit 10");
while (true) {
List<DataObject> dataObjects = dataObjectService.list(qw);
//判断dataObjects是否为空
if (CollectionUtils.isEmpty(dataObjects)) {
return;
}
for (DataObject object : dataObjects) {
try {
TimeUnit.MILLISECONDS.sleep(1);
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", object.getName());
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
manualUpdateSfDataBatch(salesforceParam, bulkConnection,object);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 1);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
} catch (Exception e) {
throw e;
} finally {
if (isFull) {
object.setNeedUpdate(false);
object.setDataLock(0);
dataObjectService.updateById(object);
}
}
}
}
}
/**
* 执行数据Update
*/