2025-03-28 17:38:34 +08:00
package com.celnet.datadump.service.impl ;
import com.alibaba.fastjson2.JSON ;
import com.alibaba.fastjson2.JSONArray ;
import com.alibaba.fastjson2.JSONObject ;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper ;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper ;
import com.baomidou.mybatisplus.core.toolkit.Wrappers ;
import com.celnet.datadump.config.SalesforceConnect ;
import com.celnet.datadump.config.SalesforceExecutor ;
import com.celnet.datadump.config.SalesforceTargetConnect ;
import com.celnet.datadump.entity.* ;
import com.celnet.datadump.global.Const ;
import com.celnet.datadump.global.TypeCode ;
import com.celnet.datadump.mapper.CustomMapper ;
import com.celnet.datadump.param.DataDumpParam ;
import com.celnet.datadump.param.DataDumpSpecialParam ;
import com.celnet.datadump.param.SalesforceParam ;
import com.celnet.datadump.service.* ;
import com.celnet.datadump.util.DataUtil ;
import com.celnet.datadump.util.EmailUtil ;
import com.celnet.datadump.util.SqlUtil ;
import com.google.common.collect.Lists ;
import com.google.common.collect.Maps ;
2025-06-13 10:15:03 +08:00
import com.sforce.async.BulkConnection ;
2025-03-28 17:38:34 +08:00
import com.sforce.soap.partner.* ;
import com.sforce.soap.partner.sobject.SObject ;
import com.xxl.job.core.biz.model.ReturnT ;
import com.xxl.job.core.log.XxlJobLogger ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.commons.collections.CollectionUtils ;
import org.apache.commons.lang3.ArrayUtils ;
import org.apache.commons.lang3.ObjectUtils ;
import org.apache.commons.lang3.StringUtils ;
import org.apache.commons.lang3.time.DateFormatUtils ;
import org.apache.commons.lang3.time.DateUtils ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import java.util.* ;
import java.util.concurrent.Future ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.stream.Collectors ;
import static com.celnet.datadump.global.SystemConfigCode.* ;
/ * *
* @author Red
* @description
* @date 2022 / 09 / 23
* /
@Service
@Slf4j
public class CommonServiceImpl implements CommonService {
@Autowired
private SalesforceConnect salesforceConnect ;
@Autowired
private CustomMapper customMapper ;
@Autowired
private SalesforceExecutor salesforceExecutor ;
@Autowired
private DataBatchHistoryService dataBatchHistoryService ;
@Autowired
private DataBatchService dataBatchService ;
@Autowired
private DataObjectService dataObjectService ;
@Autowired
private DataFieldService dataFieldService ;
@Autowired
private FileService fileService ;
@Autowired
private DataPicklistService dataPicklistService ;
@Autowired
private DataDumpSpecialService dataDumpSpecialService ;
@Autowired
private DataReportService dataReportService ;
@Autowired
private DataReportDetailService dataReportDetailService ;
@Autowired
private SalesforceTargetConnect salesforceTargetConnect ;
2025-07-04 11:19:19 +08:00
@Autowired
private MetaclassConfigService metaclassConfigService ;
2025-03-28 17:38:34 +08:00
@Override
public ReturnT < String > increment ( SalesforceParam param ) throws Exception {
QueryWrapper < DataObject > qw = new QueryWrapper < > ( ) ;
if ( StringUtils . isNotBlank ( param . getApi ( ) ) ) {
List < String > apis = DataUtil . toIdList ( param . getApi ( ) ) ;
qw . in ( " name " , apis ) ;
}
qw . eq ( " need_update " , true )
. isNotNull ( " last_update_date " ) ;
List < DataObject > list = dataObjectService . list ( qw ) ;
if ( CollectionUtils . isEmpty ( list ) ) {
return new ReturnT < > ( 500 , ( " 表 " + param . getApi ( ) + " 不存在或未开启更新 " ) ) ;
}
List < Future < ? > > futures = Lists . newArrayList ( ) ;
try {
DataReport dataReport = new DataReport ( ) ;
dataReport . setType ( TypeCode . INCREMENT ) ;
dataReport . setApis ( list . stream ( ) . map ( DataObject : : getName ) . collect ( Collectors . joining ( " , " ) ) ) ;
dataReportService . save ( dataReport ) ;
for ( DataObject dataObject : list ) {
Future < ? > future = salesforceExecutor . execute ( ( ) - > {
try {
Date updateTime = new Date ( ) ;
SalesforceParam salesforceParam = new SalesforceParam ( ) ;
salesforceParam . setApi ( dataObject . getName ( ) ) ;
salesforceParam . setBeginModifyDate ( dataObject . getLastUpdateDate ( ) ) ;
salesforceParam . setType ( 2 ) ;
// 更新字段值不为空 按更新字段里的字段校验
if ( StringUtils . isNotBlank ( dataObject . getUpdateField ( ) ) ) {
salesforceParam . setUpdateField ( dataObject . getUpdateField ( ) ) ;
}
PartnerConnection connect = dumpData ( salesforceParam , dataReport ) ;
dataObject . setLastUpdateDate ( updateTime ) ;
dataObjectService . updateById ( dataObject ) ;
// 增量batch更新
QueryWrapper < DataBatch > queryWrapper = new QueryWrapper < > ( ) ;
queryWrapper . eq ( " name " , dataObject . getName ( ) ) . orderByDesc ( " sync_start_date " ) . last ( " limit 1 " ) ;
DataBatch dataBatch = dataBatchService . getOne ( queryWrapper ) ;
if ( dataBatch ! = null ) {
if ( dataBatch . getSyncEndDate ( ) . before ( updateTime ) ) {
DataBatch update = dataBatch . clone ( ) ;
update . setSyncEndDate ( updateTime ) ;
int failCount = 0 ;
while ( true ) {
try {
SalesforceParam countParam = new SalesforceParam ( ) ;
countParam . setApi ( dataBatch . getName ( ) ) ;
countParam . setBeginCreateDate ( dataBatch . getSyncStartDate ( ) ) ;
countParam . setEndCreateDate ( dataBatch . getSyncEndDate ( ) ) ;
// 存在isDeleted 只查询IsDeleted为false的
if ( dataFieldService . hasDeleted ( countParam . getApi ( ) ) ) {
countParam . setIsDeleted ( false ) ;
} else {
// 不存在 过滤
countParam . setIsDeleted ( null ) ;
}
// sf count
Integer sfNum = countSfNum ( connect , countParam ) ;
update . setSfNum ( sfNum ) ;
// db count
Integer dbNum = customMapper . count ( countParam ) ;
update . setDbNum ( dbNum ) ;
update . setSyncStatus ( dbNum . equals ( sfNum ) ? 1 : 0 ) ;
update . setFirstDbNum ( dbNum ) ;
update . setFirstSyncDate ( updateTime ) ;
update . setFirstSfNum ( sfNum ) ;
QueryWrapper < DataBatch > updateQw = new QueryWrapper < > ( ) ;
updateQw . eq ( " name " , dataBatch . getName ( ) )
. eq ( " sync_start_date " , dataBatch . getSyncStartDate ( ) )
. eq ( " sync_end_date " , dataBatch . getSyncEndDate ( ) ) ;
dataBatchService . update ( update , updateQw ) ;
failCount = 0 ;
break ;
} catch ( InterruptedException e ) {
throw e ;
} catch ( Throwable throwable ) {
failCount + + ;
log . error ( " verify error " , throwable ) ;
if ( failCount > Const . MAX_FAIL_COUNT ) {
throw throwable ;
}
TimeUnit . MINUTES . sleep ( 1 ) ;
}
}
}
}
} catch ( Throwable throwable ) {
log . error ( " salesforceExecutor error " , throwable ) ;
throw new RuntimeException ( throwable ) ;
}
} , 0 , 0 ) ;
futures . add ( future ) ;
}
// 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
// 存在附件的开始dump
list . stream ( ) . filter ( t - > StringUtils . isNotBlank ( t . getBlobField ( ) ) ) . forEach ( t - > {
try {
fileService . dumpFile ( t . getName ( ) , t . getBlobField ( ) , true ) ;
} catch ( Exception e ) {
throw new RuntimeException ( e ) ;
}
} ) ;
return ReturnT . SUCCESS ;
} catch ( Throwable throwable ) {
salesforceExecutor . remove ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
throw throwable ;
}
}
@Override
public ReturnT < String > dump ( SalesforceParam param ) throws Exception {
List < Future < ? > > futures = Lists . newArrayList ( ) ;
try {
if ( StringUtils . isNotBlank ( param . getApi ( ) ) ) {
// 手动任务
ReturnT < String > result = manualDump ( param , futures ) ;
if ( result ! = null ) {
return result ;
}
} else {
// 自动任务
autoDump ( param , futures ) ;
}
return ReturnT . SUCCESS ;
} catch ( Throwable throwable ) {
salesforceExecutor . remove ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
log . error ( " dump error " , throwable ) ;
throw throwable ;
}
}
/ * *
* 自动dump
* @param param 参数
* @param futures futures
* /
private void autoDump ( SalesforceParam param , List < Future < ? > > futures ) throws InterruptedException {
QueryWrapper < DataObject > qw = new QueryWrapper < > ( ) ;
qw . eq ( " data_work " , 1 )
. eq ( " data_lock " , 0 )
. orderByAsc ( " data_index " )
. last ( " limit 10 " ) ;
while ( true ) {
List < DataObject > dataObjects = dataObjectService . list ( qw ) ;
if ( CollectionUtils . isEmpty ( dataObjects ) ) {
break ;
}
for ( DataObject dataObject : dataObjects ) {
DataObject update = new DataObject ( ) ;
TimeUnit . MILLISECONDS . sleep ( 1 ) ;
2025-06-19 11:35:20 +08:00
String api = dataObject . getName ( ) ;
2025-03-28 17:38:34 +08:00
try {
log . info ( " dump apis: {} " , api ) ;
XxlJobLogger . log ( " dump apis: {} " , api ) ;
// 检测表是否存在 不存在创建
checkApi ( api , true ) ;
// 检测是否有创建时间 没有创建时间 按特殊表处理 直接根据id来获取
if ( ! hasCreatedDate ( api ) ) {
Future < ? > future = salesforceExecutor . execute ( ( ) - > {
try {
DataDumpSpecialParam dataDumpSpecialParam = new DataDumpSpecialParam ( ) ;
dataDumpSpecialParam . setApi ( api ) ;
update . setName ( dataObject . getName ( ) ) ;
update . setDataLock ( 1 ) ;
dataObjectService . updateById ( update ) ;
param . setApi ( api ) ;
salesforceExecutor . waitForFutures ( dataDumpSpecialService . getData ( dataDumpSpecialParam , salesforceConnect . createConnect ( ) ) ) ;
update . setDataWork ( 0 ) ;
} catch ( Throwable e ) {
throw new RuntimeException ( e ) ;
} finally {
if ( StringUtils . isNotBlank ( update . getName ( ) ) ) {
update . setDataLock ( 0 ) ;
dataObjectService . updateById ( update ) ;
}
}
} , 0 , 0 ) ;
futures . add ( future ) ;
continue ;
}
update . setName ( dataObject . getName ( ) ) ;
update . setDataLock ( 1 ) ;
dataObjectService . updateById ( update ) ;
param . setApi ( api ) ;
List < SalesforceParam > salesforceParams ;
QueryWrapper < DataBatch > dbQw = new QueryWrapper < > ( ) ;
dbQw . eq ( " name " , api ) ;
List < DataBatch > list = dataBatchService . list ( dbQw ) ;
AtomicInteger batch = new AtomicInteger ( 1 ) ;
if ( CollectionUtils . isEmpty ( list ) ) {
// 没有批次
salesforceParams = DataUtil . splitTask ( param ) ;
} else {
// 有批次 先过滤首次执行过的
salesforceParams = list . stream ( ) . filter ( t - > t . getFirstSyncDate ( ) = = null )
. map ( t - > {
SalesforceParam salesforceParam = param . clone ( ) ;
salesforceParam . setApi ( t . getName ( ) ) ;
salesforceParam . setBeginCreateDate ( t . getSyncStartDate ( ) ) ;
salesforceParam . setEndCreateDate ( t . getSyncEndDate ( ) ) ;
salesforceParam . setBatch ( batch . getAndIncrement ( ) ) ;
return salesforceParam ;
} ) . collect ( Collectors . toList ( ) ) ;
}
// 批次都执行过了 重新同步
if ( CollectionUtils . isEmpty ( salesforceParams ) ) {
salesforceParams = DataUtil . splitTask ( param ) ;
}
for ( SalesforceParam salesforceParam : salesforceParams ) {
Future < ? > future = salesforceExecutor . execute ( ( ) - > {
try {
dumpData ( salesforceParam ) ;
} catch ( Throwable throwable ) {
log . error ( " salesforceExecutor error " , throwable ) ;
throw new RuntimeException ( throwable ) ;
}
} , salesforceParam . getBatch ( ) , 0 ) ;
futures . add ( future ) ;
}
// 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
// 附件表 跑一遍dumpFile
if ( StringUtils . isNotBlank ( dataObject . getBlobField ( ) ) ) {
fileService . dumpFile ( dataObject . getName ( ) , dataObject . getBlobField ( ) , true ) ;
}
2025-06-23 11:25:31 +08:00
update . setDataWork ( 0 ) ;
2025-03-28 17:38:34 +08:00
} catch ( Throwable e ) {
2025-06-19 11:35:20 +08:00
String message = e . getMessage ( ) ;
String format = String . format ( " 获取表数据 error, api name: %s, \ nparam: %s, \ ncause: \ n%s " , api , com . alibaba . fastjson2 . JSON . toJSONString ( param , DataDumpParam . getFilter ( ) ) , message ) ;
EmailUtil . send ( " DataDump ERROR " , format ) ;
2025-03-28 17:38:34 +08:00
throw new RuntimeException ( e ) ;
} finally {
2025-06-23 11:25:31 +08:00
if ( dataObject ! = null ) {
update . setDataLock ( 0 ) ;
dataObjectService . updateById ( update ) ;
}
2025-03-28 17:38:34 +08:00
}
}
// 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
futures . clear ( ) ;
}
}
private ReturnT < String > manualDump ( SalesforceParam param , List < Future < ? > > futures ) throws InterruptedException {
List < String > apis ;
apis = DataUtil . toIdList ( param . getApi ( ) ) ;
String join = StringUtils . join ( apis , " , " ) ;
log . info ( " dump apis: {} " , join ) ;
XxlJobLogger . log ( " dump apis: {} " , join ) ;
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = param . getBeginDate ( ) = = null & & param . getEndDate ( ) = = null & & CollectionUtils . isEmpty ( param . getIds ( ) ) ;
if ( isFull ) {
QueryWrapper < DataObject > qw = new QueryWrapper < > ( ) ;
qw . eq ( " data_lock " , 1 ) . in ( " name " , apis ) ;
List < DataObject > list = dataObjectService . list ( qw ) ;
if ( CollectionUtils . isNotEmpty ( list ) ) {
String apiNames = list . stream ( ) . map ( DataObject : : getName ) . collect ( Collectors . joining ( ) ) ;
return new ReturnT < > ( 500 , " api: " + apiNames + " is locked " ) ;
}
}
// 根据参数获取sql
for ( String api : apis ) {
DataObject update = new DataObject ( ) ;
TimeUnit . MILLISECONDS . sleep ( 1 ) ;
try {
checkApi ( api , true ) ;
if ( ! hasCreatedDate ( api ) ) {
DataDumpSpecialParam dataDumpSpecialParam = new DataDumpSpecialParam ( ) ;
dataDumpSpecialParam . setApi ( api ) ;
Future < ? > future = dataDumpSpecialService . getData ( dataDumpSpecialParam , salesforceConnect . createConnect ( ) ) ;
// 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( future ) ;
continue ;
}
param . setApi ( api ) ;
List < SalesforceParam > salesforceParams = null ;
if ( isFull ) {
update . setName ( api ) ;
update . setDataLock ( 1 ) ;
dataObjectService . updateById ( update ) ;
QueryWrapper < DataBatch > dbQw = new QueryWrapper < > ( ) ;
dbQw . eq ( " name " , api )
. isNull ( " first_sync_date " ) ;
List < DataBatch > list = dataBatchService . list ( dbQw ) ;
AtomicInteger batch = new AtomicInteger ( 1 ) ;
if ( CollectionUtils . isNotEmpty ( list ) ) {
salesforceParams = list . stream ( ) . map ( t - > {
SalesforceParam salesforceParam = param . clone ( ) ;
salesforceParam . setApi ( t . getName ( ) ) ;
salesforceParam . setBeginCreateDate ( t . getSyncStartDate ( ) ) ;
salesforceParam . setEndCreateDate ( t . getSyncEndDate ( ) ) ;
salesforceParam . setBatch ( batch . getAndIncrement ( ) ) ;
return salesforceParam ;
} ) . collect ( Collectors . toList ( ) ) ;
} else {
salesforceParams = DataUtil . splitTask ( param ) ;
}
} else {
salesforceParams = DataUtil . splitTask ( param ) ;
}
// 手动任务优先执行
for ( SalesforceParam salesforceParam : salesforceParams ) {
Future < ? > future = salesforceExecutor . execute ( ( ) - > {
try {
dumpData ( salesforceParam ) ;
} catch ( Throwable throwable ) {
log . error ( " salesforceExecutor error " , throwable ) ;
throw new RuntimeException ( throwable ) ;
}
} , salesforceParam . getBatch ( ) , 1 ) ;
futures . add ( future ) ;
}
// 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
// 附件表 跑一遍dumpFile
DataObject one = dataObjectService . getById ( api ) ;
if ( StringUtils . isNotBlank ( one . getBlobField ( ) ) ) {
fileService . dumpFile ( one . getName ( ) , one . getBlobField ( ) , true ) ;
}
update . setDataWork ( 0 ) ;
} catch ( Throwable e ) {
log . error ( " manualDump error " , e ) ;
throw new RuntimeException ( e ) ;
} finally {
if ( isFull ) {
update . setName ( api ) ;
update . setDataLock ( 0 ) ;
dataObjectService . updateById ( update ) ;
}
}
}
return null ;
}
/ * *
* 数据传输主体
*
* @param param 参数
* /
private PartnerConnection dumpData ( SalesforceParam param ) throws Throwable {
return dumpData ( param , null ) ;
}
/ * *
* 数据传输主体
*
* @param param 参数
* /
private PartnerConnection dumpData ( SalesforceParam param , DataReport dataReport ) throws Throwable {
String api = param . getApi ( ) ;
PartnerConnection connect = null ;
try {
DataBatchHistory dataBatchHistory = new DataBatchHistory ( ) ;
dataBatchHistory . setName ( api ) ;
dataBatchHistory . setStartDate ( new Date ( ) ) ;
dataBatchHistory . setSyncStartDate ( param . getBeginCreateDate ( ) ) ;
if ( param . getEndCreateDate ( ) ! = null ) {
dataBatchHistory . setSyncEndDate ( DateUtils . addSeconds ( param . getEndCreateDate ( ) , - 1 ) ) ;
}
dataBatchHistory . setBatch ( param . getBatch ( ) ) ;
if ( param . getBeginCreateDate ( ) ! = null & & param . getEndCreateDate ( ) ! = null ) {
log . info ( " NO.{} dump {}, date: {} ~ {} start " , param . getBatch ( ) ,
param . getApi ( ) ,
DateFormatUtils . format ( param . getBeginCreateDate ( ) , " yyyy-MM-dd HH:mm:ss " ) ,
DateFormatUtils . format ( param . getEndCreateDate ( ) , " yyyy-MM-dd HH:mm:ss " ) ) ;
}
connect = salesforceConnect . createConnect ( ) ;
// 存在isDeleted 只查询IsDeleted为false的
if ( dataFieldService . hasDeleted ( param . getApi ( ) ) ) {
param . setIsDeleted ( false ) ;
} else {
// 不存在 过滤
param . setIsDeleted ( null ) ;
}
// 若count数量过多 可能导致超时出不来结果 对该任务做进一步拆分
int failCount = 0 ;
boolean isSuccess = false ;
while ( failCount < = Const . MAX_FAIL_COUNT ) {
try {
dataBatchHistory . setSfNum ( countSfNum ( connect , param ) ) ;
isSuccess = true ;
break ;
} catch ( Throwable throwable ) {
failCount + + ;
}
}
// 不成功 做任务拆分
if ( ! isSuccess ) {
if ( splitTask ( param ) ) {
return connect ;
}
}
getAllSfData ( param , connect , dataReport ) ;
updateDataBatch ( param , dataBatchHistory ) ;
} catch ( Throwable throwable ) {
log . error ( " dataDumpJob error api:{} " , api , throwable ) ;
String type = param . getType ( ) = = 1 ? " 存量 " : " 增量 " ;
String format = String . format ( " %s数据迁移 error, api name: %s, \ nparam: %s, \ ncause: \ n%s " , type , api , JSON . toJSONString ( param , DataDumpParam . getFilter ( ) ) , throwable ) ;
EmailUtil . send ( " DataDump ERROR " , format ) ;
throw throwable ;
}
return connect ;
}
/ * *
* 任务拆分
*
* @param param 任务参数
* @return true成功 false失败
* /
private boolean splitTask ( SalesforceParam param ) {
List < Future < ? > > futures = Lists . newArrayList ( ) ;
try {
QueryWrapper < DataBatch > qw = new QueryWrapper < > ( ) ;
qw . eq ( " name " , param . getApi ( ) )
. eq ( " sync_start_date " , param . getBeginCreateDate ( ) )
. eq ( " sync_end_date " , param . getEndCreateDate ( ) ) ;
DataBatch dataBatch = dataBatchService . getOne ( qw ) ;
// 如果已经是同一天了 则抛出错误 不继续执行
if ( DateUtils . isSameDay ( dataBatch . getSyncEndDate ( ) , dataBatch . getSyncStartDate ( ) ) ) {
throw new Exception ( " can't count sf num " ) ;
}
DataBatch dataBatch1 = dataBatch . clone ( ) ;
DataBatch dataBatch2 = dataBatch . clone ( ) ;
Date midDate = DateUtils . addMilliseconds ( dataBatch . getSyncStartDate ( ) ,
( int ) ( dataBatch . getSyncEndDate ( ) . getTime ( ) - dataBatch . getSyncStartDate ( ) . getTime ( ) ) ) ;
Date now = new Date ( ) ;
dataBatch1 . setSyncEndDate ( midDate ) ;
dataBatch2 . setSyncStartDate ( midDate ) ;
dataBatchService . remove ( qw ) ;
dataBatchService . save ( dataBatch1 ) ;
dataBatchService . save ( dataBatch2 ) ;
SalesforceParam param1 = param . clone ( ) ;
SalesforceParam param2 = param . clone ( ) ;
param1 . setEndCreateDate ( midDate ) ;
Future < ? > future1 = salesforceExecutor . execute ( ( ) - > {
try {
dumpData ( param1 ) ;
} catch ( Throwable throwable ) {
log . error ( " salesforceExecutor error " , throwable ) ;
throw new RuntimeException ( throwable ) ;
}
} , param1 . getBatch ( ) , 0 ) ;
param2 . setBeginCreateDate ( midDate ) ;
Future < ? > future2 = salesforceExecutor . execute ( ( ) - > {
try {
dumpData ( param2 ) ;
} catch ( Throwable throwable ) {
log . error ( " salesforceExecutor error " , throwable ) ;
throw new RuntimeException ( throwable ) ;
}
} , param2 . getBatch ( ) , 0 ) ;
futures = Lists . newArrayList ( future1 , future2 ) ;
// 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
return true ;
} catch ( Throwable throwable ) {
salesforceExecutor . remove ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
}
return false ;
}
/ * *
* 遍历获取所有sf数据并处理
*
* @param param 参数
* @param connect sf connect
* /
private void getAllSfData ( SalesforceParam param , PartnerConnection connect , DataReport dataReport ) throws Throwable {
JSONArray objects = null ;
String api = param . getApi ( ) ;
Map < String , Object > map = Maps . newHashMap ( ) ;
String dateName = param . getType ( ) = = 1 ? Const . CREATED_DATE : param . getUpdateField ( ) ;
int count = 0 ;
Date lastCreatedDate = null ;
String maxId = null ;
Field [ ] dsrFields = null ;
// 获取sf字段
{
List < String > fields = Lists . newArrayList ( ) ;
DescribeSObjectResult dsr = connect . describeSObject ( api ) ;
dsrFields = dsr . getFields ( ) ;
for ( Field field : dsrFields ) {
// 不查询文件
if ( " base64 " . equalsIgnoreCase ( field . getType ( ) . toString ( ) ) ) {
continue ;
}
fields . add ( field . getName ( ) ) ;
}
if ( " Attachment " . equals ( api ) | | " FeedItem " . equals ( api ) ) {
fields . add ( " Parent.type " ) ;
}
2025-07-01 15:23:10 +08:00
if ( " Task " . equals ( api ) | | " Event " . equals ( api ) ) {
fields . add ( " Who.type " ) ;
fields . add ( " What.type " ) ;
}
2025-07-04 11:01:03 +08:00
if ( " TaskRelation " . equals ( api ) | | " EventRelation " . equals ( api ) ) {
fields . add ( " Relation.type " ) ;
}
2025-03-28 17:38:34 +08:00
DataObject dataObject = dataObjectService . getById ( api ) ;
if ( dataObject ! = null & & StringUtils . isNotBlank ( dataObject . getExtraField ( ) ) ) {
fields . addAll ( Arrays . asList ( StringUtils . split ( dataObject . getExtraField ( ) . replaceAll ( StringUtils . SPACE , StringUtils . EMPTY ) , " , " ) ) ) ;
}
param . setSelect ( StringUtils . join ( fields , " , " ) ) ;
}
// 获取数据库字段进行比对
List < String > fields = customMapper . getFields ( api ) . stream ( ) . map ( String : : toUpperCase ) . collect ( Collectors . toList ( ) ) ;
int failCount = 0 ;
while ( true ) {
try {
// 获取创建时间
param . setTimestamp ( lastCreatedDate ) ;
// 判断是否存在要排除的id
param . setMaxId ( maxId ) ;
map . put ( " param " , param ) ;
String sql ;
if ( param . getType ( ) = = 1 ) {
// type 1 按创建时间纬度
sql = SqlUtil . showSql ( " com.celnet.datadump.mapper.SalesforceMapper.list " , map ) ;
} else {
// type 2 按修改时间纬度
sql = SqlUtil . showSql ( " com.celnet.datadump.mapper.SalesforceMapper.listByModifyTime " , map ) ;
}
log . info ( " query sql: {} " , sql ) ;
XxlJobLogger . log ( " query sql: {} " , sql ) ;
QueryResult queryResult = connect . queryAll ( sql ) ;
if ( ObjectUtils . isEmpty ( queryResult ) | | ObjectUtils . isEmpty ( queryResult . getRecords ( ) ) ) {
break ;
}
SObject [ ] records = queryResult . getRecords ( ) ;
objects = DataUtil . toJsonArray ( records , dsrFields ) ;
// 获取最大修改时间和等于该修改时间的数据id
{
Date maxDate = objects . getJSONObject ( objects . size ( ) - 1 ) . getDate ( dateName ) ;
maxId = objects . stream ( )
. map ( t - > ( JSONObject ) t )
. filter ( t - > maxDate . equals ( t . getDate ( dateName ) ) )
. map ( t - > t . getString ( Const . ID ) )
. max ( String : : compareTo ) . get ( ) ;
lastCreatedDate = maxDate ;
}
// 存储更新
saveOrUpdate ( api , fields , records , objects , true ) ;
count + = records . length ;
TimeUnit . MILLISECONDS . sleep ( 1 ) ;
String format = DateFormatUtils . format ( lastCreatedDate , " yyyy-MM-dd HH:mm:ss " ) ;
log . info ( " dump success count: {}, timestamp: {} " , count , format ) ;
XxlJobLogger . log ( " dump success count: {}, timestamp: {} " , count , format ) ;
failCount = 0 ;
objects = null ;
} catch ( InterruptedException e ) {
throw e ;
} catch ( Throwable throwable ) {
failCount + + ;
log . error ( " dataDumpJob error api:{}, data:{} " , api , JSON . toJSONString ( objects ) , throwable ) ;
if ( failCount > Const . MAX_FAIL_COUNT ) {
throwable . addSuppressed ( new Exception ( " dataDump error data: " + JSON . toJSONString ( objects ) ) ) ;
throw throwable ;
}
TimeUnit . MINUTES . sleep ( 1 ) ;
}
}
if ( ObjectUtils . isNotEmpty ( dataReport ) ) {
DataReportDetail dataReportDetail = new DataReportDetail ( ) ;
dataReportDetail . setApi ( param . getApi ( ) ) ;
dataReportDetail . setReportId ( dataReport . getId ( ) ) ;
dataReportDetail . setNum ( count ) ;
dataReportDetailService . save ( dataReportDetail ) ;
}
}
/ * *
* 批次存在 且首次时间为0或者为空 赋值 、 保存执行记录
*
* @param param 参数
* @param dataBatchHistory 执行记录
* /
private void updateDataBatch ( SalesforceParam param , DataBatchHistory dataBatchHistory ) {
if ( dataBatchHistory = = null ) {
return ;
}
dataBatchHistory . setEndDate ( new Date ( ) ) ;
Integer num = customMapper . count ( param ) ;
dataBatchHistory . setDbNum ( num ) ;
if ( dataBatchHistory . getSfNum ( ) ! = null ) {
dataBatchHistory . setSyncStatus ( dataBatchHistory . getSfNum ( ) . equals ( dataBatchHistory . getDbNum ( ) ) ? 1 : 0 ) ;
}
dataBatchHistory . setCost ( DataUtil . calTime ( dataBatchHistory . getEndDate ( ) , dataBatchHistory . getStartDate ( ) ) ) ;
QueryWrapper < DataBatch > qw = new QueryWrapper < > ( ) ;
qw . eq ( " name " , param . getApi ( ) )
. eq ( " sync_start_date " , param . getBeginCreateDate ( ) )
. eq ( " sync_end_date " , param . getEndCreateDate ( ) )
. isNull ( " first_sync_date " ) ;
DataBatch dataBatch = dataBatchService . getOne ( qw ) ;
if ( dataBatch = = null ) {
return ;
}
dataBatch . setFirstSfNum ( dataBatchHistory . getSfNum ( ) ) ;
dataBatch . setFirstDbNum ( dataBatchHistory . getDbNum ( ) ) ;
dataBatch . setFirstSyncDate ( dataBatchHistory . getStartDate ( ) ) ;
dataBatch . setSyncStatus ( dataBatchHistory . getSyncStatus ( ) ) ;
dataBatchService . update ( dataBatch , qw ) ;
log . info ( " count db num: {} " , num ) ;
XxlJobLogger . log ( " count db num: {} " , num ) ;
dataBatchHistoryService . save ( dataBatchHistory ) ;
log . info ( " dataDumpJob done api:{}, count:{} " , dataBatchHistory . getName ( ) , num ) ;
}
/ * *
* 获取需同步sf数据数量
*
* @param connect connect
* @param param 参数
* @return sf统计数量
* /
@Override
public Integer countSfNum ( PartnerConnection connect , SalesforceParam param ) throws Exception {
Map < String , Object > map = Maps . newHashMap ( ) ;
map . put ( " param " , param ) ;
String sql = SqlUtil . showSql ( " com.celnet.datadump.mapper.SalesforceMapper.count " , map ) ;
log . info ( " count sql: {} " , sql ) ;
XxlJobLogger . log ( " count sql: {} " , sql ) ;
QueryResult queryResult = connect . queryAll ( sql ) ;
SObject record = queryResult . getRecords ( ) [ 0 ] ;
Integer num = ( Integer ) record . getField ( " num " ) ;
log . info ( " count sf num: {} " , num ) ;
XxlJobLogger . log ( " count sf num: {} " , num ) ;
return num ;
}
/ * *
* 插入或更新数据
*
* @param api 表名
* @param fields 字段
* @param records 数据list
* @param objects 数据 json格式
* @param insert 不存在是否插入
* /
@Override
public Integer saveOrUpdate ( String api , List < String > fields , SObject [ ] records , JSONArray objects , boolean insert ) throws Throwable {
// 根据id查询数据库 取出已存在数据的id
List < String > ids = Arrays . stream ( records ) . map ( SObject : : getId ) . collect ( Collectors . toList ( ) ) ;
DataObject one = dataObjectService . getById ( api ) ;
List < String > existsIds = customMapper . getIds ( api , ids ) ;
for ( int i = 0 ; i < objects . size ( ) ; i + + ) {
JSONObject jsonObject = objects . getJSONObject ( i ) ;
try {
Set < String > keys = jsonObject . keySet ( ) ;
// update
String id = jsonObject . getString ( Const . ID ) ;
List < Map < String , Object > > maps = Lists . newArrayList ( ) ;
for ( String key : keys ) {
if ( fields . stream ( ) . anyMatch ( key : : equalsIgnoreCase ) ) {
Map < String , Object > paramMap = Maps . newHashMap ( ) ;
paramMap . put ( " key " , key ) ;
paramMap . put ( " value " , jsonObject . get ( key ) ) ;
maps . add ( paramMap ) ;
}
}
// 附件表 插入更新时把is_dump置为false
if ( StringUtils . isNotBlank ( one . getBlobField ( ) ) ) {
Map < String , Object > paramMap = Maps . newHashMap ( ) ;
paramMap . put ( " key " , " is_dump " ) ;
paramMap . put ( " value " , false ) ;
Map < String , Object > paramMap2 = Maps . newHashMap ( ) ;
paramMap2 . put ( " key " , " is_upload " ) ;
paramMap2 . put ( " value " , false ) ;
maps . add ( paramMap ) ;
maps . add ( paramMap2 ) ;
}
2025-06-23 11:25:31 +08:00
// Task和Event
2025-07-01 15:23:10 +08:00
if ( " Task " . equals ( api ) | | " Event " . equals ( api ) ) {
Map < String , Object > paramwhoMap = Maps . newHashMap ( ) ;
paramwhoMap . put ( " key " , " WhoId_Type__c " ) ;
paramwhoMap . put ( " value " , jsonObject . get ( " Who_Type " ) ) ;
maps . add ( paramwhoMap ) ;
Map < String , Object > paramwhatMap = Maps . newHashMap ( ) ;
paramwhoMap . put ( " key " , " WhatId_Type__c " ) ;
paramwhoMap . put ( " value " , jsonObject . get ( " What_Type " ) ) ;
maps . add ( paramwhoMap ) ;
}
2025-03-28 17:38:34 +08:00
//附件关联表 插入更新时给关联对象赋值
// if ("ContentDocumentLink".equals(api)) {
// String linkedEntity_Type = records[i].getChild("LinkedEntity").getChild("Type").getValue().toString();
// Map<String, Object> paramMap = Maps.newHashMap();
// paramMap.put("key", "linkedEntity_Type");
// paramMap.put("value", linkedEntity_Type);
// maps.add(paramMap);
// }
if ( existsIds . contains ( id ) ) {
customMapper . updateById ( api , maps , id ) ;
} else {
if ( insert ) {
customMapper . save ( api , maps ) ;
}
}
TimeUnit . MILLISECONDS . sleep ( 1 ) ;
} catch ( InterruptedException e ) {
throw e ;
} catch ( Throwable throwable ) {
if ( throwable . toString ( ) . contains ( " interrupt " ) ) {
log . error ( " may interrupt error: " , throwable ) ;
throw new InterruptedException ( ) ;
}
throwable . addSuppressed ( new Exception ( " saveOrUpdate error data: " + JSON . toJSONString ( jsonObject ) ) ) ;
throw throwable ;
}
}
return existsIds . size ( ) ;
}
private final ReentrantLock reentrantLock = new ReentrantLock ( ) ;
/ * *
* 检测表是否存在 不存在则创建
*
* @param apiName 表名
* @param createBatch sf 连接
* /
@Override
public void checkApi ( String apiName , Boolean createBatch ) throws Exception {
// 加个锁 避免重复执行创建api
reentrantLock . lock ( ) ;
log . info ( " check api:{} " , apiName ) ;
2025-07-04 11:19:19 +08:00
QueryWrapper < MetaclassConfig > queryWrapper = new QueryWrapper < > ( ) ;
queryWrapper . eq ( " name " , apiName ) ;
long count = metaclassConfigService . count ( queryWrapper ) ;
2025-03-28 17:38:34 +08:00
try {
boolean hasCreatedDate = false ;
2025-07-04 11:01:03 +08:00
2025-03-28 17:38:34 +08:00
PartnerConnection connection = salesforceConnect . createConnect ( ) ;
DataObject dataObject = dataObjectService . getById ( apiName ) ;
Date now = new Date ( ) ;
if ( StringUtils . isBlank ( customMapper . checkTable ( apiName ) ) ) {
log . info ( " api:{} doesn't exist create " , apiName ) ;
// 构建字段
List < Map < String , Object > > list = Lists . newArrayList ( ) ;
DescribeSObjectResult dsr = connection . describeSObject ( apiName ) ;
String label = dsr . getLabel ( ) ;
List < DataField > fieldList = Lists . newArrayList ( ) ;
List < String > fields = Lists . newArrayList ( ) ;
String blobField = null ;
List < DataPicklist > dataPicklists = Lists . newArrayList ( ) ;
for ( Field field : dsr . getFields ( ) ) {
// 过滤字段
if ( Const . TABLE_FILTERS . contains ( field . getName ( ) ) ) {
continue ;
}
if ( Const . CREATED_DATE . equalsIgnoreCase ( field . getName ( ) ) ) {
hasCreatedDate = true ;
}
Map < String , Object > map = Maps . newHashMap ( ) ;
String sfType = field . getType ( ) . toString ( ) ;
if ( " base64 " . equalsIgnoreCase ( sfType ) ) {
blobField = field . getName ( ) ;
}
map . put ( " type " , DataUtil . fieldTypeToMysql ( field ) ) ;
// 英文' 转换为 \'
map . put ( " comment " , field . getLabel ( ) . replaceAll ( " ' " , " \\ \\ ' " ) ) ;
map . put ( " name " , field . getName ( ) ) ;
list . add ( map ) ;
DataField dataField = new DataField ( ) ;
dataField . setApi ( apiName ) ;
dataField . setSfType ( sfType ) ;
dataField . setField ( field . getName ( ) ) ;
dataField . setName ( field . getLabel ( ) ) ;
dataField . setIsCreateable ( field . getCreateable ( ) ) ;
dataField . setIsNillable ( field . getNillable ( ) ) ;
dataField . setIsDefaultedOnCreate ( field . getDefaultedOnCreate ( ) ) ;
String join = null ;
// 会有非常多映射关系的字段在 这里置空不管
if ( field . getReferenceTo ( ) . length < = 3 ) {
join = StringUtils . join ( field . getReferenceTo ( ) , " , " ) ;
} else {
log . warn ( " referenceTo too long set null, api:{}, field:{}, reference to:{} " , apiName , field . getName ( ) , StringUtils . join ( field . getReferenceTo ( ) , " , " ) ) ;
}
// picklist保存到picklist表
if ( " picklist " . equalsIgnoreCase ( sfType ) ) {
join = " data_picklist " ;
PicklistEntry [ ] picklistValues = field . getPicklistValues ( ) ;
if ( ArrayUtils . isNotEmpty ( picklistValues ) ) {
for ( PicklistEntry picklistValue : picklistValues ) {
DataPicklist dataPicklist = new DataPicklist ( ) ;
dataPicklist . setApi ( apiName ) ;
dataPicklist . setField ( field . getName ( ) ) ;
dataPicklist . setLabel ( picklistValue . getLabel ( ) ) ;
dataPicklist . setValue ( picklistValue . getValue ( ) ) ;
dataPicklists . add ( dataPicklist ) ;
}
}
}
dataField . setReferenceTo ( join ) ;
fieldList . add ( dataField ) ;
fields . add ( field . getName ( ) ) ;
}
// 存在ownerId字段
String extraField = " " ;
if ( dataObject ! = null & & StringUtils . isNotBlank ( dataObject . getExtraField ( ) ) ) {
extraField = dataObject . getExtraField ( ) ;
}
if ( fields . stream ( ) . anyMatch ( Const . OWNER_ID : : equalsIgnoreCase ) ) {
if ( StringUtils . isNotBlank ( extraField ) ) {
extraField + = " ,Owner.Type " ;
} else {
extraField = " Owner.Type " ;
}
}
// 是否存在额外字段 配置上表
if ( StringUtils . isNotBlank ( extraField ) ) {
Set < String > extras = Arrays . stream ( StringUtils . split ( extraField . replaceAll ( StringUtils . SPACE , StringUtils . EMPTY ) , " , " ) ) . collect ( Collectors . toSet ( ) ) ;
for ( String extra : extras ) {
String fieldName = extra . replaceAll ( " \\ . " , " _ " ) ;
DataField dataField = new DataField ( ) ;
dataField . setApi ( apiName ) ;
dataField . setField ( fieldName ) ;
dataField . setName ( extra ) ;
fieldList . add ( dataField ) ;
fields . add ( fieldName ) ;
Map < String , Object > map = Maps . newHashMap ( ) ;
map . put ( " type " , " varchar(255) " ) ;
map . put ( " comment " , extra ) ;
map . put ( " name " , fieldName ) ;
list . add ( map ) ;
}
}
// 存在附件的表 作特殊处理 构建两个字段
if ( StringUtils . isNotBlank ( blobField ) ) {
fileExtraFieldBuild ( apiName , list , fieldList , fields ) ;
}
// 构建索引
List < Map < String , Object > > index = Lists . newArrayList ( ) ;
for ( String tableIndex : Const . TABLE_INDEX ) {
if ( ! fields . contains ( tableIndex ) ) {
continue ;
}
Map < String , Object > createDateMap = Maps . newHashMap ( ) ;
createDateMap . put ( " field " , tableIndex ) ;
createDateMap . put ( " name " , String . format ( " IDX_%s_%s " , apiName , tableIndex ) ) ;
index . add ( createDateMap ) ;
}
log . info ( " api {} not exist, create.. " , apiName ) ;
//新增一个用来存储新sfid的字段
Map < String , Object > map = Maps . newHashMap ( ) ;
map . put ( " type " , " varchar(255) " ) ;
map . put ( " comment " , " 新sfid " ) ;
map . put ( " name " , " new_id " ) ;
list . add ( map ) ;
2025-07-01 15:23:10 +08:00
if ( " Task " . equals ( apiName ) | | " Event " . equals ( apiName ) ) {
Map < String , Object > LinkedMap = Maps . newHashMap ( ) ;
LinkedMap . put ( " type " , " varchar(18) " ) ;
LinkedMap . put ( " comment " , " whatId关联对象 " ) ;
LinkedMap . put ( " name " , " WhatId_Type__c " ) ;
list . add ( LinkedMap ) ;
Map < String , Object > LinkedMap1 = Maps . newHashMap ( ) ;
LinkedMap1 . put ( " type " , " varchar(18) " ) ;
LinkedMap1 . put ( " comment " , " whoId关联对象 " ) ;
LinkedMap1 . put ( " name " , " WhoId_Type__c " ) ;
list . add ( LinkedMap1 ) ;
}
2025-06-10 11:31:20 +08:00
2025-07-04 11:01:03 +08:00
if ( " TaskRelation " . equals ( apiName ) | | " EventRelation " . equals ( apiName ) ) {
Map < String , Object > LinkedMap = Maps . newHashMap ( ) ;
LinkedMap . put ( " type " , " varchar(18) " ) ;
LinkedMap . put ( " comment " , " relationId关联对象 " ) ;
LinkedMap . put ( " name " , " Relation_Type__c " ) ;
list . add ( LinkedMap ) ;
}
2025-03-28 17:38:34 +08:00
if ( " ContentDocumentLink " . equals ( apiName ) ) {
//文档关联表新增关联对象字段
Map < String , Object > LinkedMap = Maps . newHashMap ( ) ;
LinkedMap . put ( " type " , " varchar(255) " ) ;
LinkedMap . put ( " comment " , " 关联对象 " ) ;
LinkedMap . put ( " name " , " LinkedEntity_Type " ) ;
list . add ( LinkedMap ) ;
}
if ( " Attachment " . equals ( apiName ) | | " FeedComment " . equals ( apiName )
| | " FeedItem " . equals ( apiName ) ) {
//文档关联表新增关联对象字段
Map < String , Object > LinkedMap = Maps . newHashMap ( ) ;
LinkedMap . put ( " type " , " varchar(255) " ) ;
LinkedMap . put ( " comment " , " 关联对象 " ) ;
LinkedMap . put ( " name " , " Parent_Type " ) ;
list . add ( LinkedMap ) ;
DataField dataField = new DataField ( ) ;
dataField . setApi ( apiName ) ;
dataField . setField ( " Parent.Type " ) ;
dataField . setName ( " 关联对象 " ) ;
fieldList . add ( dataField ) ;
}
customMapper . createTable ( apiName , label , list , index ) ;
// 生成字段映射
QueryWrapper < DataField > delfieldQw = new QueryWrapper < > ( ) ;
delfieldQw . eq ( " api " , apiName ) ;
dataFieldService . remove ( delfieldQw ) ;
dataFieldService . saveBatch ( fieldList ) ;
// 生成picklist
QueryWrapper < DataPicklist > deldQw = new QueryWrapper < > ( ) ;
deldQw . eq ( " api " , apiName ) ;
dataPicklistService . remove ( deldQw ) ;
dataPicklistService . saveBatch ( dataPicklists ) ;
// 生成batch
Date startDate = DataUtil . DEFAULT_BEGIN_DATE ;
// 取当天零点 作为最大时间
Date endCreateDate = DateUtils . parseDate ( DateFormatUtils . format ( now , " yyyy-MM-dd " ) , " yyyy-MM-dd " ) ;
Date lastDay = null ;
QueryWrapper < DataBatch > delQw = new QueryWrapper < > ( ) ;
delQw . eq ( " name " , apiName ) ;
dataBatchService . remove ( delQw ) ;
if ( createBatch & & Const . BATCH_FILTERS . stream ( ) . noneMatch ( t - > apiName . indexOf ( t ) > 0 ) ) {
// 匹配的不生成
DataBatch one = new DataBatch ( ) ;
one . setFirstDbNum ( 0 ) ;
one . setFirstSfNum ( 0 ) ;
one . setDbNum ( 0 ) ;
one . setSfNum ( 0 ) ;
one . setName ( apiName ) ;
one . setLabel ( label ) ;
if ( hasCreatedDate ) {
List < Map < String , Object > > bachList = customMapper . list ( " code,value " , " system_config " , " code =' " + BATCH_TYPE + " ' " ) ;
String batchType = bachList . get ( 0 ) . get ( " value " ) . toString ( ) ;
do {
lastDay = getLastDay ( batchType , endCreateDate , startDate ) ;
DataBatch dataBatch = one . clone ( ) ;
dataBatch . setSyncStartDate ( startDate ) ;
dataBatch . setSyncEndDate ( lastDay ) ;
dataBatchService . save ( dataBatch ) ;
startDate = lastDay ;
} while ( lastDay . compareTo ( endCreateDate ) < 0 ) ;
} else {
// 没创建时间 开始结束时间为空
dataBatchService . save ( one ) ;
}
}
DataObject update = new DataObject ( ) ;
update . setLabel ( label ) ;
update . setName ( apiName ) ;
update . setLastUpdateDate ( endCreateDate ) ;
update . setBlobField ( blobField ) ;
2025-07-04 11:19:19 +08:00
if ( count > 0 ) {
update . setIsEditable ( false ) ;
}
2025-03-28 17:38:34 +08:00
dataObjectService . saveOrUpdate ( update ) ;
}
} finally {
reentrantLock . unlock ( ) ;
}
}
@Override
public ReturnT < String > createApi ( SalesforceParam param ) throws Exception {
List < String > apis ;
if ( StringUtils . isBlank ( param . getApi ( ) ) ) {
apis = dataObjectService . list ( ) . stream ( ) . map ( DataObject : : getName ) . collect ( Collectors . toList ( ) ) ;
} else {
apis = DataUtil . toIdList ( param . getApi ( ) ) ;
}
log . info ( " 打印所有待同步表: " + apis . toString ( ) ) ;
for ( String api : apis ) {
2025-06-19 11:35:20 +08:00
try {
checkApi ( api , true ) ;
} catch ( Exception e ) {
String message = e . getMessage ( ) ;
String format = String . format ( " 创建表结构 error, api name: %s, \ nparam: %s, \ ncause: \ n%s " , api , com . alibaba . fastjson2 . JSON . toJSONString ( param , DataDumpParam . getFilter ( ) ) , message ) ;
EmailUtil . send ( " DataDump ERROR " , format ) ;
}
2025-03-28 17:38:34 +08:00
}
return ReturnT . SUCCESS ;
}
@Override
public ReturnT < String > getAllApi ( ) throws Exception {
PartnerConnection partnerConnection = salesforceConnect . createConnect ( ) ;
DescribeGlobalResult result = partnerConnection . describeGlobal ( ) ;
DescribeGlobalSObjectResult [ ] Sobjects = result . getSobjects ( ) ;
List < DataObject > dataObjects = new ArrayList < > ( ) ;
for ( DescribeGlobalSObjectResult Sobject : Sobjects ) {
DataObject dataObject = new DataObject ( ) ;
String SobjectName = Sobject . getName ( ) ;
String SobjectLabel = Sobject . getLabel ( ) ;
dataObject . setName ( SobjectName ) ;
dataObject . setLabel ( SobjectLabel ) ;
dataObjects . add ( dataObject ) ;
}
dataObjectService . saveBatch ( dataObjects ) ;
return ReturnT . SUCCESS ;
}
/ * *
* 是否存在创建时间字段
*
* @param api api名称
* @return true存在创建时间字段 false不存在
* /
private Boolean hasCreatedDate ( String api ) {
QueryWrapper < DataField > qw = new QueryWrapper < > ( ) ;
qw . eq ( " api " , api ) . eq ( " field " , Const . CREATED_DATE ) . last ( " limit 1 " ) ;
DataField one = dataFieldService . getOne ( qw ) ;
return one ! = null ;
}
/ * *
* 存在附件的表 特殊构建
*
* @param apiName 表名称
* @param list 构建表字段的list
* @param fieldList 构建字段表的list
* @param fields 字段名称list
* /
private static void fileExtraFieldBuild ( String apiName , List < Map < String , Object > > list , List < DataField > fieldList , List < String > fields ) {
2025-07-01 14:53:23 +08:00
if ( " Document " . equals ( apiName ) ) {
DataField dataField = new DataField ( ) ;
dataField . setApi ( apiName ) ;
dataField . setField ( " url " ) ;
dataField . setName ( " 文件路径 " ) ;
fieldList . add ( dataField ) ;
fields . add ( " url " ) ;
Map < String , Object > map = Maps . newHashMap ( ) ;
map . put ( " type " , " text " ) ;
map . put ( " comment " , " 文件路径 " ) ;
map . put ( " name " , " localUrl " ) ;
list . add ( map ) ;
} else {
2025-03-28 17:38:34 +08:00
DataField dataField = new DataField ( ) ;
dataField . setApi ( apiName ) ;
dataField . setField ( " url " ) ;
dataField . setName ( " 文件路径 " ) ;
fieldList . add ( dataField ) ;
fields . add ( " url " ) ;
Map < String , Object > map = Maps . newHashMap ( ) ;
map . put ( " type " , " text " ) ;
map . put ( " comment " , " 文件路径 " ) ;
map . put ( " name " , " url " ) ;
list . add ( map ) ;
}
{
DataField dataField = new DataField ( ) ;
dataField . setApi ( apiName ) ;
dataField . setField ( " is_dump " ) ;
dataField . setName ( " 是否存储 " ) ;
fieldList . add ( dataField ) ;
fields . add ( " is_dump " ) ;
Map < String , Object > map = Maps . newHashMap ( ) ;
map . put ( " type " , " tinyint(1) DEFAULT 0 " ) ;
map . put ( " comment " , " 是否存储 " ) ;
map . put ( " name " , " is_dump " ) ;
list . add ( map ) ;
}
{
DataField dataField = new DataField ( ) ;
dataField . setApi ( apiName ) ;
dataField . setField ( " is_upload " ) ;
dataField . setName ( " 是否上传 " ) ;
fieldList . add ( dataField ) ;
fields . add ( " is_upload " ) ;
Map < String , Object > map = Maps . newHashMap ( ) ;
map . put ( " type " , " tinyint(1) DEFAULT 0 " ) ;
map . put ( " comment " , " 是否上传 " ) ;
map . put ( " name " , " is_upload " ) ;
list . add ( map ) ;
}
}
@Override
public ReturnT < String > getDocumentLink ( String paramStr ) throws Exception {
String api = " ContentDocumentLink " ;
2025-06-04 10:59:23 +08:00
PartnerConnection partnerConnection = salesforceConnect . createConnect ( ) ;
2025-03-28 17:38:34 +08:00
PartnerConnection connection = salesforceTargetConnect . createConnect ( ) ;
List < Map < String , Object > > list = customMapper . list ( " Id " , " ContentDocument " , " new_id is not null " ) ;
2025-06-04 10:59:23 +08:00
DescribeSObjectResult dsr = partnerConnection . describeSObject ( api ) ;
List < String > fields = customMapper . getFields ( api ) . stream ( ) . map ( String : : toUpperCase ) . collect ( Collectors . toList ( ) ) ;
Field [ ] dsrFields = dsr . getFields ( ) ;
2025-03-28 17:38:34 +08:00
try {
if ( list ! = null & & list . size ( ) > 0 ) {
2025-06-04 10:59:23 +08:00
for ( Map < String , Object > map : list ) {
String contentDocumentId = ( String ) map . get ( " Id " ) ;
String sql = " SELECT Id, LinkedEntityId, LinkedEntity.Type, ContentDocumentId, Visibility, ShareType, SystemModstamp, IsDeleted FROM ContentDocumentLink where ContentDocumentId = ' " + contentDocumentId + " ' " ;
JSONArray objects = null ;
try {
QueryResult queryResult = partnerConnection . queryAll ( sql ) ;
SObject [ ] records = queryResult . getRecords ( ) ;
objects = DataUtil . toJsonArray ( records , dsrFields ) ;
saveOrUpdate ( api , fields , records , objects , true ) ;
} catch ( Throwable e ) {
log . error ( " getDocumentLink error api:{}, data:{} " , api , JSON . toJSONString ( objects ) , e ) ;
TimeUnit . MINUTES . sleep ( 1 ) ;
return ReturnT . FAIL ;
}
}
2025-03-28 17:38:34 +08:00
//表内数据总量
Integer count = customMapper . countBySQL ( api , " where ShareType = 'V' and new_id = '0' " ) ;
//批量插入200一次
int page = count % 200 = = 0 ? count / 200 : ( count / 200 ) + 1 ;
for ( int i = 0 ; i < page ; i + + ) {
2025-06-17 11:18:14 +08:00
List < Map < String , Object > > linkList = customMapper . list ( " Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility " , api , " ShareType = 'V' and new_id = '0' order by Id asc limit 200 " ) ;
SObject [ ] accounts = new SObject [ linkList . size ( ) ] ;
String [ ] ids = new String [ linkList . size ( ) ] ;
int index = 0 ;
for ( Map < String , Object > map : linkList ) {
String linkedEntityId = ( String ) map . get ( " LinkedEntityId " ) ;
String id = ( String ) map . get ( " Id " ) ;
String contentDocumentId = ( String ) map . get ( " ContentDocumentId " ) ;
String linkedEntityType = ( String ) map . get ( " LinkedEntity_Type " ) ;
String shareType = ( String ) map . get ( " ShareType " ) ;
String Visibility = ( String ) map . get ( " Visibility " ) ;
2025-03-28 17:38:34 +08:00
2025-06-17 11:18:14 +08:00
// dataObject查询
QueryWrapper < DataObject > qw = new QueryWrapper < > ( ) ;
qw . eq ( " name " , linkedEntityType ) ;
List < DataObject > objects = dataObjectService . list ( qw ) ;
if ( ! objects . isEmpty ( ) ) {
Map < String , Object > dMap = customMapper . getById ( " new_id " , " ContentDocument " , contentDocumentId ) ;
Map < String , Object > lMap = customMapper . getById ( " new_id " , linkedEntityType , linkedEntityId ) ;
2025-03-28 17:38:34 +08:00
2025-06-17 11:18:14 +08:00
SObject account = new SObject ( ) ;
account . setType ( api ) ;
account . setField ( " ContentDocumentId " , dMap . get ( " new_id " ) . toString ( ) ) ;
account . setField ( " LinkedEntityId " , lMap . get ( " new_id " ) . toString ( ) ) ;
account . setField ( " ShareType " , shareType ) ;
account . setField ( " Visibility " , Visibility ) ;
ids [ index ] = id ;
accounts [ index ] = account ;
index + + ;
2025-03-28 17:38:34 +08:00
}
2025-06-17 11:18:14 +08:00
}
try {
SaveResult [ ] saveResults = connection . create ( accounts ) ;
for ( int j = 0 ; j < saveResults . length ; j + + ) {
if ( ! saveResults [ j ] . getSuccess ( ) ) {
String format = String . format ( " 数据导入 error, api name: %s, \ nparam: %s, \ ncause: \ n%s " , api , JSON . toJSONString ( DataDumpParam . getFilter ( ) ) , com . alibaba . fastjson . JSON . toJSONString ( saveResults [ j ] ) ) ;
2025-07-01 14:53:23 +08:00
log . error ( format ) ;
2025-06-17 11:18:14 +08:00
} else {
List < Map < String , Object > > dList = new ArrayList < > ( ) ;
Map < String , Object > linkMap = new HashMap < > ( ) ;
linkMap . put ( " key " , " new_id " ) ;
linkMap . put ( " value " , saveResults [ j ] . getId ( ) ) ;
dList . add ( linkMap ) ;
customMapper . updateById ( " ContentDocumentLink " , dList , ids [ j ] ) ;
2025-03-28 17:38:34 +08:00
}
}
2025-06-17 11:18:14 +08:00
} catch ( Exception e ) {
log . error ( " getDocumentLink error api:{}, data:{} " , api , JSON . toJSONString ( accounts ) , e ) ;
EmailUtil . send ( " -------测试----------- " , JSON . toJSONString ( accounts ) ) ;
throw new RuntimeException ( e ) ;
}
}
2025-03-28 17:38:34 +08:00
}
} catch ( Exception e ) {
log . error ( " getDocumentLink error api:{}, data:{} " , api , JSON . toJSONString ( list ) , e ) ;
return ReturnT . FAIL ;
}
return null ;
}
@Override
public String getDocumentId ( PartnerConnection partnerConnection , String contentVersionId ) throws Exception {
QueryResult queryResult = partnerConnection . queryAll ( " SELECT Id, ContentDocumentId, IsDeleted FROM ContentVersion where Id = " + " ' " + contentVersionId + " ' " ) ;
DescribeSObjectResult dsr = partnerConnection . describeSObject ( " ContentVersion " ) ;
Field [ ] dsrFields = dsr . getFields ( ) ;
SObject [ ] records = queryResult . getRecords ( ) ;
JSONArray objects = DataUtil . toJsonArray ( records , dsrFields ) ;
if ( objects ! = null & & objects . size ( ) > 0 ) {
JSONObject jsonObject = objects . getJSONObject ( 0 ) ;
return jsonObject . getString ( " ContentDocumentId " ) ;
}
return null ;
}
@Override
public ReturnT < String > getChatter ( SalesforceParam param ) throws Exception {
List < Map < String , Object > > feedList = customMapper . list ( " Parent_Type " , " FeedItem " , " 1=1 GROUP BY Parent_Type " ) ;
for ( Map < String , Object > map : feedList ) {
String parentType = map . get ( " Parent_Type " ) . toString ( ) ;
List < String > types = new ArrayList < > ( ) ;
//检测表是否存在
if ( StringUtils . isBlank ( customMapper . checkTable ( parentType ) ) ) {
types . add ( parentType ) ;
}
if ( types . size ( ) > 0 ) {
customMapper . delete ( " FeedItem " , types ) ;
customMapper . delete ( " FeedComment " , types ) ;
}
}
return null ;
}
public Date getLastDay ( String batchType , Date endDate , Date startDate ) {
switch ( batchType ) {
case BATCH_TYPE_WEEK :
return DataUtil . getWeekLastDay ( endDate , startDate ) ;
case BATCH_TYPE_MONTH :
return DataUtil . getMonthLastDay ( endDate , startDate ) ;
case BATCH_TYPE_YEAR :
return DataUtil . getYearLastDay ( endDate , startDate ) ;
default :
return endDate ;
}
}
}