【feat】增量任务优化,每次生成一条batch

This commit is contained in:
Kris 2025-07-16 11:24:43 +08:00
parent ee157b323e
commit d920ec792f
4 changed files with 160 additions and 3 deletions

View File

@ -35,6 +35,10 @@ public class SystemConfigCode {
* 批次类型 //
*/
public static final String BATCH_TYPE = "BATCH_TYPE";
/**
* 增量批次类型 //
*/
public static final String INCREMENT_BATCH_TYPE = "INCREMENT_BATCH_TYPE";
public static final String BATCH_TYPE_WEEK = "WEEK";
public static final String BATCH_TYPE_MONTH = "MONTH";

View File

@ -235,4 +235,25 @@ public class DataDumpNewJob {
param.setEndCreateDate(param.getEndDate());
return dataImportNewService.dumpFileNew(param);
}
/**
* 增量任务
*
* @param paramStr 参数json
* @return result
*/
@XxlJob("dataDumpIncrementNewJob")
public ReturnT<String> dataDumpIncrementNewJob(String paramStr) throws Exception {
log.info("dataDumpIncrementNewJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
param.setType(2);
return commonService.incrementNew(param);
}
}

View File

@ -17,6 +17,8 @@ public interface CommonService {
ReturnT<String> increment(SalesforceParam param) throws Exception;
ReturnT<String> incrementNew(SalesforceParam param) throws Exception;
ReturnT<String> dump(SalesforceParam param) throws Exception;
Integer countSfNum(PartnerConnection connect, SalesforceParam param) throws Exception;

View File

@ -3,9 +3,7 @@ package com.celnet.datadump.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.celnet.datadump.config.SalesforceConnect;
import com.celnet.datadump.config.SalesforceExecutor;
import com.celnet.datadump.config.SalesforceTargetConnect;
@ -22,7 +20,6 @@ import com.celnet.datadump.util.EmailUtil;
import com.celnet.datadump.util.SqlUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sforce.async.BulkConnection;
import com.sforce.soap.partner.*;
import com.sforce.soap.partner.sobject.SObject;
import com.xxl.job.core.biz.model.ReturnT;
@ -196,6 +193,66 @@ public class CommonServiceImpl implements CommonService {
}
}
@Override
public ReturnT<String> incrementNew(SalesforceParam param) throws Exception {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
if (StringUtils.isNotBlank(param.getApi())) {
List<String> apis = DataUtil.toIdList(param.getApi());
qw.in("name", apis);
}
qw.eq("need_update", true)
.isNotNull("last_update_date");
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isEmpty(list)) {
return new ReturnT<>(500, ("" + param.getApi() + "不存在或未开启更新"));
}
List<Future<?>> futures = Lists.newArrayList();
try {
DataReport dataReport = new DataReport();
dataReport.setType(TypeCode.INCREMENT);
dataReport.setApis(list.stream().map(DataObject::getName).collect(Collectors.joining(",")));
dataReportService.save(dataReport);
for (DataObject dataObject : list) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
Date updateTime = new Date();
SalesforceParam salesforceParam = new SalesforceParam();
salesforceParam.setApi(dataObject.getName());
salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate());
salesforceParam.setType(2);
// 更新字段值不为空 按更新字段里的字段校验
if (StringUtils.isNotBlank(dataObject.getUpdateField())) {
salesforceParam.setUpdateField(dataObject.getUpdateField());
}
dumpDataNew(salesforceParam, dataReport,dataObject);
dataObject.setLastUpdateDate(updateTime);
dataObjectService.updateById(dataObject);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, 0, 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
// 存在附件的开始dump
list.stream().filter(t -> StringUtils.isNotBlank(t.getBlobField())).forEach(t -> {
try {
fileService.dumpFile(t.getName(), t.getBlobField(), true);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return ReturnT.SUCCESS;
} catch (Throwable throwable) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
throw throwable;
}
}
@Override
public ReturnT<String> dump(SalesforceParam param) throws Exception {
List<Future<?>> futures = Lists.newArrayList();
@ -497,6 +554,48 @@ public class CommonServiceImpl implements CommonService {
return connect;
}
/**
* 数据传输主体
*
* @param param 参数
*/
private void dumpDataNew(SalesforceParam param, DataReport dataReport , DataObject dataObject) throws Throwable {
String api = param.getApi();
PartnerConnection connect ;
try {
DataBatchHistory dataBatchHistory = new DataBatchHistory();
dataBatchHistory.setName(api);
dataBatchHistory.setStartDate(new Date());
dataBatchHistory.setSyncStartDate(param.getBeginCreateDate());
if (param.getEndCreateDate() != null) {
dataBatchHistory.setSyncEndDate(DateUtils.addSeconds(param.getEndCreateDate(), -1));
}
dataBatchHistory.setBatch(param.getBatch());
connect = salesforceConnect.createConnect();
// 存在isDeleted 只查询IsDeleted为false的
if (dataFieldService.hasDeleted(param.getApi())) {
param.setIsDeleted(false);
} else {
// 不存在 过滤
param.setIsDeleted(null);
}
dataBatchHistory.setSfNum(countSfNum(connect, param));
getAllSfData(param, connect, dataReport);
insertDataBatch(param, dataBatchHistory, dataObject);
} catch (Throwable throwable) {
log.error("dataDumpJob error api:{}", api, throwable);
String type = param.getType() == 1 ? "存量" : "增量";
String format = String.format("%s数据迁移 error, api name: %s, \nparam: %s, \ncause:\n%s", type, api, JSON.toJSONString(param, DataDumpParam.getFilter()), throwable);
EmailUtil.send("DataDump ERROR", format);
throw throwable;
}
}
/**
* 任务拆分
*
@ -694,6 +793,37 @@ public class CommonServiceImpl implements CommonService {
log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num);
}
/**
* 执行增量任务新增一个批次
* @param param 参数
* @param dataBatchHistory 执行记录
*/
private void insertDataBatch(SalesforceParam param, DataBatchHistory dataBatchHistory, DataObject dataObject) {
dataBatchHistory.setEndDate(new Date());
Integer num = customMapper.count(param);
dataBatchHistory.setDbNum(num);
if (dataBatchHistory.getSfNum() != null) {
dataBatchHistory.setSyncStatus(dataBatchHistory.getSfNum().equals(dataBatchHistory.getDbNum()) ? 1 : 0);
}
dataBatchHistory.setCost(DataUtil.calTime(dataBatchHistory.getEndDate(), dataBatchHistory.getStartDate()));
DataBatch dataBatch = new DataBatch();
dataBatch.setName(dataObject.getName());
dataBatch.setLabel(dataObject.getLabel());
dataBatch.setFirstSfNum(dataBatchHistory.getSfNum());
dataBatch.setFirstDbNum(dataBatchHistory.getDbNum());
dataBatch.setFirstSyncDate(dataBatchHistory.getStartDate());
dataBatch.setSyncStatus(dataBatchHistory.getSyncStatus());
dataBatch.setSyncStartDate(dataObject.getLastUpdateDate());
dataBatch.setSyncEndDate(new Date());
dataBatchService.save(dataBatch);
log.info("count db num: {}", num);
XxlJobLogger.log("count db num: {}", num);
dataBatchHistoryService.save(dataBatchHistory);
log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num);
}
/**
* 获取需同步sf数据数量
*