【feat】 新增返写个人联系人,增量数据同步,存量数据同步

This commit is contained in:
Kris 2025-06-13 10:15:03 +08:00
parent cadbb3d490
commit 83dde4dbff
7 changed files with 748 additions and 67 deletions

View File

@ -44,6 +44,8 @@ public class JobController {
private DataImportService dataImportService;
@Autowired
private DataImportBatchService dataImportBatchService;
@Autowired
private DataImportNewService dataImportNewService;
@PostMapping("/fileTransform")
@ApiOperation("附件解析")
@ -240,12 +242,35 @@ public class JobController {
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
param.setType(1);
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportBatchService.immigrationUpdateBatch(param);
}
/**
* 返写个人联系人ID
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@PostMapping("/getPersonContactJob")
@ApiOperation("返写个人联系人ID")
@LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_UPDATE, remark = "返写个人联系人ID")
public ReturnT<String> getPersonContactJob(String paramStr) throws Exception {
log.info("getPersonContactJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportNewService.getPersonContact(param);
}
}

View File

@ -5,7 +5,6 @@ import com.celnet.datadump.param.SalesforceParam;
import com.celnet.datadump.service.CommonService;
import com.celnet.datadump.service.DataImportBatchService;
import com.celnet.datadump.service.DataImportService;
import com.celnet.datadump.service.impl.DataImportServiceImpl;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
@ -28,10 +27,6 @@ public class DataDumpJob {
@Autowired
private DataImportService dataImportService;
@Autowired
private DataImportBatchService dataImportBatchService;
/**
* 创建api
*
@ -126,31 +121,6 @@ public class DataDumpJob {
return dataImportService.immigration(param);
}
/**
* bulk批量大数据生成newSFID
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@XxlJob("dataImportBatchJob")
public ReturnT<String> dataImportBatchJob(String paramStr) throws Exception {
log.info("dataImportBatchJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
param.setType(1);
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportBatchService.immigrationBatch(param);
}
/**
* 更新目标org数据
@ -169,7 +139,7 @@ public class DataDumpJob {
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
param.setType(1);
// param.setType(1);
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
@ -177,30 +147,6 @@ public class DataDumpJob {
return dataImportService.immigrationUpdate(param);
}
/**
* bulk批量大数据更新数据
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@XxlJob("dataUpdateBatchJob")
public ReturnT<String> dataUpdateBatchJob(String paramStr) throws Exception {
log.info("dataImportBatchJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
param.setType(1);
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportBatchService.immigrationUpdateBatch(param);
}
/**
* 获取文件关联表
@ -225,4 +171,5 @@ public class DataDumpJob {
return commonService.getAllApi();
}
}

View File

@ -0,0 +1,127 @@
package com.celnet.datadump.job;
import com.alibaba.fastjson.JSON;
import com.celnet.datadump.param.SalesforceParam;
import com.celnet.datadump.service.CommonService;
import com.celnet.datadump.service.DataImportBatchService;
import com.celnet.datadump.service.DataImportNewService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 迁移任务 ()
* 2024/06/12
* kris
*/
@Component
@Slf4j
public class DataDumpNewJob {
@Autowired
private CommonService commonService;
@Autowired
private DataImportNewService dataImportNewService;
@Autowired
private DataImportBatchService dataImportBatchService;
/**
* bulk批量大数据生成newSFID
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@XxlJob("dataImportBatchJob")
public ReturnT<String> dataImportBatchJob(String paramStr) throws Exception {
log.info("dataImportBatchJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
param.setType(1);
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportBatchService.immigrationBatch(param);
}
/**
* bulk批量大数据更新数据
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@XxlJob("dataUpdateBatchJob")
public ReturnT<String> dataUpdateBatchJob(String paramStr) throws Exception {
log.info("dataImportBatchJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportBatchService.immigrationUpdateBatch(param);
}
/**
* 写入个人客户联系人old_id返写new_id
* @param paramStr 参数json
* @return result
*/
@XxlJob("getPersonContactJob")
public ReturnT<String> getPersonContactJob(String paramStr) throws Exception {
log.info("getPersonContactJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportNewService.getPersonContact(param);
}
/**
* 增量数据更新
*/
@XxlJob("dataIncrementUpdateJob")
public ReturnT<String> dataIncrementUpdateJob(String paramStr) throws Exception {
log.info("dataUpdateJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportNewService.immigrationIncrementUpdate(param);
}
}

View File

@ -0,0 +1,21 @@
package com.celnet.datadump.service;
import com.celnet.datadump.param.SalesforceParam;
import com.xxl.job.core.biz.model.ReturnT;
import java.util.List;
import java.util.concurrent.Future;
public interface DataImportNewService {
/**
* 写入个人客户联系人old_id返写new_id
*/
ReturnT<String> getPersonContact(SalesforceParam param) throws Exception;
/**
* 增量数据更新
*/
ReturnT<String> immigrationIncrementUpdate(SalesforceParam param) throws Exception;
}

View File

@ -22,6 +22,7 @@ import com.celnet.datadump.util.EmailUtil;
import com.celnet.datadump.util.SqlUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sforce.async.BulkConnection;
import com.sforce.soap.partner.*;
import com.sforce.soap.partner.sobject.SObject;
import com.xxl.job.core.biz.model.ReturnT;
@ -920,12 +921,12 @@ public class CommonServiceImpl implements CommonService {
if ("Task".equals(apiName) || "Event".equals(apiName)){
Map<String, Object> LinkedMap = Maps.newHashMap();
LinkedMap.put("type", "varchar(255)");
LinkedMap.put("type", "varchar(18)");
LinkedMap.put("comment", "whatTextId");
LinkedMap.put("name", "what_text_id");
list.add(LinkedMap);
Map<String, Object> LinkedMap1 = Maps.newHashMap();
LinkedMap1.put("type", "varchar(255)");
LinkedMap1.put("type", "varchar(18)");
LinkedMap1.put("comment", "whoTextId");
LinkedMap1.put("name", "who_text_id");
list.add(LinkedMap1);
@ -1180,7 +1181,7 @@ public class CommonServiceImpl implements CommonService {
SaveResult[] saveResults = connection.create(accounts);
for (int j = 0; j < saveResults.length; j++) {
if (!saveResults[j].getSuccess()) {
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j]));
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j]));
EmailUtil.send("DataDump ContentDocumentLink ERROR", format);
} else {
List<Map<String, Object>> dList = new ArrayList<>();

View File

@ -0,0 +1,560 @@
package com.celnet.datadump.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.celnet.datadump.config.SalesforceExecutor;
import com.celnet.datadump.config.SalesforceTargetConnect;
import com.celnet.datadump.entity.DataBatch;
import com.celnet.datadump.entity.DataBatchHistory;
import com.celnet.datadump.entity.DataField;
import com.celnet.datadump.entity.DataObject;
import com.celnet.datadump.global.Const;
import com.celnet.datadump.global.SystemConfigCode;
import com.celnet.datadump.mapper.CustomMapper;
import com.celnet.datadump.param.DataDumpParam;
import com.celnet.datadump.param.SalesforceParam;
import com.celnet.datadump.service.*;
import com.celnet.datadump.util.DataUtil;
import com.celnet.datadump.util.EmailUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sforce.soap.partner.*;
import com.sforce.soap.partner.sobject.SObject;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@Service
@Slf4j
public class DataImportNewServiceImpl implements DataImportNewService {
@Autowired
private SalesforceTargetConnect salesforceTargetConnect;
@Autowired
private SalesforceExecutor salesforceExecutor;
@Autowired
private DataObjectService dataObjectService;
@Autowired
private DataBatchService dataBatchService;
@Autowired
private DataFieldService dataFieldService;
@Autowired
private CustomMapper customMapper;
@Autowired
private DataBatchHistoryService dataBatchHistoryService;
@Autowired
private CommonService commonService;
/**
* Get入口
*/
@Override
public ReturnT<String> getPersonContact(SalesforceParam param) throws Exception {
List<Future<?>> futures = Lists.newArrayList();
try {
if (StringUtils.isNotBlank(param.getApi())) {
// 手动任务
ReturnT<String> result = manualGetPersonContact(param, futures);
if (result != null) {
return result;
}
}
return ReturnT.SUCCESS;
} catch (Exception exception) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
log.error("immigration error", exception);
throw exception;
}
}
/**
* 组装Get执行参数
*/
public ReturnT<String> manualGetPersonContact(SalesforceParam param, List<Future<?>> futures) throws Exception {
String api = "Contact";
TimeUnit.MILLISECONDS.sleep(1);
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils.isEmpty(param.getIds());
if (isFull) {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_lock", 1);
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isNotEmpty(list)) {
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
return new ReturnT<>(500, "api:" + apiNames + " is locked");
}
}
PartnerConnection connect = salesforceTargetConnect.createConnect();
DataObject update = new DataObject();
TimeUnit.MILLISECONDS.sleep(1);
try {
List<SalesforceParam> salesforceParams = null;
update.setName(api);
update.setDataLock(1);
dataObjectService.updateById(update);
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
manualGetPersonContactId(salesforceParam, connect);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 1);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
log.error("manualImmigration error", e);
throw new RuntimeException(e);
} finally {
if (isFull) {
update.setName(api);
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
return null;
}
/**
* 获取newId,写入oldId
*/
private void manualGetPersonContactId(SalesforceParam param, PartnerConnection partnerConnection) throws Exception {
String api = param.getApi();
QueryWrapper<DataField> dbQw = new QueryWrapper<>();
dbQw.eq("api", api);
Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate();
String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
DescribeSObjectResult dsr = partnerConnection.describeSObject(api);
Field[] dsrFields = dsr.getFields();
//表内数据总量
Integer count = customMapper.countBySQL(api, "where new_id is null and IsPersonAccount = 1 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'");
log.error("总Insert数据 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
if (count == 0) {
return;
}
//批量插入200一次
int page = count%200 == 0 ? count/200 : (count/200) + 1;
for (int i = 0; i < page; i++) {
List<Map<String, Object>> data = customMapper.list("*", api, "new_id is null and IsPersonAccount = 1 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 200");
int size = data.size();
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
SObject[] accounts = new SObject[size];
// 新客户ID旧联系人ID用于更新本地数据
Map<String,String> idMaps = new HashMap<>();
// 旧客户ID新联系人ID用于更新SF数据
Map<String,String> idMapa = new HashMap<>();
for (Map<String, Object> map : data) {
Map<String, Object> idMap = customMapper.getById("new_id", "Account", map.get("AccountId").toString());
if(idMap.get("new_id") != null && StringUtils.isNotEmpty(idMap.get("new_id").toString())){
// 新客户ID旧联系人ID
idMaps.put(idMap.get("new_id").toString(),map.get("Id").toString());
}
}
String idStr = "(";
for (String ids : idMaps.keySet()) {
idStr += "'" + ids + "',"; // 拼接每个ID
}
if (idStr.endsWith(",")) { // 如果最后一个字符是逗号说明循环正常结束
idStr = idStr.substring(0, idStr.length() - 1); // 去掉最后一个多余的逗号
}
idStr += ")"; // 添加右括号
try {
String sql = "SELECT Id,AccountId,Account.old_sfdc_id__c FROM Contact where AccountId in " + idStr ;
QueryResult queryResult = partnerConnection.queryAll(sql);
if (ObjectUtils.isEmpty(queryResult) || ObjectUtils.isEmpty(queryResult.getRecords())) {
break;
}
SObject[] records = queryResult.getRecords();
com.alibaba.fastjson2.JSONArray objects = DataUtil.toJsonArray(records, dsrFields);
for (int z = 0; z < objects.size(); z++) {
JSONObject jsonObject = objects.getJSONObject(z);
String contactId = jsonObject.getString(Const.ID);
String accountId = jsonObject.getString("AccountId");
String oldAccountId = jsonObject.getString("Account_old_sfdc_id__c");
String id = idMaps.get(accountId);
List<Map<String, Object>> maps = Lists.newArrayList();
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "new_id");
paramMap.put("value", contactId);
maps.add(paramMap);
customMapper.updateById(api, maps, id);
idMapa.put(oldAccountId, contactId);
}
TimeUnit.MILLISECONDS.sleep(1);
int index = 0;
for (Map<String, Object> map : data) {
SObject account = new SObject();
account.setType(api);
account.setField("old_owner_id__c", map.get("OwnerId"));
account.setField("old_sfdc_id__c", map.get("Id"));
account.setId(idMapa.get(map.get("AccountId").toString()));
accounts[index] = account;
index++;
}
SaveResult[] saveResults = partnerConnection.update(accounts);
for (SaveResult saveResult : saveResults) {
if (!saveResult.getSuccess()) {
log.info("-------------saveResults: {}", JSON.toJSONString(saveResult));
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s, \n数据实体类:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), JSON.toJSONString(saveResult));
EmailUtil.send("DataDump ERROR", format);
return;
}
}
} catch (Exception e) {
log.error("manualGetPersonContactId error api:{}", api, e);
throw e;
}
}
SalesforceParam countParam = new SalesforceParam();
countParam.setApi(api);
countParam.setBeginCreateDate(beginDate);
countParam.setEndCreateDate(DateUtils.addSeconds(endDate, -1));
// 存在isDeleted 只查询IsDeleted为false的
if (dataFieldService.hasDeleted(countParam.getApi())) {
countParam.setIsDeleted(false);
} else {
// 不存在 过滤
countParam.setIsDeleted(null);
}
// sf count
Integer sfNum = commonService.countSfNum(partnerConnection, countParam);
UpdateWrapper<DataBatchHistory> updateQw = new UpdateWrapper<>();
updateQw.eq("name", api)
.eq("sync_start_date", beginDate)
.eq("sync_end_date", DateUtils.addSeconds(endDate, -1))
.set("target_sf_num", sfNum);
dataBatchHistoryService.update(updateQw);
UpdateWrapper<DataBatch> updateQw2 = new UpdateWrapper<>();
updateQw2.eq("name", api)
.eq("sync_start_date", beginDate)
.eq("sync_end_date", endDate)
.set("sf_add_num", sfNum);
dataBatchService.update(updateQw2);
}
/**
* 增量Update入口
*/
@Override
public ReturnT<String> immigrationIncrementUpdate(SalesforceParam param) throws Exception {
List<Future<?>> futures = Lists.newArrayList();
try {
if (StringUtils.isNotBlank(param.getApi())) {
// 手动任务
ReturnT<String> result = updateIncrementSfData(param, futures);
if (result != null) {
return result;
}
} else {
// 自动更新任务
// autoUpdateSfData(param, futures);
}
return ReturnT.SUCCESS;
} catch (InterruptedException e) {
throw e;
} catch (Throwable throwable) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
log.error("immigrationUpdate error", throwable);
throw throwable;
}
}
/**
* 组装增量Update参数
*/
public ReturnT<String> updateIncrementSfData(SalesforceParam param, List<Future<?>> futures) throws Exception {
List<String> apis;
String beginDateStr = null;
String endDateStr = null;
apis = DataUtil.toIdList(param.getApi());
if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null){
Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate();
beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
}
String join = StringUtils.join(apis, ",");
log.info("immigration apis: {}", join);
XxlJobLogger.log("immigration apis: {}", join);
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils.isEmpty(param.getIds());
if (isFull) {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_lock", 1).in("name", apis);
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isNotEmpty(list)) {
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
return new ReturnT<>(500, "api:" + apiNames + " is locked");
}
}
PartnerConnection partnerConnection = salesforceTargetConnect.createConnect();
for (String api : apis) {
DataObject update = new DataObject();
try {
TimeUnit.MILLISECONDS.sleep(1);
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
if (StringUtils.isNotEmpty(beginDateStr) && StringUtils.isNotEmpty(endDateStr)) {
dbQw.eq("sync_start_date", beginDateStr); // 等于开始时间
dbQw.eq("sync_end_date", endDateStr); // 等于结束时间
}
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
manualIncrementUpdateSfData(salesforceParam, partnerConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 1);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
} catch (Exception e) {
throw e;
} finally {
if (isFull) {
update.setName(api);
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
}
return null;
}
/**
* 执行增量Update数据
*/
private void manualIncrementUpdateSfData(SalesforceParam param, PartnerConnection partnerConnection) throws Exception {
Map<String, Object> infoFlag = customMapper.list("code,value","system_config","code ='"+ SystemConfigCode.INFO_FLAG+"'").get(0);
String api = param.getApi();
QueryWrapper<DataField> dbQw = new QueryWrapper<>();
dbQw.eq("api", api);
List<DataField> list = dataFieldService.list(dbQw);
TimeUnit.MILLISECONDS.sleep(1);
Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate();
String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
//表内数据总量
Integer count = customMapper.countBySQL(api, "where new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'");
log.error("总Update数据 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
if(count == 0){
return;
}
int targetCount = 0;
//批量插入200一次
int page = count%200 == 0 ? count/200 : (count/200) + 1;
for (int i = 0; i < page; i++) {
List<Map<String, Object>> mapList = customMapper.list("*", api, "new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' order by Id asc limit " + i * 200 + ",200");
SObject[] accounts = new SObject[mapList.size()];
int j = 0;
for (Map<String, Object> map : mapList) {
SObject account = new SObject();
account.setType(api);
//给对象赋值
for (DataField dataField : list) {
String field = dataField.getField();
String reference_to = dataField.getReferenceTo();
//根据旧sfid查找引用对象新sfid
if (field.equals("Id")) {
account.setId(String.valueOf(map.get("new_id")));
} else if (!DataUtil.isUpdate(field) || (dataField.getIsCreateable() != null && !dataField.getIsCreateable())) {
continue;
} else if (StringUtils.isNotBlank(reference_to) && !"data_picklist".equals(reference_to)) {
if (!"null".equals(map.get(field)) && null != map.get(field) && !"OwnerId".equals(field)
|| !"Owner_Type".equals(field)) {
//判断reference_to内是否包含User字符串
if (reference_to.contains("User")) {
reference_to = "User";
}
Map<String, Object> m = customMapper.getById("new_id", reference_to, String.valueOf(map.get(field)));
if (m != null && !m.isEmpty()) {
account.setField(field, m.get("new_id"));
}
}
} else {
if (map.get(field) != null && StringUtils.isNotBlank(dataField.getSfType())) {
account.setField(field, DataUtil.localDataToSfData(dataField.getSfType(), String.valueOf(map.get(field))));
}else {
if (api.equals("Account")){
if ("1".equals(map.get("IsPersonAccount")) && field.equals("Name")){
continue;
}else if("0".equals(map.get("IsPersonAccount")) && field.equals("LastName")){
continue;
}
} else {
account.setField(field, map.get(field));
}
}
}
}
account.setField("old_owner_id__c", map.get("OwnerId"));
account.setField("old_sfdc_id__c", map.get("Id"));
accounts[j++] = account;
}
List<Map<String, String>> listMap = new ArrayList<>();
try {
if (infoFlag != null && "1".equals(infoFlag.get("value"))){
listMap = returnAccountsDetails(accounts,list);
}
SaveResult[] saveResults = partnerConnection.update(accounts);
for (SaveResult saveResult : saveResults) {
if (!saveResult.getSuccess()) {
log.info("-------------saveResults: {}", JSON.toJSONString(saveResult));
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s, \n数据实体类:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), JSON.toJSONString(saveResult),JSON.toJSONString(listMap));
EmailUtil.send("DataDump ERROR", format);
return;
}else {
targetCount ++;
}
}
log.info("sf return saveResults------" + JSONArray.toJSONString(saveResults));
} catch (Throwable e) {
throw e;
}
}
UpdateWrapper<DataBatchHistory> updateQw = new UpdateWrapper<>();
updateQw.eq("name", api)
.eq("sync_start_date", beginDate)
.eq("sync_end_date", DateUtils.addSeconds(endDate, -1))
.set("target_update_num", targetCount);
dataBatchHistoryService.update(updateQw);
UpdateWrapper<DataBatch> updateQw2 = new UpdateWrapper<>();
updateQw2.eq("name", api)
.eq("sync_start_date", beginDate)
.eq("sync_end_date", endDate)
.set("sf_update_num", targetCount);
dataBatchService.update(updateQw2);
}
/**
* 打印SF交互数据明细
*/
public List<Map<String,String>> returnAccountsDetails(SObject[] accounts,List<DataField> list) {
ArrayList<Map<String, String>> arrayList = new ArrayList<>();
for (int i = 0; i < accounts.length; i++) {
HashMap<String, String> map = new HashMap<>();
SObject account = accounts[i];
System.out.println("--- Account[" + i + "] ---");
// 获取对象所有字段名
for (DataField dataField : list) {
try {
Object value = account.getField(dataField.getField());
map.put(dataField.getField(),String.valueOf(value));
System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null"));
} catch (Exception e) {
System.out.println(dataField.getField() + ": [权限不足或字段不存在]");
}
}
System.out.println("old_owner_id__c: " + (account.getField("old_owner_id__c") != null ? account.getField("old_owner_id__c").toString() : "null"));
System.out.println("old_sfdc_id__c: " + (account.getField("old_sfdc_id__c") != null ? account.getField("old_sfdc_id__c").toString() : "null"));
map.put("old_owner_id__c",String.valueOf(account.getField("old_owner_id__c")));
map.put("old_sfdc_id__c",String.valueOf(account.getField("old_sfdc_id__c")));
arrayList.add(map);
}
return arrayList;
}
}

View File

@ -3,7 +3,7 @@ package com.celnet.datadump.service.impl;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.celnet.datadump.config.SalesforceExecutor;
@ -21,9 +21,9 @@ import com.celnet.datadump.util.CsvConverterUtil;
import com.celnet.datadump.util.DataUtil;
import com.celnet.datadump.util.EmailUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sforce.async.*;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.soap.partner.SaveResult;
import com.sforce.soap.partner.*;
import com.sforce.soap.partner.sobject.SObject;
import com.sforce.ws.ConnectionException;
import com.xxl.job.core.biz.model.ReturnT;
@ -227,7 +227,7 @@ public class DataImportServiceImpl implements DataImportService {
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
autoCreatedNewId(salesforceParam, partnerConnection);
manualCreatedNewId(salesforceParam, partnerConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
@ -350,7 +350,7 @@ public class DataImportServiceImpl implements DataImportService {
if (api.equals("Event")){
account.setField("EventSubtype", String.valueOf(data.get(j - 1).get("EventSubtype")));
account.setField("IsRecurrence", String.valueOf(data.get(j - 1).get("IsRecurrence")));
// account.setField("IsRecurrence", String.valueOf(data.get(j - 1).get("IsRecurrence")));
}
if (api.equals("Account")){
Map<String, Object> referenceMap = customMapper.list("new_id","RecordType", "new_id is not null and id = '"+ data.get(j - 1).get("RecordTypeId")+"' limit 1").get(0);
@ -479,8 +479,8 @@ public class DataImportServiceImpl implements DataImportService {
|| "Id".equals(dataField.getField())){
continue;
}
//用完放下面 && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()
if (dataField.getIsCreateable()) {
//用完放下面
if (dataField.getIsCreateable() && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()) {
if ("reference".equals(dataField.getSfType())){
String reference = dataField.getReferenceTo();
if ("Group,User".equals(reference)) {