From 51b4ebde80dc8b3f550b54ef702b78dc03e2522a Mon Sep 17 00:00:00 2001 From: Kris <2893855659@qq.com> Date: Mon, 23 Jun 2025 11:09:29 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90feat=E3=80=91=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E9=82=AE=E4=BB=B6=E5=8F=91=E9=80=81=E9=94=99=E8=AF=AF=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datadump/config/SalesforceConnect.java | 7 ++ .../config/SalesforceTargetConnect.java | 8 +++ .../celnet/datadump/job/DataDumpNewJob.java | 4 +- .../impl/DataImportBatchServiceImpl.java | 67 +++++++++++-------- .../impl/DataImportNewServiceImpl.java | 16 +++++ .../service/impl/DataImportServiceImpl.java | 8 +-- .../celnet/datadump/DataDumpConnetTests.java | 7 +- 7 files changed, 79 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/celnet/datadump/config/SalesforceConnect.java b/src/main/java/com/celnet/datadump/config/SalesforceConnect.java index b271339..a7e9f0e 100644 --- a/src/main/java/com/celnet/datadump/config/SalesforceConnect.java +++ b/src/main/java/com/celnet/datadump/config/SalesforceConnect.java @@ -3,6 +3,7 @@ package com.celnet.datadump.config; import com.alibaba.fastjson.JSONArray; import com.celnet.datadump.mapper.CustomMapper; import com.celnet.datadump.util.BulkUtil; +import com.celnet.datadump.util.EmailUtil; import com.google.common.collect.Lists; import com.sforce.async.BulkConnection; import com.sforce.soap.partner.PartnerConnection; @@ -63,6 +64,9 @@ public class SalesforceConnect { config.setReadTimeout(60 * 60 * 1000); return new PartnerConnection(config); } catch (ConnectionException e) { + String message = "源ORG连接配置错误!"; + String format = String.format("ORG连接异常!, \ncause:\n%s", message); + EmailUtil.send("DataDump ERROR", format); log.error("exception message", e); return null; } @@ -103,6 +107,9 @@ public class SalesforceConnect { // config.setSessionId(connection.getSessionHeader().getSessionId()); return BulkUtil.getBulkConnection(map.get("username"),map.get("password"),map.get("url")); } catch (Exception e) { + String message = "源ORG连接配置错误!"; + String format = String.format("ORG连接异常!, \ncause:\n%s", message); + EmailUtil.send("DataDump ERROR", format); log.error("exception message", e); return null; } diff --git a/src/main/java/com/celnet/datadump/config/SalesforceTargetConnect.java b/src/main/java/com/celnet/datadump/config/SalesforceTargetConnect.java index 6ebf955..369322d 100644 --- a/src/main/java/com/celnet/datadump/config/SalesforceTargetConnect.java +++ b/src/main/java/com/celnet/datadump/config/SalesforceTargetConnect.java @@ -1,7 +1,9 @@ package com.celnet.datadump.config; import com.celnet.datadump.mapper.CustomMapper; +import com.celnet.datadump.param.DataDumpParam; import com.celnet.datadump.util.BulkUtil; +import com.celnet.datadump.util.EmailUtil; import com.google.common.collect.Lists; import com.sforce.async.BulkConnection; import com.sforce.soap.partner.PartnerConnection; @@ -58,6 +60,9 @@ public class SalesforceTargetConnect { String orgId = connection.getUserInfo().getOrganizationId(); return connection; } catch (ConnectionException e) { + String message = "目标ORG连接配置错误!"; + String format = String.format("ORG连接异常!, \ncause:\n%s", message); + EmailUtil.send("DataDump ERROR", format); log.error("exception message", e); return null; } @@ -99,6 +104,9 @@ public class SalesforceTargetConnect { // config.setSessionId(connection.getSessionHeader().getSessionId()); return BulkUtil.getBulkConnection(map.get("username"),map.get("password"),map.get("url")); } catch (Exception e) { + String message = "目标ORG连接配置错误!"; + String format = String.format("ORG连接异常!, \ncause:\n%s", message); + EmailUtil.send("DataDump ERROR", format); log.error("exception message", e); return null; } diff --git a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java index 57084f7..6dc7724 100644 --- a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java +++ b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java @@ -140,8 +140,8 @@ public class DataDumpNewJob { * @return result */ @XxlJob("uploadDocumentLinkJob") - public ReturnT pullDocumentLinkJob(String paramStr) throws Exception{ - log.info("pullDocumentLinkJob execute start .................."); + public ReturnT uploadDocumentLinkJob(String paramStr) throws Exception{ + log.info("uploadDocumentLinkJob execute start .................."); return dataImportNewService.uploadDocumentLinkJob(paramStr); } diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java index ba51f8e..d122d5b 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java @@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -73,9 +74,6 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { @Autowired private DataBatchHistoryService dataBatchHistoryService; - @Autowired - private DataLogService dataLogService; - /** * Insert入口 @@ -119,7 +117,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { List list = dataObjectService.list(qw); if (CollectionUtils.isNotEmpty(list)) { String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); - return new ReturnT<>(500, "api:" + apiNames + " is locked"); + String message = "api:" + apiNames + " is locked"; + log.info(message); + String format = String.format("数据Insert error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message); + EmailUtil.send("DataDump ERROR", format); + return new ReturnT<>(500, message); } } @@ -247,6 +249,10 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { //如果必填lookup字段没有值,跳过 update.setDataIndex(Integer.parseInt(map.get("data_index").toString()+1)); dataObjectService.updateById(update); + String message = "api:" + api + "的引用对象:" + reference + "不存在数据!"; + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message); + EmailUtil.send("DataDump ERROR", format); + log.info(message); return; }else{ account.put(dataField.getField(), referenceMap.get(0).get("new_id")); @@ -298,8 +304,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { sfNum = sfNum + checkInsertResults(bulkConnection, salesforceInsertJob, batchInfos, api, ids); -// Files.delete(Paths.get(fullPath)); - + new File(fullPath).delete(); } catch (Exception e) { log.error("manualCreatedNewId error api:{}", api, e); throw e; @@ -354,16 +359,9 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { maps.add(m); customMapper.updateById(api, maps, ids[index]); index ++; - log.info("Created row with id " + id); + log.info("Created Success row with id " + id); } else if (!insertStatus) { -// DataLog dataLog = new DataLog(); -// dataLog.setRequestData("BulkInsert,api:" + api); -// dataLog.setEndTime(new Date()); -// dataLog.setStartTime(new Date()); -// dataLog.setRequestType("Insert"); -// dataLog.setMessage(error); -// dataLogService.save(dataLog); - log.info("Failed with error: " + error); + log.info("Created Fail with error: " + error); } } } @@ -419,7 +417,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { List list = dataObjectService.list(qw); if (CollectionUtils.isNotEmpty(list)) { String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); - return new ReturnT<>(500, "api:" + apiNames + " is locked"); + String message = "api:" + apiNames + " is locked"; + log.info(message); + String format = String.format("数据Update error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message); + EmailUtil.send("DataDump ERROR", format); + return new ReturnT<>(500, message); } } @@ -462,11 +464,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); - update.setDataWork(0); } catch (Exception e) { throw e; } finally { if (isFull) { + update.setNeedUpdate(false); update.setName(api); update.setDataLock(0); dataObjectService.updateById(update); @@ -498,6 +500,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { if(count == 0){ return; } + + //判断引用对象是否存在new_id + DataObject update = new DataObject(); + update.setName(api); + // 总更新数 int sfNum = 0; // 批量更新10000一次 @@ -530,6 +537,18 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { Map m = customMapper.getById("new_id", reference_to, String.valueOf(map.get(field))); if (m != null && !m.isEmpty()) { account.put(field, m.get("new_id")); + }else { + QueryWrapper maxIndex = new QueryWrapper<>(); + maxIndex.select("IFNULL(max(data_index),0) as data_index"); + maxIndex.ne("name", api); + Map mapTo = dataObjectService.getMap(maxIndex); + //如果必填lookup字段没有值,跳过 + update.setDataIndex(Integer.parseInt(mapTo.get("data_index").toString()+1)); + dataObjectService.updateById(update); + String message = "对象类型:" + api + "的数据:"+ m.get("Id") +"的引用对象:" + dataField.getReferenceTo() + "的数据:"+ map.get(field) +"不存在!"; String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message); + EmailUtil.send("DataDump ERROR", format); + log.info(message); + return; } } } else { @@ -560,8 +579,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { sfNum = sfNum + checkUpdateResults(bulkConnection, salesforceInsertJob, batchInfos,api); -// Files.delete(Paths.get(fullPath)); - + new File(fullPath).delete(); } catch (Throwable e) { log.info(e.getMessage()); throw e; @@ -609,16 +627,9 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { String error = resultInfo.get("Error"); if (updateStatus) { index ++; - log.info("Update row with id " + id); + log.info("Update Success row with id " + id); } else { -// DataLog dataLog = new DataLog(); -// dataLog.setRequestData("BulkUpdate,api:" + api); -// dataLog.setEndTime(new Date()); -// dataLog.setStartTime(new Date()); -// dataLog.setRequestType("Update"); -// dataLog.setMessage(error); -// dataLogService.save(dataLog); - log.info("Failed with error: " + error); + log.info("Update Fail with error: " + error); } } } diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java index fac77ad..b68b4e0 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java @@ -723,6 +723,10 @@ public class DataImportNewServiceImpl implements DataImportNewService { return; } + //判断引用对象是否存在new_id + DataObject update = new DataObject(); + update.setName(api); + int targetCount = 0; //批量插入200一次 int page = count%200 == 0 ? count/200 : (count/200) + 1; @@ -753,6 +757,18 @@ public class DataImportNewServiceImpl implements DataImportNewService { Map m = customMapper.getById("new_id", reference_to, String.valueOf(map.get(field))); if (m != null && !m.isEmpty()) { account.setField(field, m.get("new_id")); + }else { + QueryWrapper maxIndex = new QueryWrapper<>(); + maxIndex.select("IFNULL(max(data_index),0) as data_index"); + maxIndex.ne("name", api); + Map mapTo = dataObjectService.getMap(maxIndex); + //如果必填lookup字段没有值,跳过 + update.setDataIndex(Integer.parseInt(mapTo.get("data_index").toString()) +1); + dataObjectService.updateById(update); + String message = "对象类型:" + api + "的数据:"+ m.get("Id") +"的引用对象:" + dataField.getReferenceTo() + "的数据:"+ map.get(field) +"不存在!"; String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message); + EmailUtil.send("DataDump ERROR", format); + log.info(message); + return; } } } else { diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java index f3e7055..2eb4411 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java @@ -352,10 +352,10 @@ public class DataImportServiceImpl implements DataImportService { account.setField("EventSubtype", String.valueOf(data.get(j - 1).get("EventSubtype"))); // account.setField("IsRecurrence", String.valueOf(data.get(j - 1).get("IsRecurrence"))); } - if (api.equals("Account")){ - Map referenceMap = customMapper.list("new_id","RecordType", "new_id is not null and id = '"+ data.get(j - 1).get("RecordTypeId")+"' limit 1").get(0); - account.setField("RecordTypeId", referenceMap.get("new_id") ); - } +// if (api.equals("Account")){ +// Map referenceMap = customMapper.list("new_id","RecordType", "new_id is not null and id = '"+ data.get(j - 1).get("RecordTypeId")+"' limit 1").get(0); +// account.setField("RecordTypeId", referenceMap.get("new_id") ); +// } if (api.equals("vlink__Wechat_User__c")){ List> maps = customMapper.list("new_id", "vlink__Wechat_Account__c", "new_id is not null and id = '" + data.get(j - 1).get("vlink__Wechat_Account__c") + "' limit 1"); if (!maps.isEmpty()){ diff --git a/src/test/java/com/celnet/datadump/DataDumpConnetTests.java b/src/test/java/com/celnet/datadump/DataDumpConnetTests.java index 6e54bc8..6539e8e 100644 --- a/src/test/java/com/celnet/datadump/DataDumpConnetTests.java +++ b/src/test/java/com/celnet/datadump/DataDumpConnetTests.java @@ -38,7 +38,6 @@ public class DataDumpConnetTests { @Test public void createConnect() throws Exception { try { - List> poll = customerMapper.list("code,value", "org_config", null); // //遍历poll,找出code值为TARGET_ORG_URL,TARGET_ORG_USERNAME,TARGET_ORG_PASSWORD的value值 Map map = new HashMap<>(); // Map map = new HashMap<>(); @@ -57,9 +56,9 @@ public class DataDumpConnetTests { // } // } //遍历poll,找出code值为TARGET_ORG_URL,TARGET_ORG_USERNAME,TARGET_ORG_PASSWORD的value值 - map.put("url", "https://cookchina--sandbox.sandbox.my.sfcrmproducts.cn/services/Soap/u/56.0"); - map.put("username", "cong.chen@cookmedicalasia.com.sandbox"); - map.put("password", "cook202504"); + map.put("url", "https://steco-process.my.sfcrmproducts.cn/services/Soap/u/56.0"); + map.put("username", "binxu@steco-process.com"); + map.put("password", "AAM0902!"); String username = map.get("username").toString(); ConnectorConfig config = new ConnectorConfig(); config.setUsername(username);