diff --git a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java index 3cb245b..7ad48db 100644 --- a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java +++ b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java @@ -187,5 +187,27 @@ public class DataDumpNewJob { return commonService.updateLinkType(param); } - + /** + * 一次性插入数据 + * @param paramStr + * @author kris + * @return + * @throws Exception + */ + @XxlJob("insertSingleBatchJob") + public ReturnT insertSingleJob(String paramStr) throws Exception { + log.info("insertSingleBatchJob execute start .................."); + SalesforceParam param = new SalesforceParam(); + try { + if (StringUtils.isNotBlank(paramStr)) { + param = JSON.parseObject(paramStr, SalesforceParam.class); + } + } catch (Throwable throwable) { + return new ReturnT<>(500, "参数解析失败!"); + } + // 参数转换 + param.setBeginCreateDate(param.getBeginDate()); + param.setEndCreateDate(param.getEndDate()); + return dataImportBatchService.insertSingleBatch(param); + } } diff --git a/src/main/java/com/celnet/datadump/service/DataImportBatchService.java b/src/main/java/com/celnet/datadump/service/DataImportBatchService.java index f739519..9857a34 100644 --- a/src/main/java/com/celnet/datadump/service/DataImportBatchService.java +++ b/src/main/java/com/celnet/datadump/service/DataImportBatchService.java @@ -10,4 +10,6 @@ public interface DataImportBatchService { ReturnT immigrationUpdateBatch(SalesforceParam param) throws Exception; + ReturnT insertSingleBatch(SalesforceParam param) throws Exception; + } diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java index bbe7c4d..dcddbc0 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.text.ParseException; +import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; @@ -684,6 +685,350 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { return index; } + /** + * 一次新写入Insert入口 + */ + @Override + public ReturnT insertSingleBatch(SalesforceParam param) throws Exception { + List> futures = Lists.newArrayList(); + try { + if (StringUtils.isNotBlank(param.getApi())) { + // 手动任务 + ReturnT result = manualInsertSingle(param, futures); + if (result != null) { + return result; + } + } else { + // 自动任务 + autoInsertSingle(param, futures); + } + return ReturnT.SUCCESS; + } catch (Exception exception) { + salesforceExecutor.remove(futures.toArray(new Future[]{})); + log.error("insertSingle error", exception); + throw exception; + } + } + + /** + * 【单表】 组装一次性写入参数 + */ + public ReturnT manualInsertSingle(SalesforceParam param, List> futures) throws Exception { + List apis; + apis = DataUtil.toIdList(param.getApi()); + String join = StringUtils.join(apis, ","); + log.info("insertSingle apis: {}", join); + XxlJobLogger.log("insertSingle apis: {}", join); + + TimeUnit.MILLISECONDS.sleep(1); + + // 全量的时候 检测是否有自动任务锁住的表 + boolean isFull = CollectionUtils.isEmpty(param.getIds()); + if (isFull) { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_lock", 1).in("name", apis); + List list = dataObjectService.list(qw); + if (CollectionUtils.isNotEmpty(list)) { + String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); + return new ReturnT<>(500, "api:" + apiNames + " is locked"); + } + } + + BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect(); + for (String api : apis) { + DataObject update = new DataObject(); + TimeUnit.MILLISECONDS.sleep(1); + try { + List salesforceParams = null; + + update.setName(api); + update.setDataLock(1); + dataObjectService.updateById(update); + + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", api); + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } + + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + Future future = salesforceExecutor.execute(() -> { + try { + insertSingleData(salesforceParam, bulkConnection); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 1); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + update.setDataWork(0); + } catch (InterruptedException e) { + throw e; + } catch (Throwable e) { + log.error("insertSingle error", e); + throw new RuntimeException(e); + } finally { + if (isFull) { + update.setName(api); + update.setDataLock(0); + dataObjectService.updateById(update); + } + } + } + return null; + } + + /** + * 【多表】 组装一次新写入参数 + */ + public void autoInsertSingle(SalesforceParam param, List> futures) throws Exception { + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("data_work", 1) + .eq("data_lock", 0) + .orderByAsc("data_index") + .last(" limit 10"); + + BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect(); + while (true) { + List dataObjects = dataObjectService.list(qw); + if (CollectionUtils.isEmpty(dataObjects)) { + break; + } + for (DataObject dataObject : dataObjects) { + DataObject update = new DataObject(); + log.info("insertSingle api: {}", dataObject.getName()); + TimeUnit.MILLISECONDS.sleep(1); + try { + String api = dataObject.getName(); + update.setName(dataObject.getName()); + update.setDataLock(1); + dataObjectService.updateById(update); + + List salesforceParams = null; + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", api); + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } + + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + Future future = salesforceExecutor.execute(() -> { + try { + insertSingleData(salesforceParam, bulkConnection); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 0); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + update.setDataWork(0); + } catch (InterruptedException e) { + throw e; + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + update.setDataLock(0); + dataObjectService.updateById(update); + } + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + futures.clear(); + } + } + + /** + * 执行一次性Insert数据 + */ + private void insertSingleData(SalesforceParam param, BulkConnection bulkConnection) throws Exception { + String api = param.getApi(); + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("api", api); + List list = dataFieldService.list(dbQw); + TimeUnit.MILLISECONDS.sleep(1); + + Date beginDate = param.getBeginCreateDate(); + Date endDate = param.getEndCreateDate(); + String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); + String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); + + //表内数据总量 + Integer count = customMapper.countBySQL(api, "where new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'"); + log.error("总Insert数据 count:{};-开始时间:{};-结束时间:{};-api:{};", count, beginDateStr, endDateStr, api); + if (count == 0) { + return; + } + //批量插入10000一次 + int page = count%10000 == 0 ? count/10000 : (count/10000) + 1; + //总插入数 + int sfNum = 0; + for (int i = 0; i < page; i++) { + + List data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 10000"); + int size = data.size(); + + log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size); + List insertList = new ArrayList<>(); + + //查询当前对象多态字段映射 + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("api",api).eq("is_create",1).eq("is_link",true); + List configs = linkConfigService.list(queryWrapper); + Map fieldMap = new HashMap<>(); + if (!configs.isEmpty()) { + fieldMap = configs.stream() + .collect(Collectors.toMap( + LinkConfig::getField, // Key提取器 + LinkConfig::getLinkField, // Value提取器 + (oldVal, newVal) -> newVal // 解决重复Key冲突(保留新值) + )); + } + + //判断引用对象是否存在new_id + DataObject update = new DataObject(); + update.setName(api); + + //更新对象的new_id + String[] ids = new String[size]; + + // 定义输入/输出格式 + DateTimeFormatter inputFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME; + DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSxx"); + + for (int j = 1; j <= size; j++) { + JSONObject account = new JSONObject(); + for (DataField dataField : list) { + + if ("Owner_Type".equals(dataField.getField()) || "Id".equals(dataField.getField())){ + continue; + } + if ("CreatedDate".equals(dataField.getField()) && dataField.getIsCreateable()){ + // 转换为UTC时间并格式化 + LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter); + ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC")).minusHours(8) ; + String convertedTime = utcDateTime.format(outputFormatter); + account.put("CreatedDate", convertedTime); + continue; + } + if ("CreatedById".equals(dataField.getField()) && dataField.getIsCreateable()){ + Map CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString()); + if(CreatedByIdMap.get("new_id") != null && StringUtils.isNotEmpty(CreatedByIdMap.get("new_id").toString())){ + account.put("CreatedById", CreatedByIdMap.get("new_id")); + } + continue; + } + if (dataField.getIsCreateable() !=null && dataField.getIsCreateable()) { + if ("reference".equals(dataField.getSfType())){ + //引用类型 + String reference_to = dataField.getReferenceTo(); + //引用类型字段 + String linkfield = fieldMap.get(dataField.getApi()); + + if (StringUtils.isNotBlank(linkfield)){ + reference_to = data.get(j-1).get(linkfield)!=null?data.get(j-1).get(linkfield).toString():null; + } + + if (reference_to == null){ + continue; + } + + if (reference_to.contains(",User") || reference_to.contains("User,")) { + reference_to = "User"; + } + Map m = customMapper.getById("new_id", reference_to, data.get(j - 1).get(dataField.getField()).toString()); + if (m != null && !m.isEmpty()) { + account.put(dataField.getField(), m.get("new_id")); + }else { + String message = "对象类型:" + api + "的数据:"+ data.get(j - 1).get("Id") +"的引用对象:" + reference_to + "的数据:"+ data.get(j - 1).get(dataField.getField()) +"不存在!"; + EmailUtil.send("DataDump ERROR", message); + log.info(message); + return; + } + }else { + if (data.get(j - 1).get(dataField.getField()) != null && StringUtils.isNotBlank(dataField.getSfType())) { + account.put(dataField.getField(), DataUtil.localDataToSfData(dataField.getSfType(), data.get(j - 1).get(dataField.getField()).toString())); + }else { + account.put(dataField.getField(), data.get(j - 1).get(dataField.getField()) ); + } + } + } + + } + + ids[j-1] = data.get(j-1).get("Id").toString(); + insertList.add(account); + if (i*10000+j == count){ + break; + } + } + + try { + + //写入csv文件 + String fullPath = CsvConverterUtil.writeToCsv(insertList, UUID.randomUUID().toString()); + + JobInfo salesforceInsertJob = BulkUtil.createJob(bulkConnection, api, OperationEnum.insert); + + List batchInfos = BulkUtil.createBatchesFromCSVFile(bulkConnection, salesforceInsertJob, fullPath); + + BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos); + + sfNum = sfNum + checkInsertResults(bulkConnection, salesforceInsertJob, batchInfos, api, ids); + + BulkUtil.closeJob(bulkConnection, salesforceInsertJob.getId()); + + new File(fullPath).delete(); + + } catch (Exception e) { + log.error("manualCreatedNewId error api:{}", api, e); + throw e; + } + } + + UpdateWrapper updateQw = new UpdateWrapper<>(); + updateQw.eq("name", api) + .eq("sync_start_date", beginDate) + .eq("sync_end_date", DateUtils.addSeconds(endDate, -1)) + .set("target_sf_num", sfNum); + dataBatchHistoryService.update(updateQw); + + UpdateWrapper updateQw2 = new UpdateWrapper<>(); + updateQw2.eq("name", api) + .eq("sync_start_date", beginDate) + .eq("sync_end_date", endDate) + .set("sf_add_num", sfNum); + dataBatchService.update(updateQw2); + + } + + } diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java index 495b7a9..7adca34 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java @@ -32,6 +32,8 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit;