【feat】 新增Bulk Api一次性插入数据任务

This commit is contained in:
Kris 2025-07-15 12:03:17 +08:00
parent 37d8d94004
commit 6ec1b3f206
4 changed files with 372 additions and 1 deletions

View File

@ -187,5 +187,27 @@ public class DataDumpNewJob {
return commonService.updateLinkType(param); return commonService.updateLinkType(param);
} }
/**
* 一次性插入数据
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@XxlJob("insertSingleBatchJob")
public ReturnT<String> insertSingleJob(String paramStr) throws Exception {
log.info("insertSingleBatchJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportBatchService.insertSingleBatch(param);
}
} }

View File

@ -10,4 +10,6 @@ public interface DataImportBatchService {
ReturnT<String> immigrationUpdateBatch(SalesforceParam param) throws Exception; ReturnT<String> immigrationUpdateBatch(SalesforceParam param) throws Exception;
ReturnT<String> insertSingleBatch(SalesforceParam param) throws Exception;
} }

View File

@ -40,6 +40,7 @@ import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
@ -684,6 +685,350 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
return index; return index;
} }
/**
* 一次新写入Insert入口
*/
@Override
public ReturnT<String> insertSingleBatch(SalesforceParam param) throws Exception {
List<Future<?>> futures = Lists.newArrayList();
try {
if (StringUtils.isNotBlank(param.getApi())) {
// 手动任务
ReturnT<String> result = manualInsertSingle(param, futures);
if (result != null) {
return result;
}
} else {
// 自动任务
autoInsertSingle(param, futures);
}
return ReturnT.SUCCESS;
} catch (Exception exception) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
log.error("insertSingle error", exception);
throw exception;
}
}
/**
* 单表 组装一次性写入参数
*/
public ReturnT<String> manualInsertSingle(SalesforceParam param, List<Future<?>> futures) throws Exception {
List<String> apis;
apis = DataUtil.toIdList(param.getApi());
String join = StringUtils.join(apis, ",");
log.info("insertSingle apis: {}", join);
XxlJobLogger.log("insertSingle apis: {}", join);
TimeUnit.MILLISECONDS.sleep(1);
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils.isEmpty(param.getIds());
if (isFull) {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_lock", 1).in("name", apis);
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isNotEmpty(list)) {
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
return new ReturnT<>(500, "api:" + apiNames + " is locked");
}
}
BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect();
for (String api : apis) {
DataObject update = new DataObject();
TimeUnit.MILLISECONDS.sleep(1);
try {
List<SalesforceParam> salesforceParams = null;
update.setName(api);
update.setDataLock(1);
dataObjectService.updateById(update);
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
insertSingleData(salesforceParam, bulkConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 1);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
log.error("insertSingle error", e);
throw new RuntimeException(e);
} finally {
if (isFull) {
update.setName(api);
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
}
return null;
}
/**
* 多表 组装一次新写入参数
*/
public void autoInsertSingle(SalesforceParam param, List<Future<?>> futures) throws Exception {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_work", 1)
.eq("data_lock", 0)
.orderByAsc("data_index")
.last(" limit 10");
BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect();
while (true) {
List<DataObject> dataObjects = dataObjectService.list(qw);
if (CollectionUtils.isEmpty(dataObjects)) {
break;
}
for (DataObject dataObject : dataObjects) {
DataObject update = new DataObject();
log.info("insertSingle api: {}", dataObject.getName());
TimeUnit.MILLISECONDS.sleep(1);
try {
String api = dataObject.getName();
update.setName(dataObject.getName());
update.setDataLock(1);
dataObjectService.updateById(update);
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
insertSingleData(salesforceParam, bulkConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
futures.clear();
}
}
/**
* 执行一次性Insert数据
*/
private void insertSingleData(SalesforceParam param, BulkConnection bulkConnection) throws Exception {
String api = param.getApi();
QueryWrapper<DataField> dbQw = new QueryWrapper<>();
dbQw.eq("api", api);
List<DataField> list = dataFieldService.list(dbQw);
TimeUnit.MILLISECONDS.sleep(1);
Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate();
String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
//表内数据总量
Integer count = customMapper.countBySQL(api, "where new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'");
log.error("总Insert数据 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
if (count == 0) {
return;
}
//批量插入10000一次
int page = count%10000 == 0 ? count/10000 : (count/10000) + 1;
//总插入数
int sfNum = 0;
for (int i = 0; i < page; i++) {
List<JSONObject> data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 10000");
int size = data.size();
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
List<JSONObject> insertList = new ArrayList<>();
//查询当前对象多态字段映射
QueryWrapper<LinkConfig> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("api",api).eq("is_create",1).eq("is_link",true);
List<LinkConfig> configs = linkConfigService.list(queryWrapper);
Map<String, String> fieldMap = new HashMap<>();
if (!configs.isEmpty()) {
fieldMap = configs.stream()
.collect(Collectors.toMap(
LinkConfig::getField, // Key提取器
LinkConfig::getLinkField, // Value提取器
(oldVal, newVal) -> newVal // 解决重复Key冲突保留新值
));
}
//判断引用对象是否存在new_id
DataObject update = new DataObject();
update.setName(api);
//更新对象的new_id
String[] ids = new String[size];
// 定义输入/输出格式
DateTimeFormatter inputFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSxx");
for (int j = 1; j <= size; j++) {
JSONObject account = new JSONObject();
for (DataField dataField : list) {
if ("Owner_Type".equals(dataField.getField()) || "Id".equals(dataField.getField())){
continue;
}
if ("CreatedDate".equals(dataField.getField()) && dataField.getIsCreateable()){
// 转换为UTC时间并格式化
LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter);
ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC")).minusHours(8) ;
String convertedTime = utcDateTime.format(outputFormatter);
account.put("CreatedDate", convertedTime);
continue;
}
if ("CreatedById".equals(dataField.getField()) && dataField.getIsCreateable()){
Map<String, Object> CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString());
if(CreatedByIdMap.get("new_id") != null && StringUtils.isNotEmpty(CreatedByIdMap.get("new_id").toString())){
account.put("CreatedById", CreatedByIdMap.get("new_id"));
}
continue;
}
if (dataField.getIsCreateable() !=null && dataField.getIsCreateable()) {
if ("reference".equals(dataField.getSfType())){
//引用类型
String reference_to = dataField.getReferenceTo();
//引用类型字段
String linkfield = fieldMap.get(dataField.getApi());
if (StringUtils.isNotBlank(linkfield)){
reference_to = data.get(j-1).get(linkfield)!=null?data.get(j-1).get(linkfield).toString():null;
}
if (reference_to == null){
continue;
}
if (reference_to.contains(",User") || reference_to.contains("User,")) {
reference_to = "User";
}
Map<String, Object> m = customMapper.getById("new_id", reference_to, data.get(j - 1).get(dataField.getField()).toString());
if (m != null && !m.isEmpty()) {
account.put(dataField.getField(), m.get("new_id"));
}else {
String message = "对象类型:" + api + "的数据:"+ data.get(j - 1).get("Id") +"的引用对象:" + reference_to + "的数据:"+ data.get(j - 1).get(dataField.getField()) +"不存在!";
EmailUtil.send("DataDump ERROR", message);
log.info(message);
return;
}
}else {
if (data.get(j - 1).get(dataField.getField()) != null && StringUtils.isNotBlank(dataField.getSfType())) {
account.put(dataField.getField(), DataUtil.localDataToSfData(dataField.getSfType(), data.get(j - 1).get(dataField.getField()).toString()));
}else {
account.put(dataField.getField(), data.get(j - 1).get(dataField.getField()) );
}
}
}
}
ids[j-1] = data.get(j-1).get("Id").toString();
insertList.add(account);
if (i*10000+j == count){
break;
}
}
try {
//写入csv文件
String fullPath = CsvConverterUtil.writeToCsv(insertList, UUID.randomUUID().toString());
JobInfo salesforceInsertJob = BulkUtil.createJob(bulkConnection, api, OperationEnum.insert);
List<BatchInfo> batchInfos = BulkUtil.createBatchesFromCSVFile(bulkConnection, salesforceInsertJob, fullPath);
BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos);
sfNum = sfNum + checkInsertResults(bulkConnection, salesforceInsertJob, batchInfos, api, ids);
BulkUtil.closeJob(bulkConnection, salesforceInsertJob.getId());
new File(fullPath).delete();
} catch (Exception e) {
log.error("manualCreatedNewId error api:{}", api, e);
throw e;
}
}
UpdateWrapper<DataBatchHistory> updateQw = new UpdateWrapper<>();
updateQw.eq("name", api)
.eq("sync_start_date", beginDate)
.eq("sync_end_date", DateUtils.addSeconds(endDate, -1))
.set("target_sf_num", sfNum);
dataBatchHistoryService.update(updateQw);
UpdateWrapper<DataBatch> updateQw2 = new UpdateWrapper<>();
updateQw2.eq("name", api)
.eq("sync_start_date", beginDate)
.eq("sync_end_date", endDate)
.set("sf_add_num", sfNum);
dataBatchService.update(updateQw2);
}
} }

View File

@ -32,6 +32,8 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;