diff --git a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java index a4fc0ca..90de31a 100644 --- a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java +++ b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java @@ -104,11 +104,11 @@ public class DataDumpNewJob { } /** - * 增量数据更新 + * 数据更新 */ - @XxlJob("dataIncrementUpdateJob") - public ReturnT dataIncrementUpdateJob(String paramStr) throws Exception { - log.info("dataUpdateJob execute start .................."); + @XxlJob("dataUpdateNewJob") + public ReturnT dataUpdateNewJob(String paramStr) throws Exception { + log.info("dataUpdateNewJob execute start .................."); SalesforceParam param = new SalesforceParam(); try { if (StringUtils.isNotBlank(paramStr)) { @@ -120,7 +120,7 @@ public class DataDumpNewJob { param.setBeginCreateDate(param.getBeginDate()); param.setEndCreateDate(param.getEndDate()); - return dataImportNewService.immigrationIncrementUpdate(param); + return dataImportNewService.immigrationUpdateNew(param); } diff --git a/src/main/java/com/celnet/datadump/service/DataImportNewService.java b/src/main/java/com/celnet/datadump/service/DataImportNewService.java index 6a3938e..0ef4b25 100644 --- a/src/main/java/com/celnet/datadump/service/DataImportNewService.java +++ b/src/main/java/com/celnet/datadump/service/DataImportNewService.java @@ -14,8 +14,8 @@ public interface DataImportNewService { ReturnT getPersonContact(SalesforceParam param) throws Exception; /** - * 增量数据更新 + * 数据更新 */ - ReturnT immigrationIncrementUpdate(SalesforceParam param) throws Exception; + ReturnT immigrationUpdateNew(SalesforceParam param) throws Exception; } diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java index fe678c7..b87819d 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java @@ -303,21 +303,24 @@ public class DataImportNewServiceImpl implements DataImportNewService { } /** - * 增量Update入口 + * Update入口 */ @Override - public ReturnT immigrationIncrementUpdate(SalesforceParam param) throws Exception { + public ReturnT immigrationUpdateNew(SalesforceParam param) throws Exception { List> futures = Lists.newArrayList(); try { if (StringUtils.isNotBlank(param.getApi())) { - // 手动任务 - ReturnT result = updateIncrementSfData(param, futures); - if (result != null) { - return result; + if (1 == param.getType()){ + return updateSfDataNew(param, futures); + }else { + return updateIncrementalSfDataNew(param, futures); } } else { - // 自动更新任务 -// autoUpdateSfData(param, futures); + if (1 == param.getType()){ + autoUpdateSfDataNew(param, futures); + }else { + autoUpdateIncrementalSfDataNew(param, futures); + } } return ReturnT.SUCCESS; } catch (InterruptedException e) { @@ -328,11 +331,10 @@ public class DataImportNewServiceImpl implements DataImportNewService { throw throwable; } } - /** - * 组装增量Update参数 + * 组装【单表】【存量】Update参数 */ - public ReturnT updateIncrementSfData(SalesforceParam param, List> futures) throws Exception { + public ReturnT updateSfDataNew(SalesforceParam param, List> futures) throws Exception { List apis; String beginDateStr = null; String endDateStr = null; @@ -343,9 +345,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); } - String join = StringUtils.join(apis, ","); - log.info("immigration apis: {}", join); - XxlJobLogger.log("immigration apis: {}", join); + // 全量的时候 检测是否有自动任务锁住的表 boolean isFull = CollectionUtils.isEmpty(param.getIds()); if (isFull) { @@ -362,7 +362,6 @@ public class DataImportNewServiceImpl implements DataImportNewService { for (String api : apis) { DataObject update = new DataObject(); try { - TimeUnit.MILLISECONDS.sleep(1); List salesforceParams = null; QueryWrapper dbQw = new QueryWrapper<>(); dbQw.eq("name", api); @@ -387,7 +386,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { for (SalesforceParam salesforceParam : salesforceParams) { Future future = salesforceExecutor.execute(() -> { try { - manualIncrementUpdateSfData(salesforceParam, partnerConnection); + UpdateSfDataNew(salesforceParam, partnerConnection); } catch (Throwable throwable) { log.error("salesforceExecutor error", throwable); throw new RuntimeException(throwable); @@ -412,24 +411,241 @@ public class DataImportNewServiceImpl implements DataImportNewService { } /** - * 执行增量Update数据 + * 组装【单表】【增量】Update参数 */ - private void manualIncrementUpdateSfData(SalesforceParam param, PartnerConnection partnerConnection) throws Exception { - Map infoFlag = customMapper.list("code,value","system_config","code ='"+ SystemConfigCode.INFO_FLAG+"'").get(0); + public ReturnT updateIncrementalSfDataNew(SalesforceParam param, List> futures) throws Exception { + List apis; + String beginDateStr = null; + String endDateStr = null; + apis = DataUtil.toIdList(param.getApi()); + if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null){ + Date beginDate = param.getBeginCreateDate(); + Date endDate = param.getEndCreateDate(); + beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); + endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); + } + + // 全量的时候 检测是否有自动任务锁住的表 + boolean isFull = CollectionUtils.isEmpty(param.getIds()); + if (isFull) { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_lock", 1).in("name", apis); + List list = dataObjectService.list(qw); + if (CollectionUtils.isNotEmpty(list)) { + String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); + return new ReturnT<>(500, "api:" + apiNames + " is locked"); + } + } + + PartnerConnection partnerConnection = salesforceTargetConnect.createConnect(); + for (String api : apis) { + DataObject update = new DataObject(); + try { + List salesforceParams = null; + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", api); + if (StringUtils.isNotEmpty(beginDateStr) && StringUtils.isNotEmpty(endDateStr)) { + dbQw.eq("sync_start_date", beginDateStr); // 等于开始时间 + dbQw.eq("sync_end_date", endDateStr); // 等于结束时间 + } + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } + + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + Future future = salesforceExecutor.execute(() -> { + try { + UpdateSfDataNew(salesforceParam, partnerConnection); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 1); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + update.setDataWork(0); + } catch (Exception e) { + throw e; + } finally { + if (isFull) { + update.setName(api); + update.setDataLock(0); + dataObjectService.updateById(update); + } + } + } + return null; + } + + /** + * 组装【多表】【存量】Update参数 + */ + private void autoUpdateSfDataNew(SalesforceParam param, List> futures) throws Exception { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("need_update", 1) + .eq("data_lock", 0) + .orderByAsc("data_index") + .last(" limit 10"); + + PartnerConnection partnerConnection = salesforceTargetConnect.createConnect(); + while (true) { + + List dataObjects = dataObjectService.list(qw); + //判断dataObjects是否为空 + if (CollectionUtils.isEmpty(dataObjects)) { + return; + } + + for (DataObject dataObject : dataObjects) { + + DataObject update = new DataObject(); + update.setName(dataObject.getName()); + update.setDataLock(1); + dataObjectService.updateById(update); + + try { + + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(dataObject.getName()); + salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate()); + + Future future = salesforceExecutor.execute(() -> { + try { + UpdateSfDataNew(salesforceParam, partnerConnection); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 0); + futures.add(future); + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + update.setDataWork(0); + } catch (Exception e) { + throw e; + } finally { + if (dataObject != null) { + update.setDataLock(0); + dataObjectService.updateById(update); + } + } + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + futures.clear(); + } + } + + /** + * 组装【多表】【增量】Update参数 + */ + private void autoUpdateIncrementalSfDataNew(SalesforceParam param, List> futures) throws Exception { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("need_update", 1) + .eq("data_lock", 0) + .orderByAsc("data_index") + .last(" limit 10"); + + PartnerConnection partnerConnection = salesforceTargetConnect.createConnect(); + while (true) { + + List dataObjects = dataObjectService.list(qw); + //判断dataObjects是否为空 + if (CollectionUtils.isEmpty(dataObjects)) { + return; + } + + for (DataObject dataObject : dataObjects) { + + DataObject update = new DataObject(); + update.setName(dataObject.getName()); + update.setDataLock(1); + dataObjectService.updateById(update); + + try { + + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(dataObject.getName()); + salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate()); + + Future future = salesforceExecutor.execute(() -> { + try { + UpdateSfDataNew(salesforceParam, partnerConnection); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 0); + futures.add(future); + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + update.setDataWork(0); + } catch (Exception e) { + throw e; + } finally { + if (dataObject != null) { + update.setDataLock(0); + dataObjectService.updateById(update); + } + } + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + futures.clear(); + } + } + + /** + * 执行Update更新数据 + */ + private void UpdateSfDataNew(SalesforceParam param, PartnerConnection partnerConnection) throws Exception { + Map infoFlag = customMapper.list("code,value","system_config","code ='"+SystemConfigCode.INFO_FLAG+"'").get(0); + String api = param.getApi(); + TimeUnit.MILLISECONDS.sleep(1); QueryWrapper dbQw = new QueryWrapper<>(); dbQw.eq("api", api); List list = dataFieldService.list(dbQw); - TimeUnit.MILLISECONDS.sleep(1); Date beginDate = param.getBeginCreateDate(); Date endDate = param.getEndCreateDate(); + String sql = ""; + String sql2 = ""; + String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); + if (1 == param.getType()) { + if (api.contains("Share")){ + sql = "where RowCause = 'Manual' and new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'"; + sql2 = "RowCause = 'Manual' and new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' order by Id asc limit "; + }else { + sql = "where new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'"; + sql2 = "new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' order by Id asc limit "; + } + }else { + if (api.contains("Share")){ + sql = "where RowCause = 'Manual' and new_id is not null and LastModifiedDate >= '" + beginDateStr + "' "; + sql2 = "RowCause = 'Manual' and new_id is not null and LastModifiedDate >= '" + beginDateStr + "' order by Id asc limit "; + }else { + sql = "where new_id is not null and LastModifiedDate >= '" + beginDateStr + "' "; + sql2 = "new_id is not null and LastModifiedDate >= '" + beginDateStr + "' order by Id asc limit "; + } + } //表内数据总量 - Integer count = customMapper.countBySQL(api, "where new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'"); - + Integer count = customMapper.countBySQL(api, sql); log.error("总Update数据 count:{};-开始时间:{};-结束时间:{};-api:{};", count, beginDateStr, endDateStr, api); if(count == 0){ @@ -440,7 +656,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { //批量插入200一次 int page = count%200 == 0 ? count/200 : (count/200) + 1; for (int i = 0; i < page; i++) { - List> mapList = customMapper.list("*", api, "new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' order by Id asc limit " + i * 200 + ",200"); + List> mapList = customMapper.list("*", api, sql2+ i * 200 + ",200"); SObject[] accounts = new SObject[mapList.size()]; int j = 0; for (Map map : mapList) { @@ -478,7 +694,6 @@ public class DataImportNewServiceImpl implements DataImportNewService { }else if("0".equals(map.get("IsPersonAccount")) && field.equals("LastName")){ continue; } - } else { account.setField(field, map.get(field)); } @@ -487,19 +702,19 @@ public class DataImportNewServiceImpl implements DataImportNewService { } account.setField("old_owner_id__c", map.get("OwnerId")); account.setField("old_sfdc_id__c", map.get("Id")); - accounts[j++] = account; } - List> listMap = new ArrayList<>(); + try { if (infoFlag != null && "1".equals(infoFlag.get("value"))){ - listMap = returnAccountsDetails(accounts,list); + printlnAccountsDetails(accounts,list); } SaveResult[] saveResults = partnerConnection.update(accounts); for (SaveResult saveResult : saveResults) { if (!saveResult.getSuccess()) { + Map map = returnErrorAccountsDetails(accounts, list, saveResult.getId()); log.info("-------------saveResults: {}", JSON.toJSONString(saveResult)); - String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s, \n数据实体类:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), JSON.toJSONString(saveResult),JSON.toJSONString(listMap)); + String format = String.format("数据更新 error, api name: %s, \nparam: %s, \ncause:\n%s, \n数据实体类:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), JSON.toJSONString(saveResult),JSON.toJSONString(map)); EmailUtil.send("DataDump ERROR", format); return; }else { @@ -507,7 +722,6 @@ public class DataImportNewServiceImpl implements DataImportNewService { } } log.info("sf return saveResults------" + JSONArray.toJSONString(saveResults)); - } catch (Throwable e) { throw e; } @@ -525,36 +739,51 @@ public class DataImportNewServiceImpl implements DataImportNewService { .eq("sync_end_date", endDate) .set("sf_update_num", targetCount); dataBatchService.update(updateQw2); - } /** * 打印SF交互数据明细 */ - public List> returnAccountsDetails(SObject[] accounts,List list) { - ArrayList> arrayList = new ArrayList<>(); - for (int i = 0; i < accounts.length; i++) { - HashMap map = new HashMap<>(); + public void printlnAccountsDetails(SObject[] accounts,List list) { + for (int i = 0; i < accounts.length; i++) { SObject account = accounts[i]; - System.out.println("--- Account[" + i + "] ---"); + System.out.println("--- 对象数据[" + i + "] ---"); // 获取对象所有字段名 for (DataField dataField : list) { try { Object value = account.getField(dataField.getField()); - map.put(dataField.getField(),String.valueOf(value)); - System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null")); + System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null")); } catch (Exception e) { - System.out.println(dataField.getField() + ": [权限不足或字段不存在]"); + System.out.println(dataField.getField() + ": [权限不足或字段不存在]"); } } - System.out.println("old_owner_id__c: " + (account.getField("old_owner_id__c") != null ? account.getField("old_owner_id__c").toString() : "null")); - System.out.println("old_sfdc_id__c: " + (account.getField("old_sfdc_id__c") != null ? account.getField("old_sfdc_id__c").toString() : "null")); - map.put("old_owner_id__c",String.valueOf(account.getField("old_owner_id__c"))); - map.put("old_sfdc_id__c",String.valueOf(account.getField("old_sfdc_id__c"))); - arrayList.add(map); + System.out.println("old_owner_id__c: " + (account.getField("old_owner_id__c") != null ? account.getField("old_owner_id__c").toString() : "null")); + System.out.println("old_sfdc_id__c: " + (account.getField("old_sfdc_id__c") != null ? account.getField("old_sfdc_id__c").toString() : "null")); + } - return arrayList; } + /** + * 返回SF交互数据错误明细 + */ + public Map returnErrorAccountsDetails(SObject[] accounts,List list,String errorId) { + HashMap map = new HashMap<>(); + for (int i = 0; i < accounts.length; i++) { + SObject account = accounts[i]; + if (errorId.equals(account.getId()) || errorId.equals(account.getField("Id"))){ + for (DataField dataField : list) { + try { + Object value = account.getField(dataField.getField()); + map.put(dataField.getField(),String.valueOf(value)); + System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null")); + } catch (Exception e) { + } + } + map.put("old_owner_id__c",String.valueOf(account.getField("old_owner_id__c"))); + map.put("old_sfdc_id__c",String.valueOf(account.getField("old_sfdc_id__c"))); + } + } + return map; + } }