【feat】 一次性插入新增Share对象处理逻辑

This commit is contained in:
Kris 2025-07-25 16:07:43 +08:00
parent ecc449cb8a
commit 2643f3380d

View File

@ -924,6 +924,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
update.setDataLock(1); update.setDataLock(1);
dataObjectService.updateById(update); dataObjectService.updateById(update);
if (api.contains("Share")){
insertSingleShareData(api,bulkConnection);
continue;
}
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>(); QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api); dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw); List<DataBatch> list = dataBatchService.list(dbQw);
@ -997,6 +1002,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
update.setDataLock(1); update.setDataLock(1);
dataObjectService.updateById(update); dataObjectService.updateById(update);
if (api.contains("Share")){
insertSingleShareData(api,bulkConnection);
continue;
}
List<SalesforceParam> salesforceParams = null; List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>(); QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api); dbQw.eq("name", api);
@ -1211,7 +1221,135 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
} }
/**
* 执行一次性Insert Share数据
*/
private void insertSingleShareData(String api, BulkConnection bulkConnection) throws Exception {
QueryWrapper<DataField> dbQw = new QueryWrapper<>();
dbQw.eq("api", api);
List<DataField> list = dataFieldService.list(dbQw);
TimeUnit.MILLISECONDS.sleep(1);
String beginDateStr = null;
String endDateStr = null;
//表内数据总量
Integer count = customMapper.countBySQL(api, "where new_id is null and RowCause = 'Manual'");
log.error("总Insert数据 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
if (count == 0) {
return;
}
//批量插入10000一次
int page = count%10000 == 0 ? count/10000 : (count/10000) + 1;
//总插入数
int sfNum = 0;
for (int i = 0; i < page; i++) {
List<JSONObject> data = customMapper.listJsonObject("*", api, "new_id is null and RowCause = 'Manual' limit 10000");
int size = data.size();
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
List<JSONObject> insertList = new ArrayList<>();
//查询当前对象多态字段映射
QueryWrapper<LinkConfig> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("api",api).eq("is_create",1).eq("is_link",true);
List<LinkConfig> configs = linkConfigService.list(queryWrapper);
Map<String, String> fieldMap = new HashMap<>();
if (!configs.isEmpty()) {
fieldMap = configs.stream()
.collect(Collectors.toMap(
LinkConfig::getField, // Key提取器
LinkConfig::getLinkField, // Value提取器
(oldVal, newVal) -> newVal // 解决重复Key冲突保留新值
));
}
//判断引用对象是否存在new_id
DataObject update = new DataObject();
update.setName(api);
//更新对象的new_id
String[] ids = new String[size];
for (int j = 1; j <= size; j++) {
JSONObject account = new JSONObject();
for (DataField dataField : list) {
if ("Owner_Type".equals(dataField.getField()) || "Id".equals(dataField.getField())){
continue;
}
if (dataField.getIsCreateable() !=null && dataField.getIsCreateable()) {
if ("reference".equals(dataField.getSfType()) && data.get(j - 1).get(dataField.getField()) != null){
//引用类型
String reference_to = dataField.getReferenceTo();
//引用类型字段
String linkfield = fieldMap.get(dataField.getField());
if (StringUtils.isNotBlank(linkfield)){
reference_to = data.get(j-1).get(linkfield)!=null?data.get(j-1).get(linkfield).toString():null;
}
if (reference_to == null){
continue;
}
if (reference_to.contains(",User") || reference_to.contains("User,")) {
reference_to = "User";
}
Map<String, Object> m = customMapper.getById("new_id", reference_to, data.get(j - 1).get(dataField.getField()).toString());
if (m != null && !m.isEmpty()) {
account.put(dataField.getField(), m.get("new_id"));
}else {
String message = "对象类型:" + api + "的数据:"+ data.get(j - 1).get("Id") +"的引用对象:" + reference_to + "的数据:"+ data.get(j - 1).get(dataField.getField()) +"不存在!";
EmailUtil.send("DataDump ERROR", message);
log.info(message);
return;
}
}else {
if (data.get(j - 1).get(dataField.getField()) != null && StringUtils.isNotBlank(dataField.getSfType())) {
account.put(dataField.getField(), DataUtil.localBulkDataToSfData(dataField.getSfType(), data.get(j - 1).get(dataField.getField()).toString()));
}else {
account.put(dataField.getField(), data.get(j - 1).get(dataField.getField()) );
}
}
}
}
ids[j-1] = data.get(j-1).get("Id").toString();
insertList.add(account);
if (i*10000+j == count){
break;
}
}
try {
//写入csv文件
String fullPath = CsvConverterUtil.writeToCsv(insertList, UUID.randomUUID().toString());
JobInfo salesforceInsertJob = BulkUtil.createJob(bulkConnection, api, OperationEnum.insert);
List<BatchInfo> batchInfos = BulkUtil.createBatchesFromCSVFile(bulkConnection, salesforceInsertJob, fullPath);
BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos);
sfNum = sfNum + checkInsertResults(bulkConnection, salesforceInsertJob, batchInfos, api, ids);
BulkUtil.closeJob(bulkConnection, salesforceInsertJob.getId());
new File(fullPath).delete();
} catch (Exception e) {
log.error("manualCreatedNewId error api:{}", api, e);
throw e;
}
}
}
} }