From 2643f3380d48476b5492b5c3d3825c8dcab54ce1 Mon Sep 17 00:00:00 2001 From: Kris <2893855659@qq.com> Date: Fri, 25 Jul 2025 16:07:43 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90feat=E3=80=91=20=E4=B8=80=E6=AC=A1?= =?UTF-8?q?=E6=80=A7=E6=8F=92=E5=85=A5=E6=96=B0=E5=A2=9EShare=E5=AF=B9?= =?UTF-8?q?=E8=B1=A1=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/DataImportBatchServiceImpl.java | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java index 0444009..d0689a0 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java @@ -924,6 +924,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { update.setDataLock(1); dataObjectService.updateById(update); + if (api.contains("Share")){ + insertSingleShareData(api,bulkConnection); + continue; + } + QueryWrapper dbQw = new QueryWrapper<>(); dbQw.eq("name", api); List list = dataBatchService.list(dbQw); @@ -997,6 +1002,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { update.setDataLock(1); dataObjectService.updateById(update); + if (api.contains("Share")){ + insertSingleShareData(api,bulkConnection); + continue; + } + List salesforceParams = null; QueryWrapper dbQw = new QueryWrapper<>(); dbQw.eq("name", api); @@ -1211,7 +1221,135 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { } + /** + * 执行一次性Insert Share数据 + */ + private void insertSingleShareData(String api, BulkConnection bulkConnection) throws Exception { + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("api", api); + List list = dataFieldService.list(dbQw); + TimeUnit.MILLISECONDS.sleep(1); + String beginDateStr = null; + String endDateStr = null; + + //表内数据总量 + Integer count = customMapper.countBySQL(api, "where new_id is null and RowCause = 'Manual'"); + log.error("总Insert数据 count:{};-开始时间:{};-结束时间:{};-api:{};", count, beginDateStr, endDateStr, api); + if (count == 0) { + return; + } + //批量插入10000一次 + int page = count%10000 == 0 ? count/10000 : (count/10000) + 1; + //总插入数 + int sfNum = 0; + for (int i = 0; i < page; i++) { + + List data = customMapper.listJsonObject("*", api, "new_id is null and RowCause = 'Manual' limit 10000"); + int size = data.size(); + + log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size); + List insertList = new ArrayList<>(); + + //查询当前对象多态字段映射 + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("api",api).eq("is_create",1).eq("is_link",true); + List configs = linkConfigService.list(queryWrapper); + Map fieldMap = new HashMap<>(); + + if (!configs.isEmpty()) { + fieldMap = configs.stream() + .collect(Collectors.toMap( + LinkConfig::getField, // Key提取器 + LinkConfig::getLinkField, // Value提取器 + (oldVal, newVal) -> newVal // 解决重复Key冲突(保留新值) + )); + } + + //判断引用对象是否存在new_id + DataObject update = new DataObject(); + update.setName(api); + + //更新对象的new_id + String[] ids = new String[size]; + + for (int j = 1; j <= size; j++) { + JSONObject account = new JSONObject(); + for (DataField dataField : list) { + + if ("Owner_Type".equals(dataField.getField()) || "Id".equals(dataField.getField())){ + continue; + } + + if (dataField.getIsCreateable() !=null && dataField.getIsCreateable()) { + if ("reference".equals(dataField.getSfType()) && data.get(j - 1).get(dataField.getField()) != null){ + //引用类型 + String reference_to = dataField.getReferenceTo(); + //引用类型字段 + String linkfield = fieldMap.get(dataField.getField()); + + if (StringUtils.isNotBlank(linkfield)){ + reference_to = data.get(j-1).get(linkfield)!=null?data.get(j-1).get(linkfield).toString():null; + } + + if (reference_to == null){ + continue; + } + + if (reference_to.contains(",User") || reference_to.contains("User,")) { + reference_to = "User"; + } + Map m = customMapper.getById("new_id", reference_to, data.get(j - 1).get(dataField.getField()).toString()); + if (m != null && !m.isEmpty()) { + account.put(dataField.getField(), m.get("new_id")); + }else { + String message = "对象类型:" + api + "的数据:"+ data.get(j - 1).get("Id") +"的引用对象:" + reference_to + "的数据:"+ data.get(j - 1).get(dataField.getField()) +"不存在!"; + EmailUtil.send("DataDump ERROR", message); + log.info(message); + return; + } + }else { + if (data.get(j - 1).get(dataField.getField()) != null && StringUtils.isNotBlank(dataField.getSfType())) { + account.put(dataField.getField(), DataUtil.localBulkDataToSfData(dataField.getSfType(), data.get(j - 1).get(dataField.getField()).toString())); + }else { + account.put(dataField.getField(), data.get(j - 1).get(dataField.getField()) ); + } + } + } + + } + + ids[j-1] = data.get(j-1).get("Id").toString(); + insertList.add(account); + if (i*10000+j == count){ + break; + } + } + + try { + + //写入csv文件 + String fullPath = CsvConverterUtil.writeToCsv(insertList, UUID.randomUUID().toString()); + + JobInfo salesforceInsertJob = BulkUtil.createJob(bulkConnection, api, OperationEnum.insert); + + List batchInfos = BulkUtil.createBatchesFromCSVFile(bulkConnection, salesforceInsertJob, fullPath); + + BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos); + + sfNum = sfNum + checkInsertResults(bulkConnection, salesforceInsertJob, batchInfos, api, ids); + + BulkUtil.closeJob(bulkConnection, salesforceInsertJob.getId()); + + new File(fullPath).delete(); + + } catch (Exception e) { + log.error("manualCreatedNewId error api:{}", api, e); + throw e; + } + } + + } }