【feat】 增量同步任务新增
This commit is contained in:
parent
1318cfca60
commit
2b49cd4e00
@ -273,4 +273,30 @@ public class JobController {
|
|||||||
param.setEndCreateDate(param.getEndDate());
|
param.setEndCreateDate(param.getEndDate());
|
||||||
return dataImportNewService.getPersonContact(param);
|
return dataImportNewService.getPersonContact(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 数据更新同步(新)
|
||||||
|
* @param paramStr
|
||||||
|
* @author kris
|
||||||
|
* @return
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@PostMapping("/dataUpdateNewJob")
|
||||||
|
@ApiOperation("数据更新同步(新)")
|
||||||
|
@LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_UPDATE, remark = "数据更新同步(新)")
|
||||||
|
public ReturnT<String> dataUpdateNewJob(String paramStr) throws Exception {
|
||||||
|
log.info("getPersonContactJob execute start ..................");
|
||||||
|
SalesforceParam param = new SalesforceParam();
|
||||||
|
try {
|
||||||
|
if (StringUtils.isNotBlank(paramStr)) {
|
||||||
|
param = JSON.parseObject(paramStr, SalesforceParam.class);
|
||||||
|
}
|
||||||
|
} catch (Throwable throwable) {
|
||||||
|
return new ReturnT<>(500, "参数解析失败!");
|
||||||
|
}
|
||||||
|
// 参数转换
|
||||||
|
param.setBeginCreateDate(param.getBeginDate());
|
||||||
|
param.setEndCreateDate(param.getEndDate());
|
||||||
|
return dataImportNewService.immigrationUpdateNew(param);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -307,7 +307,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update入口
|
* 数据更新Update入口
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ReturnT<String> immigrationUpdateNew(SalesforceParam param) throws Exception {
|
public ReturnT<String> immigrationUpdateNew(SalesforceParam param) throws Exception {
|
||||||
@ -340,10 +340,10 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
|||||||
* 组装【单表】【存量】Update参数
|
* 组装【单表】【存量】Update参数
|
||||||
*/
|
*/
|
||||||
public ReturnT<String> updateSfDataNew(SalesforceParam param, List<Future<?>> futures) throws Exception {
|
public ReturnT<String> updateSfDataNew(SalesforceParam param, List<Future<?>> futures) throws Exception {
|
||||||
List<String> apis;
|
List<String> apis = DataUtil.toIdList(param.getApi());
|
||||||
|
|
||||||
String beginDateStr = null;
|
String beginDateStr = null;
|
||||||
String endDateStr = null;
|
String endDateStr = null;
|
||||||
apis = DataUtil.toIdList(param.getApi());
|
|
||||||
if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null){
|
if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null){
|
||||||
Date beginDate = param.getBeginCreateDate();
|
Date beginDate = param.getBeginCreateDate();
|
||||||
Date endDate = param.getEndCreateDate();
|
Date endDate = param.getEndCreateDate();
|
||||||
@ -359,7 +359,10 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
|||||||
List<DataObject> list = dataObjectService.list(qw);
|
List<DataObject> list = dataObjectService.list(qw);
|
||||||
if (CollectionUtils.isNotEmpty(list)) {
|
if (CollectionUtils.isNotEmpty(list)) {
|
||||||
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
|
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
|
||||||
return new ReturnT<>(500, "api:" + apiNames + " is locked");
|
String message = "api:" + apiNames + " is locked";
|
||||||
|
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
|
||||||
|
EmailUtil.send("DataDump ERROR", format);
|
||||||
|
return new ReturnT<>(500, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -401,7 +404,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
|||||||
}
|
}
|
||||||
// 等待当前所有线程执行完成
|
// 等待当前所有线程执行完成
|
||||||
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
||||||
update.setDataWork(0);
|
update.setNeedUpdate(false);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
@ -444,7 +447,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
|||||||
List<SalesforceParam> salesforceParams = null;
|
List<SalesforceParam> salesforceParams = null;
|
||||||
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
|
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
|
||||||
dbQw.eq("name", api);
|
dbQw.eq("name", api);
|
||||||
dbQw.ge("sync_start_date",dataObject.getLastUpdateDate());
|
dbQw.gt("sync_end_date",dataObject.getLastUpdateDate());
|
||||||
List<DataBatch> dataBatches = dataBatchService.list(dbQw);
|
List<DataBatch> dataBatches = dataBatchService.list(dbQw);
|
||||||
AtomicInteger batch = new AtomicInteger(1);
|
AtomicInteger batch = new AtomicInteger(1);
|
||||||
if (CollectionUtils.isNotEmpty(dataBatches)) {
|
if (CollectionUtils.isNotEmpty(dataBatches)) {
|
||||||
@ -472,11 +475,12 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
|||||||
}
|
}
|
||||||
// 等待当前所有线程执行完成
|
// 等待当前所有线程执行完成
|
||||||
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
||||||
update.setDataWork(0);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
if (isFull) {
|
if (isFull) {
|
||||||
|
update.setNeedUpdate(false);
|
||||||
update.setName(api);
|
update.setName(api);
|
||||||
update.setDataLock(0);
|
update.setDataLock(0);
|
||||||
dataObjectService.updateById(update);
|
dataObjectService.updateById(update);
|
||||||
@ -491,12 +495,37 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
|||||||
*/
|
*/
|
||||||
private void autoUpdateSfDataNew(SalesforceParam param, List<Future<?>> futures) throws Exception {
|
private void autoUpdateSfDataNew(SalesforceParam param, List<Future<?>> futures) throws Exception {
|
||||||
|
|
||||||
|
// 全量的时候 检测是否有自动任务锁住的表
|
||||||
|
boolean isFull = CollectionUtils.isEmpty(param.getIds());
|
||||||
|
if (isFull) {
|
||||||
|
QueryWrapper<DataObject> qw = new QueryWrapper<>();
|
||||||
|
qw.eq("data_lock", 1);
|
||||||
|
List<DataObject> list = dataObjectService.list(qw);
|
||||||
|
if (CollectionUtils.isNotEmpty(list)) {
|
||||||
|
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
|
||||||
|
String message = "api:" + apiNames + " is locked";
|
||||||
|
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
|
||||||
|
EmailUtil.send("DataDump ERROR", format);
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String beginDateStr = null;
|
||||||
|
String endDateStr = null;
|
||||||
|
if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null){
|
||||||
|
Date beginDate = param.getBeginCreateDate();
|
||||||
|
Date endDate = param.getEndCreateDate();
|
||||||
|
beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
|
||||||
|
endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
|
||||||
|
}
|
||||||
|
|
||||||
QueryWrapper<DataObject> qw = new QueryWrapper<>();
|
QueryWrapper<DataObject> qw = new QueryWrapper<>();
|
||||||
qw.eq("need_update", 1)
|
qw.eq("need_update", 1)
|
||||||
.orderByAsc("data_index")
|
.orderByAsc("data_index")
|
||||||
.last(" limit 10");
|
.last(" limit 10");
|
||||||
|
|
||||||
PartnerConnection partnerConnection = salesforceTargetConnect.createConnect();
|
PartnerConnection partnerConnection = salesforceTargetConnect.createConnect();
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
List<DataObject> dataObjects = dataObjectService.list(qw);
|
List<DataObject> dataObjects = dataObjectService.list(qw);
|
||||||
//判断dataObjects是否为空
|
//判断dataObjects是否为空
|
||||||
@ -514,99 +543,19 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
|||||||
TimeUnit.MILLISECONDS.sleep(1);
|
TimeUnit.MILLISECONDS.sleep(1);
|
||||||
try {
|
try {
|
||||||
String api = dataObject.getName();
|
String api = dataObject.getName();
|
||||||
boolean needUpdate = dataObject.getNeedUpdate();
|
|
||||||
List<SalesforceParam> salesforceParams = null;
|
List<SalesforceParam> salesforceParams = null;
|
||||||
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
|
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
|
||||||
dbQw.eq("name", api);
|
dbQw.eq("name", api);
|
||||||
|
if (StringUtils.isNotEmpty(beginDateStr) && StringUtils.isNotEmpty(endDateStr)) {
|
||||||
|
dbQw.eq("sync_start_date", beginDateStr); // 等于开始时间
|
||||||
|
dbQw.eq("sync_end_date", endDateStr); // 等于结束时间
|
||||||
|
}
|
||||||
List<DataBatch> list = dataBatchService.list(dbQw);
|
List<DataBatch> list = dataBatchService.list(dbQw);
|
||||||
AtomicInteger batch = new AtomicInteger(1);
|
AtomicInteger batch = new AtomicInteger(1);
|
||||||
if (CollectionUtils.isNotEmpty(list)) {
|
if (CollectionUtils.isNotEmpty(list)) {
|
||||||
salesforceParams = list.stream().map(t -> {
|
salesforceParams = list.stream().map(t -> {
|
||||||
SalesforceParam salesforceParam = param.clone();
|
SalesforceParam salesforceParam = param.clone();
|
||||||
salesforceParam.setApi(t.getName());
|
salesforceParam.setApi(t.getName());
|
||||||
if (needUpdate) {
|
|
||||||
salesforceParam.setType(2);
|
|
||||||
salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate());
|
|
||||||
}
|
|
||||||
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
|
|
||||||
salesforceParam.setEndCreateDate(t.getSyncEndDate());
|
|
||||||
salesforceParam.setBatch(batch.getAndIncrement());
|
|
||||||
return salesforceParam;
|
|
||||||
}).collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
// 手动任务优先执行
|
|
||||||
for (SalesforceParam salesforceParam : salesforceParams) {
|
|
||||||
Future<?> future = salesforceExecutor.execute(() -> {
|
|
||||||
try {
|
|
||||||
// autoUpdateSfData(salesforceParam, partnerConnection);
|
|
||||||
} catch (Throwable throwable) {
|
|
||||||
log.error("salesforceExecutor error", throwable);
|
|
||||||
throw new RuntimeException(throwable);
|
|
||||||
}
|
|
||||||
}, salesforceParam.getBatch(), 0);
|
|
||||||
futures.add(future);
|
|
||||||
}
|
|
||||||
// 等待当前所有线程执行完成
|
|
||||||
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
|
||||||
update.setDataWork(0);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
if (dataObject != null) {
|
|
||||||
update.setDataLock(0);
|
|
||||||
dataObjectService.updateById(update);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// 等待当前所有线程执行完成
|
|
||||||
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
|
||||||
futures.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 组装【多表】【增量】Update参数
|
|
||||||
*/
|
|
||||||
private void autoUpdateIncrementalSfDataNew(SalesforceParam param, List<Future<?>> futures) throws Exception {
|
|
||||||
QueryWrapper<DataObject> qw = new QueryWrapper<>();
|
|
||||||
qw.eq("data_work", 1)
|
|
||||||
.eq("data_lock", 0)
|
|
||||||
.orderByAsc("data_index")
|
|
||||||
.last(" limit 10");
|
|
||||||
|
|
||||||
PartnerConnection partnerConnection = salesforceTargetConnect.createConnect();
|
|
||||||
while (true) {
|
|
||||||
List<DataObject> dataObjects = dataObjectService.list(qw);
|
|
||||||
//判断dataObjects是否为空
|
|
||||||
if (CollectionUtils.isEmpty(dataObjects)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (DataObject dataObject : dataObjects) {
|
|
||||||
TimeUnit.MILLISECONDS.sleep(1);
|
|
||||||
|
|
||||||
DataObject update = new DataObject();
|
|
||||||
update.setName(dataObject.getName());
|
|
||||||
update.setDataLock(1);
|
|
||||||
dataObjectService.updateById(update);
|
|
||||||
TimeUnit.MILLISECONDS.sleep(1);
|
|
||||||
try {
|
|
||||||
String api = dataObject.getName();
|
|
||||||
boolean needUpdate = dataObject.getNeedUpdate();
|
|
||||||
List<SalesforceParam> salesforceParams = null;
|
|
||||||
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
|
|
||||||
dbQw.eq("name", api);
|
|
||||||
List<DataBatch> list = dataBatchService.list(dbQw);
|
|
||||||
AtomicInteger batch = new AtomicInteger(1);
|
|
||||||
if (CollectionUtils.isNotEmpty(list)) {
|
|
||||||
salesforceParams = list.stream().map(t -> {
|
|
||||||
SalesforceParam salesforceParam = param.clone();
|
|
||||||
salesforceParam.setApi(t.getName());
|
|
||||||
if (needUpdate) {
|
|
||||||
salesforceParam.setType(2);
|
|
||||||
salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate());
|
|
||||||
}
|
|
||||||
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
|
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
|
||||||
salesforceParam.setEndCreateDate(t.getSyncEndDate());
|
salesforceParam.setEndCreateDate(t.getSyncEndDate());
|
||||||
salesforceParam.setBatch(batch.getAndIncrement());
|
salesforceParam.setBatch(batch.getAndIncrement());
|
||||||
@ -628,10 +577,97 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
|||||||
}
|
}
|
||||||
// 等待当前所有线程执行完成
|
// 等待当前所有线程执行完成
|
||||||
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
||||||
update.setDataWork(0);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
|
update.setNeedUpdate(false);
|
||||||
|
update.setDataLock(0);
|
||||||
|
dataObjectService.updateById(update);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 等待当前所有线程执行完成
|
||||||
|
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
||||||
|
futures.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 组装【多表】【增量】Update参数
|
||||||
|
*/
|
||||||
|
private void autoUpdateIncrementalSfDataNew(SalesforceParam param, List<Future<?>> futures) throws Exception {
|
||||||
|
|
||||||
|
// 全量的时候 检测是否有自动任务锁住的表
|
||||||
|
boolean isFull = CollectionUtils.isEmpty(param.getIds());
|
||||||
|
if (isFull) {
|
||||||
|
QueryWrapper<DataObject> qw = new QueryWrapper<>();
|
||||||
|
qw.eq("data_lock", 1);
|
||||||
|
List<DataObject> list = dataObjectService.list(qw);
|
||||||
|
if (CollectionUtils.isNotEmpty(list)) {
|
||||||
|
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
|
||||||
|
String message = "api:" + apiNames + " is locked";
|
||||||
|
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
|
||||||
|
EmailUtil.send("DataDump ERROR", format);
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
QueryWrapper<DataObject> qw = new QueryWrapper<>();
|
||||||
|
qw.eq("need_update", 1)
|
||||||
|
.orderByAsc("data_index")
|
||||||
|
.last(" limit 10");
|
||||||
|
|
||||||
|
PartnerConnection partnerConnection = salesforceTargetConnect.createConnect();
|
||||||
|
while (true) {
|
||||||
|
List<DataObject> dataObjects = dataObjectService.list(qw);
|
||||||
|
//判断dataObjects是否为空
|
||||||
|
if (CollectionUtils.isEmpty(dataObjects)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (DataObject dataObject : dataObjects) {
|
||||||
|
TimeUnit.MILLISECONDS.sleep(1);
|
||||||
|
DataObject update = new DataObject();
|
||||||
|
update.setName(dataObject.getName());
|
||||||
|
update.setDataLock(1);
|
||||||
|
dataObjectService.updateById(update);
|
||||||
|
TimeUnit.MILLISECONDS.sleep(1);
|
||||||
|
try {
|
||||||
|
String api = dataObject.getName();
|
||||||
|
List<SalesforceParam> salesforceParams = null;
|
||||||
|
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
|
||||||
|
dbQw.eq("name", api);
|
||||||
|
dbQw.gt("sync_end_date",dataObject.getLastUpdateDate());
|
||||||
|
List<DataBatch> list = dataBatchService.list(dbQw);
|
||||||
|
AtomicInteger batch = new AtomicInteger(1);
|
||||||
|
if (CollectionUtils.isNotEmpty(list)) {
|
||||||
|
salesforceParams = list.stream().map(t -> {
|
||||||
|
SalesforceParam salesforceParam = param.clone();
|
||||||
|
salesforceParam.setApi(t.getName());
|
||||||
|
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
|
||||||
|
salesforceParam.setEndCreateDate(t.getSyncEndDate());
|
||||||
|
salesforceParam.setBatch(batch.getAndIncrement());
|
||||||
|
return salesforceParam;
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 手动任务优先执行
|
||||||
|
for (SalesforceParam salesforceParam : salesforceParams) {
|
||||||
|
Future<?> future = salesforceExecutor.execute(() -> {
|
||||||
|
try {
|
||||||
|
UpdateSfDataNew(salesforceParam, partnerConnection);
|
||||||
|
} catch (Throwable throwable) {
|
||||||
|
log.error("salesforceExecutor error", throwable);
|
||||||
|
throw new RuntimeException(throwable);
|
||||||
|
}
|
||||||
|
}, salesforceParam.getBatch(), 0);
|
||||||
|
futures.add(future);
|
||||||
|
}
|
||||||
|
// 等待当前所有线程执行完成
|
||||||
|
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
update.setNeedUpdate(false);
|
||||||
update.setDataLock(0);
|
update.setDataLock(0);
|
||||||
dataObjectService.updateById(update);
|
dataObjectService.updateById(update);
|
||||||
}
|
}
|
||||||
@ -812,6 +848,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
|||||||
map.put(dataField.getField(),String.valueOf(value));
|
map.put(dataField.getField(),String.valueOf(value));
|
||||||
System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null"));
|
System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null"));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
System.out.println(dataField.getField() + ": [权限不足或字段不存在]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
map.put("old_owner_id__c",String.valueOf(account.getField("old_owner_id__c")));
|
map.put("old_owner_id__c",String.valueOf(account.getField("old_owner_id__c")));
|
||||||
|
@ -427,171 +427,6 @@ public class DataImportServiceImpl implements DataImportService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private void autoCreatedNewId(SalesforceParam param, PartnerConnection partnerConnection) throws Exception {
|
|
||||||
String api = param.getApi();
|
|
||||||
QueryWrapper<DataField> dbQw = new QueryWrapper<>();
|
|
||||||
dbQw.eq("api", api);
|
|
||||||
List<DataField> list = dataFieldService.list(dbQw);
|
|
||||||
TimeUnit.MILLISECONDS.sleep(1);
|
|
||||||
|
|
||||||
param.setApi(api);
|
|
||||||
Date beginDate = param.getBeginCreateDate();
|
|
||||||
Date endDate = param.getEndCreateDate();
|
|
||||||
String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
|
|
||||||
String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
|
|
||||||
Integer count = 0;
|
|
||||||
//表内数据总量
|
|
||||||
if (api.contains("Share")){
|
|
||||||
count = customMapper.countBySQL(api, "where RowCause = 'Manual' and new_id is null and LastModifiedDate >= '" + beginDateStr + "' and LastModifiedDate < '" + endDateStr + "'");
|
|
||||||
}else {
|
|
||||||
count = customMapper.countBySQL(api, "where new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (count == 0) {
|
|
||||||
log.error("无数据同步 api:{}", api);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//批量插入200一次
|
|
||||||
int page = count % 200 == 0 ? count / 200 : (count / 200) + 1;
|
|
||||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
|
|
||||||
DataObject update = new DataObject();
|
|
||||||
update.setName(api);
|
|
||||||
for (int i = 0; i < page; i++) {
|
|
||||||
List<Map<String, Object>> data = null;
|
|
||||||
if (api.contains("Share")){
|
|
||||||
data = customMapper.list("*", api, "RowCause = 'Manual' and new_id is null and LastModifiedDate >= '" + beginDateStr + "' and LastModifiedDate < '" + endDateStr + "' limit 200");
|
|
||||||
}else {
|
|
||||||
data = customMapper.list("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 200");
|
|
||||||
}
|
|
||||||
|
|
||||||
int size = data.size();
|
|
||||||
SObject[] accounts = new SObject[size];
|
|
||||||
String[] ids = new String[size];
|
|
||||||
for (int j = 0; j < size; j++) {
|
|
||||||
SObject account = new SObject();
|
|
||||||
account.setType(api);
|
|
||||||
//找出sf对象必填字段,并且给默认值
|
|
||||||
for (DataField dataField : list) {
|
|
||||||
if ("OwnerId".equals(dataField.getField()) || "Owner_Type".equals(dataField.getField())
|
|
||||||
|| "Id".equals(dataField.getField())){
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
//用完放下面
|
|
||||||
if (dataField.getIsCreateable() && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()) {
|
|
||||||
if ("reference".equals(dataField.getSfType())){
|
|
||||||
String reference = dataField.getReferenceTo();
|
|
||||||
if ("Group,User".equals(reference)) {
|
|
||||||
reference = "User";
|
|
||||||
}
|
|
||||||
// List<Map<String, Object>> referenceMap = customMapper.list("new_id", reference, "new_id is not null limit 1");
|
|
||||||
//share表处理
|
|
||||||
List<Map<String, Object>> referenceMap = customMapper.list("new_id", reference, "Id = '"+data.get(j).get(dataField.getField()).toString()+"' and new_id is not null limit 1");
|
|
||||||
|
|
||||||
if (referenceMap.size() == 0){
|
|
||||||
QueryWrapper<DataObject> maxIndex = new QueryWrapper<>();
|
|
||||||
maxIndex.select("IFNULL(max(data_index),0) as data_index");
|
|
||||||
maxIndex.ne("name", api);
|
|
||||||
Map<String, Object> map = dataObjectService.getMap(maxIndex);
|
|
||||||
|
|
||||||
//如果必填lookup字段没有值,跳过
|
|
||||||
update.setDataIndex(Integer.parseInt(map.get("data_index").toString()+1));
|
|
||||||
dataObjectService.updateById(update);
|
|
||||||
return;
|
|
||||||
}else{
|
|
||||||
account.setField(dataField.getField(), referenceMap.get(0).get("new_id"));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// if ("picklist".equals(dataField.getSfType())){
|
|
||||||
// List<Map<String, Object>> pickList = customMapper.list("value", "data_picklist", "api = '"+api+"' and field = '"+dataField.getField()+"' limit 1");
|
|
||||||
// account.setField(dataField.getField(), pickList.get(0).get("value"));
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
// account.setField(dataField.getField(), DataUtil.fieldTypeToSf(dataField));
|
|
||||||
if ("picklist".equals(dataField.getSfType())){
|
|
||||||
account.setField(dataField.getField(), data.get(j).get(dataField.getField()));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
accounts[j] = account;
|
|
||||||
//object类型转Date类型
|
|
||||||
// Date date;
|
|
||||||
// //date转Calendar类型
|
|
||||||
// Calendar calendar = Calendar.getInstance();
|
|
||||||
// try {
|
|
||||||
// date = sdf.parse(String.valueOf(data.get(j - 1).get("CreatedDate")));
|
|
||||||
// }catch (ParseException e){
|
|
||||||
// //解决当时间秒为0时,转换秒精度丢失问题
|
|
||||||
// date = sdf.parse(data.get(j - 1).get("CreatedDate")+":00");
|
|
||||||
// }
|
|
||||||
// calendar.setTime(date);
|
|
||||||
// account.setField("CreatedDate", calendar);
|
|
||||||
// Map<String, Object> CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString());
|
|
||||||
// account.setField("CreatedById", CreatedByIdMap.get("new_id"));
|
|
||||||
|
|
||||||
ids[j] = data.get(j).get("Id").toString();
|
|
||||||
accounts[j] = account;
|
|
||||||
if (i*200+j == count){
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
SaveResult[] saveResults = partnerConnection.create(accounts);
|
|
||||||
int index = 0;
|
|
||||||
for (SaveResult saveResult : saveResults) {
|
|
||||||
if (saveResult.getSuccess()) {
|
|
||||||
List<Map<String, Object>> maps = new ArrayList<>();
|
|
||||||
Map<String, Object> m = new HashMap<>();
|
|
||||||
m.put("key", "new_id");
|
|
||||||
m.put("value", saveResult.getId());
|
|
||||||
maps.add(m);
|
|
||||||
customMapper.updateById(api, maps, ids[index]);
|
|
||||||
index++;
|
|
||||||
} else {
|
|
||||||
log.error("-------------saveResults: {}", JSON.toJSONString(saveResult));
|
|
||||||
ReturnT.FAIL.setCode(500);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
TimeUnit.MILLISECONDS.sleep(1);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("autoCreatedNewId error api:{}", api, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
SalesforceParam countParam = new SalesforceParam();
|
|
||||||
countParam.setApi(api);
|
|
||||||
countParam.setBeginCreateDate(beginDate);
|
|
||||||
countParam.setEndCreateDate(DateUtils.addSeconds(endDate, -1));
|
|
||||||
// 存在isDeleted 只查询IsDeleted为false的
|
|
||||||
if (dataFieldService.hasDeleted(countParam.getApi())) {
|
|
||||||
countParam.setIsDeleted(false);
|
|
||||||
} else {
|
|
||||||
// 不存在 过滤
|
|
||||||
countParam.setIsDeleted(null);
|
|
||||||
}
|
|
||||||
// sf count
|
|
||||||
Integer sfNum = commonService.countSfNum(partnerConnection, countParam);
|
|
||||||
|
|
||||||
UpdateWrapper<DataBatchHistory> updateQw = new UpdateWrapper<>();
|
|
||||||
updateQw.eq("name", api)
|
|
||||||
.eq("sync_start_date", beginDate)
|
|
||||||
.eq("sync_end_date", DateUtils.addSeconds(endDate, -1))
|
|
||||||
.set("target_sf_num", sfNum);
|
|
||||||
dataBatchHistoryService.update(updateQw);
|
|
||||||
|
|
||||||
UpdateWrapper<DataBatch> updateQw2 = new UpdateWrapper<>();
|
|
||||||
updateQw2.eq("name", api)
|
|
||||||
.eq("sync_start_date", beginDate)
|
|
||||||
.eq("sync_end_date", endDate)
|
|
||||||
.set("sf_add_num", sfNum);
|
|
||||||
dataBatchService.update(updateQw2);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ReturnT<String> immigrationUpdate(SalesforceParam param) throws Exception {
|
public ReturnT<String> immigrationUpdate(SalesforceParam param) throws Exception {
|
||||||
List<Future<?>> futures = Lists.newArrayList();
|
List<Future<?>> futures = Lists.newArrayList();
|
||||||
|
Loading…
Reference in New Issue
Block a user