@ -3,9 +3,7 @@ package com.celnet.datadump.service.impl;
import com.alibaba.fastjson2.JSON ;
import com.alibaba.fastjson2.JSONArray ;
import com.alibaba.fastjson2.JSONObject ;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper ;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper ;
import com.baomidou.mybatisplus.core.toolkit.Wrappers ;
import com.celnet.datadump.config.SalesforceConnect ;
import com.celnet.datadump.config.SalesforceExecutor ;
import com.celnet.datadump.config.SalesforceTargetConnect ;
@ -22,7 +20,6 @@ import com.celnet.datadump.util.EmailUtil;
import com.celnet.datadump.util.SqlUtil ;
import com.google.common.collect.Lists ;
import com.google.common.collect.Maps ;
import com.sforce.async.BulkConnection ;
import com.sforce.soap.partner.* ;
import com.sforce.soap.partner.sobject.SObject ;
import com.xxl.job.core.biz.model.ReturnT ;
@ -196,6 +193,66 @@ public class CommonServiceImpl implements CommonService {
}
}
@Override
public ReturnT < String > incrementNew ( SalesforceParam param ) throws Exception {
QueryWrapper < DataObject > qw = new QueryWrapper < > ( ) ;
if ( StringUtils . isNotBlank ( param . getApi ( ) ) ) {
List < String > apis = DataUtil . toIdList ( param . getApi ( ) ) ;
qw . in ( " name " , apis ) ;
}
qw . eq ( " need_update " , true )
. isNotNull ( " last_update_date " ) ;
List < DataObject > list = dataObjectService . list ( qw ) ;
if ( CollectionUtils . isEmpty ( list ) ) {
return new ReturnT < > ( 500 , ( " 表 " + param . getApi ( ) + " 不存在或未开启更新 " ) ) ;
}
List < Future < ? > > futures = Lists . newArrayList ( ) ;
try {
DataReport dataReport = new DataReport ( ) ;
dataReport . setType ( TypeCode . INCREMENT ) ;
dataReport . setApis ( list . stream ( ) . map ( DataObject : : getName ) . collect ( Collectors . joining ( " , " ) ) ) ;
dataReportService . save ( dataReport ) ;
for ( DataObject dataObject : list ) {
Future < ? > future = salesforceExecutor . execute ( ( ) - > {
try {
Date updateTime = new Date ( ) ;
SalesforceParam salesforceParam = new SalesforceParam ( ) ;
salesforceParam . setApi ( dataObject . getName ( ) ) ;
salesforceParam . setBeginModifyDate ( dataObject . getLastUpdateDate ( ) ) ;
salesforceParam . setType ( 2 ) ;
/ / 更新字段值不为空 按更新字段里的字段校验
if ( StringUtils . isNotBlank ( dataObject . getUpdateField ( ) ) ) {
salesforceParam . setUpdateField ( dataObject . getUpdateField ( ) ) ;
}
dumpDataNew ( salesforceParam , dataReport , dataObject ) ;
dataObject . setLastUpdateDate ( updateTime ) ;
dataObjectService . updateById ( dataObject ) ;
} catch ( Throwable throwable ) {
log . error ( " salesforceExecutor error " , throwable ) ;
throw new RuntimeException ( throwable ) ;
}
} , 0 , 0 ) ;
futures . add ( future ) ;
}
/ / 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
/ / 存在附件的开始dump
list . stream ( ) . filter ( t - > StringUtils . isNotBlank ( t . getBlobField ( ) ) ) . forEach ( t - > {
try {
fileService . dumpFile ( t . getName ( ) , t . getBlobField ( ) , true ) ;
} catch ( Exception e ) {
throw new RuntimeException ( e ) ;
}
} ) ;
return ReturnT . SUCCESS ;
} catch ( Throwable throwable ) {
salesforceExecutor . remove ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
throw throwable ;
}
}
@Override
public ReturnT < String > dump ( SalesforceParam param ) throws Exception {
List < Future < ? > > futures = Lists . newArrayList ( ) ;
@ -497,6 +554,48 @@ public class CommonServiceImpl implements CommonService {
return connect ;
}
/ * *
* 数据传输主体 ( 新 )
*
* @param param 参数
* /
private void dumpDataNew ( SalesforceParam param , DataReport dataReport , DataObject dataObject ) throws Throwable {
String api = param . getApi ( ) ;
PartnerConnection connect ;
try {
DataBatchHistory dataBatchHistory = new DataBatchHistory ( ) ;
dataBatchHistory . setName ( api ) ;
dataBatchHistory . setStartDate ( new Date ( ) ) ;
dataBatchHistory . setSyncStartDate ( param . getBeginCreateDate ( ) ) ;
if ( param . getEndCreateDate ( ) ! = null ) {
dataBatchHistory . setSyncEndDate ( DateUtils . addSeconds ( param . getEndCreateDate ( ) , - 1 ) ) ;
}
dataBatchHistory . setBatch ( param . getBatch ( ) ) ;
connect = salesforceConnect . createConnect ( ) ;
/ / 存在isDeleted 只查询IsDeleted为false的
if ( dataFieldService . hasDeleted ( param . getApi ( ) ) ) {
param . setIsDeleted ( false ) ;
} else {
/ / 不存在 过滤
param . setIsDeleted ( null ) ;
}
dataBatchHistory . setSfNum ( countSfNum ( connect , param ) ) ;
getAllSfData ( param , connect , dataReport ) ;
insertDataBatch ( param , dataBatchHistory , dataObject ) ;
} catch ( Throwable throwable ) {
log . error ( " dataDumpJob error api:{} " , api , throwable ) ;
String type = param . getType ( ) = = 1 ? " 存量 " : " 增量 " ;
String format = String . format ( " %s数据迁移 error, api name: %s, \ nparam: %s, \ ncause: \ n%s " , type , api , JSON . toJSONString ( param , DataDumpParam . getFilter ( ) ) , throwable ) ;
EmailUtil . send ( " DataDump ERROR " , format ) ;
throw throwable ;
}
}
/ * *
* 任务拆分
*
@ -583,17 +682,7 @@ public class CommonServiceImpl implements CommonService {
}
fields . add ( field . getName ( ) ) ;
}
if ( " Attachment " . equals ( api ) | | " FeedItem " . equals ( api )
| | " FeedComment " . equals ( api ) | | " Note " . equals ( api ) ) {
fields . add ( " Parent.type " ) ;
}
if ( " Task " . equals ( api ) | | " Event " . equals ( api ) ) {
fields . add ( " Who.type " ) ;
fields . add ( " What.type " ) ;
}
if ( " TaskRelation " . equals ( api ) | | " EventRelation " . equals ( api ) ) {
fields . add ( " Relation.type " ) ;
}
DataObject dataObject = dataObjectService . getById ( api ) ;
if ( dataObject ! = null & & StringUtils . isNotBlank ( dataObject . getExtraField ( ) ) ) {
fields . addAll ( Arrays . asList ( StringUtils . split ( dataObject . getExtraField ( ) . replaceAll ( StringUtils . SPACE , StringUtils . EMPTY ) , " , " ) ) ) ;
@ -704,6 +793,37 @@ public class CommonServiceImpl implements CommonService {
log . info ( " dataDumpJob done api:{}, count:{} " , dataBatchHistory . getName ( ) , num ) ;
}
/ * *
* 执行增量任务 , 新增一个批次
* @param param 参数
* @param dataBatchHistory 执行记录
* /
private void insertDataBatch ( SalesforceParam param , DataBatchHistory dataBatchHistory , DataObject dataObject ) {
dataBatchHistory . setEndDate ( new Date ( ) ) ;
Integer num = customMapper . count ( param ) ;
dataBatchHistory . setDbNum ( num ) ;
if ( dataBatchHistory . getSfNum ( ) ! = null ) {
dataBatchHistory . setSyncStatus ( dataBatchHistory . getSfNum ( ) . equals ( dataBatchHistory . getDbNum ( ) ) ? 1 : 0 ) ;
}
dataBatchHistory . setCost ( DataUtil . calTime ( dataBatchHistory . getEndDate ( ) , dataBatchHistory . getStartDate ( ) ) ) ;
DataBatch dataBatch = new DataBatch ( ) ;
dataBatch . setName ( dataObject . getName ( ) ) ;
dataBatch . setLabel ( dataObject . getLabel ( ) ) ;
dataBatch . setFirstSfNum ( dataBatchHistory . getSfNum ( ) ) ;
dataBatch . setFirstDbNum ( dataBatchHistory . getDbNum ( ) ) ;
dataBatch . setFirstSyncDate ( dataBatchHistory . getStartDate ( ) ) ;
dataBatch . setSyncStatus ( dataBatchHistory . getSyncStatus ( ) ) ;
dataBatch . setSyncStartDate ( dataObject . getLastUpdateDate ( ) ) ;
dataBatch . setSyncEndDate ( new Date ( ) ) ;
dataBatchService . save ( dataBatch ) ;
log . info ( " count db num: {} " , num ) ;
XxlJobLogger . log ( " count db num: {} " , num ) ;
dataBatchHistoryService . save ( dataBatchHistory ) ;
log . info ( " dataDumpJob done api:{}, count:{} " , dataBatchHistory . getName ( ) , num ) ;
}
/ * *
* 获取需同步sf数据数量
*