@ -303,21 +303,24 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
/ * *
* 增量 Update入口
* Update入口
* /
@Override
public ReturnT < String > immigration Increment Update( SalesforceParam param ) throws Exception {
public ReturnT < String > immigration UpdateNew ( SalesforceParam param ) throws Exception {
List < Future < ? > > futures = Lists . newArrayList ( ) ;
try {
if ( StringUtils . isNotBlank ( param . getApi ( ) ) ) {
/ / 手动任务
ReturnT < String > result = updateIncrementSfData ( param , futures ) ;
if ( result ! = null ) {
return result ;
if ( 1 = = param . getType ( ) ) {
return updateSfDataNew ( param , futures ) ;
} else {
return updateIncrementalSfDataNew( param , futures ) ;
}
} else {
/ / 自动更新任务
/ / autoUpdateSfData ( param , futures ) ;
if ( 1 = = param . getType ( ) ) {
autoUpdateSfDataNew ( param , futures ) ;
} else {
autoUpdateIncrementalSfDataNew ( param , futures ) ;
}
}
return ReturnT . SUCCESS ;
} catch ( InterruptedException e ) {
@ -328,11 +331,10 @@ public class DataImportNewServiceImpl implements DataImportNewService {
throw throwable ;
}
}
/ * *
* 组装 增量 Update参数
* 组装 【 单表 】 【 存量 】 Update参数
* /
public ReturnT < String > update Increment SfData( SalesforceParam param , List < Future < ? > > futures ) throws Exception {
public ReturnT < String > update SfDataNew ( SalesforceParam param , List < Future < ? > > futures ) throws Exception {
List < String > apis ;
String beginDateStr = null ;
String endDateStr = null ;
@ -343,9 +345,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
beginDateStr = DateUtil . format ( beginDate , " yyyy-MM-dd HH:mm:ss " ) ;
endDateStr = DateUtil . format ( endDate , " yyyy-MM-dd HH:mm:ss " ) ;
}
String join = StringUtils . join ( apis , " , " ) ;
log . info ( " immigration apis: {} " , join ) ;
XxlJobLogger . log ( " immigration apis: {} " , join ) ;
/ / 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils . isEmpty ( param . getIds ( ) ) ;
if ( isFull ) {
@ -362,7 +362,6 @@ public class DataImportNewServiceImpl implements DataImportNewService {
for ( String api : apis ) {
DataObject update = new DataObject ( ) ;
try {
TimeUnit . MILLISECONDS . sleep ( 1 ) ;
List < SalesforceParam > salesforceParams = null ;
QueryWrapper < DataBatch > dbQw = new QueryWrapper < > ( ) ;
dbQw . eq ( " name " , api ) ;
@ -387,7 +386,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
for ( SalesforceParam salesforceParam : salesforceParams ) {
Future < ? > future = salesforceExecutor . execute ( ( ) - > {
try {
manualIncrement UpdateSfData( salesforceParam , partnerConnection ) ;
UpdateSfDataNew ( salesforceParam , partnerConnection ) ;
} catch ( Throwable throwable ) {
log . error ( " salesforceExecutor error " , throwable ) ;
throw new RuntimeException ( throwable ) ;
@ -412,24 +411,241 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
/ * *
* 执行增量Update数据
* 组装【 单表 】 【 增量 】 Update参数
* /
private void manualIncrementUpdateSfData ( SalesforceParam param , PartnerConnection partnerConnection ) throws Exception {
public ReturnT < String > updateIncrementalSfDataNew ( SalesforceParam param , List < Future < ? > > futures ) throws Exception {
List < String > apis ;
String beginDateStr = null ;
String endDateStr = null ;
apis = DataUtil . toIdList ( param . getApi ( ) ) ;
if ( param . getBeginCreateDate ( ) ! = null & & param . getEndCreateDate ( ) ! = null ) {
Date beginDate = param . getBeginCreateDate ( ) ;
Date endDate = param . getEndCreateDate ( ) ;
beginDateStr = DateUtil . format ( beginDate , " yyyy-MM-dd HH:mm:ss " ) ;
endDateStr = DateUtil . format ( endDate , " yyyy-MM-dd HH:mm:ss " ) ;
}
/ / 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils . isEmpty ( param . getIds ( ) ) ;
if ( isFull ) {
QueryWrapper < DataObject > qw = new QueryWrapper < > ( ) ;
qw . eq ( " data_lock " , 1 ) . in ( " name " , apis ) ;
List < DataObject > list = dataObjectService . list ( qw ) ;
if ( CollectionUtils . isNotEmpty ( list ) ) {
String apiNames = list . stream ( ) . map ( DataObject : : getName ) . collect ( Collectors . joining ( ) ) ;
return new ReturnT < > ( 500 , " api: " + apiNames + " is locked " ) ;
}
}
PartnerConnection partnerConnection = salesforceTargetConnect . createConnect ( ) ;
for ( String api : apis ) {
DataObject update = new DataObject ( ) ;
try {
List < SalesforceParam > salesforceParams = null ;
QueryWrapper < DataBatch > dbQw = new QueryWrapper < > ( ) ;
dbQw . eq ( " name " , api ) ;
if ( StringUtils . isNotEmpty ( beginDateStr ) & & StringUtils . isNotEmpty ( endDateStr ) ) {
dbQw . eq ( " sync_start_date " , beginDateStr ) ; / / 等于开始时间
dbQw . eq ( " sync_end_date " , endDateStr ) ; / / 等于结束时间
}
List < DataBatch > list = dataBatchService . list ( dbQw ) ;
AtomicInteger batch = new AtomicInteger ( 1 ) ;
if ( CollectionUtils . isNotEmpty ( list ) ) {
salesforceParams = list . stream ( ) . map ( t - > {
SalesforceParam salesforceParam = param . clone ( ) ;
salesforceParam . setApi ( t . getName ( ) ) ;
salesforceParam . setBeginCreateDate ( t . getSyncStartDate ( ) ) ;
salesforceParam . setEndCreateDate ( t . getSyncEndDate ( ) ) ;
salesforceParam . setBatch ( batch . getAndIncrement ( ) ) ;
return salesforceParam ;
} ) . collect ( Collectors . toList ( ) ) ;
}
/ / 手动任务优先执行
for ( SalesforceParam salesforceParam : salesforceParams ) {
Future < ? > future = salesforceExecutor . execute ( ( ) - > {
try {
UpdateSfDataNew ( salesforceParam , partnerConnection ) ;
} catch ( Throwable throwable ) {
log . error ( " salesforceExecutor error " , throwable ) ;
throw new RuntimeException ( throwable ) ;
}
} , salesforceParam . getBatch ( ) , 1 ) ;
futures . add ( future ) ;
}
/ / 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
update . setDataWork ( 0 ) ;
} catch ( Exception e ) {
throw e ;
} finally {
if ( isFull ) {
update . setName ( api ) ;
update . setDataLock ( 0 ) ;
dataObjectService . updateById ( update ) ;
}
}
}
return null ;
}
/ * *
* 组装 【 多表 】 【 存量 】 Update参数
* /
private void autoUpdateSfDataNew ( SalesforceParam param , List < Future < ? > > futures ) throws Exception {
QueryWrapper < DataObject > qw = new QueryWrapper < > ( ) ;
qw . eq ( " need_update " , 1 )
. eq ( " data_lock " , 0 )
. orderByAsc ( " data_index " )
. last ( " limit 10 " ) ;
PartnerConnection partnerConnection = salesforceTargetConnect . createConnect ( ) ;
while ( true ) {
List < DataObject > dataObjects = dataObjectService . list ( qw ) ;
/ / 判断dataObjects是否为空
if ( CollectionUtils . isEmpty ( dataObjects ) ) {
return ;
}
for ( DataObject dataObject : dataObjects ) {
DataObject update = new DataObject ( ) ;
update . setName ( dataObject . getName ( ) ) ;
update . setDataLock ( 1 ) ;
dataObjectService . updateById ( update ) ;
try {
SalesforceParam salesforceParam = param . clone ( ) ;
salesforceParam . setApi ( dataObject . getName ( ) ) ;
salesforceParam . setBeginModifyDate ( dataObject . getLastUpdateDate ( ) ) ;
Future < ? > future = salesforceExecutor . execute ( ( ) - > {
try {
UpdateSfDataNew ( salesforceParam , partnerConnection ) ;
} catch ( Throwable throwable ) {
log . error ( " salesforceExecutor error " , throwable ) ;
throw new RuntimeException ( throwable ) ;
}
} , salesforceParam . getBatch ( ) , 0 ) ;
futures . add ( future ) ;
/ / 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
update . setDataWork ( 0 ) ;
} catch ( Exception e ) {
throw e ;
} finally {
if ( dataObject ! = null ) {
update . setDataLock ( 0 ) ;
dataObjectService . updateById ( update ) ;
}
}
}
/ / 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
futures . clear ( ) ;
}
}
/ * *
* 组装 【 多表 】 【 增量 】 Update参数
* /
private void autoUpdateIncrementalSfDataNew ( SalesforceParam param , List < Future < ? > > futures ) throws Exception {
QueryWrapper < DataObject > qw = new QueryWrapper < > ( ) ;
qw . eq ( " need_update " , 1 )
. eq ( " data_lock " , 0 )
. orderByAsc ( " data_index " )
. last ( " limit 10 " ) ;
PartnerConnection partnerConnection = salesforceTargetConnect . createConnect ( ) ;
while ( true ) {
List < DataObject > dataObjects = dataObjectService . list ( qw ) ;
/ / 判断dataObjects是否为空
if ( CollectionUtils . isEmpty ( dataObjects ) ) {
return ;
}
for ( DataObject dataObject : dataObjects ) {
DataObject update = new DataObject ( ) ;
update . setName ( dataObject . getName ( ) ) ;
update . setDataLock ( 1 ) ;
dataObjectService . updateById ( update ) ;
try {
SalesforceParam salesforceParam = param . clone ( ) ;
salesforceParam . setApi ( dataObject . getName ( ) ) ;
salesforceParam . setBeginModifyDate ( dataObject . getLastUpdateDate ( ) ) ;
Future < ? > future = salesforceExecutor . execute ( ( ) - > {
try {
UpdateSfDataNew ( salesforceParam , partnerConnection ) ;
} catch ( Throwable throwable ) {
log . error ( " salesforceExecutor error " , throwable ) ;
throw new RuntimeException ( throwable ) ;
}
} , salesforceParam . getBatch ( ) , 0 ) ;
futures . add ( future ) ;
/ / 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
update . setDataWork ( 0 ) ;
} catch ( Exception e ) {
throw e ;
} finally {
if ( dataObject ! = null ) {
update . setDataLock ( 0 ) ;
dataObjectService . updateById ( update ) ;
}
}
}
/ / 等待当前所有线程执行完成
salesforceExecutor . waitForFutures ( futures . toArray ( new Future < ? > [ ] { } ) ) ;
futures . clear ( ) ;
}
}
/ * *
* 执行Update更新数据
* /
private void UpdateSfDataNew ( SalesforceParam param , PartnerConnection partnerConnection ) throws Exception {
Map < String , Object > infoFlag = customMapper . list ( " code,value " , " system_config " , " code =' " + SystemConfigCode . INFO_FLAG + " ' " ) . get ( 0 ) ;
String api = param . getApi ( ) ;
TimeUnit . MILLISECONDS . sleep ( 1 ) ;
QueryWrapper < DataField > dbQw = new QueryWrapper < > ( ) ;
dbQw . eq ( " api " , api ) ;
List < DataField > list = dataFieldService . list ( dbQw ) ;
TimeUnit . MILLISECONDS . sleep ( 1 ) ;
Date beginDate = param . getBeginCreateDate ( ) ;
Date endDate = param . getEndCreateDate ( ) ;
String sql = " " ;
String sql2 = " " ;
String beginDateStr = DateUtil . format ( beginDate , " yyyy-MM-dd HH:mm:ss " ) ;
String endDateStr = DateUtil . format ( endDate , " yyyy-MM-dd HH:mm:ss " ) ;
if ( 1 = = param . getType ( ) ) {
if ( api . contains ( " Share " ) ) {
sql = " where RowCause = 'Manual' and new_id is not null and CreatedDate >= ' " + beginDateStr + " ' and CreatedDate < ' " + endDateStr + " ' " ;
sql2 = " RowCause = 'Manual' and new_id is not null and CreatedDate >= ' " + beginDateStr + " ' and CreatedDate < ' " + endDateStr + " ' order by Id asc limit " ;
} else {
sql = " where new_id is not null and CreatedDate >= ' " + beginDateStr + " ' and CreatedDate < ' " + endDateStr + " ' " ;
sql2 = " new_id is not null and CreatedDate >= ' " + beginDateStr + " ' and CreatedDate < ' " + endDateStr + " ' order by Id asc limit " ;
}
} else {
if ( api . contains ( " Share " ) ) {
sql = " where RowCause = 'Manual' and new_id is not null and LastModifiedDate >= ' " + beginDateStr + " ' " ;
sql2 = " RowCause = 'Manual' and new_id is not null and LastModifiedDate >= ' " + beginDateStr + " ' order by Id asc limit " ;
} else {
sql = " where new_id is not null and LastModifiedDate >= ' " + beginDateStr + " ' " ;
sql2 = " new_id is not null and LastModifiedDate >= ' " + beginDateStr + " ' order by Id asc limit " ;
}
}
/ / 表内数据总量
Integer count = customMapper . countBySQL ( api , " where new_id is not null and CreatedDate >= ' " + beginDateStr + " ' and CreatedDate < ' " + endDateStr + " ' " ) ;
Integer count = customMapper . countBySQL ( api , sql ) ;
log . error ( " 总Update数据 count:{}; -开始时间:{}; -结束时间:{}; -api:{}; " , count , beginDateStr , endDateStr , api ) ;
if ( count = = 0 ) {
@ -440,7 +656,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
/ / 批量插入200一次
int page = count % 200 = = 0 ? count / 200 : ( count / 200 ) + 1 ;
for ( int i = 0 ; i < page ; i + + ) {
List < Map < String , Object > > mapList = customMapper . list ( " * " , api , " new_id is not null and CreatedDate >= ' " + beginDateStr + " ' and CreatedDate < ' " + endDateStr + " ' order by Id asc limit " + i * 200 + " ,200 " ) ;
List < Map < String , Object > > mapList = customMapper . list ( " * " , api , sql2 + i * 200 + " ,200 " ) ;
SObject [ ] accounts = new SObject [ mapList . size ( ) ] ;
int j = 0 ;
for ( Map < String , Object > map : mapList ) {
@ -478,7 +694,6 @@ public class DataImportNewServiceImpl implements DataImportNewService {
} else if ( " 0 " . equals ( map . get ( " IsPersonAccount " ) ) & & field . equals ( " LastName " ) ) {
continue ;
}
} else {
account . setField ( field , map . get ( field ) ) ;
}
@ -487,19 +702,19 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
account . setField ( " old_owner_id__c " , map . get ( " OwnerId " ) ) ;
account . setField ( " old_sfdc_id__c " , map . get ( " Id " ) ) ;
accounts [ j + + ] = account ;
}
List < Map < String , String > > listMap = new ArrayList < > ( ) ;
try {
if ( infoFlag ! = null & & " 1 " . equals ( infoFlag . get ( " value " ) ) ) {
listMap = retur nAccountsDetails( accounts , list ) ;
printl nAccountsDetails( accounts , list ) ;
}
SaveResult [ ] saveResults = partnerConnection . update ( accounts ) ;
for ( SaveResult saveResult : saveResults ) {
if ( ! saveResult . getSuccess ( ) ) {
Map < String , String > map = returnErrorAccountsDetails ( accounts , list , saveResult . getId ( ) ) ;
log . info ( " -------------saveResults: {} " , JSON . toJSONString ( saveResult ) ) ;
String format = String . format ( " 数据 导入 error, api name: %s, \ nparam: %s, \ ncause: \ n%s, \ n数据实体类: \ n%s " , api , com . alibaba . fastjson2 . JSON . toJSONString ( param , DataDumpParam . getFilter ( ) ) , JSON . toJSONString ( saveResult ) , JSON . toJSONString ( listM ap) ) ;
String format = String . format ( " 数据 更新 error, api name: %s, \ nparam: %s, \ ncause: \ n%s, \ n数据实体类: \ n%s " , api , com . alibaba . fastjson2 . JSON . toJSONString ( param , DataDumpParam . getFilter ( ) ) , JSON . toJSONString ( saveResult ) , JSON . toJSONString ( m ap) ) ;
EmailUtil . send ( " DataDump ERROR " , format ) ;
return ;
} else {
@ -507,7 +722,6 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
}
log . info ( " sf return saveResults------ " + JSONArray . toJSONString ( saveResults ) ) ;
} catch ( Throwable e ) {
throw e ;
}
@ -525,23 +739,19 @@ public class DataImportNewServiceImpl implements DataImportNewService {
. eq ( " sync_end_date " , endDate )
. set ( " sf_update_num " , targetCount ) ;
dataBatchService . update ( updateQw2 ) ;
}
/ * *
* 打印SF交互数据明细
* /
public List < Map < String , String > > returnAccountsDetails ( SObject [ ] accounts , List < DataField > list ) {
ArrayList < Map < String , String > > arrayList = new ArrayList < > ( ) ;
public void printlnAccountsDetails ( SObject [ ] accounts , List < DataField > list ) {
for ( int i = 0 ; i < accounts . length ; i + + ) {
HashMap < String , String > map = new HashMap < > ( ) ;
SObject account = accounts [ i ] ;
System . out . println ( " --- Account [" + i + " ] --- " ) ;
System . out . println ( " --- 对象数据 [" + i + " ] --- " ) ;
/ / 获取对象所有字段名
for ( DataField dataField : list ) {
try {
Object value = account . getField ( dataField . getField ( ) ) ;
map . put ( dataField . getField ( ) , String . valueOf ( value ) ) ;
System . out . println ( dataField . getField ( ) + " : " + ( value ! = null ? value . toString ( ) : " null " ) ) ;
} catch ( Exception e ) {
System . out . println ( dataField . getField ( ) + " : [权限不足或字段不存在] " ) ;
@ -549,12 +759,31 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
System . out . println ( " old_owner_id__c: " + ( account . getField ( " old_owner_id__c " ) ! = null ? account . getField ( " old_owner_id__c " ) . toString ( ) : " null " ) ) ;
System . out . println ( " old_sfdc_id__c: " + ( account . getField ( " old_sfdc_id__c " ) ! = null ? account . getField ( " old_sfdc_id__c " ) . toString ( ) : " null " ) ) ;
}
}
/ * *
* 返回SF交互数据错误明细
* /
public Map < String , String > returnErrorAccountsDetails ( SObject [ ] accounts , List < DataField > list , String errorId ) {
HashMap < String , String > map = new HashMap < > ( ) ;
for ( int i = 0 ; i < accounts . length ; i + + ) {
SObject account = accounts [ i ] ;
if ( errorId . equals ( account . getId ( ) ) | | errorId . equals ( account . getField ( " Id " ) ) ) {
for ( DataField dataField : list ) {
try {
Object value = account . getField ( dataField . getField ( ) ) ;
map . put ( dataField . getField ( ) , String . valueOf ( value ) ) ;
System . out . println ( dataField . getField ( ) + " : " + ( value ! = null ? value . toString ( ) : " null " ) ) ;
} catch ( Exception e ) {
}
}
map . put ( " old_owner_id__c " , String . valueOf ( account . getField ( " old_owner_id__c " ) ) ) ;
map . put ( " old_sfdc_id__c " , String . valueOf ( account . getField ( " old_sfdc_id__c " ) ) ) ;
arrayList . add ( map ) ;
}
return arrayList ;
}
return map ;
}
}