package com.celnet.datadump.service.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.celnet.datadump.config.SalesforceConnect; import com.celnet.datadump.config.SalesforceExecutor; import com.celnet.datadump.config.SalesforceTargetConnect; import com.celnet.datadump.entity.*; import com.celnet.datadump.global.Const; import com.celnet.datadump.global.TypeCode; import com.celnet.datadump.mapper.CustomMapper; import com.celnet.datadump.param.DataDumpParam; import com.celnet.datadump.param.DataDumpSpecialParam; import com.celnet.datadump.param.SalesforceParam; import com.celnet.datadump.service.*; import com.celnet.datadump.util.DataUtil; import com.celnet.datadump.util.EmailUtil; import com.celnet.datadump.util.SqlUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.sforce.async.BulkConnection; import com.sforce.soap.partner.*; import com.sforce.soap.partner.sobject.SObject; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.log.XxlJobLogger; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import static com.celnet.datadump.global.SystemConfigCode.*; /** * @author Red * @description * @date 2022/09/23 */ @Service @Slf4j public class CommonServiceImpl implements CommonService { @Autowired private SalesforceConnect salesforceConnect; @Autowired private CustomMapper customMapper; @Autowired private SalesforceExecutor salesforceExecutor; @Autowired private DataBatchHistoryService dataBatchHistoryService; @Autowired private DataBatchService dataBatchService; @Autowired private DataObjectService dataObjectService; @Autowired private DataFieldService dataFieldService; @Autowired private FileService fileService; @Autowired private DataPicklistService dataPicklistService; @Autowired private DataDumpSpecialService dataDumpSpecialService; @Autowired private DataReportService dataReportService; @Autowired private DataReportDetailService dataReportDetailService; @Autowired private SalesforceTargetConnect salesforceTargetConnect; @Override public ReturnT increment(SalesforceParam param) throws Exception { QueryWrapper qw = new QueryWrapper<>(); if (StringUtils.isNotBlank(param.getApi())) { List apis = DataUtil.toIdList(param.getApi()); qw.in("name", apis); } qw.eq("need_update", true) .isNotNull("last_update_date"); List list = dataObjectService.list(qw); if (CollectionUtils.isEmpty(list)) { return new ReturnT<>(500, ("表" + param.getApi() + "不存在或未开启更新")); } List> futures = Lists.newArrayList(); try { DataReport dataReport = new DataReport(); dataReport.setType(TypeCode.INCREMENT); dataReport.setApis(list.stream().map(DataObject::getName).collect(Collectors.joining(","))); dataReportService.save(dataReport); for (DataObject dataObject : list) { Future future = salesforceExecutor.execute(() -> { try { Date updateTime = new Date(); SalesforceParam salesforceParam = new SalesforceParam(); salesforceParam.setApi(dataObject.getName()); salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate()); salesforceParam.setType(2); // 更新字段值不为空 按更新字段里的字段校验 if (StringUtils.isNotBlank(dataObject.getUpdateField())) { salesforceParam.setUpdateField(dataObject.getUpdateField()); } PartnerConnection connect = dumpData(salesforceParam, dataReport); dataObject.setLastUpdateDate(updateTime); dataObjectService.updateById(dataObject); // 增量batch更新 QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("name", dataObject.getName()).orderByDesc("sync_start_date").last("limit 1"); DataBatch dataBatch = dataBatchService.getOne(queryWrapper); if (dataBatch != null) { if (dataBatch.getSyncEndDate().before(updateTime)) { DataBatch update = dataBatch.clone(); update.setSyncEndDate(updateTime); int failCount = 0; while (true) { try { SalesforceParam countParam = new SalesforceParam(); countParam.setApi(dataBatch.getName()); countParam.setBeginCreateDate(dataBatch.getSyncStartDate()); countParam.setEndCreateDate(dataBatch.getSyncEndDate()); // 存在isDeleted 只查询IsDeleted为false的 if (dataFieldService.hasDeleted(countParam.getApi())) { countParam.setIsDeleted(false); } else { // 不存在 过滤 countParam.setIsDeleted(null); } // sf count Integer sfNum = countSfNum(connect, countParam); update.setSfNum(sfNum); // db count Integer dbNum = customMapper.count(countParam); update.setDbNum(dbNum); update.setSyncStatus(dbNum.equals(sfNum) ? 1 : 0); update.setFirstDbNum(dbNum); update.setFirstSyncDate(updateTime); update.setFirstSfNum(sfNum); QueryWrapper updateQw = new QueryWrapper<>(); updateQw.eq("name", dataBatch.getName()) .eq("sync_start_date", dataBatch.getSyncStartDate()) .eq("sync_end_date", dataBatch.getSyncEndDate()); dataBatchService.update(update, updateQw); failCount = 0; break; } catch (InterruptedException e) { throw e; } catch (Throwable throwable) { failCount++; log.error("verify error", throwable); if (failCount > Const.MAX_FAIL_COUNT) { throw throwable; } TimeUnit.MINUTES.sleep(1); } } } } } catch (Throwable throwable) { log.error("salesforceExecutor error", throwable); throw new RuntimeException(throwable); } }, 0, 0); futures.add(future); } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); // 存在附件的开始dump list.stream().filter(t -> StringUtils.isNotBlank(t.getBlobField())).forEach(t -> { try { fileService.dumpFile(t.getName(), t.getBlobField(), true); } catch (Exception e) { throw new RuntimeException(e); } }); return ReturnT.SUCCESS; } catch (Throwable throwable) { salesforceExecutor.remove(futures.toArray(new Future[]{})); throw throwable; } } @Override public ReturnT dump(SalesforceParam param) throws Exception { List> futures = Lists.newArrayList(); try { if (StringUtils.isNotBlank(param.getApi())) { // 手动任务 ReturnT result = manualDump(param, futures); if (result != null) { return result; } } else { // 自动任务 autoDump(param, futures); } return ReturnT.SUCCESS; } catch (Throwable throwable) { salesforceExecutor.remove(futures.toArray(new Future[]{})); log.error("dump error", throwable); throw throwable; } } /** * 自动dump * * @param param 参数 * @param futures futures */ private void autoDump(SalesforceParam param, List> futures) throws InterruptedException { QueryWrapper qw = new QueryWrapper<>(); qw.eq("data_work", 1) .eq("data_lock", 0) .orderByAsc("data_index") .last(" limit 10"); while (true) { List dataObjects = dataObjectService.list(qw); if (CollectionUtils.isEmpty(dataObjects)) { break; } for (DataObject dataObject : dataObjects) { DataObject update = new DataObject(); TimeUnit.MILLISECONDS.sleep(1); try { String api = dataObject.getName(); log.info("dump apis: {}", api); XxlJobLogger.log("dump apis: {}", api); // 检测表是否存在 不存在创建 checkApi(api, true); // 检测是否有创建时间 没有创建时间 按特殊表处理 直接根据id来获取 if (!hasCreatedDate(api)) { Future future = salesforceExecutor.execute(() -> { try { DataDumpSpecialParam dataDumpSpecialParam = new DataDumpSpecialParam(); dataDumpSpecialParam.setApi(api); update.setName(dataObject.getName()); update.setDataLock(1); dataObjectService.updateById(update); param.setApi(api); salesforceExecutor.waitForFutures(dataDumpSpecialService.getData(dataDumpSpecialParam, salesforceConnect.createConnect())); update.setDataWork(0); } catch (Throwable e) { throw new RuntimeException(e); } finally { if (StringUtils.isNotBlank(update.getName())) { update.setDataLock(0); dataObjectService.updateById(update); } } }, 0, 0); futures.add(future); continue; } update.setName(dataObject.getName()); update.setDataLock(1); dataObjectService.updateById(update); param.setApi(api); List salesforceParams; QueryWrapper dbQw = new QueryWrapper<>(); dbQw.eq("name", api); List list = dataBatchService.list(dbQw); AtomicInteger batch = new AtomicInteger(1); if (CollectionUtils.isEmpty(list)) { // 没有批次 salesforceParams = DataUtil.splitTask(param); } else { // 有批次 先过滤首次执行过的 salesforceParams = list.stream().filter(t -> t.getFirstSyncDate() == null) .map(t -> { SalesforceParam salesforceParam = param.clone(); salesforceParam.setApi(t.getName()); salesforceParam.setBeginCreateDate(t.getSyncStartDate()); salesforceParam.setEndCreateDate(t.getSyncEndDate()); salesforceParam.setBatch(batch.getAndIncrement()); return salesforceParam; }).collect(Collectors.toList()); } // 批次都执行过了 重新同步 if (CollectionUtils.isEmpty(salesforceParams)) { salesforceParams = DataUtil.splitTask(param); } for (SalesforceParam salesforceParam : salesforceParams) { Future future = salesforceExecutor.execute(() -> { try { dumpData(salesforceParam); } catch (Throwable throwable) { log.error("salesforceExecutor error", throwable); throw new RuntimeException(throwable); } }, salesforceParam.getBatch(), 0); futures.add(future); } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); // 附件表 跑一遍dumpFile if (StringUtils.isNotBlank(dataObject.getBlobField())) { fileService.dumpFile(dataObject.getName(), dataObject.getBlobField(), true); } update.setDataWork(0); } catch (Throwable e) { throw new RuntimeException(e); } finally { if (dataObject != null) { update.setDataLock(0); dataObjectService.updateById(update); } } } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); futures.clear(); } } private ReturnT manualDump(SalesforceParam param, List> futures) throws InterruptedException { List apis; apis = DataUtil.toIdList(param.getApi()); String join = StringUtils.join(apis, ","); log.info("dump apis: {}", join); XxlJobLogger.log("dump apis: {}", join); // 全量的时候 检测是否有自动任务锁住的表 boolean isFull = param.getBeginDate() == null && param.getEndDate() == null && CollectionUtils.isEmpty(param.getIds()); if (isFull) { QueryWrapper qw = new QueryWrapper<>(); qw.eq("data_lock", 1).in("name", apis); List list = dataObjectService.list(qw); if (CollectionUtils.isNotEmpty(list)) { String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining()); return new ReturnT<>(500, "api:" + apiNames + " is locked"); } } // 根据参数获取sql for (String api : apis) { DataObject update = new DataObject(); TimeUnit.MILLISECONDS.sleep(1); try { checkApi(api, true); if (!hasCreatedDate(api)) { DataDumpSpecialParam dataDumpSpecialParam = new DataDumpSpecialParam(); dataDumpSpecialParam.setApi(api); Future future = dataDumpSpecialService.getData(dataDumpSpecialParam, salesforceConnect.createConnect()); // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(future); continue; } param.setApi(api); List salesforceParams = null; if (isFull) { update.setName(api); update.setDataLock(1); dataObjectService.updateById(update); QueryWrapper dbQw = new QueryWrapper<>(); dbQw.eq("name", api) .isNull("first_sync_date"); List list = dataBatchService.list(dbQw); AtomicInteger batch = new AtomicInteger(1); if (CollectionUtils.isNotEmpty(list)) { salesforceParams = list.stream().map(t -> { SalesforceParam salesforceParam = param.clone(); salesforceParam.setApi(t.getName()); salesforceParam.setBeginCreateDate(t.getSyncStartDate()); salesforceParam.setEndCreateDate(t.getSyncEndDate()); salesforceParam.setBatch(batch.getAndIncrement()); return salesforceParam; }).collect(Collectors.toList()); } else { salesforceParams = DataUtil.splitTask(param); } } else { salesforceParams = DataUtil.splitTask(param); } // 手动任务优先执行 for (SalesforceParam salesforceParam : salesforceParams) { Future future = salesforceExecutor.execute(() -> { try { dumpData(salesforceParam); } catch (Throwable throwable) { log.error("salesforceExecutor error", throwable); throw new RuntimeException(throwable); } }, salesforceParam.getBatch(), 1); futures.add(future); } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); // 附件表 跑一遍dumpFile DataObject one = dataObjectService.getById(api); if (StringUtils.isNotBlank(one.getBlobField())) { fileService.dumpFile(one.getName(), one.getBlobField(), true); } update.setDataWork(0); } catch (Throwable e) { log.error("manualDump error", e); throw new RuntimeException(e); } finally { if (isFull) { update.setName(api); update.setDataLock(0); dataObjectService.updateById(update); } } } return null; } /** * 数据传输主体 * * @param param 参数 */ private PartnerConnection dumpData(SalesforceParam param) throws Throwable { return dumpData(param, null); } /** * 数据传输主体 * * @param param 参数 */ private PartnerConnection dumpData(SalesforceParam param, DataReport dataReport) throws Throwable { String api = param.getApi(); PartnerConnection connect = null; try { DataBatchHistory dataBatchHistory = new DataBatchHistory(); dataBatchHistory.setName(api); dataBatchHistory.setStartDate(new Date()); dataBatchHistory.setSyncStartDate(param.getBeginCreateDate()); if (param.getEndCreateDate() != null) { dataBatchHistory.setSyncEndDate(DateUtils.addSeconds(param.getEndCreateDate(), -1)); } dataBatchHistory.setBatch(param.getBatch()); if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null) { log.info("NO.{} dump {}, date: {} ~ {} start", param.getBatch(), param.getApi(), DateFormatUtils.format(param.getBeginCreateDate(), "yyyy-MM-dd HH:mm:ss"), DateFormatUtils.format(param.getEndCreateDate(), "yyyy-MM-dd HH:mm:ss")); } connect = salesforceConnect.createConnect(); // 存在isDeleted 只查询IsDeleted为false的 if (dataFieldService.hasDeleted(param.getApi())) { param.setIsDeleted(false); } else { // 不存在 过滤 param.setIsDeleted(null); } // 若count数量过多 可能导致超时出不来结果 对该任务做进一步拆分 int failCount = 0; boolean isSuccess = false; while (failCount <= Const.MAX_FAIL_COUNT) { try { dataBatchHistory.setSfNum(countSfNum(connect, param)); isSuccess = true; break; } catch (Throwable throwable) { failCount++; } } // 不成功 做任务拆分 if (!isSuccess) { if (splitTask(param)) { return connect; } } getAllSfData(param, connect, dataReport); updateDataBatch(param, dataBatchHistory); } catch (Throwable throwable) { log.error("dataDumpJob error api:{}", api, throwable); String type = param.getType() == 1 ? "存量" : "增量"; String format = String.format("%s数据迁移 error, api name: %s, \nparam: %s, \ncause:\n%s", type, api, JSON.toJSONString(param, DataDumpParam.getFilter()), throwable); EmailUtil.send("DataDump ERROR", format); throw throwable; } return connect; } /** * 任务拆分 * * @param param 任务参数 * @return true成功 false失败 */ private boolean splitTask(SalesforceParam param) { List> futures = Lists.newArrayList(); try { QueryWrapper qw = new QueryWrapper<>(); qw.eq("name", param.getApi()) .eq("sync_start_date", param.getBeginCreateDate()) .eq("sync_end_date", param.getEndCreateDate()); DataBatch dataBatch = dataBatchService.getOne(qw); // 如果已经是同一天了 则抛出错误 不继续执行 if (DateUtils.isSameDay(dataBatch.getSyncEndDate(), dataBatch.getSyncStartDate())) { throw new Exception("can't count sf num"); } DataBatch dataBatch1 = dataBatch.clone(); DataBatch dataBatch2 = dataBatch.clone(); Date midDate = DateUtils.addMilliseconds(dataBatch.getSyncStartDate(), (int) (dataBatch.getSyncEndDate().getTime() - dataBatch.getSyncStartDate().getTime())); Date now = new Date(); dataBatch1.setSyncEndDate(midDate); dataBatch2.setSyncStartDate(midDate); dataBatchService.remove(qw); dataBatchService.save(dataBatch1); dataBatchService.save(dataBatch2); SalesforceParam param1 = param.clone(); SalesforceParam param2 = param.clone(); param1.setEndCreateDate(midDate); Future future1 = salesforceExecutor.execute(() -> { try { dumpData(param1); } catch (Throwable throwable) { log.error("salesforceExecutor error", throwable); throw new RuntimeException(throwable); } }, param1.getBatch(), 0); param2.setBeginCreateDate(midDate); Future future2 = salesforceExecutor.execute(() -> { try { dumpData(param2); } catch (Throwable throwable) { log.error("salesforceExecutor error", throwable); throw new RuntimeException(throwable); } }, param2.getBatch(), 0); futures = Lists.newArrayList(future1, future2); // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); return true; } catch (Throwable throwable) { salesforceExecutor.remove(futures.toArray(new Future[]{})); } return false; } /** * 遍历获取所有sf数据并处理 * * @param param 参数 * @param connect sf connect */ private void getAllSfData(SalesforceParam param, PartnerConnection connect, DataReport dataReport) throws Throwable { JSONArray objects = null; String api = param.getApi(); Map map = Maps.newHashMap(); String dateName = param.getType() == 1 ? Const.CREATED_DATE : param.getUpdateField(); int count = 0; Date lastCreatedDate = null; String maxId = null; Field[] dsrFields = null; // 获取sf字段 { List fields = Lists.newArrayList(); DescribeSObjectResult dsr = connect.describeSObject(api); dsrFields = dsr.getFields(); for (Field field : dsrFields) { // 不查询文件 if ("base64".equalsIgnoreCase(field.getType().toString())) { continue; } fields.add(field.getName()); } if ("Attachment".equals(api) || "FeedItem".equals(api)) { fields.add("Parent.type"); } DataObject dataObject = dataObjectService.getById(api); if (dataObject != null && StringUtils.isNotBlank(dataObject.getExtraField())) { fields.addAll(Arrays.asList(StringUtils.split(dataObject.getExtraField().replaceAll(StringUtils.SPACE, StringUtils.EMPTY), ","))); } param.setSelect(StringUtils.join(fields, ",")); } // 获取数据库字段进行比对 List fields = customMapper.getFields(api).stream().map(String::toUpperCase).collect(Collectors.toList()); int failCount = 0; while (true) { try { // 获取创建时间 param.setTimestamp(lastCreatedDate); // 判断是否存在要排除的id param.setMaxId(maxId); map.put("param", param); String sql; if (param.getType() == 1) { // type 1 按创建时间纬度 sql = SqlUtil.showSql("com.celnet.datadump.mapper.SalesforceMapper.list", map); } else { // type 2 按修改时间纬度 sql = SqlUtil.showSql("com.celnet.datadump.mapper.SalesforceMapper.listByModifyTime", map); } log.info("query sql: {}", sql); XxlJobLogger.log("query sql: {}", sql); QueryResult queryResult = connect.queryAll(sql); if (ObjectUtils.isEmpty(queryResult) || ObjectUtils.isEmpty(queryResult.getRecords())) { break; } SObject[] records = queryResult.getRecords(); objects = DataUtil.toJsonArray(records, dsrFields); // 获取最大修改时间和等于该修改时间的数据id { Date maxDate = objects.getJSONObject(objects.size() - 1).getDate(dateName); maxId = objects.stream() .map(t -> (JSONObject) t) .filter(t -> maxDate.equals(t.getDate(dateName))) .map(t -> t.getString(Const.ID)) .max(String::compareTo).get(); lastCreatedDate = maxDate; } // 存储更新 saveOrUpdate(api, fields, records, objects, true); count += records.length; TimeUnit.MILLISECONDS.sleep(1); String format = DateFormatUtils.format(lastCreatedDate, "yyyy-MM-dd HH:mm:ss"); log.info("dump success count: {}, timestamp: {}", count, format); XxlJobLogger.log("dump success count: {}, timestamp: {}", count, format); failCount = 0; objects = null; } catch (InterruptedException e) { throw e; } catch (Throwable throwable) { failCount++; log.error("dataDumpJob error api:{}, data:{}", api, JSON.toJSONString(objects), throwable); if (failCount > Const.MAX_FAIL_COUNT) { throwable.addSuppressed(new Exception("dataDump error data:" + JSON.toJSONString(objects))); throw throwable; } TimeUnit.MINUTES.sleep(1); } } if (ObjectUtils.isNotEmpty(dataReport)) { DataReportDetail dataReportDetail = new DataReportDetail(); dataReportDetail.setApi(param.getApi()); dataReportDetail.setReportId(dataReport.getId()); dataReportDetail.setNum(count); dataReportDetailService.save(dataReportDetail); } } /** * 批次存在 且首次时间为0或者为空 赋值、保存执行记录 * * @param param 参数 * @param dataBatchHistory 执行记录 */ private void updateDataBatch(SalesforceParam param, DataBatchHistory dataBatchHistory) { if (dataBatchHistory == null) { return; } dataBatchHistory.setEndDate(new Date()); Integer num = customMapper.count(param); dataBatchHistory.setDbNum(num); if (dataBatchHistory.getSfNum() != null) { dataBatchHistory.setSyncStatus(dataBatchHistory.getSfNum().equals(dataBatchHistory.getDbNum()) ? 1 : 0); } dataBatchHistory.setCost(DataUtil.calTime(dataBatchHistory.getEndDate(), dataBatchHistory.getStartDate())); QueryWrapper qw = new QueryWrapper<>(); qw.eq("name", param.getApi()) .eq("sync_start_date", param.getBeginCreateDate()) .eq("sync_end_date", param.getEndCreateDate()) .isNull("first_sync_date"); DataBatch dataBatch = dataBatchService.getOne(qw); if (dataBatch == null) { return; } dataBatch.setFirstSfNum(dataBatchHistory.getSfNum()); dataBatch.setFirstDbNum(dataBatchHistory.getDbNum()); dataBatch.setFirstSyncDate(dataBatchHistory.getStartDate()); dataBatch.setSyncStatus(dataBatchHistory.getSyncStatus()); dataBatchService.update(dataBatch, qw); log.info("count db num: {}", num); XxlJobLogger.log("count db num: {}", num); dataBatchHistoryService.save(dataBatchHistory); log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num); } /** * 获取需同步sf数据数量 * * @param connect connect * @param param 参数 * @return sf统计数量 */ @Override public Integer countSfNum(PartnerConnection connect, SalesforceParam param) throws Exception { Map map = Maps.newHashMap(); map.put("param", param); String sql = SqlUtil.showSql("com.celnet.datadump.mapper.SalesforceMapper.count", map); log.info("count sql: {}", sql); XxlJobLogger.log("count sql: {}", sql); QueryResult queryResult = connect.queryAll(sql); SObject record = queryResult.getRecords()[0]; Integer num = (Integer) record.getField("num"); log.info("count sf num: {}", num); XxlJobLogger.log("count sf num: {}", num); return num; } /** * 插入或更新数据 * * @param api 表名 * @param fields 字段 * @param records 数据list * @param objects 数据 json格式 * @param insert 不存在是否插入 */ @Override public Integer saveOrUpdate(String api, List fields, SObject[] records, JSONArray objects, boolean insert) throws Throwable { // 根据id查询数据库 取出已存在数据的id List ids = Arrays.stream(records).map(SObject::getId).collect(Collectors.toList()); DataObject one = dataObjectService.getById(api); List existsIds = customMapper.getIds(api, ids); for (int i = 0; i < objects.size(); i++) { JSONObject jsonObject = objects.getJSONObject(i); try { Set keys = jsonObject.keySet(); // update String id = jsonObject.getString(Const.ID); List> maps = Lists.newArrayList(); for (String key : keys) { if (fields.stream().anyMatch(key::equalsIgnoreCase)) { Map paramMap = Maps.newHashMap(); paramMap.put("key", key); paramMap.put("value", jsonObject.get(key)); maps.add(paramMap); } } // 附件表 插入更新时把is_dump置为false if (StringUtils.isNotBlank(one.getBlobField())) { Map paramMap = Maps.newHashMap(); paramMap.put("key", "is_dump"); paramMap.put("value", false); Map paramMap2 = Maps.newHashMap(); paramMap2.put("key", "is_upload"); paramMap2.put("value", false); maps.add(paramMap); maps.add(paramMap2); } //附件关联表 插入更新时给关联对象赋值 // if ("ContentDocumentLink".equals(api)) { // String linkedEntity_Type = records[i].getChild("LinkedEntity").getChild("Type").getValue().toString(); // Map paramMap = Maps.newHashMap(); // paramMap.put("key", "linkedEntity_Type"); // paramMap.put("value", linkedEntity_Type); // maps.add(paramMap); // } if (existsIds.contains(id)) { customMapper.updateById(api, maps, id); } else { if (insert) { customMapper.save(api, maps); } } TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e) { throw e; } catch (Throwable throwable) { if (throwable.toString().contains("interrupt")) { log.error("may interrupt error:", throwable); throw new InterruptedException(); } throwable.addSuppressed(new Exception("saveOrUpdate error data:" + JSON.toJSONString(jsonObject))); throw throwable; } } return existsIds.size(); } private final ReentrantLock reentrantLock = new ReentrantLock(); /** * 检测表是否存在 不存在则创建 * * @param apiName 表名 * @param createBatch sf 连接 */ @Override public void checkApi(String apiName, Boolean createBatch) throws Exception { // 加个锁 避免重复执行创建api reentrantLock.lock(); log.info("check api:{}", apiName); try { boolean hasCreatedDate = false; PartnerConnection connection = salesforceConnect.createConnect(); DataObject dataObject = dataObjectService.getById(apiName); Date now = new Date(); if (StringUtils.isBlank(customMapper.checkTable(apiName))) { log.info("api:{} doesn't exist create", apiName); // 构建字段 List> list = Lists.newArrayList(); DescribeSObjectResult dsr = connection.describeSObject(apiName); String label = dsr.getLabel(); List fieldList = Lists.newArrayList(); List fields = Lists.newArrayList(); String blobField = null; List dataPicklists = Lists.newArrayList(); for (Field field : dsr.getFields()) { // 过滤字段 if (Const.TABLE_FILTERS.contains(field.getName())) { continue; } if (Const.CREATED_DATE.equalsIgnoreCase(field.getName())) { hasCreatedDate = true; } Map map = Maps.newHashMap(); String sfType = field.getType().toString(); if ("base64".equalsIgnoreCase(sfType)) { blobField = field.getName(); } map.put("type", DataUtil.fieldTypeToMysql(field)); // 英文' 转换为 \' map.put("comment", field.getLabel().replaceAll("'", "\\\\'")); map.put("name", field.getName()); list.add(map); DataField dataField = new DataField(); dataField.setApi(apiName); dataField.setSfType(sfType); dataField.setField(field.getName()); dataField.setName(field.getLabel()); dataField.setIsCreateable(field.getCreateable()); dataField.setIsNillable(field.getNillable()); dataField.setIsDefaultedOnCreate(field.getDefaultedOnCreate()); String join = null; // 会有非常多映射关系的字段在 这里置空不管 if (field.getReferenceTo().length <= 3) { join = StringUtils.join(field.getReferenceTo(), ","); } else { log.warn("referenceTo too long set null, api:{}, field:{}, reference to:{}", apiName, field.getName(), StringUtils.join(field.getReferenceTo(), ",")); } // picklist保存到picklist表 if ("picklist".equalsIgnoreCase(sfType)) { join = "data_picklist"; PicklistEntry[] picklistValues = field.getPicklistValues(); if (ArrayUtils.isNotEmpty(picklistValues)) { for (PicklistEntry picklistValue : picklistValues) { DataPicklist dataPicklist = new DataPicklist(); dataPicklist.setApi(apiName); dataPicklist.setField(field.getName()); dataPicklist.setLabel(picklistValue.getLabel()); dataPicklist.setValue(picklistValue.getValue()); dataPicklists.add(dataPicklist); } } } dataField.setReferenceTo(join); fieldList.add(dataField); fields.add(field.getName()); } // 存在ownerId字段 String extraField = ""; if (dataObject != null && StringUtils.isNotBlank(dataObject.getExtraField())) { extraField = dataObject.getExtraField(); } if (fields.stream().anyMatch(Const.OWNER_ID::equalsIgnoreCase)) { if (StringUtils.isNotBlank(extraField)) { extraField += ",Owner.Type"; } else { extraField = "Owner.Type"; } } // 是否存在额外字段 配置上表 if (StringUtils.isNotBlank(extraField)) { Set extras = Arrays.stream(StringUtils.split(extraField.replaceAll(StringUtils.SPACE, StringUtils.EMPTY), ",")).collect(Collectors.toSet()); for (String extra : extras) { String fieldName = extra.replaceAll("\\.", "_"); DataField dataField = new DataField(); dataField.setApi(apiName); dataField.setField(fieldName); dataField.setName(extra); fieldList.add(dataField); fields.add(fieldName); Map map = Maps.newHashMap(); map.put("type", "varchar(255)"); map.put("comment", extra); map.put("name", fieldName); list.add(map); } } // 存在附件的表 作特殊处理 构建两个字段 if (StringUtils.isNotBlank(blobField)) { fileExtraFieldBuild(apiName, list, fieldList, fields); } // 构建索引 List> index = Lists.newArrayList(); for (String tableIndex : Const.TABLE_INDEX) { if (!fields.contains(tableIndex)) { continue; } Map createDateMap = Maps.newHashMap(); createDateMap.put("field", tableIndex); createDateMap.put("name", String.format("IDX_%s_%s", apiName, tableIndex)); index.add(createDateMap); } log.info("api {} not exist, create..", apiName); //新增一个用来存储新sfid的字段 Map map = Maps.newHashMap(); map.put("type", "varchar(255)"); map.put("comment", "新sfid"); map.put("name", "new_id"); list.add(map); if ("Task".equals(apiName) || "Event".equals(apiName)){ Map LinkedMap = Maps.newHashMap(); LinkedMap.put("type", "varchar(18)"); LinkedMap.put("comment", "whatTextId"); LinkedMap.put("name", "WhatId_Text__c"); list.add(LinkedMap); Map LinkedMap1 = Maps.newHashMap(); LinkedMap1.put("type", "varchar(18)"); LinkedMap1.put("comment", "whoTextId"); LinkedMap1.put("name", "WhoId_Text__c"); list.add(LinkedMap1); } if ("ContentDocumentLink".equals(apiName)){ //文档关联表新增关联对象字段 Map LinkedMap = Maps.newHashMap(); LinkedMap.put("type", "varchar(255)"); LinkedMap.put("comment", "关联对象"); LinkedMap.put("name", "LinkedEntity_Type"); list.add(LinkedMap); } if ("Attachment".equals(apiName) || "FeedComment".equals(apiName) || "FeedItem".equals(apiName)){ //文档关联表新增关联对象字段 Map LinkedMap = Maps.newHashMap(); LinkedMap.put("type", "varchar(255)"); LinkedMap.put("comment", "关联对象"); LinkedMap.put("name", "Parent_Type"); list.add(LinkedMap); DataField dataField = new DataField(); dataField.setApi(apiName); dataField.setField("Parent.Type"); dataField.setName("关联对象"); fieldList.add(dataField); } customMapper.createTable(apiName, label, list, index); // 生成字段映射 QueryWrapper delfieldQw = new QueryWrapper<>(); delfieldQw.eq("api", apiName); dataFieldService.remove(delfieldQw); dataFieldService.saveBatch(fieldList); // 生成picklist QueryWrapper deldQw = new QueryWrapper<>(); deldQw.eq("api", apiName); dataPicklistService.remove(deldQw); dataPicklistService.saveBatch(dataPicklists); // 生成batch Date startDate = DataUtil.DEFAULT_BEGIN_DATE; // 取当天零点 作为最大时间 Date endCreateDate = DateUtils.parseDate(DateFormatUtils.format(now, "yyyy-MM-dd"), "yyyy-MM-dd"); Date lastDay = null; QueryWrapper delQw = new QueryWrapper<>(); delQw.eq("name", apiName); dataBatchService.remove(delQw); if (createBatch && Const.BATCH_FILTERS.stream().noneMatch(t -> apiName.indexOf(t) > 0)) { // 匹配的不生成 DataBatch one = new DataBatch(); one.setFirstDbNum(0); one.setFirstSfNum(0); one.setDbNum(0); one.setSfNum(0); one.setName(apiName); one.setLabel(label); if (hasCreatedDate) { List> bachList = customMapper.list("code,value","system_config","code ='"+BATCH_TYPE+"'"); String batchType = bachList.get(0).get("value").toString(); do { lastDay = getLastDay(batchType, endCreateDate, startDate); DataBatch dataBatch = one.clone(); dataBatch.setSyncStartDate(startDate); dataBatch.setSyncEndDate(lastDay); dataBatchService.save(dataBatch); startDate = lastDay; } while (lastDay.compareTo(endCreateDate) < 0); } else { // 没创建时间 开始结束时间为空 dataBatchService.save(one); } } DataObject update = new DataObject(); update.setLabel(label); update.setName(apiName); update.setLastUpdateDate(endCreateDate); update.setBlobField(blobField); dataObjectService.saveOrUpdate(update); } } finally { reentrantLock.unlock(); } } @Override public ReturnT createApi(SalesforceParam param) throws Exception { List apis; if (StringUtils.isBlank(param.getApi())) { apis = dataObjectService.list().stream().map(DataObject::getName).collect(Collectors.toList()); } else { apis = DataUtil.toIdList(param.getApi()); } log.info("打印所有待同步表:" +apis.toString()); for (String api : apis) { checkApi(api, true); } return ReturnT.SUCCESS; } @Override public ReturnT getAllApi() throws Exception { PartnerConnection partnerConnection = salesforceConnect.createConnect(); DescribeGlobalResult result = partnerConnection.describeGlobal(); DescribeGlobalSObjectResult[] Sobjects = result.getSobjects(); List dataObjects = new ArrayList<>(); for(DescribeGlobalSObjectResult Sobject : Sobjects) { DataObject dataObject = new DataObject(); String SobjectName = Sobject.getName(); String SobjectLabel = Sobject.getLabel(); dataObject.setName(SobjectName); dataObject.setLabel(SobjectLabel); dataObjects.add(dataObject); } dataObjectService.saveBatch(dataObjects); return ReturnT.SUCCESS; } /** * 是否存在创建时间字段 * * @param api api名称 * @return true存在创建时间字段 false不存在 */ private Boolean hasCreatedDate(String api) { QueryWrapper qw = new QueryWrapper<>(); qw.eq("api", api).eq("field", Const.CREATED_DATE).last("limit 1"); DataField one = dataFieldService.getOne(qw); return one != null; } /** * 存在附件的表 特殊构建 * * @param apiName 表名称 * @param list 构建表字段的list * @param fieldList 构建字段表的list * @param fields 字段名称list */ private static void fileExtraFieldBuild(String apiName, List> list, List fieldList, List fields) { { DataField dataField = new DataField(); dataField.setApi(apiName); dataField.setField("url"); dataField.setName("文件路径"); fieldList.add(dataField); fields.add("url"); Map map = Maps.newHashMap(); map.put("type", "text"); map.put("comment", "文件路径"); map.put("name", "url"); list.add(map); } { DataField dataField = new DataField(); dataField.setApi(apiName); dataField.setField("is_dump"); dataField.setName("是否存储"); fieldList.add(dataField); fields.add("is_dump"); Map map = Maps.newHashMap(); map.put("type", "tinyint(1) DEFAULT 0"); map.put("comment", "是否存储"); map.put("name", "is_dump"); list.add(map); } { DataField dataField = new DataField(); dataField.setApi(apiName); dataField.setField("is_upload"); dataField.setName("是否上传"); fieldList.add(dataField); fields.add("is_upload"); Map map = Maps.newHashMap(); map.put("type", "tinyint(1) DEFAULT 0"); map.put("comment", "是否上传"); map.put("name", "is_upload"); list.add(map); } } @Override public ReturnT getDocumentLink(String paramStr) throws Exception { String api = "ContentDocumentLink"; PartnerConnection partnerConnection = salesforceConnect.createConnect(); PartnerConnection connection = salesforceTargetConnect.createConnect(); List> list = customMapper.list("Id", "ContentDocument", "new_id is not null"); DescribeSObjectResult dsr = partnerConnection.describeSObject(api); List fields = customMapper.getFields(api).stream().map(String::toUpperCase).collect(Collectors.toList()); Field[] dsrFields = dsr.getFields(); try { if (list != null && list.size() > 0) { for (Map map : list) { String contentDocumentId = (String) map.get("Id"); String sql = "SELECT Id, LinkedEntityId, LinkedEntity.Type, ContentDocumentId, Visibility, ShareType, SystemModstamp, IsDeleted FROM ContentDocumentLink where ContentDocumentId = '" + contentDocumentId + "'"; JSONArray objects = null; try { QueryResult queryResult = partnerConnection.queryAll(sql); SObject[] records = queryResult.getRecords(); objects = DataUtil.toJsonArray(records, dsrFields); saveOrUpdate(api, fields, records, objects, true); } catch (Throwable e) { log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(objects), e); TimeUnit.MINUTES.sleep(1); return ReturnT.FAIL; } } //表内数据总量 Integer count = customMapper.countBySQL(api, "where ShareType = 'V' and new_id = '0'"); //批量插入200一次 int page = count % 200 == 0 ? count / 200 : (count / 200) + 1; for (int i = 0; i < page; i++) { List> linkList = customMapper.list("Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility", api, "ShareType = 'V' and new_id = '0' order by Id asc limit 200"); SObject[] accounts = new SObject[linkList.size()]; String[] ids = new String[linkList.size()]; int index = 0; for (Map map : linkList) { String linkedEntityId = (String) map.get("LinkedEntityId"); String id = (String) map.get("Id"); String contentDocumentId = (String) map.get("ContentDocumentId"); String linkedEntityType = (String) map.get("LinkedEntity_Type"); String shareType = (String) map.get("ShareType"); String Visibility = (String) map.get("Visibility"); // dataObject查询 QueryWrapper qw = new QueryWrapper<>(); qw.eq("name", linkedEntityType); List objects = dataObjectService.list(qw); if (!objects.isEmpty()) { Map dMap = customMapper.getById("new_id", "ContentDocument", contentDocumentId); Map lMap = customMapper.getById("new_id", linkedEntityType, linkedEntityId); SObject account = new SObject(); account.setType(api); account.setField("ContentDocumentId", dMap.get("new_id").toString()); account.setField("LinkedEntityId", lMap.get("new_id").toString()); account.setField("ShareType", shareType); account.setField("Visibility", Visibility); ids[index] = id; accounts[index] = account; index++; } } try { SaveResult[] saveResults = connection.create(accounts); for (int j = 0; j < saveResults.length; j++) { if (!saveResults[j].getSuccess()) { String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j])); EmailUtil.send("DataDump ContentDocumentLink ERROR", format); } else { List> dList = new ArrayList<>(); Map linkMap = new HashMap<>(); linkMap.put("key", "new_id"); linkMap.put("value", saveResults[j].getId()); dList.add(linkMap); customMapper.updateById("ContentDocumentLink", dList, ids[j]); } } } catch (Exception e) { log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(accounts), e); EmailUtil.send("-------测试-----------", JSON.toJSONString(accounts)); throw new RuntimeException(e); } } } } catch (Exception e) { log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(list), e); return ReturnT.FAIL; } return null; } @Override public String getDocumentId(PartnerConnection partnerConnection, String contentVersionId) throws Exception{ QueryResult queryResult = partnerConnection.queryAll("SELECT Id, ContentDocumentId, IsDeleted FROM ContentVersion where Id = " + "'" + contentVersionId + "'"); DescribeSObjectResult dsr = partnerConnection.describeSObject("ContentVersion"); Field[] dsrFields = dsr.getFields(); SObject[] records = queryResult.getRecords(); JSONArray objects = DataUtil.toJsonArray(records, dsrFields); if (objects != null && objects.size() > 0) { JSONObject jsonObject = objects.getJSONObject(0); return jsonObject.getString("ContentDocumentId"); } return null; } @Override public ReturnT getChatter(SalesforceParam param) throws Exception { List> feedList = customMapper.list("Parent_Type", "FeedItem", "1=1 GROUP BY Parent_Type"); for (Map map : feedList){ String parentType = map.get("Parent_Type").toString(); List types = new ArrayList<>(); //检测表是否存在 if (StringUtils.isBlank(customMapper.checkTable(parentType))){ types.add(parentType); } if (types.size()>0){ customMapper.delete("FeedItem", types); customMapper.delete("FeedComment", types); } } return null; } public Date getLastDay(String batchType,Date endDate,Date startDate){ switch (batchType) { case BATCH_TYPE_WEEK: return DataUtil.getWeekLastDay(endDate, startDate); case BATCH_TYPE_MONTH: return DataUtil.getMonthLastDay(endDate, startDate); case BATCH_TYPE_YEAR: return DataUtil.getYearLastDay(endDate, startDate); default: return endDate; } } }