Compare commits
No commits in common. "d920ec792f68d2da67a78be5ef260bf85b6a7e4f" and "f35c4e44ffc1ba5e5607b4cc58513f9aae067e6f" have entirely different histories.
d920ec792f
...
f35c4e44ff
@ -35,10 +35,6 @@ public class SystemConfigCode {
|
|||||||
* 批次类型 周/月/年
|
* 批次类型 周/月/年
|
||||||
*/
|
*/
|
||||||
public static final String BATCH_TYPE = "BATCH_TYPE";
|
public static final String BATCH_TYPE = "BATCH_TYPE";
|
||||||
/**
|
|
||||||
* 增量批次类型 周/月/年
|
|
||||||
*/
|
|
||||||
public static final String INCREMENT_BATCH_TYPE = "INCREMENT_BATCH_TYPE";
|
|
||||||
|
|
||||||
public static final String BATCH_TYPE_WEEK = "WEEK";
|
public static final String BATCH_TYPE_WEEK = "WEEK";
|
||||||
public static final String BATCH_TYPE_MONTH = "MONTH";
|
public static final String BATCH_TYPE_MONTH = "MONTH";
|
||||||
|
@ -235,25 +235,4 @@ public class DataDumpNewJob {
|
|||||||
param.setEndCreateDate(param.getEndDate());
|
param.setEndCreateDate(param.getEndDate());
|
||||||
return dataImportNewService.dumpFileNew(param);
|
return dataImportNewService.dumpFileNew(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 增量任务(新)
|
|
||||||
*
|
|
||||||
* @param paramStr 参数json
|
|
||||||
* @return result
|
|
||||||
*/
|
|
||||||
@XxlJob("dataDumpIncrementNewJob")
|
|
||||||
public ReturnT<String> dataDumpIncrementNewJob(String paramStr) throws Exception {
|
|
||||||
log.info("dataDumpIncrementNewJob execute start ..................");
|
|
||||||
SalesforceParam param = new SalesforceParam();
|
|
||||||
try {
|
|
||||||
if (StringUtils.isNotBlank(paramStr)) {
|
|
||||||
param = JSON.parseObject(paramStr, SalesforceParam.class);
|
|
||||||
}
|
|
||||||
} catch (Throwable throwable) {
|
|
||||||
return new ReturnT<>(500, "参数解析失败!");
|
|
||||||
}
|
|
||||||
param.setType(2);
|
|
||||||
return commonService.incrementNew(param);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -17,8 +17,6 @@ public interface CommonService {
|
|||||||
|
|
||||||
ReturnT<String> increment(SalesforceParam param) throws Exception;
|
ReturnT<String> increment(SalesforceParam param) throws Exception;
|
||||||
|
|
||||||
ReturnT<String> incrementNew(SalesforceParam param) throws Exception;
|
|
||||||
|
|
||||||
ReturnT<String> dump(SalesforceParam param) throws Exception;
|
ReturnT<String> dump(SalesforceParam param) throws Exception;
|
||||||
|
|
||||||
Integer countSfNum(PartnerConnection connect, SalesforceParam param) throws Exception;
|
Integer countSfNum(PartnerConnection connect, SalesforceParam param) throws Exception;
|
||||||
|
@ -3,7 +3,9 @@ package com.celnet.datadump.service.impl;
|
|||||||
import com.alibaba.fastjson2.JSON;
|
import com.alibaba.fastjson2.JSON;
|
||||||
import com.alibaba.fastjson2.JSONArray;
|
import com.alibaba.fastjson2.JSONArray;
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
import com.celnet.datadump.config.SalesforceConnect;
|
import com.celnet.datadump.config.SalesforceConnect;
|
||||||
import com.celnet.datadump.config.SalesforceExecutor;
|
import com.celnet.datadump.config.SalesforceExecutor;
|
||||||
import com.celnet.datadump.config.SalesforceTargetConnect;
|
import com.celnet.datadump.config.SalesforceTargetConnect;
|
||||||
@ -20,6 +22,7 @@ import com.celnet.datadump.util.EmailUtil;
|
|||||||
import com.celnet.datadump.util.SqlUtil;
|
import com.celnet.datadump.util.SqlUtil;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
import com.sforce.async.BulkConnection;
|
||||||
import com.sforce.soap.partner.*;
|
import com.sforce.soap.partner.*;
|
||||||
import com.sforce.soap.partner.sobject.SObject;
|
import com.sforce.soap.partner.sobject.SObject;
|
||||||
import com.xxl.job.core.biz.model.ReturnT;
|
import com.xxl.job.core.biz.model.ReturnT;
|
||||||
@ -193,66 +196,6 @@ public class CommonServiceImpl implements CommonService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ReturnT<String> incrementNew(SalesforceParam param) throws Exception {
|
|
||||||
QueryWrapper<DataObject> qw = new QueryWrapper<>();
|
|
||||||
if (StringUtils.isNotBlank(param.getApi())) {
|
|
||||||
List<String> apis = DataUtil.toIdList(param.getApi());
|
|
||||||
qw.in("name", apis);
|
|
||||||
}
|
|
||||||
qw.eq("need_update", true)
|
|
||||||
.isNotNull("last_update_date");
|
|
||||||
List<DataObject> list = dataObjectService.list(qw);
|
|
||||||
if (CollectionUtils.isEmpty(list)) {
|
|
||||||
return new ReturnT<>(500, ("表" + param.getApi() + "不存在或未开启更新"));
|
|
||||||
}
|
|
||||||
List<Future<?>> futures = Lists.newArrayList();
|
|
||||||
try {
|
|
||||||
DataReport dataReport = new DataReport();
|
|
||||||
dataReport.setType(TypeCode.INCREMENT);
|
|
||||||
dataReport.setApis(list.stream().map(DataObject::getName).collect(Collectors.joining(",")));
|
|
||||||
dataReportService.save(dataReport);
|
|
||||||
for (DataObject dataObject : list) {
|
|
||||||
Future<?> future = salesforceExecutor.execute(() -> {
|
|
||||||
try {
|
|
||||||
Date updateTime = new Date();
|
|
||||||
SalesforceParam salesforceParam = new SalesforceParam();
|
|
||||||
salesforceParam.setApi(dataObject.getName());
|
|
||||||
salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate());
|
|
||||||
salesforceParam.setType(2);
|
|
||||||
// 更新字段值不为空 按更新字段里的字段校验
|
|
||||||
if (StringUtils.isNotBlank(dataObject.getUpdateField())) {
|
|
||||||
salesforceParam.setUpdateField(dataObject.getUpdateField());
|
|
||||||
}
|
|
||||||
dumpDataNew(salesforceParam, dataReport,dataObject);
|
|
||||||
|
|
||||||
dataObject.setLastUpdateDate(updateTime);
|
|
||||||
dataObjectService.updateById(dataObject);
|
|
||||||
|
|
||||||
} catch (Throwable throwable) {
|
|
||||||
log.error("salesforceExecutor error", throwable);
|
|
||||||
throw new RuntimeException(throwable);
|
|
||||||
}
|
|
||||||
}, 0, 0);
|
|
||||||
futures.add(future);
|
|
||||||
}
|
|
||||||
// 等待当前所有线程执行完成
|
|
||||||
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
|
||||||
// 存在附件的开始dump
|
|
||||||
list.stream().filter(t -> StringUtils.isNotBlank(t.getBlobField())).forEach(t -> {
|
|
||||||
try {
|
|
||||||
fileService.dumpFile(t.getName(), t.getBlobField(), true);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return ReturnT.SUCCESS;
|
|
||||||
} catch (Throwable throwable) {
|
|
||||||
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
|
|
||||||
throw throwable;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ReturnT<String> dump(SalesforceParam param) throws Exception {
|
public ReturnT<String> dump(SalesforceParam param) throws Exception {
|
||||||
List<Future<?>> futures = Lists.newArrayList();
|
List<Future<?>> futures = Lists.newArrayList();
|
||||||
@ -554,48 +497,6 @@ public class CommonServiceImpl implements CommonService {
|
|||||||
return connect;
|
return connect;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 数据传输主体(新)
|
|
||||||
*
|
|
||||||
* @param param 参数
|
|
||||||
*/
|
|
||||||
private void dumpDataNew(SalesforceParam param, DataReport dataReport , DataObject dataObject) throws Throwable {
|
|
||||||
String api = param.getApi();
|
|
||||||
PartnerConnection connect ;
|
|
||||||
try {
|
|
||||||
DataBatchHistory dataBatchHistory = new DataBatchHistory();
|
|
||||||
dataBatchHistory.setName(api);
|
|
||||||
dataBatchHistory.setStartDate(new Date());
|
|
||||||
dataBatchHistory.setSyncStartDate(param.getBeginCreateDate());
|
|
||||||
if (param.getEndCreateDate() != null) {
|
|
||||||
dataBatchHistory.setSyncEndDate(DateUtils.addSeconds(param.getEndCreateDate(), -1));
|
|
||||||
}
|
|
||||||
dataBatchHistory.setBatch(param.getBatch());
|
|
||||||
|
|
||||||
connect = salesforceConnect.createConnect();
|
|
||||||
// 存在isDeleted 只查询IsDeleted为false的
|
|
||||||
if (dataFieldService.hasDeleted(param.getApi())) {
|
|
||||||
param.setIsDeleted(false);
|
|
||||||
} else {
|
|
||||||
// 不存在 过滤
|
|
||||||
param.setIsDeleted(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
dataBatchHistory.setSfNum(countSfNum(connect, param));
|
|
||||||
|
|
||||||
getAllSfData(param, connect, dataReport);
|
|
||||||
|
|
||||||
insertDataBatch(param, dataBatchHistory, dataObject);
|
|
||||||
|
|
||||||
} catch (Throwable throwable) {
|
|
||||||
log.error("dataDumpJob error api:{}", api, throwable);
|
|
||||||
String type = param.getType() == 1 ? "存量" : "增量";
|
|
||||||
String format = String.format("%s数据迁移 error, api name: %s, \nparam: %s, \ncause:\n%s", type, api, JSON.toJSONString(param, DataDumpParam.getFilter()), throwable);
|
|
||||||
EmailUtil.send("DataDump ERROR", format);
|
|
||||||
throw throwable;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 任务拆分
|
* 任务拆分
|
||||||
*
|
*
|
||||||
@ -682,7 +583,17 @@ public class CommonServiceImpl implements CommonService {
|
|||||||
}
|
}
|
||||||
fields.add(field.getName());
|
fields.add(field.getName());
|
||||||
}
|
}
|
||||||
|
if ("Attachment".equals(api) || "FeedItem".equals(api)
|
||||||
|
|| "FeedComment".equals(api) || "Note".equals(api)) {
|
||||||
|
fields.add("Parent.type");
|
||||||
|
}
|
||||||
|
if ("Task".equals(api) || "Event".equals(api)) {
|
||||||
|
fields.add("Who.type");
|
||||||
|
fields.add("What.type");
|
||||||
|
}
|
||||||
|
if ("TaskRelation".equals(api) || "EventRelation".equals(api)) {
|
||||||
|
fields.add("Relation.type");
|
||||||
|
}
|
||||||
DataObject dataObject = dataObjectService.getById(api);
|
DataObject dataObject = dataObjectService.getById(api);
|
||||||
if (dataObject != null && StringUtils.isNotBlank(dataObject.getExtraField())) {
|
if (dataObject != null && StringUtils.isNotBlank(dataObject.getExtraField())) {
|
||||||
fields.addAll(Arrays.asList(StringUtils.split(dataObject.getExtraField().replaceAll(StringUtils.SPACE, StringUtils.EMPTY), ",")));
|
fields.addAll(Arrays.asList(StringUtils.split(dataObject.getExtraField().replaceAll(StringUtils.SPACE, StringUtils.EMPTY), ",")));
|
||||||
@ -793,37 +704,6 @@ public class CommonServiceImpl implements CommonService {
|
|||||||
log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num);
|
log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 执行增量任务,新增一个批次
|
|
||||||
* @param param 参数
|
|
||||||
* @param dataBatchHistory 执行记录
|
|
||||||
*/
|
|
||||||
private void insertDataBatch(SalesforceParam param, DataBatchHistory dataBatchHistory, DataObject dataObject) {
|
|
||||||
|
|
||||||
dataBatchHistory.setEndDate(new Date());
|
|
||||||
Integer num = customMapper.count(param);
|
|
||||||
dataBatchHistory.setDbNum(num);
|
|
||||||
if (dataBatchHistory.getSfNum() != null) {
|
|
||||||
dataBatchHistory.setSyncStatus(dataBatchHistory.getSfNum().equals(dataBatchHistory.getDbNum()) ? 1 : 0);
|
|
||||||
}
|
|
||||||
dataBatchHistory.setCost(DataUtil.calTime(dataBatchHistory.getEndDate(), dataBatchHistory.getStartDate()));
|
|
||||||
DataBatch dataBatch = new DataBatch();
|
|
||||||
dataBatch.setName(dataObject.getName());
|
|
||||||
dataBatch.setLabel(dataObject.getLabel());
|
|
||||||
dataBatch.setFirstSfNum(dataBatchHistory.getSfNum());
|
|
||||||
dataBatch.setFirstDbNum(dataBatchHistory.getDbNum());
|
|
||||||
dataBatch.setFirstSyncDate(dataBatchHistory.getStartDate());
|
|
||||||
dataBatch.setSyncStatus(dataBatchHistory.getSyncStatus());
|
|
||||||
dataBatch.setSyncStartDate(dataObject.getLastUpdateDate());
|
|
||||||
dataBatch.setSyncEndDate(new Date());
|
|
||||||
dataBatchService.save(dataBatch);
|
|
||||||
|
|
||||||
log.info("count db num: {}", num);
|
|
||||||
XxlJobLogger.log("count db num: {}", num);
|
|
||||||
dataBatchHistoryService.save(dataBatchHistory);
|
|
||||||
log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取需同步sf数据数量
|
* 获取需同步sf数据数量
|
||||||
*
|
*
|
||||||
|
Loading…
Reference in New Issue
Block a user