diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java index dcddbc0..91903c8 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java @@ -92,6 +92,8 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { if (result != null) { return result; } + }else { + autoImmigrationBatch(param, futures); } return ReturnT.SUCCESS; } catch (Exception exception) { @@ -102,7 +104,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { } /** - * 组装执行参数 + * 【单表】组装Insert执行参数 */ public ReturnT manualImmigrationBatch(SalesforceParam param, List> futures) throws Exception { List apis; @@ -160,7 +162,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { for (SalesforceParam salesforceParam : salesforceParams) { Future future = salesforceExecutor.execute(() -> { try { - manualCreatedNewIdBatch(salesforceParam, bulkConnection); + createdNewIdBatch(salesforceParam, bulkConnection); } catch (Throwable throwable) { log.error("salesforceExecutor error", throwable); throw new RuntimeException(throwable); @@ -187,10 +189,97 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { return null; } + /** + * 【多表】组装Insert执行参数 + */ + public void autoImmigrationBatch(SalesforceParam param, List> futures) throws Exception { + + // 全量的时候 检测是否有自动任务锁住的表 + boolean isFull = CollectionUtils.isEmpty(param.getIds()); + if (isFull) { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_lock", 1); + List list = dataObjectService.list(qw); + if (CollectionUtils.isNotEmpty(list)) { + String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); + String message = "api:" + apiNames + " is locked"; + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message); + EmailUtil.send("DataDump ERROR", format); + return ; + } + } + + BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect(); + + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_work", 1) + .orderByAsc("data_index") + .last(" limit 10"); + + while (true) { + List dataObjects = dataObjectService.list(qw); + //判断dataObjects是否为空 + if (CollectionUtils.isEmpty(dataObjects)) { + return; + } + + for (DataObject object : dataObjects) { + TimeUnit.MILLISECONDS.sleep(1); + try { + List salesforceParams = null; + object.setDataLock(1); + dataObjectService.updateById(object); + + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", object.getName()); + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } + + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + Future future = salesforceExecutor.execute(() -> { + try { + createdNewIdBatch(salesforceParam, bulkConnection); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 1); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + object.setDataWork(0); + } catch (InterruptedException e) { + throw e; + } catch (Throwable e) { + log.error("manualImmigration error", e); + throw new RuntimeException(e); + } finally { + if (isFull) { + object.setDataLock(0); + dataObjectService.updateById(object); + } + } + } + } + } + /** * 执行数据Insert */ - public void manualCreatedNewIdBatch(SalesforceParam param, BulkConnection bulkConnection) throws Exception { + public void createdNewIdBatch(SalesforceParam param, BulkConnection bulkConnection) throws Exception { String api = param.getApi(); QueryWrapper dbQw = new QueryWrapper<>(); dbQw.eq("api", api); @@ -411,6 +500,8 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { if (result != null) { return result; } + }else { + autoUpdateSfDataBatch(param, futures); } return ReturnT.SUCCESS; } catch (InterruptedException e) { @@ -423,7 +514,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { } /** - * 组装执行参数 + * 【单彪】组装Update执行参数 */ public ReturnT updateSfDataBatch(SalesforceParam param, List> futures) throws Exception { List apis; @@ -508,6 +599,88 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { return null; } + /** + * 【多表】组装Update执行参数 + */ + public void autoUpdateSfDataBatch(SalesforceParam param, List> futures) throws Exception { + // 全量的时候 检测是否有自动任务锁住的表 + boolean isFull = CollectionUtils.isEmpty(param.getIds()); + if (isFull) { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_lock", 1); + List list = dataObjectService.list(qw); + if (CollectionUtils.isNotEmpty(list)) { + String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); + String message = "api:" + apiNames + " is locked"; + String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message); + EmailUtil.send("DataDump ERROR", format); + return ; + } + } + + BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect(); + + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_work", 1) + .eq("need_update",1) + .orderByAsc("data_index") + .last(" limit 10"); + + while (true) { + List dataObjects = dataObjectService.list(qw); + //判断dataObjects是否为空 + if (CollectionUtils.isEmpty(dataObjects)) { + return; + } + + for (DataObject object : dataObjects) { + + try { + TimeUnit.MILLISECONDS.sleep(1); + List salesforceParams = null; + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", object.getName()); + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } + + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + Future future = salesforceExecutor.execute(() -> { + try { + manualUpdateSfDataBatch(salesforceParam, bulkConnection,object); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 1); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + } catch (Exception e) { + throw e; + } finally { + if (isFull) { + object.setNeedUpdate(false); + object.setDataLock(0); + dataObjectService.updateById(object); + } + } + } + } + + } + /** * 执行数据Update */