package com.celnet.datadump.service.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.celnet.datadump.config.SalesforceConnect; import com.celnet.datadump.config.SalesforceExecutor; import com.celnet.datadump.entity.DataObject; import com.celnet.datadump.global.Const; import com.celnet.datadump.param.DataCheckDeletedParam; import com.celnet.datadump.param.SalesforceParam; import com.celnet.datadump.mapper.CustomMapper; import com.celnet.datadump.service.CommonService; import com.celnet.datadump.service.DataCheckDeletedService; import com.celnet.datadump.service.DataFieldService; import com.celnet.datadump.service.DataObjectService; import com.celnet.datadump.util.DataUtil; import com.celnet.datadump.util.SqlUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.sforce.soap.partner.DescribeSObjectResult; import com.sforce.soap.partner.PartnerConnection; import com.sforce.soap.partner.Field; import com.sforce.soap.partner.QueryResult; import com.sforce.soap.partner.sobject.SObject; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.log.XxlJobLogger; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * @author Red * @description * @date 2022/12/06 */ @Slf4j @Component public class DataCheckDeletedJobServiceImpl implements DataCheckDeletedService { @Autowired private SalesforceConnect salesforceConnect; @Autowired private CustomMapper customMapper; @Autowired private SalesforceExecutor salesforceExecutor; @Autowired private CommonService commonService; @Autowired private DataObjectService dataObjectService; @Autowired private DataFieldService dataFieldService; /** * 检测Data 逻辑删除的数据 更新回db * * @param param 参数 * @return returnT */ @Override public ReturnT checkDeletedData(DataCheckDeletedParam param) throws Throwable { List> futures = Lists.newArrayList(); try { // 指定sf api List apis; if (StringUtils.isNotBlank(param.getApi())) { apis = DataUtil.toIdList(param.getApi()); } else { // 查询object所有表 按index 从小到大排序 null在最后 List list = dataObjectService.list(); list.sort(Comparator.comparing(DataObject::getDataIndex, Comparator.nullsLast(Integer::compareTo))); apis = list.stream().map(DataObject::getName).collect(Collectors.toList()); } PartnerConnection connect = salesforceConnect.createConnect(); for (String api : apis) { futures.add(salesforceExecutor.execute(() -> { try { updateDeletedData(param, api, connect); } catch (Throwable throwable) { log.error("checkDeletedData error api:{}",api, throwable); } }, 1, 2)); } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); } catch (Throwable throwable) { salesforceExecutor.remove(futures.toArray(new Future[]{})); throw throwable; } return ReturnT.SUCCESS; } private void updateDeletedData(DataCheckDeletedParam dataCheckDeletedParam, String api, PartnerConnection connect) throws Throwable { JSONArray objects = null; // 不存在isDeleted 过滤 if (!dataFieldService.hasDeleted(api)) { return; } // 如果不存在表结构 跳过该任务 if (!checkApi(api)) { return; } SalesforceParam param = new SalesforceParam(); BeanUtils.copyProperties(dataCheckDeletedParam, param); DataObject dataObject = dataObjectService.getById(api); // 更新字段值不为空 按更新字段里的字段校验 if (StringUtils.isNotBlank(dataObject.getUpdateField())) { param.setUpdateField(dataObject.getUpdateField()); } Field[] dsrFields = null; // 获取sf字段 { List fields = Lists.newArrayList(); DescribeSObjectResult dsr = connect.describeSObject(api); dsrFields = dsr.getFields(); // 逻辑删除 只需要查id和是否删除即可 fields.add("Id"); fields.add("IsDeleted"); fields.add(param.getUpdateField()); param.setSelect(StringUtils.join(fields, ",")); } param.setApi(api); // 获取数据库字段进行比对 List fields = customMapper.getFields(api).stream().map(String::toUpperCase).collect(Collectors.toList()); String sql; Map map = Maps.newHashMap(); Date lastCreatedDate = null; String maxId = null; param.setIsDeleted(true); param.setBeginModifyDate(DateUtils.addDays(new Date(), -1 * Math.abs(dataCheckDeletedParam.getBeforeDay()))); int count = 0, existCount = 0, failCount = 0; while (true) { try { // 获取创建时间 param.setTimestamp(lastCreatedDate); // 判断是否存在要排除的id param.setMaxId(maxId); map.put("param", param); sql = SqlUtil.showSql("com.celnet.datadump.mapper.SalesforceMapper.listByModifyTime", map); log.info("query sql: {}", sql); XxlJobLogger.log("query sql: {}", sql); // 使用queryAll查询才能删除数据 QueryResult queryResult = connect.queryAll(sql); if (ObjectUtils.isEmpty(queryResult) || ObjectUtils.isEmpty(queryResult.getRecords())) { break; } SObject[] records = queryResult.getRecords(); objects = DataUtil.toJsonArray(records, dsrFields); // 获取最大修改时间和等于该修改时间的数据id { Date maxDate = objects.getJSONObject(objects.size() - 1).getDate(param.getUpdateField()); maxId = ((JSONObject)objects.get(objects.size()-1)).getString(Const.ID); lastCreatedDate = maxDate; } // 只做更新 不存在的数据不插入 Integer existNum = commonService.saveOrUpdate(api, fields, records, objects, false); existCount += existNum; count += records.length; TimeUnit.MILLISECONDS.sleep(1); String format = DateFormatUtils.format(lastCreatedDate, "yyyy-MM-dd HH:mm:ss"); log.info("check Deleted Data api: {}, deleted data count: {}, exist data count: {}, timestamp: {}", api, count, existCount, format); XxlJobLogger.log("check Deleted Data api: {}, deleted data count: {}, exist data count: {}, timestamp: {}", api, count, existCount, format); failCount = 0; objects = null; } catch (InterruptedException e) { throw e; } catch (Throwable throwable) { failCount++; log.error("checkDeletedData error api:{}, data:{}", api, JSON.toJSONString(objects), throwable); if (failCount > Const.MAX_FAIL_COUNT) { throwable.addSuppressed(new Exception("checkDeletedData error data:" + JSON.toJSONString(objects))); throw throwable; } TimeUnit.MINUTES.sleep(1); } } } /** * 检测表是否存在 存在返回true 不存在返回false * * @param apiName 表名 */ private Boolean checkApi(String apiName) { // 加个锁 避免重复执行创建api String api = customMapper.checkTable(apiName); return StringUtils.isNotBlank(api); } }