From 7048ea42b27725db1dd1ccbb3c075bad26de8c1d Mon Sep 17 00:00:00 2001 From: Kris <2893855659@qq.com> Date: Fri, 6 Jun 2025 14:07:35 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90feat=E3=80=91=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=89=8B=E5=8A=A8=E6=89=B9=E9=87=8Finsert=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datadump/controller/JobController.java | 2 +- .../service/impl/DataImportServiceImpl.java | 210 ++++++++---------- 2 files changed, 96 insertions(+), 116 deletions(-) diff --git a/src/main/java/com/celnet/datadump/controller/JobController.java b/src/main/java/com/celnet/datadump/controller/JobController.java index 9b2faf4..4ade6dc 100644 --- a/src/main/java/com/celnet/datadump/controller/JobController.java +++ b/src/main/java/com/celnet/datadump/controller/JobController.java @@ -198,7 +198,7 @@ public class JobController { * @throws Exception */ @PostMapping("/dataImportBatchJob") - @ApiOperation("大数据生成newSFID") + @ApiOperation("生成newSFID(大数据量)") public ReturnT dataImportBatchJob(String paramStr) throws Exception { log.info("dataImportBatchJob execute start .................."); SalesforceParam param = new SalesforceParam(); diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java index e316bec..aea5bbc 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java @@ -44,6 +44,10 @@ import java.io.IOException; import java.io.InputStream; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -550,52 +554,84 @@ public class DataImportServiceImpl implements DataImportService { } //批量插入2000一次 - int page = count%200 == 0 ? count/200 : (count/200) + 1; + int page = count%2000 == 0 ? count/2000 : (count/2000) + 1; + + //总插入数 + int sfNum = 0; + for (int i = 0; i < page; i++) { - List data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 200"); + List data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 2000"); int size = data.size(); log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size); List insertList = new ArrayList<>(); + //判断引用对象是否存在new_id + DataObject update = new DataObject(); + update.setName(api); + + //更新对象的new_id + String[] ids = new String[size]; + + // 定义输入/输出格式 + DateTimeFormatter inputFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME; + DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSxx"); + for (int j = 1; j <= size; j++) { JSONObject account = new JSONObject(); - //找出sf对象必填字段,并且给默认值 - for (Map map : data) { - //给对象赋值 - for (DataField dataField : list) { - String field = dataField.getField(); - String reference_to = dataField.getReferenceTo(); - - if (!DataUtil.isUpdate(field) || (dataField.getIsCreateable() != null && !dataField.getIsCreateable())) { - continue; - } else if (StringUtils.isNotBlank(reference_to) && !"data_picklist".equals(reference_to)) { - if (!"null".equals(map.get(field)) && null != map.get(field) && !"OwnerId".equals(field) - || !"Owner_Type".equals(field)) { - //判断reference_to内是否包含User字符串 - if (reference_to.contains("User")) { - reference_to = "User"; - } - Map m = customMapper.getById("new_id", reference_to, String.valueOf(map.get(field))); - if (m != null && !m.isEmpty()) { - account.put(field, m.get("new_id")); - } + for (DataField dataField : list) { + if ("OwnerId".equals(dataField.getField()) || "Owner_Type".equals(dataField.getField()) + || "Id".equals(dataField.getField())){ + continue; + } + if (dataField.getIsCreateable() && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()) { + if ("reference".equals(dataField.getSfType())){ + String reference = dataField.getReferenceTo(); + if (reference == null){ + reference = data.get(j-1).get("Parent_Type").toString(); } - } else { - if (map.get(field) != null && StringUtils.isNotBlank(dataField.getSfType())) { - account.put(field, DataUtil.localDataToSfData(dataField.getSfType(), String.valueOf(map.get(field)))); - }else { - account.put(field, map.get(field)); + List> referenceMap = customMapper.list("new_id", reference, "new_id is not null limit 1"); + if (referenceMap.isEmpty()){ + QueryWrapper maxIndex = new QueryWrapper<>(); + maxIndex.select("IFNULL(max(data_index),0) as data_index"); + maxIndex.ne("name", api); + Map map = dataObjectService.getMap(maxIndex); + //如果必填lookup字段没有值,跳过 + update.setDataIndex(Integer.parseInt(map.get("data_index").toString()+1)); + dataObjectService.updateById(update); + return; + }else{ + account.put(dataField.getField(), referenceMap.get(0).get("new_id")); + continue; } } + if ("picklist".equals(dataField.getSfType())){ + List> pickList = customMapper.list("value", "data_picklist", "api = '"+api+"' and field = '"+dataField.getField()+"' limit 1"); + account.put(dataField.getField(), pickList.get(0).get("value")); + continue; + } + account.put(dataField.getField(), DataUtil.fieldTypeToSf(dataField)); } - account.put("old_owner_id__c", map.get("OwnerId")); - account.put("old_sfdc_id__c", map.get("Id")); + + // 转换为UTC时间并格式化 + LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter); + + ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC")); + + String convertedTime = utcDateTime.format(outputFormatter); + + account.put("CreatedDate", convertedTime); + Map CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString()); + if(CreatedByIdMap.get("new_id") != null && StringUtils.isNotEmpty(CreatedByIdMap.get("new_id").toString())){ + account.put("CreatedById", CreatedByIdMap.get("new_id")); + } + } + ids[j-1] = data.get(j-1).get("Id").toString(); insertList.add(account); - if (i*200+j == count){ + if (i*2000+j == count){ break; } } @@ -605,8 +641,6 @@ public class DataImportServiceImpl implements DataImportService { //写入csv文件 String fullPath = CsvConverterUtil.writeToCsv(insertList, UUID.randomUUID().toString()); -// FileInputStream inputStream = new FileInputStream(fullPath); - JobInfo salesforceInsertJob = createSalesforceJob(bulkConnection, api, OperationEnum.insert); List batchInfos = BulkUtil.createBatchesFromCSVFile(bulkConnection, salesforceInsertJob, fullPath); @@ -615,87 +649,37 @@ public class DataImportServiceImpl implements DataImportService { BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos); - checkResults(bulkConnection, salesforceInsertJob, batchInfos); + sfNum = sfNum + checkInsertResults(bulkConnection, salesforceInsertJob, batchInfos, api, ids); - -// BatchInfo insertBatch = bulkConnection.createJob(salesforceInsertJob,jsonString); - -// BatchInfo insertBatch = bulkConnection.createBatchFromStream(salesforceInsertJob,inputStream); -// -// closeJob(bulkConnection, insertBatch.getJobId()); - -// // 轮询直到批次完成 -// while (true) { -// BatchInfo batchInfo = bulkConnection.getBatchInfo(insertBatch.getJobId(), insertBatch.getId()); -// BatchStateEnum state = batchInfo.getState(); -// if (state == BatchStateEnum.Completed) { -// InputStream batchResultStream = bulkConnection.getBatchResultStream(insertBatch.getJobId(), insertBatch.getId()); -// List idColumns = CsvConverterUtil.extractIdColumn(batchResultStream); -// int index = 0; -// for (String id : idColumns) { -// List> maps = new ArrayList<>(); -// Map m = new HashMap<>(); -// m.put("key", "new_id"); -// m.put("value", id); -// maps.add(m); -// customMapper.updateById(api, maps, ids[index]); -// } -// break; -// } else if (state == BatchStateEnum.Failed) { -// throw new RuntimeException("Batch failed: " + batchInfo.getStateMessage()); -// } -// TimeUnit.SECONDS.sleep(10); //10秒检查一次 -// } -// -// String soql = "SELECT Id FROM "+ api +" WHERE CreatedDate >= '" + beginDate + "' AND CreatedDate <= '" + endDate + "'"; -// -// JobInfo salesforceQueryJob = createSalesforceJob(bulkConnection, api, OperationEnum.query); -// -// ByteArrayInputStream queryStream = new ByteArrayInputStream(soql.getBytes()); -// -// BatchInfo queryBatch = bulkConnection.createBatchFromStream(salesforceQueryJob, queryStream); -// -// while (true) { -// BatchInfo info = bulkConnection.getBatchInfo(queryBatch.getJobId(), queryBatch.getId()); -// if (info.getState() == BatchStateEnum.Completed) { -// InputStream batchResultStream = bulkConnection.getBatchResultStream(queryBatch.getJobId(), queryBatch.getId()); -// List idColumns = CsvConverterUtil.extractIdColumn(batchResultStream); -// Integer sfNum = idColumns.size(); -// -// UpdateWrapper updateQw = new UpdateWrapper<>(); -// updateQw.eq("name", api) -// .eq("sync_start_date", beginDate) -// .eq("sync_end_date", DateUtils.addSeconds(endDate, -1)) -// .set("target_sf_num", sfNum); -// dataBatchHistoryService.update(updateQw); -// -// UpdateWrapper updateQw2 = new UpdateWrapper<>(); -// updateQw2.eq("name", api) -// .eq("sync_start_date", beginDate) -// .eq("sync_end_date", endDate) -// .set("sf_add_num", sfNum); -// dataBatchService.update(updateQw2); -// -// break; -// }else if (info.getState() == BatchStateEnum.Failed) { -// throw new Exception("Query failed: " + info.getStateMessage()); -// } -// TimeUnit.SECONDS.sleep(10); //10秒检查一次 -// } } catch (Exception e) { log.error("manualCreatedNewId error api:{}", api, e); throw e; } } + UpdateWrapper updateQw = new UpdateWrapper<>(); + updateQw.eq("name", api) + .eq("sync_start_date", beginDate) + .eq("sync_end_date", DateUtils.addSeconds(endDate, -1)) + .set("target_sf_num", sfNum); + dataBatchHistoryService.update(updateQw); + + UpdateWrapper updateQw2 = new UpdateWrapper<>(); + updateQw2.eq("name", api) + .eq("sync_start_date", beginDate) + .eq("sync_end_date", endDate) + .set("sf_add_num", sfNum); + dataBatchService.update(updateQw2); + } /** * Gets the results of the operation and checks for errors. */ - private void checkResults(BulkConnection connection, JobInfo job, - List batchInfoList) + private int checkInsertResults(BulkConnection connection, JobInfo job, + List batchInfoList,String api,String[] ids) throws AsyncApiException, IOException { + int index = 0; // batchInfoList was populated when batches were created and submitted for (BatchInfo b : batchInfoList) { CSVReader rdr = @@ -714,27 +698,23 @@ public class DataImportServiceImpl implements DataImportService { String id = resultInfo.get("Id"); String error = resultInfo.get("Error"); if (success && created) { - System.out.println("Created row with id " + id); + List> maps = new ArrayList<>(); + Map m = new HashMap<>(); + m.put("key", "new_id"); + m.put("value", id); + maps.add(m); + customMapper.updateById(api, maps, ids[index]); + index ++; + log.info("Created row with id " + id); } else if (!success) { - System.out.println("Failed with error: " + error); + log.info("Failed with error: " + error); } } } + return index; } - /** - * 更新任务状态为完成 - * @param connection - * @param jobId - * @throws AsyncApiException - */ - private void closeJob(BulkConnection connection, String jobId) - throws AsyncApiException { - JobInfo job = new JobInfo(); - job.setId(jobId); - job.setState(JobStateEnum.Closed); - connection.updateJob(job); - } + /** * 创建任务