diff --git a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java index 90de31a..29b211f 100644 --- a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java +++ b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java @@ -1,6 +1,7 @@ package com.celnet.datadump.job; import com.alibaba.fastjson.JSON; +import com.celnet.datadump.config.SalesforceConnect; import com.celnet.datadump.param.SalesforceParam; import com.celnet.datadump.service.CommonService; import com.celnet.datadump.service.DataImportBatchService; @@ -123,5 +124,27 @@ public class DataDumpNewJob { return dataImportNewService.immigrationUpdateNew(param); } + /** + * 拉取文件关联表 + * @return result + */ + @XxlJob("dumpDocumentLinkJob") + public ReturnT dumpDocumentLinkJob(String paramStr) throws Exception{ + log.info("dumpDocumentLinkJob execute start .................."); + + return dataImportNewService.dumpDocumentLinkJob(paramStr); + } + + /** + * 推送文件关联表 + * @return result + */ + @XxlJob("getDocumentLinkJob") + public ReturnT pullDocumentLinkJob(String paramStr) throws Exception{ + log.info("pullDocumentLinkJob execute start .................."); + + return dataImportNewService.pullDocumentLinkJob(paramStr); + } + } diff --git a/src/main/java/com/celnet/datadump/service/DataImportNewService.java b/src/main/java/com/celnet/datadump/service/DataImportNewService.java index 0ef4b25..412801c 100644 --- a/src/main/java/com/celnet/datadump/service/DataImportNewService.java +++ b/src/main/java/com/celnet/datadump/service/DataImportNewService.java @@ -18,4 +18,20 @@ public interface DataImportNewService { */ ReturnT immigrationUpdateNew(SalesforceParam param) throws Exception; + + /** + * 获取documentLink对象 + * @return + * @throws Exception + */ + ReturnT dumpDocumentLinkJob(String paramStr) throws Exception; + + + /** + * 推送documentLink对象 + * @return + * @throws Exception + */ + ReturnT pullDocumentLinkJob(String paramStr) throws Exception; + } diff --git a/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java index d26feb4..f1ec2f2 100644 --- a/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java @@ -1146,58 +1146,58 @@ public class CommonServiceImpl implements CommonService { //批量插入200一次 int page = count % 200 == 0 ? count / 200 : (count / 200) + 1; for (int i = 0; i < page; i++) { - List> linkList = customMapper.list("Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility", api, "ShareType = 'V' and new_id = '0' order by Id asc limit 200"); - SObject[] accounts = new SObject[linkList.size()]; - String[] ids = new String[linkList.size()]; - int index = 0; - for (Map map : linkList) { - String linkedEntityId = (String) map.get("LinkedEntityId"); - String id = (String) map.get("Id"); - String contentDocumentId = (String) map.get("ContentDocumentId"); - String linkedEntityType = (String) map.get("LinkedEntity_Type"); - String shareType = (String) map.get("ShareType"); - String Visibility = (String) map.get("Visibility"); + List> linkList = customMapper.list("Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility", api, "ShareType = 'V' and new_id = '0' order by Id asc limit 200"); + SObject[] accounts = new SObject[linkList.size()]; + String[] ids = new String[linkList.size()]; + int index = 0; + for (Map map : linkList) { + String linkedEntityId = (String) map.get("LinkedEntityId"); + String id = (String) map.get("Id"); + String contentDocumentId = (String) map.get("ContentDocumentId"); + String linkedEntityType = (String) map.get("LinkedEntity_Type"); + String shareType = (String) map.get("ShareType"); + String Visibility = (String) map.get("Visibility"); - // dataObject查询 - QueryWrapper qw = new QueryWrapper<>(); - qw.eq("name", linkedEntityType); - List objects = dataObjectService.list(qw); - if (!objects.isEmpty()) { - Map dMap = customMapper.getById("new_id", "ContentDocument", contentDocumentId); - Map lMap = customMapper.getById("new_id", linkedEntityType, linkedEntityId); + // dataObject查询 + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("name", linkedEntityType); + List objects = dataObjectService.list(qw); + if (!objects.isEmpty()) { + Map dMap = customMapper.getById("new_id", "ContentDocument", contentDocumentId); + Map lMap = customMapper.getById("new_id", linkedEntityType, linkedEntityId); - SObject account = new SObject(); - account.setType(api); - account.setField("ContentDocumentId", dMap.get("new_id").toString()); - account.setField("LinkedEntityId", lMap.get("new_id").toString()); - account.setField("ShareType", shareType); - account.setField("Visibility", Visibility); - ids[index] = id; - accounts[index] = account; - index++; + SObject account = new SObject(); + account.setType(api); + account.setField("ContentDocumentId", dMap.get("new_id").toString()); + account.setField("LinkedEntityId", lMap.get("new_id").toString()); + account.setField("ShareType", shareType); + account.setField("Visibility", Visibility); + ids[index] = id; + accounts[index] = account; + index++; + } + } + try { + SaveResult[] saveResults = connection.create(accounts); + for (int j = 0; j < saveResults.length; j++) { + if (!saveResults[j].getSuccess()) { + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j])); + EmailUtil.send("DataDump ContentDocumentLink ERROR", format); + } else { + List> dList = new ArrayList<>(); + Map linkMap = new HashMap<>(); + linkMap.put("key", "new_id"); + linkMap.put("value", saveResults[j].getId()); + dList.add(linkMap); + customMapper.updateById("ContentDocumentLink", dList, ids[j]); } } - try { - SaveResult[] saveResults = connection.create(accounts); - for (int j = 0; j < saveResults.length; j++) { - if (!saveResults[j].getSuccess()) { - String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j])); - EmailUtil.send("DataDump ContentDocumentLink ERROR", format); - } else { - List> dList = new ArrayList<>(); - Map linkMap = new HashMap<>(); - linkMap.put("key", "new_id"); - linkMap.put("value", saveResults[j].getId()); - dList.add(linkMap); - customMapper.updateById("ContentDocumentLink", dList, ids[j]); - } - } - } catch (Exception e) { - log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(accounts), e); - EmailUtil.send("-------测试-----------", JSON.toJSONString(accounts)); - throw new RuntimeException(e); - } - } + } catch (Exception e) { + log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(accounts), e); + EmailUtil.send("-------测试-----------", JSON.toJSONString(accounts)); + throw new RuntimeException(e); + } + } } } catch (Exception e) { log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(list), e); diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java index b87819d..c736471 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; +import com.celnet.datadump.config.SalesforceConnect; import com.celnet.datadump.config.SalesforceExecutor; import com.celnet.datadump.config.SalesforceTargetConnect; import com.celnet.datadump.entity.DataBatch; @@ -47,9 +48,15 @@ public class DataImportNewServiceImpl implements DataImportNewService { @Autowired private SalesforceTargetConnect salesforceTargetConnect; + @Autowired + private DataBatchHistoryService dataBatchHistoryService; + @Autowired private SalesforceExecutor salesforceExecutor; + @Autowired + private SalesforceConnect salesforceConnect; + @Autowired private DataObjectService dataObjectService; @@ -62,15 +69,12 @@ public class DataImportNewServiceImpl implements DataImportNewService { @Autowired private CustomMapper customMapper; - @Autowired - private DataBatchHistoryService dataBatchHistoryService; - @Autowired private CommonService commonService; /** - * Get入口 + * Get返写个人客户联系人入口 */ @Override public ReturnT getPersonContact(SalesforceParam param) throws Exception { @@ -331,6 +335,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { throw throwable; } } + /** * 组装【单表】【存量】Update参数 */ @@ -414,16 +419,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { * 组装【单表】【增量】Update参数 */ public ReturnT updateIncrementalSfDataNew(SalesforceParam param, List> futures) throws Exception { - List apis; - String beginDateStr = null; - String endDateStr = null; - apis = DataUtil.toIdList(param.getApi()); - if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null){ - Date beginDate = param.getBeginCreateDate(); - Date endDate = param.getEndCreateDate(); - beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); - endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); - } + List apis = DataUtil.toIdList(param.getApi()); // 全量的时候 检测是否有自动任务锁住的表 boolean isFull = CollectionUtils.isEmpty(param.getIds()); @@ -441,17 +437,18 @@ public class DataImportNewServiceImpl implements DataImportNewService { for (String api : apis) { DataObject update = new DataObject(); try { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("name", api); + DataObject dataObject = dataObjectService.getOne(qw); + List salesforceParams = null; QueryWrapper dbQw = new QueryWrapper<>(); dbQw.eq("name", api); - if (StringUtils.isNotEmpty(beginDateStr) && StringUtils.isNotEmpty(endDateStr)) { - dbQw.eq("sync_start_date", beginDateStr); // 等于开始时间 - dbQw.eq("sync_end_date", endDateStr); // 等于结束时间 - } - List list = dataBatchService.list(dbQw); + dbQw.ge("sync_start_date",dataObject.getLastUpdateDate()); + List dataBatches = dataBatchService.list(dbQw); AtomicInteger batch = new AtomicInteger(1); - if (CollectionUtils.isNotEmpty(list)) { - salesforceParams = list.stream().map(t -> { + if (CollectionUtils.isNotEmpty(dataBatches)) { + salesforceParams = dataBatches.stream().map(t -> { SalesforceParam salesforceParam = param.clone(); salesforceParam.setApi(t.getName()); salesforceParam.setBeginCreateDate(t.getSyncStartDate()); @@ -493,15 +490,14 @@ public class DataImportNewServiceImpl implements DataImportNewService { * 组装【多表】【存量】Update参数 */ private void autoUpdateSfDataNew(SalesforceParam param, List> futures) throws Exception { + QueryWrapper qw = new QueryWrapper<>(); qw.eq("need_update", 1) - .eq("data_lock", 0) .orderByAsc("data_index") .last(" limit 10"); PartnerConnection partnerConnection = salesforceTargetConnect.createConnect(); while (true) { - List dataObjects = dataObjectService.list(qw); //判断dataObjects是否为空 if (CollectionUtils.isEmpty(dataObjects)) { @@ -509,27 +505,48 @@ public class DataImportNewServiceImpl implements DataImportNewService { } for (DataObject dataObject : dataObjects) { + TimeUnit.MILLISECONDS.sleep(1); DataObject update = new DataObject(); update.setName(dataObject.getName()); update.setDataLock(1); dataObjectService.updateById(update); - + TimeUnit.MILLISECONDS.sleep(1); try { + String api = dataObject.getName(); + boolean needUpdate = dataObject.getNeedUpdate(); + List salesforceParams = null; + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", api); + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + if (needUpdate) { + salesforceParam.setType(2); + salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate()); + } + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } - SalesforceParam salesforceParam = param.clone(); - salesforceParam.setApi(dataObject.getName()); - salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate()); - - Future future = salesforceExecutor.execute(() -> { - try { - UpdateSfDataNew(salesforceParam, partnerConnection); - } catch (Throwable throwable) { - log.error("salesforceExecutor error", throwable); - throw new RuntimeException(throwable); - } - }, salesforceParam.getBatch(), 0); - futures.add(future); + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + Future future = salesforceExecutor.execute(() -> { + try { +// autoUpdateSfData(salesforceParam, partnerConnection); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 0); + futures.add(future); + } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); update.setDataWork(0); @@ -553,14 +570,13 @@ public class DataImportNewServiceImpl implements DataImportNewService { */ private void autoUpdateIncrementalSfDataNew(SalesforceParam param, List> futures) throws Exception { QueryWrapper qw = new QueryWrapper<>(); - qw.eq("need_update", 1) + qw.eq("data_work", 1) .eq("data_lock", 0) .orderByAsc("data_index") .last(" limit 10"); PartnerConnection partnerConnection = salesforceTargetConnect.createConnect(); while (true) { - List dataObjects = dataObjectService.list(qw); //判断dataObjects是否为空 if (CollectionUtils.isEmpty(dataObjects)) { @@ -568,37 +584,56 @@ public class DataImportNewServiceImpl implements DataImportNewService { } for (DataObject dataObject : dataObjects) { + TimeUnit.MILLISECONDS.sleep(1); DataObject update = new DataObject(); update.setName(dataObject.getName()); update.setDataLock(1); dataObjectService.updateById(update); - + TimeUnit.MILLISECONDS.sleep(1); try { + String api = dataObject.getName(); + boolean needUpdate = dataObject.getNeedUpdate(); + List salesforceParams = null; + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", api); + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + if (needUpdate) { + salesforceParam.setType(2); + salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate()); + } + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } - SalesforceParam salesforceParam = param.clone(); - salesforceParam.setApi(dataObject.getName()); - salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate()); - - Future future = salesforceExecutor.execute(() -> { - try { - UpdateSfDataNew(salesforceParam, partnerConnection); - } catch (Throwable throwable) { - log.error("salesforceExecutor error", throwable); - throw new RuntimeException(throwable); - } - }, salesforceParam.getBatch(), 0); - futures.add(future); + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + Future future = salesforceExecutor.execute(() -> { + try { + UpdateSfDataNew(salesforceParam, partnerConnection); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 0); + futures.add(future); + } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); update.setDataWork(0); } catch (Exception e) { throw e; } finally { - if (dataObject != null) { - update.setDataLock(0); - dataObjectService.updateById(update); - } + update.setDataLock(0); + dataObjectService.updateById(update); } } // 等待当前所有线程执行完成 @@ -786,4 +821,114 @@ public class DataImportNewServiceImpl implements DataImportNewService { return map; } + /** + * 获取DocumentLink + */ + @Override + public ReturnT dumpDocumentLinkJob(String paramStr) throws Exception { + String api = "ContentDocumentLink"; + PartnerConnection partnerConnection = salesforceConnect.createConnect(); + List> list = customMapper.list("Id", "ContentDocument", "new_id is not null"); + DescribeSObjectResult dsr = partnerConnection.describeSObject(api); + List fields = customMapper.getFields(api).stream().map(String::toUpperCase).collect(Collectors.toList()); + Field[] dsrFields = dsr.getFields(); + try { + if (list != null && !list.isEmpty()) { + for (Map map : list) { + String contentDocumentId = (String) map.get("Id"); + String sql = "SELECT Id, LinkedEntityId, LinkedEntity.Type, ContentDocumentId, Visibility, ShareType, SystemModstamp, IsDeleted FROM ContentDocumentLink where ContentDocumentId = '" + contentDocumentId + "'"; + com.alibaba.fastjson2.JSONArray objects = null; + QueryResult queryResult = partnerConnection.queryAll(sql); + SObject[] records = queryResult.getRecords(); + objects = DataUtil.toJsonArray(records, dsrFields); + commonService.saveOrUpdate(api, fields, records, objects, true); + } + + } + } catch (Exception e) { + log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(list), e); + return ReturnT.FAIL; + } catch (Throwable e) { + log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(list), e); + TimeUnit.MINUTES.sleep(1); + return ReturnT.FAIL; + } + return null; + } + + /** + * 推送DocumentLink + */ + @Override + public ReturnT pullDocumentLinkJob(String paramStr) throws Exception { + String api = "ContentDocumentLink"; + PartnerConnection connection = salesforceTargetConnect.createConnect(); + List> list = customMapper.list("Id", "ContentDocument", "new_id is not null"); + try { + if (list != null && !list.isEmpty()) { + //表内数据总量 + Integer count = customMapper.countBySQL(api, "where ShareType = 'V' and new_id = '0'"); + //批量插入200一次 + int page = count % 200 == 0 ? count / 200 : (count / 200) + 1; + for (int i = 0; i < page; i++) { + List> linkList = customMapper.list("Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility", api, "ShareType = 'V' and new_id = '0' order by Id asc limit 200"); + SObject[] accounts = new SObject[linkList.size()]; + String[] ids = new String[linkList.size()]; + int index = 0; + for (Map map : linkList) { + String linkedEntityId = (String) map.get("LinkedEntityId"); + String id = (String) map.get("Id"); + String contentDocumentId = (String) map.get("ContentDocumentId"); + String linkedEntityType = (String) map.get("LinkedEntity_Type"); + String shareType = (String) map.get("ShareType"); + String Visibility = (String) map.get("Visibility"); + + // dataObject查询 + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("name", linkedEntityType); + List objects = dataObjectService.list(qw); + if (!objects.isEmpty()) { + Map dMap = customMapper.getById("new_id", "ContentDocument", contentDocumentId); + Map lMap = customMapper.getById("new_id", linkedEntityType, linkedEntityId); + + SObject account = new SObject(); + account.setType(api); + account.setField("ContentDocumentId", dMap.get("new_id").toString()); + account.setField("LinkedEntityId", lMap.get("new_id").toString()); + account.setField("ShareType", shareType); + account.setField("Visibility", Visibility); + ids[index] = id; + accounts[index] = account; + index++; + } + } + try { + SaveResult[] saveResults = connection.create(accounts); + for (int j = 0; j < saveResults.length; j++) { + if (!saveResults[j].getSuccess()) { + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j])); + EmailUtil.send("DataDump ContentDocumentLink ERROR", format); + } else { + List> dList = new ArrayList<>(); + Map linkMap = new HashMap<>(); + linkMap.put("key", "new_id"); + linkMap.put("value", saveResults[j].getId()); + dList.add(linkMap); + customMapper.updateById("ContentDocumentLink", dList, ids[j]); + } + } + } catch (Exception e) { + log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(accounts), e); + EmailUtil.send("-------测试-----------", com.alibaba.fastjson2.JSON.toJSONString(accounts)); + throw new RuntimeException(e); + } + } + } + } catch (Exception e) { + log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(list), e); + return ReturnT.FAIL; + } + return null; + } + }