diff --git a/src/main/java/com/celnet/datadump/global/SystemConfigCode.java b/src/main/java/com/celnet/datadump/global/SystemConfigCode.java index ad3fc76..790955a 100644 --- a/src/main/java/com/celnet/datadump/global/SystemConfigCode.java +++ b/src/main/java/com/celnet/datadump/global/SystemConfigCode.java @@ -35,6 +35,10 @@ public class SystemConfigCode { * 批次类型 周/月/年 */ public static final String BATCH_TYPE = "BATCH_TYPE"; + /** + * 增量批次类型 周/月/年 + */ + public static final String INCREMENT_BATCH_TYPE = "INCREMENT_BATCH_TYPE"; public static final String BATCH_TYPE_WEEK = "WEEK"; public static final String BATCH_TYPE_MONTH = "MONTH"; diff --git a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java index 9a3e34a..b446aa4 100644 --- a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java +++ b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java @@ -235,4 +235,25 @@ public class DataDumpNewJob { param.setEndCreateDate(param.getEndDate()); return dataImportNewService.dumpFileNew(param); } + + /** + * 增量任务(新) + * + * @param paramStr 参数json + * @return result + */ + @XxlJob("dataDumpIncrementNewJob") + public ReturnT dataDumpIncrementNewJob(String paramStr) throws Exception { + log.info("dataDumpIncrementNewJob execute start .................."); + SalesforceParam param = new SalesforceParam(); + try { + if (StringUtils.isNotBlank(paramStr)) { + param = JSON.parseObject(paramStr, SalesforceParam.class); + } + } catch (Throwable throwable) { + return new ReturnT<>(500, "参数解析失败!"); + } + param.setType(2); + return commonService.incrementNew(param); + } } diff --git a/src/main/java/com/celnet/datadump/service/CommonService.java b/src/main/java/com/celnet/datadump/service/CommonService.java index 54706e4..be1c7bf 100644 --- a/src/main/java/com/celnet/datadump/service/CommonService.java +++ b/src/main/java/com/celnet/datadump/service/CommonService.java @@ -17,6 +17,8 @@ public interface CommonService { ReturnT increment(SalesforceParam param) throws Exception; + ReturnT incrementNew(SalesforceParam param) throws Exception; + ReturnT dump(SalesforceParam param) throws Exception; Integer countSfNum(PartnerConnection connect, SalesforceParam param) throws Exception; diff --git a/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java index aaa8614..ba3b6f0 100644 --- a/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java @@ -3,9 +3,7 @@ package com.celnet.datadump.service.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.celnet.datadump.config.SalesforceConnect; import com.celnet.datadump.config.SalesforceExecutor; import com.celnet.datadump.config.SalesforceTargetConnect; @@ -22,7 +20,6 @@ import com.celnet.datadump.util.EmailUtil; import com.celnet.datadump.util.SqlUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.sforce.async.BulkConnection; import com.sforce.soap.partner.*; import com.sforce.soap.partner.sobject.SObject; import com.xxl.job.core.biz.model.ReturnT; @@ -196,6 +193,66 @@ public class CommonServiceImpl implements CommonService { } } + @Override + public ReturnT incrementNew(SalesforceParam param) throws Exception { + QueryWrapper qw = new QueryWrapper<>(); + if (StringUtils.isNotBlank(param.getApi())) { + List apis = DataUtil.toIdList(param.getApi()); + qw.in("name", apis); + } + qw.eq("need_update", true) + .isNotNull("last_update_date"); + List list = dataObjectService.list(qw); + if (CollectionUtils.isEmpty(list)) { + return new ReturnT<>(500, ("表" + param.getApi() + "不存在或未开启更新")); + } + List> futures = Lists.newArrayList(); + try { + DataReport dataReport = new DataReport(); + dataReport.setType(TypeCode.INCREMENT); + dataReport.setApis(list.stream().map(DataObject::getName).collect(Collectors.joining(","))); + dataReportService.save(dataReport); + for (DataObject dataObject : list) { + Future future = salesforceExecutor.execute(() -> { + try { + Date updateTime = new Date(); + SalesforceParam salesforceParam = new SalesforceParam(); + salesforceParam.setApi(dataObject.getName()); + salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate()); + salesforceParam.setType(2); + // 更新字段值不为空 按更新字段里的字段校验 + if (StringUtils.isNotBlank(dataObject.getUpdateField())) { + salesforceParam.setUpdateField(dataObject.getUpdateField()); + } + dumpDataNew(salesforceParam, dataReport,dataObject); + + dataObject.setLastUpdateDate(updateTime); + dataObjectService.updateById(dataObject); + + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, 0, 0); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + // 存在附件的开始dump + list.stream().filter(t -> StringUtils.isNotBlank(t.getBlobField())).forEach(t -> { + try { + fileService.dumpFile(t.getName(), t.getBlobField(), true); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return ReturnT.SUCCESS; + } catch (Throwable throwable) { + salesforceExecutor.remove(futures.toArray(new Future[]{})); + throw throwable; + } + } + @Override public ReturnT dump(SalesforceParam param) throws Exception { List> futures = Lists.newArrayList(); @@ -497,6 +554,48 @@ public class CommonServiceImpl implements CommonService { return connect; } + /** + * 数据传输主体(新) + * + * @param param 参数 + */ + private void dumpDataNew(SalesforceParam param, DataReport dataReport , DataObject dataObject) throws Throwable { + String api = param.getApi(); + PartnerConnection connect ; + try { + DataBatchHistory dataBatchHistory = new DataBatchHistory(); + dataBatchHistory.setName(api); + dataBatchHistory.setStartDate(new Date()); + dataBatchHistory.setSyncStartDate(param.getBeginCreateDate()); + if (param.getEndCreateDate() != null) { + dataBatchHistory.setSyncEndDate(DateUtils.addSeconds(param.getEndCreateDate(), -1)); + } + dataBatchHistory.setBatch(param.getBatch()); + + connect = salesforceConnect.createConnect(); + // 存在isDeleted 只查询IsDeleted为false的 + if (dataFieldService.hasDeleted(param.getApi())) { + param.setIsDeleted(false); + } else { + // 不存在 过滤 + param.setIsDeleted(null); + } + + dataBatchHistory.setSfNum(countSfNum(connect, param)); + + getAllSfData(param, connect, dataReport); + + insertDataBatch(param, dataBatchHistory, dataObject); + + } catch (Throwable throwable) { + log.error("dataDumpJob error api:{}", api, throwable); + String type = param.getType() == 1 ? "存量" : "增量"; + String format = String.format("%s数据迁移 error, api name: %s, \nparam: %s, \ncause:\n%s", type, api, JSON.toJSONString(param, DataDumpParam.getFilter()), throwable); + EmailUtil.send("DataDump ERROR", format); + throw throwable; + } + } + /** * 任务拆分 * @@ -694,6 +793,37 @@ public class CommonServiceImpl implements CommonService { log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num); } + /** + * 执行增量任务,新增一个批次 + * @param param 参数 + * @param dataBatchHistory 执行记录 + */ + private void insertDataBatch(SalesforceParam param, DataBatchHistory dataBatchHistory, DataObject dataObject) { + + dataBatchHistory.setEndDate(new Date()); + Integer num = customMapper.count(param); + dataBatchHistory.setDbNum(num); + if (dataBatchHistory.getSfNum() != null) { + dataBatchHistory.setSyncStatus(dataBatchHistory.getSfNum().equals(dataBatchHistory.getDbNum()) ? 1 : 0); + } + dataBatchHistory.setCost(DataUtil.calTime(dataBatchHistory.getEndDate(), dataBatchHistory.getStartDate())); + DataBatch dataBatch = new DataBatch(); + dataBatch.setName(dataObject.getName()); + dataBatch.setLabel(dataObject.getLabel()); + dataBatch.setFirstSfNum(dataBatchHistory.getSfNum()); + dataBatch.setFirstDbNum(dataBatchHistory.getDbNum()); + dataBatch.setFirstSyncDate(dataBatchHistory.getStartDate()); + dataBatch.setSyncStatus(dataBatchHistory.getSyncStatus()); + dataBatch.setSyncStartDate(dataObject.getLastUpdateDate()); + dataBatch.setSyncEndDate(new Date()); + dataBatchService.save(dataBatch); + + log.info("count db num: {}", num); + XxlJobLogger.log("count db num: {}", num); + dataBatchHistoryService.save(dataBatchHistory); + log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num); + } + /** * 获取需同步sf数据数量 *