From 83dde4dbff12dbcc19036bc60f3e9b29ba78db93 Mon Sep 17 00:00:00 2001 From: Kris <2893855659@qq.com> Date: Fri, 13 Jun 2025 10:15:03 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90feat=E3=80=91=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E8=BF=94=E5=86=99=E4=B8=AA=E4=BA=BA=E8=81=94=E7=B3=BB=E4=BA=BA?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E9=87=8F=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=EF=BC=8C=E5=AD=98=E9=87=8F=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datadump/controller/JobController.java | 29 +- .../com/celnet/datadump/job/DataDumpJob.java | 57 +- .../celnet/datadump/job/DataDumpNewJob.java | 127 ++++ .../service/DataImportNewService.java | 21 + .../service/impl/CommonServiceImpl.java | 7 +- .../impl/DataImportNewServiceImpl.java | 560 ++++++++++++++++++ .../service/impl/DataImportServiceImpl.java | 14 +- 7 files changed, 748 insertions(+), 67 deletions(-) create mode 100644 src/main/java/com/celnet/datadump/job/DataDumpNewJob.java create mode 100644 src/main/java/com/celnet/datadump/service/DataImportNewService.java create mode 100644 src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java diff --git a/src/main/java/com/celnet/datadump/controller/JobController.java b/src/main/java/com/celnet/datadump/controller/JobController.java index 20110e8..0c45f42 100644 --- a/src/main/java/com/celnet/datadump/controller/JobController.java +++ b/src/main/java/com/celnet/datadump/controller/JobController.java @@ -44,6 +44,8 @@ public class JobController { private DataImportService dataImportService; @Autowired private DataImportBatchService dataImportBatchService; + @Autowired + private DataImportNewService dataImportNewService; @PostMapping("/fileTransform") @ApiOperation("附件解析") @@ -240,12 +242,35 @@ public class JobController { } catch (Throwable throwable) { return new ReturnT<>(500, "参数解析失败!"); } - param.setType(1); // 参数转换 param.setBeginCreateDate(param.getBeginDate()); param.setEndCreateDate(param.getEndDate()); return dataImportBatchService.immigrationUpdateBatch(param); } - + /** + * 返写个人联系人ID + * @param paramStr + * @author kris + * @return + * @throws Exception + */ + @PostMapping("/getPersonContactJob") + @ApiOperation("返写个人联系人ID") + @LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_UPDATE, remark = "返写个人联系人ID") + public ReturnT getPersonContactJob(String paramStr) throws Exception { + log.info("getPersonContactJob execute start .................."); + SalesforceParam param = new SalesforceParam(); + try { + if (StringUtils.isNotBlank(paramStr)) { + param = JSON.parseObject(paramStr, SalesforceParam.class); + } + } catch (Throwable throwable) { + return new ReturnT<>(500, "参数解析失败!"); + } + // 参数转换 + param.setBeginCreateDate(param.getBeginDate()); + param.setEndCreateDate(param.getEndDate()); + return dataImportNewService.getPersonContact(param); + } } diff --git a/src/main/java/com/celnet/datadump/job/DataDumpJob.java b/src/main/java/com/celnet/datadump/job/DataDumpJob.java index 3422164..2fd6e33 100644 --- a/src/main/java/com/celnet/datadump/job/DataDumpJob.java +++ b/src/main/java/com/celnet/datadump/job/DataDumpJob.java @@ -5,7 +5,6 @@ import com.celnet.datadump.param.SalesforceParam; import com.celnet.datadump.service.CommonService; import com.celnet.datadump.service.DataImportBatchService; import com.celnet.datadump.service.DataImportService; -import com.celnet.datadump.service.impl.DataImportServiceImpl; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; @@ -28,10 +27,6 @@ public class DataDumpJob { @Autowired private DataImportService dataImportService; - @Autowired - private DataImportBatchService dataImportBatchService; - - /** * 创建api * @@ -126,31 +121,6 @@ public class DataDumpJob { return dataImportService.immigration(param); } - /** - * bulk批量大数据生成newSFID - * @param paramStr - * @author kris - * @return - * @throws Exception - */ - @XxlJob("dataImportBatchJob") - public ReturnT dataImportBatchJob(String paramStr) throws Exception { - log.info("dataImportBatchJob execute start .................."); - SalesforceParam param = new SalesforceParam(); - try { - if (StringUtils.isNotBlank(paramStr)) { - param = JSON.parseObject(paramStr, SalesforceParam.class); - } - } catch (Throwable throwable) { - return new ReturnT<>(500, "参数解析失败!"); - } - param.setType(1); - // 参数转换 - param.setBeginCreateDate(param.getBeginDate()); - param.setEndCreateDate(param.getEndDate()); - return dataImportBatchService.immigrationBatch(param); - } - /** * 更新目标org数据 @@ -169,7 +139,7 @@ public class DataDumpJob { } catch (Throwable throwable) { return new ReturnT<>(500, "参数解析失败!"); } - param.setType(1); +// param.setType(1); // 参数转换 param.setBeginCreateDate(param.getBeginDate()); param.setEndCreateDate(param.getEndDate()); @@ -177,30 +147,6 @@ public class DataDumpJob { return dataImportService.immigrationUpdate(param); } - /** - * bulk批量大数据更新数据 - * @param paramStr - * @author kris - * @return - * @throws Exception - */ - @XxlJob("dataUpdateBatchJob") - public ReturnT dataUpdateBatchJob(String paramStr) throws Exception { - log.info("dataImportBatchJob execute start .................."); - SalesforceParam param = new SalesforceParam(); - try { - if (StringUtils.isNotBlank(paramStr)) { - param = JSON.parseObject(paramStr, SalesforceParam.class); - } - } catch (Throwable throwable) { - return new ReturnT<>(500, "参数解析失败!"); - } - param.setType(1); - // 参数转换 - param.setBeginCreateDate(param.getBeginDate()); - param.setEndCreateDate(param.getEndDate()); - return dataImportBatchService.immigrationUpdateBatch(param); - } /** * 获取文件关联表 @@ -225,4 +171,5 @@ public class DataDumpJob { return commonService.getAllApi(); } + } diff --git a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java new file mode 100644 index 0000000..a4fc0ca --- /dev/null +++ b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java @@ -0,0 +1,127 @@ +package com.celnet.datadump.job; + +import com.alibaba.fastjson.JSON; +import com.celnet.datadump.param.SalesforceParam; +import com.celnet.datadump.service.CommonService; +import com.celnet.datadump.service.DataImportBatchService; +import com.celnet.datadump.service.DataImportNewService; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 迁移任务 (新) + * 2024/06/12 + * kris + */ +@Component +@Slf4j +public class DataDumpNewJob { + + @Autowired + private CommonService commonService; + + @Autowired + private DataImportNewService dataImportNewService; + + @Autowired + private DataImportBatchService dataImportBatchService; + + + /** + * bulk批量大数据生成newSFID + * @param paramStr + * @author kris + * @return + * @throws Exception + */ + @XxlJob("dataImportBatchJob") + public ReturnT dataImportBatchJob(String paramStr) throws Exception { + log.info("dataImportBatchJob execute start .................."); + SalesforceParam param = new SalesforceParam(); + try { + if (StringUtils.isNotBlank(paramStr)) { + param = JSON.parseObject(paramStr, SalesforceParam.class); + } + } catch (Throwable throwable) { + return new ReturnT<>(500, "参数解析失败!"); + } + param.setType(1); + // 参数转换 + param.setBeginCreateDate(param.getBeginDate()); + param.setEndCreateDate(param.getEndDate()); + return dataImportBatchService.immigrationBatch(param); + } + + /** + * bulk批量大数据更新数据 + * @param paramStr + * @author kris + * @return + * @throws Exception + */ + @XxlJob("dataUpdateBatchJob") + public ReturnT dataUpdateBatchJob(String paramStr) throws Exception { + log.info("dataImportBatchJob execute start .................."); + SalesforceParam param = new SalesforceParam(); + try { + if (StringUtils.isNotBlank(paramStr)) { + param = JSON.parseObject(paramStr, SalesforceParam.class); + } + } catch (Throwable throwable) { + return new ReturnT<>(500, "参数解析失败!"); + } + // 参数转换 + param.setBeginCreateDate(param.getBeginDate()); + param.setEndCreateDate(param.getEndDate()); + return dataImportBatchService.immigrationUpdateBatch(param); + } + + /** + * 写入个人客户联系人old_id,返写new_id + * @param paramStr 参数json + * @return result + */ + @XxlJob("getPersonContactJob") + public ReturnT getPersonContactJob(String paramStr) throws Exception { + log.info("getPersonContactJob execute start .................."); + SalesforceParam param = new SalesforceParam(); + try { + if (StringUtils.isNotBlank(paramStr)) { + param = JSON.parseObject(paramStr, SalesforceParam.class); + } + } catch (Throwable throwable) { + return new ReturnT<>(500, "参数解析失败!"); + } + // 参数转换 + param.setBeginCreateDate(param.getBeginDate()); + param.setEndCreateDate(param.getEndDate()); + + return dataImportNewService.getPersonContact(param); + } + + /** + * 增量数据更新 + */ + @XxlJob("dataIncrementUpdateJob") + public ReturnT dataIncrementUpdateJob(String paramStr) throws Exception { + log.info("dataUpdateJob execute start .................."); + SalesforceParam param = new SalesforceParam(); + try { + if (StringUtils.isNotBlank(paramStr)) { + param = JSON.parseObject(paramStr, SalesforceParam.class); + } + } catch (Throwable throwable) { + return new ReturnT<>(500, "参数解析失败!"); + } + param.setBeginCreateDate(param.getBeginDate()); + param.setEndCreateDate(param.getEndDate()); + + return dataImportNewService.immigrationIncrementUpdate(param); + } + + +} diff --git a/src/main/java/com/celnet/datadump/service/DataImportNewService.java b/src/main/java/com/celnet/datadump/service/DataImportNewService.java new file mode 100644 index 0000000..6a3938e --- /dev/null +++ b/src/main/java/com/celnet/datadump/service/DataImportNewService.java @@ -0,0 +1,21 @@ +package com.celnet.datadump.service; + +import com.celnet.datadump.param.SalesforceParam; +import com.xxl.job.core.biz.model.ReturnT; + +import java.util.List; +import java.util.concurrent.Future; + +public interface DataImportNewService { + + /** + * 写入个人客户联系人old_id,返写new_id + */ + ReturnT getPersonContact(SalesforceParam param) throws Exception; + + /** + * 增量数据更新 + */ + ReturnT immigrationIncrementUpdate(SalesforceParam param) throws Exception; + +} diff --git a/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java index 3d86197..4c9b5e6 100644 --- a/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java @@ -22,6 +22,7 @@ import com.celnet.datadump.util.EmailUtil; import com.celnet.datadump.util.SqlUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.sforce.async.BulkConnection; import com.sforce.soap.partner.*; import com.sforce.soap.partner.sobject.SObject; import com.xxl.job.core.biz.model.ReturnT; @@ -920,12 +921,12 @@ public class CommonServiceImpl implements CommonService { if ("Task".equals(apiName) || "Event".equals(apiName)){ Map LinkedMap = Maps.newHashMap(); - LinkedMap.put("type", "varchar(255)"); + LinkedMap.put("type", "varchar(18)"); LinkedMap.put("comment", "whatTextId"); LinkedMap.put("name", "what_text_id"); list.add(LinkedMap); Map LinkedMap1 = Maps.newHashMap(); - LinkedMap1.put("type", "varchar(255)"); + LinkedMap1.put("type", "varchar(18)"); LinkedMap1.put("comment", "whoTextId"); LinkedMap1.put("name", "who_text_id"); list.add(LinkedMap1); @@ -1180,7 +1181,7 @@ public class CommonServiceImpl implements CommonService { SaveResult[] saveResults = connection.create(accounts); for (int j = 0; j < saveResults.length; j++) { if (!saveResults[j].getSuccess()) { - String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j])); + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j])); EmailUtil.send("DataDump ContentDocumentLink ERROR", format); } else { List> dList = new ArrayList<>(); diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java new file mode 100644 index 0000000..fe678c7 --- /dev/null +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java @@ -0,0 +1,560 @@ +package com.celnet.datadump.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; +import com.celnet.datadump.config.SalesforceExecutor; +import com.celnet.datadump.config.SalesforceTargetConnect; +import com.celnet.datadump.entity.DataBatch; +import com.celnet.datadump.entity.DataBatchHistory; +import com.celnet.datadump.entity.DataField; +import com.celnet.datadump.entity.DataObject; +import com.celnet.datadump.global.Const; +import com.celnet.datadump.global.SystemConfigCode; +import com.celnet.datadump.mapper.CustomMapper; +import com.celnet.datadump.param.DataDumpParam; +import com.celnet.datadump.param.SalesforceParam; +import com.celnet.datadump.service.*; +import com.celnet.datadump.util.DataUtil; +import com.celnet.datadump.util.EmailUtil; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.sforce.soap.partner.*; +import com.sforce.soap.partner.sobject.SObject; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.log.XxlJobLogger; +import com.xxl.job.core.util.DateUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.time.DateUtils; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class DataImportNewServiceImpl implements DataImportNewService { + + @Autowired + private SalesforceTargetConnect salesforceTargetConnect; + + @Autowired + private SalesforceExecutor salesforceExecutor; + + @Autowired + private DataObjectService dataObjectService; + + @Autowired + private DataBatchService dataBatchService; + + @Autowired + private DataFieldService dataFieldService; + + @Autowired + private CustomMapper customMapper; + + @Autowired + private DataBatchHistoryService dataBatchHistoryService; + + @Autowired + private CommonService commonService; + + + /** + * Get入口 + */ + @Override + public ReturnT getPersonContact(SalesforceParam param) throws Exception { + List> futures = Lists.newArrayList(); + try { + if (StringUtils.isNotBlank(param.getApi())) { + // 手动任务 + ReturnT result = manualGetPersonContact(param, futures); + if (result != null) { + return result; + } + } + return ReturnT.SUCCESS; + } catch (Exception exception) { + salesforceExecutor.remove(futures.toArray(new Future[]{})); + log.error("immigration error", exception); + throw exception; + } + } + + /** + * 组装Get执行参数 + */ + public ReturnT manualGetPersonContact(SalesforceParam param, List> futures) throws Exception { + String api = "Contact"; + + TimeUnit.MILLISECONDS.sleep(1); + // 全量的时候 检测是否有自动任务锁住的表 + boolean isFull = CollectionUtils.isEmpty(param.getIds()); + if (isFull) { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_lock", 1); + List list = dataObjectService.list(qw); + if (CollectionUtils.isNotEmpty(list)) { + String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); + return new ReturnT<>(500, "api:" + apiNames + " is locked"); + } + } + + PartnerConnection connect = salesforceTargetConnect.createConnect(); + DataObject update = new DataObject(); + TimeUnit.MILLISECONDS.sleep(1); + try { + List salesforceParams = null; + + update.setName(api); + update.setDataLock(1); + dataObjectService.updateById(update); + + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", api); + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } + + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + Future future = salesforceExecutor.execute(() -> { + try { + manualGetPersonContactId(salesforceParam, connect); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 1); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + update.setDataWork(0); + } catch (InterruptedException e) { + throw e; + } catch (Throwable e) { + log.error("manualImmigration error", e); + throw new RuntimeException(e); + } finally { + if (isFull) { + update.setName(api); + update.setDataLock(0); + dataObjectService.updateById(update); + } + } + return null; + } + + /** + * 获取newId,写入oldId + */ + private void manualGetPersonContactId(SalesforceParam param, PartnerConnection partnerConnection) throws Exception { + + String api = param.getApi(); + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("api", api); + + Date beginDate = param.getBeginCreateDate(); + Date endDate = param.getEndCreateDate(); + String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); + String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); + + + DescribeSObjectResult dsr = partnerConnection.describeSObject(api); + Field[] dsrFields = dsr.getFields(); + + //表内数据总量 + Integer count = customMapper.countBySQL(api, "where new_id is null and IsPersonAccount = 1 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'"); + + log.error("总Insert数据 count:{};-开始时间:{};-结束时间:{};-api:{};", count, beginDateStr, endDateStr, api); + if (count == 0) { + return; + } + + //批量插入200一次 + int page = count%200 == 0 ? count/200 : (count/200) + 1; + + for (int i = 0; i < page; i++) { + List> data = customMapper.list("*", api, "new_id is null and IsPersonAccount = 1 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 200"); + int size = data.size(); + log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size); + SObject[] accounts = new SObject[size]; + + // 新客户ID,旧联系人ID,用于更新本地数据 + Map idMaps = new HashMap<>(); + // 旧客户ID,新联系人ID,用于更新SF数据 + Map idMapa = new HashMap<>(); + + for (Map map : data) { + Map idMap = customMapper.getById("new_id", "Account", map.get("AccountId").toString()); + if(idMap.get("new_id") != null && StringUtils.isNotEmpty(idMap.get("new_id").toString())){ + // 新客户ID,旧联系人ID + idMaps.put(idMap.get("new_id").toString(),map.get("Id").toString()); + } + } + + String idStr = "("; + for (String ids : idMaps.keySet()) { + idStr += "'" + ids + "',"; // 拼接每个ID + } + if (idStr.endsWith(",")) { // 如果最后一个字符是逗号,说明循环正常结束 + idStr = idStr.substring(0, idStr.length() - 1); // 去掉最后一个多余的逗号 + } + idStr += ")"; // 添加右括号 + + try { + String sql = "SELECT Id,AccountId,Account.old_sfdc_id__c FROM Contact where AccountId in " + idStr ; + QueryResult queryResult = partnerConnection.queryAll(sql); + if (ObjectUtils.isEmpty(queryResult) || ObjectUtils.isEmpty(queryResult.getRecords())) { + break; + } + SObject[] records = queryResult.getRecords(); + com.alibaba.fastjson2.JSONArray objects = DataUtil.toJsonArray(records, dsrFields); + for (int z = 0; z < objects.size(); z++) { + JSONObject jsonObject = objects.getJSONObject(z); + String contactId = jsonObject.getString(Const.ID); + String accountId = jsonObject.getString("AccountId"); + String oldAccountId = jsonObject.getString("Account_old_sfdc_id__c"); + String id = idMaps.get(accountId); + List> maps = Lists.newArrayList(); + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "new_id"); + paramMap.put("value", contactId); + maps.add(paramMap); + customMapper.updateById(api, maps, id); + idMapa.put(oldAccountId, contactId); + } + TimeUnit.MILLISECONDS.sleep(1); + + int index = 0; + for (Map map : data) { + SObject account = new SObject(); + account.setType(api); + account.setField("old_owner_id__c", map.get("OwnerId")); + account.setField("old_sfdc_id__c", map.get("Id")); + account.setId(idMapa.get(map.get("AccountId").toString())); + accounts[index] = account; + index++; + } + + SaveResult[] saveResults = partnerConnection.update(accounts); + for (SaveResult saveResult : saveResults) { + if (!saveResult.getSuccess()) { + log.info("-------------saveResults: {}", JSON.toJSONString(saveResult)); + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s, \n数据实体类:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), JSON.toJSONString(saveResult)); + EmailUtil.send("DataDump ERROR", format); + return; + } + } + + } catch (Exception e) { + log.error("manualGetPersonContactId error api:{}", api, e); + throw e; + } + } + SalesforceParam countParam = new SalesforceParam(); + countParam.setApi(api); + countParam.setBeginCreateDate(beginDate); + countParam.setEndCreateDate(DateUtils.addSeconds(endDate, -1)); + // 存在isDeleted 只查询IsDeleted为false的 + if (dataFieldService.hasDeleted(countParam.getApi())) { + countParam.setIsDeleted(false); + } else { + // 不存在 过滤 + countParam.setIsDeleted(null); + } + // sf count + Integer sfNum = commonService.countSfNum(partnerConnection, countParam); + + UpdateWrapper updateQw = new UpdateWrapper<>(); + updateQw.eq("name", api) + .eq("sync_start_date", beginDate) + .eq("sync_end_date", DateUtils.addSeconds(endDate, -1)) + .set("target_sf_num", sfNum); + dataBatchHistoryService.update(updateQw); + + UpdateWrapper updateQw2 = new UpdateWrapper<>(); + updateQw2.eq("name", api) + .eq("sync_start_date", beginDate) + .eq("sync_end_date", endDate) + .set("sf_add_num", sfNum); + dataBatchService.update(updateQw2); + } + + /** + * 增量Update入口 + */ + @Override + public ReturnT immigrationIncrementUpdate(SalesforceParam param) throws Exception { + List> futures = Lists.newArrayList(); + try { + if (StringUtils.isNotBlank(param.getApi())) { + // 手动任务 + ReturnT result = updateIncrementSfData(param, futures); + if (result != null) { + return result; + } + } else { + // 自动更新任务 +// autoUpdateSfData(param, futures); + } + return ReturnT.SUCCESS; + } catch (InterruptedException e) { + throw e; + } catch (Throwable throwable) { + salesforceExecutor.remove(futures.toArray(new Future[]{})); + log.error("immigrationUpdate error", throwable); + throw throwable; + } + } + + /** + * 组装增量Update参数 + */ + public ReturnT updateIncrementSfData(SalesforceParam param, List> futures) throws Exception { + List apis; + String beginDateStr = null; + String endDateStr = null; + apis = DataUtil.toIdList(param.getApi()); + if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null){ + Date beginDate = param.getBeginCreateDate(); + Date endDate = param.getEndCreateDate(); + beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); + endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); + } + String join = StringUtils.join(apis, ","); + log.info("immigration apis: {}", join); + XxlJobLogger.log("immigration apis: {}", join); + // 全量的时候 检测是否有自动任务锁住的表 + boolean isFull = CollectionUtils.isEmpty(param.getIds()); + if (isFull) { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_lock", 1).in("name", apis); + List list = dataObjectService.list(qw); + if (CollectionUtils.isNotEmpty(list)) { + String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); + return new ReturnT<>(500, "api:" + apiNames + " is locked"); + } + } + + PartnerConnection partnerConnection = salesforceTargetConnect.createConnect(); + for (String api : apis) { + DataObject update = new DataObject(); + try { + TimeUnit.MILLISECONDS.sleep(1); + List salesforceParams = null; + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", api); + if (StringUtils.isNotEmpty(beginDateStr) && StringUtils.isNotEmpty(endDateStr)) { + dbQw.eq("sync_start_date", beginDateStr); // 等于开始时间 + dbQw.eq("sync_end_date", endDateStr); // 等于结束时间 + } + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } + + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + Future future = salesforceExecutor.execute(() -> { + try { + manualIncrementUpdateSfData(salesforceParam, partnerConnection); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 1); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + update.setDataWork(0); + } catch (Exception e) { + throw e; + } finally { + if (isFull) { + update.setName(api); + update.setDataLock(0); + dataObjectService.updateById(update); + } + } + } + return null; + } + + /** + * 执行增量Update数据 + */ + private void manualIncrementUpdateSfData(SalesforceParam param, PartnerConnection partnerConnection) throws Exception { + Map infoFlag = customMapper.list("code,value","system_config","code ='"+ SystemConfigCode.INFO_FLAG+"'").get(0); + String api = param.getApi(); + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("api", api); + List list = dataFieldService.list(dbQw); + TimeUnit.MILLISECONDS.sleep(1); + + Date beginDate = param.getBeginCreateDate(); + Date endDate = param.getEndCreateDate(); + String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); + String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); + + //表内数据总量 + Integer count = customMapper.countBySQL(api, "where new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'"); + + log.error("总Update数据 count:{};-开始时间:{};-结束时间:{};-api:{};", count, beginDateStr, endDateStr, api); + + if(count == 0){ + return; + } + + int targetCount = 0; + //批量插入200一次 + int page = count%200 == 0 ? count/200 : (count/200) + 1; + for (int i = 0; i < page; i++) { + List> mapList = customMapper.list("*", api, "new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' order by Id asc limit " + i * 200 + ",200"); + SObject[] accounts = new SObject[mapList.size()]; + int j = 0; + for (Map map : mapList) { + SObject account = new SObject(); + account.setType(api); + //给对象赋值 + for (DataField dataField : list) { + String field = dataField.getField(); + String reference_to = dataField.getReferenceTo(); + + //根据旧sfid查找引用对象新sfid + if (field.equals("Id")) { + account.setId(String.valueOf(map.get("new_id"))); + } else if (!DataUtil.isUpdate(field) || (dataField.getIsCreateable() != null && !dataField.getIsCreateable())) { + continue; + } else if (StringUtils.isNotBlank(reference_to) && !"data_picklist".equals(reference_to)) { + if (!"null".equals(map.get(field)) && null != map.get(field) && !"OwnerId".equals(field) + || !"Owner_Type".equals(field)) { + //判断reference_to内是否包含User字符串 + if (reference_to.contains("User")) { + reference_to = "User"; + } + Map m = customMapper.getById("new_id", reference_to, String.valueOf(map.get(field))); + if (m != null && !m.isEmpty()) { + account.setField(field, m.get("new_id")); + } + } + } else { + if (map.get(field) != null && StringUtils.isNotBlank(dataField.getSfType())) { + account.setField(field, DataUtil.localDataToSfData(dataField.getSfType(), String.valueOf(map.get(field)))); + }else { + if (api.equals("Account")){ + if ("1".equals(map.get("IsPersonAccount")) && field.equals("Name")){ + continue; + }else if("0".equals(map.get("IsPersonAccount")) && field.equals("LastName")){ + continue; + } + + } else { + account.setField(field, map.get(field)); + } + } + } + } + account.setField("old_owner_id__c", map.get("OwnerId")); + account.setField("old_sfdc_id__c", map.get("Id")); + + accounts[j++] = account; + } + List> listMap = new ArrayList<>(); + try { + if (infoFlag != null && "1".equals(infoFlag.get("value"))){ + listMap = returnAccountsDetails(accounts,list); + } + SaveResult[] saveResults = partnerConnection.update(accounts); + for (SaveResult saveResult : saveResults) { + if (!saveResult.getSuccess()) { + log.info("-------------saveResults: {}", JSON.toJSONString(saveResult)); + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s, \n数据实体类:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), JSON.toJSONString(saveResult),JSON.toJSONString(listMap)); + EmailUtil.send("DataDump ERROR", format); + return; + }else { + targetCount ++; + } + } + log.info("sf return saveResults------" + JSONArray.toJSONString(saveResults)); + + } catch (Throwable e) { + throw e; + } + } + UpdateWrapper updateQw = new UpdateWrapper<>(); + updateQw.eq("name", api) + .eq("sync_start_date", beginDate) + .eq("sync_end_date", DateUtils.addSeconds(endDate, -1)) + .set("target_update_num", targetCount); + dataBatchHistoryService.update(updateQw); + + UpdateWrapper updateQw2 = new UpdateWrapper<>(); + updateQw2.eq("name", api) + .eq("sync_start_date", beginDate) + .eq("sync_end_date", endDate) + .set("sf_update_num", targetCount); + dataBatchService.update(updateQw2); + + } + + /** + * 打印SF交互数据明细 + */ + public List> returnAccountsDetails(SObject[] accounts,List list) { + ArrayList> arrayList = new ArrayList<>(); + for (int i = 0; i < accounts.length; i++) { + HashMap map = new HashMap<>(); + SObject account = accounts[i]; + System.out.println("--- Account[" + i + "] ---"); + // 获取对象所有字段名 + for (DataField dataField : list) { + try { + Object value = account.getField(dataField.getField()); + map.put(dataField.getField(),String.valueOf(value)); + System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null")); + } catch (Exception e) { + System.out.println(dataField.getField() + ": [权限不足或字段不存在]"); + } + } + System.out.println("old_owner_id__c: " + (account.getField("old_owner_id__c") != null ? account.getField("old_owner_id__c").toString() : "null")); + System.out.println("old_sfdc_id__c: " + (account.getField("old_sfdc_id__c") != null ? account.getField("old_sfdc_id__c").toString() : "null")); + map.put("old_owner_id__c",String.valueOf(account.getField("old_owner_id__c"))); + map.put("old_sfdc_id__c",String.valueOf(account.getField("old_sfdc_id__c"))); + arrayList.add(map); + } + return arrayList; + } + + +} diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java index 66c12a5..04ceb8c 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java @@ -3,7 +3,7 @@ package com.celnet.datadump.service.impl; import cn.hutool.core.lang.UUID; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; -import cn.hutool.json.JSONObject; +import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.celnet.datadump.config.SalesforceExecutor; @@ -21,9 +21,9 @@ import com.celnet.datadump.util.CsvConverterUtil; import com.celnet.datadump.util.DataUtil; import com.celnet.datadump.util.EmailUtil; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.sforce.async.*; -import com.sforce.soap.partner.PartnerConnection; -import com.sforce.soap.partner.SaveResult; +import com.sforce.soap.partner.*; import com.sforce.soap.partner.sobject.SObject; import com.sforce.ws.ConnectionException; import com.xxl.job.core.biz.model.ReturnT; @@ -227,7 +227,7 @@ public class DataImportServiceImpl implements DataImportService { for (SalesforceParam salesforceParam : salesforceParams) { Future future = salesforceExecutor.execute(() -> { try { - autoCreatedNewId(salesforceParam, partnerConnection); + manualCreatedNewId(salesforceParam, partnerConnection); } catch (Throwable throwable) { log.error("salesforceExecutor error", throwable); throw new RuntimeException(throwable); @@ -350,7 +350,7 @@ public class DataImportServiceImpl implements DataImportService { if (api.equals("Event")){ account.setField("EventSubtype", String.valueOf(data.get(j - 1).get("EventSubtype"))); - account.setField("IsRecurrence", String.valueOf(data.get(j - 1).get("IsRecurrence"))); +// account.setField("IsRecurrence", String.valueOf(data.get(j - 1).get("IsRecurrence"))); } if (api.equals("Account")){ Map referenceMap = customMapper.list("new_id","RecordType", "new_id is not null and id = '"+ data.get(j - 1).get("RecordTypeId")+"' limit 1").get(0); @@ -479,8 +479,8 @@ public class DataImportServiceImpl implements DataImportService { || "Id".equals(dataField.getField())){ continue; } - //用完放下面 && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate() - if (dataField.getIsCreateable()) { + //用完放下面 + if (dataField.getIsCreateable() && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()) { if ("reference".equals(dataField.getSfType())){ String reference = dataField.getReferenceTo(); if ("Group,User".equals(reference)) {