【feat】新增获取DocumentLink和推送DocumentLink

This commit is contained in:
Kris 2025-06-17 11:18:14 +08:00
parent 2b8c281b1f
commit 1318cfca60
4 changed files with 289 additions and 105 deletions

View File

@ -1,6 +1,7 @@
package com.celnet.datadump.job;
import com.alibaba.fastjson.JSON;
import com.celnet.datadump.config.SalesforceConnect;
import com.celnet.datadump.param.SalesforceParam;
import com.celnet.datadump.service.CommonService;
import com.celnet.datadump.service.DataImportBatchService;
@ -123,5 +124,27 @@ public class DataDumpNewJob {
return dataImportNewService.immigrationUpdateNew(param);
}
/**
* 拉取文件关联表
* @return result
*/
@XxlJob("dumpDocumentLinkJob")
public ReturnT<String> dumpDocumentLinkJob(String paramStr) throws Exception{
log.info("dumpDocumentLinkJob execute start ..................");
return dataImportNewService.dumpDocumentLinkJob(paramStr);
}
/**
* 推送文件关联表
* @return result
*/
@XxlJob("getDocumentLinkJob")
public ReturnT<String> pullDocumentLinkJob(String paramStr) throws Exception{
log.info("pullDocumentLinkJob execute start ..................");
return dataImportNewService.pullDocumentLinkJob(paramStr);
}
}

View File

@ -18,4 +18,20 @@ public interface DataImportNewService {
*/
ReturnT<String> immigrationUpdateNew(SalesforceParam param) throws Exception;
/**
* 获取documentLink对象
* @return
* @throws Exception
*/
ReturnT<String> dumpDocumentLinkJob(String paramStr) throws Exception;
/**
* 推送documentLink对象
* @return
* @throws Exception
*/
ReturnT<String> pullDocumentLinkJob(String paramStr) throws Exception;
}

View File

@ -1146,58 +1146,58 @@ public class CommonServiceImpl implements CommonService {
//批量插入200一次
int page = count % 200 == 0 ? count / 200 : (count / 200) + 1;
for (int i = 0; i < page; i++) {
List<Map<String, Object>> linkList = customMapper.list("Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility", api, "ShareType = 'V' and new_id = '0' order by Id asc limit 200");
SObject[] accounts = new SObject[linkList.size()];
String[] ids = new String[linkList.size()];
int index = 0;
for (Map<String, Object> map : linkList) {
String linkedEntityId = (String) map.get("LinkedEntityId");
String id = (String) map.get("Id");
String contentDocumentId = (String) map.get("ContentDocumentId");
String linkedEntityType = (String) map.get("LinkedEntity_Type");
String shareType = (String) map.get("ShareType");
String Visibility = (String) map.get("Visibility");
List<Map<String, Object>> linkList = customMapper.list("Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility", api, "ShareType = 'V' and new_id = '0' order by Id asc limit 200");
SObject[] accounts = new SObject[linkList.size()];
String[] ids = new String[linkList.size()];
int index = 0;
for (Map<String, Object> map : linkList) {
String linkedEntityId = (String) map.get("LinkedEntityId");
String id = (String) map.get("Id");
String contentDocumentId = (String) map.get("ContentDocumentId");
String linkedEntityType = (String) map.get("LinkedEntity_Type");
String shareType = (String) map.get("ShareType");
String Visibility = (String) map.get("Visibility");
// dataObject查询
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("name", linkedEntityType);
List<DataObject> objects = dataObjectService.list(qw);
if (!objects.isEmpty()) {
Map<String, Object> dMap = customMapper.getById("new_id", "ContentDocument", contentDocumentId);
Map<String, Object> lMap = customMapper.getById("new_id", linkedEntityType, linkedEntityId);
// dataObject查询
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("name", linkedEntityType);
List<DataObject> objects = dataObjectService.list(qw);
if (!objects.isEmpty()) {
Map<String, Object> dMap = customMapper.getById("new_id", "ContentDocument", contentDocumentId);
Map<String, Object> lMap = customMapper.getById("new_id", linkedEntityType, linkedEntityId);
SObject account = new SObject();
account.setType(api);
account.setField("ContentDocumentId", dMap.get("new_id").toString());
account.setField("LinkedEntityId", lMap.get("new_id").toString());
account.setField("ShareType", shareType);
account.setField("Visibility", Visibility);
ids[index] = id;
accounts[index] = account;
index++;
SObject account = new SObject();
account.setType(api);
account.setField("ContentDocumentId", dMap.get("new_id").toString());
account.setField("LinkedEntityId", lMap.get("new_id").toString());
account.setField("ShareType", shareType);
account.setField("Visibility", Visibility);
ids[index] = id;
accounts[index] = account;
index++;
}
}
try {
SaveResult[] saveResults = connection.create(accounts);
for (int j = 0; j < saveResults.length; j++) {
if (!saveResults[j].getSuccess()) {
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j]));
EmailUtil.send("DataDump ContentDocumentLink ERROR", format);
} else {
List<Map<String, Object>> dList = new ArrayList<>();
Map<String, Object> linkMap = new HashMap<>();
linkMap.put("key", "new_id");
linkMap.put("value", saveResults[j].getId());
dList.add(linkMap);
customMapper.updateById("ContentDocumentLink", dList, ids[j]);
}
}
try {
SaveResult[] saveResults = connection.create(accounts);
for (int j = 0; j < saveResults.length; j++) {
if (!saveResults[j].getSuccess()) {
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j]));
EmailUtil.send("DataDump ContentDocumentLink ERROR", format);
} else {
List<Map<String, Object>> dList = new ArrayList<>();
Map<String, Object> linkMap = new HashMap<>();
linkMap.put("key", "new_id");
linkMap.put("value", saveResults[j].getId());
dList.add(linkMap);
customMapper.updateById("ContentDocumentLink", dList, ids[j]);
}
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(accounts), e);
EmailUtil.send("-------测试-----------", JSON.toJSONString(accounts));
throw new RuntimeException(e);
}
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(accounts), e);
EmailUtil.send("-------测试-----------", JSON.toJSONString(accounts));
throw new RuntimeException(e);
}
}
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(list), e);

View File

@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.celnet.datadump.config.SalesforceConnect;
import com.celnet.datadump.config.SalesforceExecutor;
import com.celnet.datadump.config.SalesforceTargetConnect;
import com.celnet.datadump.entity.DataBatch;
@ -47,9 +48,15 @@ public class DataImportNewServiceImpl implements DataImportNewService {
@Autowired
private SalesforceTargetConnect salesforceTargetConnect;
@Autowired
private DataBatchHistoryService dataBatchHistoryService;
@Autowired
private SalesforceExecutor salesforceExecutor;
@Autowired
private SalesforceConnect salesforceConnect;
@Autowired
private DataObjectService dataObjectService;
@ -62,15 +69,12 @@ public class DataImportNewServiceImpl implements DataImportNewService {
@Autowired
private CustomMapper customMapper;
@Autowired
private DataBatchHistoryService dataBatchHistoryService;
@Autowired
private CommonService commonService;
/**
* Get入口
* Get返写个人客户联系人入口
*/
@Override
public ReturnT<String> getPersonContact(SalesforceParam param) throws Exception {
@ -331,6 +335,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
throw throwable;
}
}
/**
* 组装单表存量Update参数
*/
@ -414,16 +419,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
* 组装单表增量Update参数
*/
public ReturnT<String> updateIncrementalSfDataNew(SalesforceParam param, List<Future<?>> futures) throws Exception {
List<String> apis;
String beginDateStr = null;
String endDateStr = null;
apis = DataUtil.toIdList(param.getApi());
if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null){
Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate();
beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
}
List<String> apis = DataUtil.toIdList(param.getApi());
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils.isEmpty(param.getIds());
@ -441,17 +437,18 @@ public class DataImportNewServiceImpl implements DataImportNewService {
for (String api : apis) {
DataObject update = new DataObject();
try {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("name", api);
DataObject dataObject = dataObjectService.getOne(qw);
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
if (StringUtils.isNotEmpty(beginDateStr) && StringUtils.isNotEmpty(endDateStr)) {
dbQw.eq("sync_start_date", beginDateStr); // 等于开始时间
dbQw.eq("sync_end_date", endDateStr); // 等于结束时间
}
List<DataBatch> list = dataBatchService.list(dbQw);
dbQw.ge("sync_start_date",dataObject.getLastUpdateDate());
List<DataBatch> dataBatches = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
if (CollectionUtils.isNotEmpty(dataBatches)) {
salesforceParams = dataBatches.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
@ -493,15 +490,14 @@ public class DataImportNewServiceImpl implements DataImportNewService {
* 组装多表存量Update参数
*/
private void autoUpdateSfDataNew(SalesforceParam param, List<Future<?>> futures) throws Exception {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("need_update", 1)
.eq("data_lock", 0)
.orderByAsc("data_index")
.last(" limit 10");
PartnerConnection partnerConnection = salesforceTargetConnect.createConnect();
while (true) {
List<DataObject> dataObjects = dataObjectService.list(qw);
//判断dataObjects是否为空
if (CollectionUtils.isEmpty(dataObjects)) {
@ -509,27 +505,48 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
for (DataObject dataObject : dataObjects) {
TimeUnit.MILLISECONDS.sleep(1);
DataObject update = new DataObject();
update.setName(dataObject.getName());
update.setDataLock(1);
dataObjectService.updateById(update);
TimeUnit.MILLISECONDS.sleep(1);
try {
String api = dataObject.getName();
boolean needUpdate = dataObject.getNeedUpdate();
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
if (needUpdate) {
salesforceParam.setType(2);
salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate());
}
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(dataObject.getName());
salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate());
Future<?> future = salesforceExecutor.execute(() -> {
try {
UpdateSfDataNew(salesforceParam, partnerConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 0);
futures.add(future);
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
// autoUpdateSfData(salesforceParam, partnerConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
@ -553,14 +570,13 @@ public class DataImportNewServiceImpl implements DataImportNewService {
*/
private void autoUpdateIncrementalSfDataNew(SalesforceParam param, List<Future<?>> futures) throws Exception {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("need_update", 1)
qw.eq("data_work", 1)
.eq("data_lock", 0)
.orderByAsc("data_index")
.last(" limit 10");
PartnerConnection partnerConnection = salesforceTargetConnect.createConnect();
while (true) {
List<DataObject> dataObjects = dataObjectService.list(qw);
//判断dataObjects是否为空
if (CollectionUtils.isEmpty(dataObjects)) {
@ -568,37 +584,56 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
for (DataObject dataObject : dataObjects) {
TimeUnit.MILLISECONDS.sleep(1);
DataObject update = new DataObject();
update.setName(dataObject.getName());
update.setDataLock(1);
dataObjectService.updateById(update);
TimeUnit.MILLISECONDS.sleep(1);
try {
String api = dataObject.getName();
boolean needUpdate = dataObject.getNeedUpdate();
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
if (needUpdate) {
salesforceParam.setType(2);
salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate());
}
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(dataObject.getName());
salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate());
Future<?> future = salesforceExecutor.execute(() -> {
try {
UpdateSfDataNew(salesforceParam, partnerConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 0);
futures.add(future);
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
UpdateSfDataNew(salesforceParam, partnerConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
} catch (Exception e) {
throw e;
} finally {
if (dataObject != null) {
update.setDataLock(0);
dataObjectService.updateById(update);
}
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
// 等待当前所有线程执行完成
@ -786,4 +821,114 @@ public class DataImportNewServiceImpl implements DataImportNewService {
return map;
}
/**
* 获取DocumentLink
*/
@Override
public ReturnT<String> dumpDocumentLinkJob(String paramStr) throws Exception {
String api = "ContentDocumentLink";
PartnerConnection partnerConnection = salesforceConnect.createConnect();
List<Map<String, Object>> list = customMapper.list("Id", "ContentDocument", "new_id is not null");
DescribeSObjectResult dsr = partnerConnection.describeSObject(api);
List<String> fields = customMapper.getFields(api).stream().map(String::toUpperCase).collect(Collectors.toList());
Field[] dsrFields = dsr.getFields();
try {
if (list != null && !list.isEmpty()) {
for (Map<String, Object> map : list) {
String contentDocumentId = (String) map.get("Id");
String sql = "SELECT Id, LinkedEntityId, LinkedEntity.Type, ContentDocumentId, Visibility, ShareType, SystemModstamp, IsDeleted FROM ContentDocumentLink where ContentDocumentId = '" + contentDocumentId + "'";
com.alibaba.fastjson2.JSONArray objects = null;
QueryResult queryResult = partnerConnection.queryAll(sql);
SObject[] records = queryResult.getRecords();
objects = DataUtil.toJsonArray(records, dsrFields);
commonService.saveOrUpdate(api, fields, records, objects, true);
}
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(list), e);
return ReturnT.FAIL;
} catch (Throwable e) {
log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(list), e);
TimeUnit.MINUTES.sleep(1);
return ReturnT.FAIL;
}
return null;
}
/**
* 推送DocumentLink
*/
@Override
public ReturnT<String> pullDocumentLinkJob(String paramStr) throws Exception {
String api = "ContentDocumentLink";
PartnerConnection connection = salesforceTargetConnect.createConnect();
List<Map<String, Object>> list = customMapper.list("Id", "ContentDocument", "new_id is not null");
try {
if (list != null && !list.isEmpty()) {
//表内数据总量
Integer count = customMapper.countBySQL(api, "where ShareType = 'V' and new_id = '0'");
//批量插入200一次
int page = count % 200 == 0 ? count / 200 : (count / 200) + 1;
for (int i = 0; i < page; i++) {
List<Map<String, Object>> linkList = customMapper.list("Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility", api, "ShareType = 'V' and new_id = '0' order by Id asc limit 200");
SObject[] accounts = new SObject[linkList.size()];
String[] ids = new String[linkList.size()];
int index = 0;
for (Map<String, Object> map : linkList) {
String linkedEntityId = (String) map.get("LinkedEntityId");
String id = (String) map.get("Id");
String contentDocumentId = (String) map.get("ContentDocumentId");
String linkedEntityType = (String) map.get("LinkedEntity_Type");
String shareType = (String) map.get("ShareType");
String Visibility = (String) map.get("Visibility");
// dataObject查询
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("name", linkedEntityType);
List<DataObject> objects = dataObjectService.list(qw);
if (!objects.isEmpty()) {
Map<String, Object> dMap = customMapper.getById("new_id", "ContentDocument", contentDocumentId);
Map<String, Object> lMap = customMapper.getById("new_id", linkedEntityType, linkedEntityId);
SObject account = new SObject();
account.setType(api);
account.setField("ContentDocumentId", dMap.get("new_id").toString());
account.setField("LinkedEntityId", lMap.get("new_id").toString());
account.setField("ShareType", shareType);
account.setField("Visibility", Visibility);
ids[index] = id;
accounts[index] = account;
index++;
}
}
try {
SaveResult[] saveResults = connection.create(accounts);
for (int j = 0; j < saveResults.length; j++) {
if (!saveResults[j].getSuccess()) {
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j]));
EmailUtil.send("DataDump ContentDocumentLink ERROR", format);
} else {
List<Map<String, Object>> dList = new ArrayList<>();
Map<String, Object> linkMap = new HashMap<>();
linkMap.put("key", "new_id");
linkMap.put("value", saveResults[j].getId());
dList.add(linkMap);
customMapper.updateById("ContentDocumentLink", dList, ids[j]);
}
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(accounts), e);
EmailUtil.send("-------测试-----------", com.alibaba.fastjson2.JSON.toJSONString(accounts));
throw new RuntimeException(e);
}
}
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(list), e);
return ReturnT.FAIL;
}
return null;
}
}