【feat】优化手动批量insert数据

This commit is contained in:
Kris 2025-06-06 14:07:35 +08:00
parent c1fe4914b0
commit 7048ea42b2
2 changed files with 96 additions and 116 deletions

View File

@ -198,7 +198,7 @@ public class JobController {
* @throws Exception
*/
@PostMapping("/dataImportBatchJob")
@ApiOperation("大数据生成newSFID")
@ApiOperation("生成newSFID(大数据量)")
public ReturnT<String> dataImportBatchJob(String paramStr) throws Exception {
log.info("dataImportBatchJob execute start ..................");
SalesforceParam param = new SalesforceParam();

View File

@ -44,6 +44,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -550,52 +554,84 @@ public class DataImportServiceImpl implements DataImportService {
}
//批量插入2000一次
int page = count%200 == 0 ? count/200 : (count/200) + 1;
int page = count%2000 == 0 ? count/2000 : (count/2000) + 1;
//总插入数
int sfNum = 0;
for (int i = 0; i < page; i++) {
List<JSONObject> data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 200");
List<JSONObject> data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 2000");
int size = data.size();
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
List<JSONObject> insertList = new ArrayList<>();
//判断引用对象是否存在new_id
DataObject update = new DataObject();
update.setName(api);
//更新对象的new_id
String[] ids = new String[size];
// 定义输入/输出格式
DateTimeFormatter inputFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSxx");
for (int j = 1; j <= size; j++) {
JSONObject account = new JSONObject();
//找出sf对象必填字段并且给默认值
for (Map<String, Object> map : data) {
//给对象赋值
for (DataField dataField : list) {
String field = dataField.getField();
String reference_to = dataField.getReferenceTo();
if (!DataUtil.isUpdate(field) || (dataField.getIsCreateable() != null && !dataField.getIsCreateable())) {
if ("OwnerId".equals(dataField.getField()) || "Owner_Type".equals(dataField.getField())
|| "Id".equals(dataField.getField())){
continue;
} else if (StringUtils.isNotBlank(reference_to) && !"data_picklist".equals(reference_to)) {
if (!"null".equals(map.get(field)) && null != map.get(field) && !"OwnerId".equals(field)
|| !"Owner_Type".equals(field)) {
//判断reference_to内是否包含User字符串
if (reference_to.contains("User")) {
reference_to = "User";
}
Map<String, Object> m = customMapper.getById("new_id", reference_to, String.valueOf(map.get(field)));
if (m != null && !m.isEmpty()) {
account.put(field, m.get("new_id"));
}
if (dataField.getIsCreateable() && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()) {
if ("reference".equals(dataField.getSfType())){
String reference = dataField.getReferenceTo();
if (reference == null){
reference = data.get(j-1).get("Parent_Type").toString();
}
List<Map<String, Object>> referenceMap = customMapper.list("new_id", reference, "new_id is not null limit 1");
if (referenceMap.isEmpty()){
QueryWrapper<DataObject> maxIndex = new QueryWrapper<>();
maxIndex.select("IFNULL(max(data_index),0) as data_index");
maxIndex.ne("name", api);
Map<String, Object> map = dataObjectService.getMap(maxIndex);
//如果必填lookup字段没有值跳过
update.setDataIndex(Integer.parseInt(map.get("data_index").toString()+1));
dataObjectService.updateById(update);
return;
}else{
if (map.get(field) != null && StringUtils.isNotBlank(dataField.getSfType())) {
account.put(field, DataUtil.localDataToSfData(dataField.getSfType(), String.valueOf(map.get(field))));
}else {
account.put(field, map.get(field));
account.put(dataField.getField(), referenceMap.get(0).get("new_id"));
continue;
}
}
if ("picklist".equals(dataField.getSfType())){
List<Map<String, Object>> pickList = customMapper.list("value", "data_picklist", "api = '"+api+"' and field = '"+dataField.getField()+"' limit 1");
account.put(dataField.getField(), pickList.get(0).get("value"));
continue;
}
account.put("old_owner_id__c", map.get("OwnerId"));
account.put("old_sfdc_id__c", map.get("Id"));
account.put(dataField.getField(), DataUtil.fieldTypeToSf(dataField));
}
// 转换为UTC时间并格式化
LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter);
ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC"));
String convertedTime = utcDateTime.format(outputFormatter);
account.put("CreatedDate", convertedTime);
Map<String, Object> CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString());
if(CreatedByIdMap.get("new_id") != null && StringUtils.isNotEmpty(CreatedByIdMap.get("new_id").toString())){
account.put("CreatedById", CreatedByIdMap.get("new_id"));
}
}
ids[j-1] = data.get(j-1).get("Id").toString();
insertList.add(account);
if (i*200+j == count){
if (i*2000+j == count){
break;
}
}
@ -605,8 +641,6 @@ public class DataImportServiceImpl implements DataImportService {
//写入csv文件
String fullPath = CsvConverterUtil.writeToCsv(insertList, UUID.randomUUID().toString());
// FileInputStream inputStream = new FileInputStream(fullPath);
JobInfo salesforceInsertJob = createSalesforceJob(bulkConnection, api, OperationEnum.insert);
List<BatchInfo> batchInfos = BulkUtil.createBatchesFromCSVFile(bulkConnection, salesforceInsertJob, fullPath);
@ -615,87 +649,37 @@ public class DataImportServiceImpl implements DataImportService {
BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos);
checkResults(bulkConnection, salesforceInsertJob, batchInfos);
sfNum = sfNum + checkInsertResults(bulkConnection, salesforceInsertJob, batchInfos, api, ids);
// BatchInfo insertBatch = bulkConnection.createJob(salesforceInsertJob,jsonString);
// BatchInfo insertBatch = bulkConnection.createBatchFromStream(salesforceInsertJob,inputStream);
//
// closeJob(bulkConnection, insertBatch.getJobId());
// // 轮询直到批次完成
// while (true) {
// BatchInfo batchInfo = bulkConnection.getBatchInfo(insertBatch.getJobId(), insertBatch.getId());
// BatchStateEnum state = batchInfo.getState();
// if (state == BatchStateEnum.Completed) {
// InputStream batchResultStream = bulkConnection.getBatchResultStream(insertBatch.getJobId(), insertBatch.getId());
// List<String> idColumns = CsvConverterUtil.extractIdColumn(batchResultStream);
// int index = 0;
// for (String id : idColumns) {
// List<Map<String, Object>> maps = new ArrayList<>();
// Map<String, Object> m = new HashMap<>();
// m.put("key", "new_id");
// m.put("value", id);
// maps.add(m);
// customMapper.updateById(api, maps, ids[index]);
// }
// break;
// } else if (state == BatchStateEnum.Failed) {
// throw new RuntimeException("Batch failed: " + batchInfo.getStateMessage());
// }
// TimeUnit.SECONDS.sleep(10); //10秒检查一次
// }
//
// String soql = "SELECT Id FROM "+ api +" WHERE CreatedDate >= '" + beginDate + "' AND CreatedDate <= '" + endDate + "'";
//
// JobInfo salesforceQueryJob = createSalesforceJob(bulkConnection, api, OperationEnum.query);
//
// ByteArrayInputStream queryStream = new ByteArrayInputStream(soql.getBytes());
//
// BatchInfo queryBatch = bulkConnection.createBatchFromStream(salesforceQueryJob, queryStream);
//
// while (true) {
// BatchInfo info = bulkConnection.getBatchInfo(queryBatch.getJobId(), queryBatch.getId());
// if (info.getState() == BatchStateEnum.Completed) {
// InputStream batchResultStream = bulkConnection.getBatchResultStream(queryBatch.getJobId(), queryBatch.getId());
// List<String> idColumns = CsvConverterUtil.extractIdColumn(batchResultStream);
// Integer sfNum = idColumns.size();
//
// UpdateWrapper<DataBatchHistory> updateQw = new UpdateWrapper<>();
// updateQw.eq("name", api)
// .eq("sync_start_date", beginDate)
// .eq("sync_end_date", DateUtils.addSeconds(endDate, -1))
// .set("target_sf_num", sfNum);
// dataBatchHistoryService.update(updateQw);
//
// UpdateWrapper<DataBatch> updateQw2 = new UpdateWrapper<>();
// updateQw2.eq("name", api)
// .eq("sync_start_date", beginDate)
// .eq("sync_end_date", endDate)
// .set("sf_add_num", sfNum);
// dataBatchService.update(updateQw2);
//
// break;
// }else if (info.getState() == BatchStateEnum.Failed) {
// throw new Exception("Query failed: " + info.getStateMessage());
// }
// TimeUnit.SECONDS.sleep(10); //10秒检查一次
// }
} catch (Exception e) {
log.error("manualCreatedNewId error api:{}", api, e);
throw e;
}
}
UpdateWrapper<DataBatchHistory> updateQw = new UpdateWrapper<>();
updateQw.eq("name", api)
.eq("sync_start_date", beginDate)
.eq("sync_end_date", DateUtils.addSeconds(endDate, -1))
.set("target_sf_num", sfNum);
dataBatchHistoryService.update(updateQw);
UpdateWrapper<DataBatch> updateQw2 = new UpdateWrapper<>();
updateQw2.eq("name", api)
.eq("sync_start_date", beginDate)
.eq("sync_end_date", endDate)
.set("sf_add_num", sfNum);
dataBatchService.update(updateQw2);
}
/**
* Gets the results of the operation and checks for errors.
*/
private void checkResults(BulkConnection connection, JobInfo job,
List<BatchInfo> batchInfoList)
private int checkInsertResults(BulkConnection connection, JobInfo job,
List<BatchInfo> batchInfoList,String api,String[] ids)
throws AsyncApiException, IOException {
int index = 0;
// batchInfoList was populated when batches were created and submitted
for (BatchInfo b : batchInfoList) {
CSVReader rdr =
@ -714,27 +698,23 @@ public class DataImportServiceImpl implements DataImportService {
String id = resultInfo.get("Id");
String error = resultInfo.get("Error");
if (success && created) {
System.out.println("Created row with id " + id);
List<Map<String, Object>> maps = new ArrayList<>();
Map<String, Object> m = new HashMap<>();
m.put("key", "new_id");
m.put("value", id);
maps.add(m);
customMapper.updateById(api, maps, ids[index]);
index ++;
log.info("Created row with id " + id);
} else if (!success) {
System.out.println("Failed with error: " + error);
log.info("Failed with error: " + error);
}
}
}
return index;
}
/**
* 更新任务状态为完成
* @param connection
* @param jobId
* @throws AsyncApiException
*/
private void closeJob(BulkConnection connection, String jobId)
throws AsyncApiException {
JobInfo job = new JobInfo();
job.setId(jobId);
job.setState(JobStateEnum.Closed);
connection.updateJob(job);
}
/**
* 创建任务