From 2b49cd4e00461d1c585f146b84ecddb93eee6964 Mon Sep 17 00:00:00 2001 From: Kris <2893855659@qq.com> Date: Tue, 17 Jun 2025 15:02:23 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90feat=E3=80=91=20=E5=A2=9E=E9=87=8F?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E4=BB=BB=E5=8A=A1=E6=96=B0=E5=A2=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datadump/controller/JobController.java | 26 +++ .../impl/DataImportNewServiceImpl.java | 221 ++++++++++-------- .../service/impl/DataImportServiceImpl.java | 165 ------------- 3 files changed, 155 insertions(+), 257 deletions(-) diff --git a/src/main/java/com/celnet/datadump/controller/JobController.java b/src/main/java/com/celnet/datadump/controller/JobController.java index 0c45f42..cd44147 100644 --- a/src/main/java/com/celnet/datadump/controller/JobController.java +++ b/src/main/java/com/celnet/datadump/controller/JobController.java @@ -273,4 +273,30 @@ public class JobController { param.setEndCreateDate(param.getEndDate()); return dataImportNewService.getPersonContact(param); } + + /** + * 数据更新同步(新) + * @param paramStr + * @author kris + * @return + * @throws Exception + */ + @PostMapping("/dataUpdateNewJob") + @ApiOperation("数据更新同步(新)") + @LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_UPDATE, remark = "数据更新同步(新)") + public ReturnT dataUpdateNewJob(String paramStr) throws Exception { + log.info("getPersonContactJob execute start .................."); + SalesforceParam param = new SalesforceParam(); + try { + if (StringUtils.isNotBlank(paramStr)) { + param = JSON.parseObject(paramStr, SalesforceParam.class); + } + } catch (Throwable throwable) { + return new ReturnT<>(500, "参数解析失败!"); + } + // 参数转换 + param.setBeginCreateDate(param.getBeginDate()); + param.setEndCreateDate(param.getEndDate()); + return dataImportNewService.immigrationUpdateNew(param); + } } diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java index c736471..1935c34 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java @@ -307,7 +307,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { } /** - * Update入口 + * 数据更新Update入口 */ @Override public ReturnT immigrationUpdateNew(SalesforceParam param) throws Exception { @@ -340,10 +340,10 @@ public class DataImportNewServiceImpl implements DataImportNewService { * 组装【单表】【存量】Update参数 */ public ReturnT updateSfDataNew(SalesforceParam param, List> futures) throws Exception { - List apis; + List apis = DataUtil.toIdList(param.getApi()); + String beginDateStr = null; String endDateStr = null; - apis = DataUtil.toIdList(param.getApi()); if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null){ Date beginDate = param.getBeginCreateDate(); Date endDate = param.getEndCreateDate(); @@ -359,7 +359,10 @@ public class DataImportNewServiceImpl implements DataImportNewService { List list = dataObjectService.list(qw); if (CollectionUtils.isNotEmpty(list)) { String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); - return new ReturnT<>(500, "api:" + apiNames + " is locked"); + String message = "api:" + apiNames + " is locked"; + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message); + EmailUtil.send("DataDump ERROR", format); + return new ReturnT<>(500, message); } } @@ -401,7 +404,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); - update.setDataWork(0); + update.setNeedUpdate(false); } catch (Exception e) { throw e; } finally { @@ -444,7 +447,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { List salesforceParams = null; QueryWrapper dbQw = new QueryWrapper<>(); dbQw.eq("name", api); - dbQw.ge("sync_start_date",dataObject.getLastUpdateDate()); + dbQw.gt("sync_end_date",dataObject.getLastUpdateDate()); List dataBatches = dataBatchService.list(dbQw); AtomicInteger batch = new AtomicInteger(1); if (CollectionUtils.isNotEmpty(dataBatches)) { @@ -472,11 +475,12 @@ public class DataImportNewServiceImpl implements DataImportNewService { } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); - update.setDataWork(0); + } catch (Exception e) { throw e; } finally { if (isFull) { + update.setNeedUpdate(false); update.setName(api); update.setDataLock(0); dataObjectService.updateById(update); @@ -491,12 +495,37 @@ public class DataImportNewServiceImpl implements DataImportNewService { */ private void autoUpdateSfDataNew(SalesforceParam param, List> futures) throws Exception { + // 全量的时候 检测是否有自动任务锁住的表 + boolean isFull = CollectionUtils.isEmpty(param.getIds()); + if (isFull) { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_lock", 1); + List list = dataObjectService.list(qw); + if (CollectionUtils.isNotEmpty(list)) { + String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); + String message = "api:" + apiNames + " is locked"; + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message); + EmailUtil.send("DataDump ERROR", format); + return ; + } + } + + String beginDateStr = null; + String endDateStr = null; + if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null){ + Date beginDate = param.getBeginCreateDate(); + Date endDate = param.getEndCreateDate(); + beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); + endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); + } + QueryWrapper qw = new QueryWrapper<>(); qw.eq("need_update", 1) .orderByAsc("data_index") .last(" limit 10"); PartnerConnection partnerConnection = salesforceTargetConnect.createConnect(); + while (true) { List dataObjects = dataObjectService.list(qw); //判断dataObjects是否为空 @@ -514,99 +543,19 @@ public class DataImportNewServiceImpl implements DataImportNewService { TimeUnit.MILLISECONDS.sleep(1); try { String api = dataObject.getName(); - boolean needUpdate = dataObject.getNeedUpdate(); List salesforceParams = null; QueryWrapper dbQw = new QueryWrapper<>(); dbQw.eq("name", api); + if (StringUtils.isNotEmpty(beginDateStr) && StringUtils.isNotEmpty(endDateStr)) { + dbQw.eq("sync_start_date", beginDateStr); // 等于开始时间 + dbQw.eq("sync_end_date", endDateStr); // 等于结束时间 + } List list = dataBatchService.list(dbQw); AtomicInteger batch = new AtomicInteger(1); if (CollectionUtils.isNotEmpty(list)) { salesforceParams = list.stream().map(t -> { SalesforceParam salesforceParam = param.clone(); salesforceParam.setApi(t.getName()); - if (needUpdate) { - salesforceParam.setType(2); - salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate()); - } - salesforceParam.setBeginCreateDate(t.getSyncStartDate()); - salesforceParam.setEndCreateDate(t.getSyncEndDate()); - salesforceParam.setBatch(batch.getAndIncrement()); - return salesforceParam; - }).collect(Collectors.toList()); - } - - // 手动任务优先执行 - for (SalesforceParam salesforceParam : salesforceParams) { - Future future = salesforceExecutor.execute(() -> { - try { -// autoUpdateSfData(salesforceParam, partnerConnection); - } catch (Throwable throwable) { - log.error("salesforceExecutor error", throwable); - throw new RuntimeException(throwable); - } - }, salesforceParam.getBatch(), 0); - futures.add(future); - } - // 等待当前所有线程执行完成 - salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); - update.setDataWork(0); - } catch (Exception e) { - throw e; - } finally { - if (dataObject != null) { - update.setDataLock(0); - dataObjectService.updateById(update); - } - } - } - // 等待当前所有线程执行完成 - salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); - futures.clear(); - } - } - - /** - * 组装【多表】【增量】Update参数 - */ - private void autoUpdateIncrementalSfDataNew(SalesforceParam param, List> futures) throws Exception { - QueryWrapper qw = new QueryWrapper<>(); - qw.eq("data_work", 1) - .eq("data_lock", 0) - .orderByAsc("data_index") - .last(" limit 10"); - - PartnerConnection partnerConnection = salesforceTargetConnect.createConnect(); - while (true) { - List dataObjects = dataObjectService.list(qw); - //判断dataObjects是否为空 - if (CollectionUtils.isEmpty(dataObjects)) { - return; - } - - for (DataObject dataObject : dataObjects) { - TimeUnit.MILLISECONDS.sleep(1); - - DataObject update = new DataObject(); - update.setName(dataObject.getName()); - update.setDataLock(1); - dataObjectService.updateById(update); - TimeUnit.MILLISECONDS.sleep(1); - try { - String api = dataObject.getName(); - boolean needUpdate = dataObject.getNeedUpdate(); - List salesforceParams = null; - QueryWrapper dbQw = new QueryWrapper<>(); - dbQw.eq("name", api); - List list = dataBatchService.list(dbQw); - AtomicInteger batch = new AtomicInteger(1); - if (CollectionUtils.isNotEmpty(list)) { - salesforceParams = list.stream().map(t -> { - SalesforceParam salesforceParam = param.clone(); - salesforceParam.setApi(t.getName()); - if (needUpdate) { - salesforceParam.setType(2); - salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate()); - } salesforceParam.setBeginCreateDate(t.getSyncStartDate()); salesforceParam.setEndCreateDate(t.getSyncEndDate()); salesforceParam.setBatch(batch.getAndIncrement()); @@ -628,10 +577,97 @@ public class DataImportNewServiceImpl implements DataImportNewService { } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); - update.setDataWork(0); } catch (Exception e) { throw e; } finally { + update.setNeedUpdate(false); + update.setDataLock(0); + dataObjectService.updateById(update); + } + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + futures.clear(); + } + } + + /** + * 组装【多表】【增量】Update参数 + */ + private void autoUpdateIncrementalSfDataNew(SalesforceParam param, List> futures) throws Exception { + + // 全量的时候 检测是否有自动任务锁住的表 + boolean isFull = CollectionUtils.isEmpty(param.getIds()); + if (isFull) { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_lock", 1); + List list = dataObjectService.list(qw); + if (CollectionUtils.isNotEmpty(list)) { + String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); + String message = "api:" + apiNames + " is locked"; + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message); + EmailUtil.send("DataDump ERROR", format); + return ; + } + } + + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("need_update", 1) + .orderByAsc("data_index") + .last(" limit 10"); + + PartnerConnection partnerConnection = salesforceTargetConnect.createConnect(); + while (true) { + List dataObjects = dataObjectService.list(qw); + //判断dataObjects是否为空 + if (CollectionUtils.isEmpty(dataObjects)) { + return; + } + + for (DataObject dataObject : dataObjects) { + TimeUnit.MILLISECONDS.sleep(1); + DataObject update = new DataObject(); + update.setName(dataObject.getName()); + update.setDataLock(1); + dataObjectService.updateById(update); + TimeUnit.MILLISECONDS.sleep(1); + try { + String api = dataObject.getName(); + List salesforceParams = null; + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", api); + dbQw.gt("sync_end_date",dataObject.getLastUpdateDate()); + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } + + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + Future future = salesforceExecutor.execute(() -> { + try { + UpdateSfDataNew(salesforceParam, partnerConnection); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 0); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + } catch (Exception e) { + throw e; + } finally { + update.setNeedUpdate(false); update.setDataLock(0); dataObjectService.updateById(update); } @@ -812,6 +848,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { map.put(dataField.getField(),String.valueOf(value)); System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null")); } catch (Exception e) { + System.out.println(dataField.getField() + ": [权限不足或字段不存在]"); } } map.put("old_owner_id__c",String.valueOf(account.getField("old_owner_id__c"))); diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java index 04ceb8c..f3e7055 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportServiceImpl.java @@ -427,171 +427,6 @@ public class DataImportServiceImpl implements DataImportService { } - - - private void autoCreatedNewId(SalesforceParam param, PartnerConnection partnerConnection) throws Exception { - String api = param.getApi(); - QueryWrapper dbQw = new QueryWrapper<>(); - dbQw.eq("api", api); - List list = dataFieldService.list(dbQw); - TimeUnit.MILLISECONDS.sleep(1); - - param.setApi(api); - Date beginDate = param.getBeginCreateDate(); - Date endDate = param.getEndCreateDate(); - String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); - String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); - Integer count = 0; - //表内数据总量 - if (api.contains("Share")){ - count = customMapper.countBySQL(api, "where RowCause = 'Manual' and new_id is null and LastModifiedDate >= '" + beginDateStr + "' and LastModifiedDate < '" + endDateStr + "'"); - }else { - count = customMapper.countBySQL(api, "where new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'"); - } - - if (count == 0) { - log.error("无数据同步 api:{}", api); - return; - } - - //批量插入200一次 - int page = count % 200 == 0 ? count / 200 : (count / 200) + 1; - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); - DataObject update = new DataObject(); - update.setName(api); - for (int i = 0; i < page; i++) { - List> data = null; - if (api.contains("Share")){ - data = customMapper.list("*", api, "RowCause = 'Manual' and new_id is null and LastModifiedDate >= '" + beginDateStr + "' and LastModifiedDate < '" + endDateStr + "' limit 200"); - }else { - data = customMapper.list("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 200"); - } - - int size = data.size(); - SObject[] accounts = new SObject[size]; - String[] ids = new String[size]; - for (int j = 0; j < size; j++) { - SObject account = new SObject(); - account.setType(api); - //找出sf对象必填字段,并且给默认值 - for (DataField dataField : list) { - if ("OwnerId".equals(dataField.getField()) || "Owner_Type".equals(dataField.getField()) - || "Id".equals(dataField.getField())){ - continue; - } - //用完放下面 - if (dataField.getIsCreateable() && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()) { - if ("reference".equals(dataField.getSfType())){ - String reference = dataField.getReferenceTo(); - if ("Group,User".equals(reference)) { - reference = "User"; - } -// List> referenceMap = customMapper.list("new_id", reference, "new_id is not null limit 1"); - //share表处理 - List> referenceMap = customMapper.list("new_id", reference, "Id = '"+data.get(j).get(dataField.getField()).toString()+"' and new_id is not null limit 1"); - - if (referenceMap.size() == 0){ - QueryWrapper maxIndex = new QueryWrapper<>(); - maxIndex.select("IFNULL(max(data_index),0) as data_index"); - maxIndex.ne("name", api); - Map map = dataObjectService.getMap(maxIndex); - - //如果必填lookup字段没有值,跳过 - update.setDataIndex(Integer.parseInt(map.get("data_index").toString()+1)); - dataObjectService.updateById(update); - return; - }else{ - account.setField(dataField.getField(), referenceMap.get(0).get("new_id")); - continue; - } - } -// if ("picklist".equals(dataField.getSfType())){ -// List> pickList = customMapper.list("value", "data_picklist", "api = '"+api+"' and field = '"+dataField.getField()+"' limit 1"); -// account.setField(dataField.getField(), pickList.get(0).get("value")); -// continue; -// } -// account.setField(dataField.getField(), DataUtil.fieldTypeToSf(dataField)); - if ("picklist".equals(dataField.getSfType())){ - account.setField(dataField.getField(), data.get(j).get(dataField.getField())); - continue; - } - } - } - accounts[j] = account; - //object类型转Date类型 -// Date date; -// //date转Calendar类型 -// Calendar calendar = Calendar.getInstance(); -// try { -// date = sdf.parse(String.valueOf(data.get(j - 1).get("CreatedDate"))); -// }catch (ParseException e){ -// //解决当时间秒为0时,转换秒精度丢失问题 -// date = sdf.parse(data.get(j - 1).get("CreatedDate")+":00"); -// } -// calendar.setTime(date); -// account.setField("CreatedDate", calendar); -// Map CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString()); -// account.setField("CreatedById", CreatedByIdMap.get("new_id")); - - ids[j] = data.get(j).get("Id").toString(); - accounts[j] = account; - if (i*200+j == count){ - break; - } - } - try { - SaveResult[] saveResults = partnerConnection.create(accounts); - int index = 0; - for (SaveResult saveResult : saveResults) { - if (saveResult.getSuccess()) { - List> maps = new ArrayList<>(); - Map m = new HashMap<>(); - m.put("key", "new_id"); - m.put("value", saveResult.getId()); - maps.add(m); - customMapper.updateById(api, maps, ids[index]); - index++; - } else { - log.error("-------------saveResults: {}", JSON.toJSONString(saveResult)); - ReturnT.FAIL.setCode(500); - } - } - TimeUnit.MILLISECONDS.sleep(1); - } catch (Exception e) { - log.error("autoCreatedNewId error api:{}", api, e); - throw e; - } - - } - SalesforceParam countParam = new SalesforceParam(); - countParam.setApi(api); - countParam.setBeginCreateDate(beginDate); - countParam.setEndCreateDate(DateUtils.addSeconds(endDate, -1)); - // 存在isDeleted 只查询IsDeleted为false的 - if (dataFieldService.hasDeleted(countParam.getApi())) { - countParam.setIsDeleted(false); - } else { - // 不存在 过滤 - countParam.setIsDeleted(null); - } - // sf count - Integer sfNum = commonService.countSfNum(partnerConnection, countParam); - - UpdateWrapper updateQw = new UpdateWrapper<>(); - updateQw.eq("name", api) - .eq("sync_start_date", beginDate) - .eq("sync_end_date", DateUtils.addSeconds(endDate, -1)) - .set("target_sf_num", sfNum); - dataBatchHistoryService.update(updateQw); - - UpdateWrapper updateQw2 = new UpdateWrapper<>(); - updateQw2.eq("name", api) - .eq("sync_start_date", beginDate) - .eq("sync_end_date", endDate) - .set("sf_add_num", sfNum); - dataBatchService.update(updateQw2); - } - @Override public ReturnT immigrationUpdate(SalesforceParam param) throws Exception { List> futures = Lists.newArrayList();