data-dump/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java

1291 lines
61 KiB
Java

package com.celnet.datadump.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.celnet.datadump.config.SalesforceConnect;
import com.celnet.datadump.config.SalesforceExecutor;
import com.celnet.datadump.config.SalesforceTargetConnect;
import com.celnet.datadump.entity.*;
import com.celnet.datadump.global.Const;
import com.celnet.datadump.global.TypeCode;
import com.celnet.datadump.mapper.CustomMapper;
import com.celnet.datadump.param.DataDumpParam;
import com.celnet.datadump.param.DataDumpSpecialParam;
import com.celnet.datadump.param.SalesforceParam;
import com.celnet.datadump.service.*;
import com.celnet.datadump.util.DataUtil;
import com.celnet.datadump.util.EmailUtil;
import com.celnet.datadump.util.SqlUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sforce.async.BulkConnection;
import com.sforce.soap.partner.*;
import com.sforce.soap.partner.sobject.SObject;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static com.celnet.datadump.global.SystemConfigCode.*;
/**
* @author Red
* @description
* @date 2022/09/23
*/
@Service
@Slf4j
public class CommonServiceImpl implements CommonService {
@Autowired
private SalesforceConnect salesforceConnect;
@Autowired
private CustomMapper customMapper;
@Autowired
private SalesforceExecutor salesforceExecutor;
@Autowired
private DataBatchHistoryService dataBatchHistoryService;
@Autowired
private DataBatchService dataBatchService;
@Autowired
private DataObjectService dataObjectService;
@Autowired
private DataFieldService dataFieldService;
@Autowired
private FileService fileService;
@Autowired
private DataPicklistService dataPicklistService;
@Autowired
private DataDumpSpecialService dataDumpSpecialService;
@Autowired
private DataReportService dataReportService;
@Autowired
private DataReportDetailService dataReportDetailService;
@Autowired
private SalesforceTargetConnect salesforceTargetConnect;
@Override
public ReturnT<String> increment(SalesforceParam param) throws Exception {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
if (StringUtils.isNotBlank(param.getApi())) {
List<String> apis = DataUtil.toIdList(param.getApi());
qw.in("name", apis);
}
qw.eq("need_update", true)
.isNotNull("last_update_date");
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isEmpty(list)) {
return new ReturnT<>(500, ("" + param.getApi() + "不存在或未开启更新"));
}
List<Future<?>> futures = Lists.newArrayList();
try {
DataReport dataReport = new DataReport();
dataReport.setType(TypeCode.INCREMENT);
dataReport.setApis(list.stream().map(DataObject::getName).collect(Collectors.joining(",")));
dataReportService.save(dataReport);
for (DataObject dataObject : list) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
Date updateTime = new Date();
SalesforceParam salesforceParam = new SalesforceParam();
salesforceParam.setApi(dataObject.getName());
salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate());
salesforceParam.setType(2);
// 更新字段值不为空 按更新字段里的字段校验
if (StringUtils.isNotBlank(dataObject.getUpdateField())) {
salesforceParam.setUpdateField(dataObject.getUpdateField());
}
PartnerConnection connect = dumpData(salesforceParam, dataReport);
dataObject.setLastUpdateDate(updateTime);
dataObjectService.updateById(dataObject);
// 增量batch更新
QueryWrapper<DataBatch> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("name", dataObject.getName()).orderByDesc("sync_start_date").last("limit 1");
DataBatch dataBatch = dataBatchService.getOne(queryWrapper);
if (dataBatch != null) {
if (dataBatch.getSyncEndDate().before(updateTime)) {
DataBatch update = dataBatch.clone();
update.setSyncEndDate(updateTime);
int failCount = 0;
while (true) {
try {
SalesforceParam countParam = new SalesforceParam();
countParam.setApi(dataBatch.getName());
countParam.setBeginCreateDate(dataBatch.getSyncStartDate());
countParam.setEndCreateDate(dataBatch.getSyncEndDate());
// 存在isDeleted 只查询IsDeleted为false的
if (dataFieldService.hasDeleted(countParam.getApi())) {
countParam.setIsDeleted(false);
} else {
// 不存在 过滤
countParam.setIsDeleted(null);
}
// sf count
Integer sfNum = countSfNum(connect, countParam);
update.setSfNum(sfNum);
// db count
Integer dbNum = customMapper.count(countParam);
update.setDbNum(dbNum);
update.setSyncStatus(dbNum.equals(sfNum) ? 1 : 0);
update.setFirstDbNum(dbNum);
update.setFirstSyncDate(updateTime);
update.setFirstSfNum(sfNum);
QueryWrapper<DataBatch> updateQw = new QueryWrapper<>();
updateQw.eq("name", dataBatch.getName())
.eq("sync_start_date", dataBatch.getSyncStartDate())
.eq("sync_end_date", dataBatch.getSyncEndDate());
dataBatchService.update(update, updateQw);
failCount = 0;
break;
} catch (InterruptedException e) {
throw e;
} catch (Throwable throwable) {
failCount++;
log.error("verify error", throwable);
if (failCount > Const.MAX_FAIL_COUNT) {
throw throwable;
}
TimeUnit.MINUTES.sleep(1);
}
}
}
}
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, 0, 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
// 存在附件的开始dump
list.stream().filter(t -> StringUtils.isNotBlank(t.getBlobField())).forEach(t -> {
try {
fileService.dumpFile(t.getName(), t.getBlobField(), true);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return ReturnT.SUCCESS;
} catch (Throwable throwable) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
throw throwable;
}
}
@Override
public ReturnT<String> dump(SalesforceParam param) throws Exception {
List<Future<?>> futures = Lists.newArrayList();
try {
if (StringUtils.isNotBlank(param.getApi())) {
// 手动任务
ReturnT<String> result = manualDump(param, futures);
if (result != null) {
return result;
}
} else {
// 自动任务
autoDump(param, futures);
}
return ReturnT.SUCCESS;
} catch (Throwable throwable) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
log.error("dump error", throwable);
throw throwable;
}
}
/**
* 自动dump
* @param param 参数
* @param futures futures
*/
private void autoDump(SalesforceParam param, List<Future<?>> futures) throws InterruptedException {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_work", 1)
.eq("data_lock", 0)
.orderByAsc("data_index")
.last(" limit 10");
while (true) {
List<DataObject> dataObjects = dataObjectService.list(qw);
if (CollectionUtils.isEmpty(dataObjects)) {
break;
}
for (DataObject dataObject : dataObjects) {
DataObject update = new DataObject();
TimeUnit.MILLISECONDS.sleep(1);
String api = dataObject.getName();
try {
log.info("dump apis: {}", api);
XxlJobLogger.log("dump apis: {}", api);
// 检测表是否存在 不存在创建
checkApi(api, true);
// 检测是否有创建时间 没有创建时间 按特殊表处理 直接根据id来获取
if (!hasCreatedDate(api)) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
DataDumpSpecialParam dataDumpSpecialParam = new DataDumpSpecialParam();
dataDumpSpecialParam.setApi(api);
update.setName(dataObject.getName());
update.setDataLock(1);
dataObjectService.updateById(update);
param.setApi(api);
salesforceExecutor.waitForFutures(dataDumpSpecialService.getData(dataDumpSpecialParam, salesforceConnect.createConnect()));
update.setDataWork(0);
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
if (StringUtils.isNotBlank(update.getName())) {
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
}, 0, 0);
futures.add(future);
continue;
}
update.setName(dataObject.getName());
update.setDataLock(1);
dataObjectService.updateById(update);
param.setApi(api);
List<SalesforceParam> salesforceParams;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isEmpty(list)) {
// 没有批次
salesforceParams = DataUtil.splitTask(param);
} else {
// 有批次 先过滤首次执行过的
salesforceParams = list.stream().filter(t -> t.getFirstSyncDate() == null)
.map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 批次都执行过了 重新同步
if (CollectionUtils.isEmpty(salesforceParams)) {
salesforceParams = DataUtil.splitTask(param);
}
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
dumpData(salesforceParam);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
// 附件表 跑一遍dumpFile
if (StringUtils.isNotBlank(dataObject.getBlobField())) {
fileService.dumpFile(dataObject.getName(), dataObject.getBlobField(), true);
}
update.setDataWork(0);
} catch (Throwable e) {
String message = e.getMessage();
String format = String.format("获取表数据 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
EmailUtil.send("DataDump ERROR", format);
throw new RuntimeException(e);
} finally {
if (dataObject != null) {
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
futures.clear();
}
}
private ReturnT<String> manualDump(SalesforceParam param, List<Future<?>> futures) throws InterruptedException {
List<String> apis;
apis = DataUtil.toIdList(param.getApi());
String join = StringUtils.join(apis, ",");
log.info("dump apis: {}", join);
XxlJobLogger.log("dump apis: {}", join);
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = param.getBeginDate() == null && param.getEndDate() == null && CollectionUtils.isEmpty(param.getIds());
if (isFull) {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_lock", 1).in("name", apis);
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isNotEmpty(list)) {
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
return new ReturnT<>(500, "api:" + apiNames + " is locked");
}
}
// 根据参数获取sql
for (String api : apis) {
DataObject update = new DataObject();
TimeUnit.MILLISECONDS.sleep(1);
try {
checkApi(api, true);
if (!hasCreatedDate(api)) {
DataDumpSpecialParam dataDumpSpecialParam = new DataDumpSpecialParam();
dataDumpSpecialParam.setApi(api);
Future<?> future = dataDumpSpecialService.getData(dataDumpSpecialParam, salesforceConnect.createConnect());
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(future);
continue;
}
param.setApi(api);
List<SalesforceParam> salesforceParams = null;
if (isFull) {
update.setName(api);
update.setDataLock(1);
dataObjectService.updateById(update);
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api)
.isNull("first_sync_date");
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
} else {
salesforceParams = DataUtil.splitTask(param);
}
} else {
salesforceParams = DataUtil.splitTask(param);
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
dumpData(salesforceParam);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 1);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
// 附件表 跑一遍dumpFile
DataObject one = dataObjectService.getById(api);
if (StringUtils.isNotBlank(one.getBlobField())) {
fileService.dumpFile(one.getName(), one.getBlobField(), true);
}
update.setDataWork(0);
} catch (Throwable e) {
log.error("manualDump error", e);
throw new RuntimeException(e);
} finally {
if (isFull) {
update.setName(api);
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
}
return null;
}
/**
* 数据传输主体
*
* @param param 参数
*/
private PartnerConnection dumpData(SalesforceParam param) throws Throwable {
return dumpData(param, null);
}
/**
* 数据传输主体
*
* @param param 参数
*/
private PartnerConnection dumpData(SalesforceParam param, DataReport dataReport) throws Throwable {
String api = param.getApi();
PartnerConnection connect = null;
try {
DataBatchHistory dataBatchHistory = new DataBatchHistory();
dataBatchHistory.setName(api);
dataBatchHistory.setStartDate(new Date());
dataBatchHistory.setSyncStartDate(param.getBeginCreateDate());
if (param.getEndCreateDate() != null) {
dataBatchHistory.setSyncEndDate(DateUtils.addSeconds(param.getEndCreateDate(), -1));
}
dataBatchHistory.setBatch(param.getBatch());
if (param.getBeginCreateDate() != null && param.getEndCreateDate() != null) {
log.info("NO.{} dump {}, date: {} ~ {} start", param.getBatch(),
param.getApi(),
DateFormatUtils.format(param.getBeginCreateDate(), "yyyy-MM-dd HH:mm:ss"),
DateFormatUtils.format(param.getEndCreateDate(), "yyyy-MM-dd HH:mm:ss"));
}
connect = salesforceConnect.createConnect();
// 存在isDeleted 只查询IsDeleted为false的
if (dataFieldService.hasDeleted(param.getApi())) {
param.setIsDeleted(false);
} else {
// 不存在 过滤
param.setIsDeleted(null);
}
// 若count数量过多 可能导致超时出不来结果 对该任务做进一步拆分
int failCount = 0;
boolean isSuccess = false;
while (failCount <= Const.MAX_FAIL_COUNT) {
try {
dataBatchHistory.setSfNum(countSfNum(connect, param));
isSuccess = true;
break;
} catch (Throwable throwable) {
failCount++;
}
}
// 不成功 做任务拆分
if (!isSuccess) {
if (splitTask(param)) {
return connect;
}
}
getAllSfData(param, connect, dataReport);
updateDataBatch(param, dataBatchHistory);
} catch (Throwable throwable) {
log.error("dataDumpJob error api:{}", api, throwable);
String type = param.getType() == 1 ? "存量" : "增量";
String format = String.format("%s数据迁移 error, api name: %s, \nparam: %s, \ncause:\n%s", type, api, JSON.toJSONString(param, DataDumpParam.getFilter()), throwable);
EmailUtil.send("DataDump ERROR", format);
throw throwable;
}
return connect;
}
/**
* 任务拆分
*
* @param param 任务参数
* @return true成功 false失败
*/
private boolean splitTask(SalesforceParam param) {
List<Future<?>> futures = Lists.newArrayList();
try {
QueryWrapper<DataBatch> qw = new QueryWrapper<>();
qw.eq("name", param.getApi())
.eq("sync_start_date", param.getBeginCreateDate())
.eq("sync_end_date", param.getEndCreateDate());
DataBatch dataBatch = dataBatchService.getOne(qw);
// 如果已经是同一天了 则抛出错误 不继续执行
if (DateUtils.isSameDay(dataBatch.getSyncEndDate(), dataBatch.getSyncStartDate())) {
throw new Exception("can't count sf num");
}
DataBatch dataBatch1 = dataBatch.clone();
DataBatch dataBatch2 = dataBatch.clone();
Date midDate = DateUtils.addMilliseconds(dataBatch.getSyncStartDate(),
(int) (dataBatch.getSyncEndDate().getTime() - dataBatch.getSyncStartDate().getTime()));
Date now = new Date();
dataBatch1.setSyncEndDate(midDate);
dataBatch2.setSyncStartDate(midDate);
dataBatchService.remove(qw);
dataBatchService.save(dataBatch1);
dataBatchService.save(dataBatch2);
SalesforceParam param1 = param.clone();
SalesforceParam param2 = param.clone();
param1.setEndCreateDate(midDate);
Future<?> future1 = salesforceExecutor.execute(() -> {
try {
dumpData(param1);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, param1.getBatch(), 0);
param2.setBeginCreateDate(midDate);
Future<?> future2 = salesforceExecutor.execute(() -> {
try {
dumpData(param2);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, param2.getBatch(), 0);
futures = Lists.newArrayList(future1, future2);
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
return true;
} catch (Throwable throwable) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
}
return false;
}
/**
* 遍历获取所有sf数据并处理
*
* @param param 参数
* @param connect sf connect
*/
private void getAllSfData(SalesforceParam param, PartnerConnection connect, DataReport dataReport) throws Throwable {
JSONArray objects = null;
String api = param.getApi();
Map<String, Object> map = Maps.newHashMap();
String dateName = param.getType() == 1 ? Const.CREATED_DATE : param.getUpdateField();
int count = 0;
Date lastCreatedDate = null;
String maxId = null;
Field[] dsrFields = null;
// 获取sf字段
{
List<String> fields = Lists.newArrayList();
DescribeSObjectResult dsr = connect.describeSObject(api);
dsrFields = dsr.getFields();
for (Field field : dsrFields) {
// 不查询文件
if ("base64".equalsIgnoreCase(field.getType().toString())) {
continue;
}
fields.add(field.getName());
}
if ("Attachment".equals(api) || "FeedItem".equals(api)) {
fields.add("Parent.type");
}
DataObject dataObject = dataObjectService.getById(api);
if (dataObject != null && StringUtils.isNotBlank(dataObject.getExtraField())) {
fields.addAll(Arrays.asList(StringUtils.split(dataObject.getExtraField().replaceAll(StringUtils.SPACE, StringUtils.EMPTY), ",")));
}
param.setSelect(StringUtils.join(fields, ","));
}
// 获取数据库字段进行比对
List<String> fields = customMapper.getFields(api).stream().map(String::toUpperCase).collect(Collectors.toList());
int failCount = 0;
while (true) {
try {
// 获取创建时间
param.setTimestamp(lastCreatedDate);
// 判断是否存在要排除的id
param.setMaxId(maxId);
map.put("param", param);
String sql;
if (param.getType() == 1) {
// type 1 按创建时间纬度
sql = SqlUtil.showSql("com.celnet.datadump.mapper.SalesforceMapper.list", map);
} else {
// type 2 按修改时间纬度
sql = SqlUtil.showSql("com.celnet.datadump.mapper.SalesforceMapper.listByModifyTime", map);
}
log.info("query sql: {}", sql);
XxlJobLogger.log("query sql: {}", sql);
QueryResult queryResult = connect.queryAll(sql);
if (ObjectUtils.isEmpty(queryResult) || ObjectUtils.isEmpty(queryResult.getRecords())) {
break;
}
SObject[] records = queryResult.getRecords();
objects = DataUtil.toJsonArray(records, dsrFields);
// 获取最大修改时间和等于该修改时间的数据id
{
Date maxDate = objects.getJSONObject(objects.size() - 1).getDate(dateName);
maxId = objects.stream()
.map(t -> (JSONObject) t)
.filter(t -> maxDate.equals(t.getDate(dateName)))
.map(t -> t.getString(Const.ID))
.max(String::compareTo).get();
lastCreatedDate = maxDate;
}
// 存储更新
saveOrUpdate(api, fields, records, objects, true);
count += records.length;
TimeUnit.MILLISECONDS.sleep(1);
String format = DateFormatUtils.format(lastCreatedDate, "yyyy-MM-dd HH:mm:ss");
log.info("dump success count: {}, timestamp: {}", count, format);
XxlJobLogger.log("dump success count: {}, timestamp: {}", count, format);
failCount = 0;
objects = null;
} catch (InterruptedException e) {
throw e;
} catch (Throwable throwable) {
failCount++;
log.error("dataDumpJob error api:{}, data:{}", api, JSON.toJSONString(objects), throwable);
if (failCount > Const.MAX_FAIL_COUNT) {
throwable.addSuppressed(new Exception("dataDump error data:" + JSON.toJSONString(objects)));
throw throwable;
}
TimeUnit.MINUTES.sleep(1);
}
}
if (ObjectUtils.isNotEmpty(dataReport)) {
DataReportDetail dataReportDetail = new DataReportDetail();
dataReportDetail.setApi(param.getApi());
dataReportDetail.setReportId(dataReport.getId());
dataReportDetail.setNum(count);
dataReportDetailService.save(dataReportDetail);
}
}
/**
* 批次存在 且首次时间为0或者为空 赋值、保存执行记录
*
* @param param 参数
* @param dataBatchHistory 执行记录
*/
private void updateDataBatch(SalesforceParam param, DataBatchHistory dataBatchHistory) {
if (dataBatchHistory == null) {
return;
}
dataBatchHistory.setEndDate(new Date());
Integer num = customMapper.count(param);
dataBatchHistory.setDbNum(num);
if (dataBatchHistory.getSfNum() != null) {
dataBatchHistory.setSyncStatus(dataBatchHistory.getSfNum().equals(dataBatchHistory.getDbNum()) ? 1 : 0);
}
dataBatchHistory.setCost(DataUtil.calTime(dataBatchHistory.getEndDate(), dataBatchHistory.getStartDate()));
QueryWrapper<DataBatch> qw = new QueryWrapper<>();
qw.eq("name", param.getApi())
.eq("sync_start_date", param.getBeginCreateDate())
.eq("sync_end_date", param.getEndCreateDate())
.isNull("first_sync_date");
DataBatch dataBatch = dataBatchService.getOne(qw);
if (dataBatch == null) {
return;
}
dataBatch.setFirstSfNum(dataBatchHistory.getSfNum());
dataBatch.setFirstDbNum(dataBatchHistory.getDbNum());
dataBatch.setFirstSyncDate(dataBatchHistory.getStartDate());
dataBatch.setSyncStatus(dataBatchHistory.getSyncStatus());
dataBatchService.update(dataBatch, qw);
log.info("count db num: {}", num);
XxlJobLogger.log("count db num: {}", num);
dataBatchHistoryService.save(dataBatchHistory);
log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num);
}
/**
* 获取需同步sf数据数量
*
* @param connect connect
* @param param 参数
* @return sf统计数量
*/
@Override
public Integer countSfNum(PartnerConnection connect, SalesforceParam param) throws Exception {
Map<String, Object> map = Maps.newHashMap();
map.put("param", param);
String sql = SqlUtil.showSql("com.celnet.datadump.mapper.SalesforceMapper.count", map);
log.info("count sql: {}", sql);
XxlJobLogger.log("count sql: {}", sql);
QueryResult queryResult = connect.queryAll(sql);
SObject record = queryResult.getRecords()[0];
Integer num = (Integer) record.getField("num");
log.info("count sf num: {}", num);
XxlJobLogger.log("count sf num: {}", num);
return num;
}
/**
* 插入或更新数据
*
* @param api 表名
* @param fields 字段
* @param records 数据list
* @param objects 数据 json格式
* @param insert 不存在是否插入
*/
@Override
public Integer saveOrUpdate(String api, List<String> fields, SObject[] records, JSONArray objects, boolean insert) throws Throwable {
// 根据id查询数据库 取出已存在数据的id
List<String> ids = Arrays.stream(records).map(SObject::getId).collect(Collectors.toList());
DataObject one = dataObjectService.getById(api);
List<String> existsIds = customMapper.getIds(api, ids);
for (int i = 0; i < objects.size(); i++) {
JSONObject jsonObject = objects.getJSONObject(i);
try {
Set<String> keys = jsonObject.keySet();
// update
String id = jsonObject.getString(Const.ID);
List<Map<String, Object>> maps = Lists.newArrayList();
for (String key : keys) {
if (fields.stream().anyMatch(key::equalsIgnoreCase)) {
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", key);
paramMap.put("value", jsonObject.get(key));
maps.add(paramMap);
}
}
// 附件表 插入更新时把is_dump置为false
if (StringUtils.isNotBlank(one.getBlobField())) {
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", false);
Map<String, Object> paramMap2 = Maps.newHashMap();
paramMap2.put("key", "is_upload");
paramMap2.put("value", false);
maps.add(paramMap);
maps.add(paramMap2);
}
// Task和Event
// if ("Task".equals(api) || "Event".equals(api)){
// Map<String, Object> paramwhoMap = Maps.newHashMap();
// paramwhoMap.put("key", "WhoId_Type__c");
// paramwhoMap.put("value", jsonObject.get("Who_Type"));
// maps.add(paramwhoMap);
// Map<String, Object> paramwhatMap = Maps.newHashMap();
// paramwhoMap.put("key", "WhatId_Type__c");
// paramwhoMap.put("value", jsonObject.get("What_Type"));
// maps.add(paramwhoMap);
// }
//附件关联表 插入更新时给关联对象赋值
// if ("ContentDocumentLink".equals(api)) {
// String linkedEntity_Type = records[i].getChild("LinkedEntity").getChild("Type").getValue().toString();
// Map<String, Object> paramMap = Maps.newHashMap();
// paramMap.put("key", "linkedEntity_Type");
// paramMap.put("value", linkedEntity_Type);
// maps.add(paramMap);
// }
if (existsIds.contains(id)) {
customMapper.updateById(api, maps, id);
} else {
if (insert) {
customMapper.save(api, maps);
}
}
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
throw e;
} catch (Throwable throwable) {
if (throwable.toString().contains("interrupt")) {
log.error("may interrupt error:", throwable);
throw new InterruptedException();
}
throwable.addSuppressed(new Exception("saveOrUpdate error data:" + JSON.toJSONString(jsonObject)));
throw throwable;
}
}
return existsIds.size();
}
private final ReentrantLock reentrantLock = new ReentrantLock();
/**
* 检测表是否存在 不存在则创建
*
* @param apiName 表名
* @param createBatch sf 连接
*/
@Override
public void checkApi(String apiName, Boolean createBatch) throws Exception {
// 加个锁 避免重复执行创建api
reentrantLock.lock();
log.info("check api:{}", apiName);
try {
boolean hasCreatedDate = false;
PartnerConnection connection = salesforceConnect.createConnect();
DataObject dataObject = dataObjectService.getById(apiName);
Date now = new Date();
if (StringUtils.isBlank(customMapper.checkTable(apiName))) {
log.info("api:{} doesn't exist create", apiName);
// 构建字段
List<Map<String, Object>> list = Lists.newArrayList();
DescribeSObjectResult dsr = connection.describeSObject(apiName);
String label = dsr.getLabel();
boolean isCustomObject = dsr.isCustom(); // 自定义对象才支持新增字段
boolean isUpdateable = dsr.isUpdateable(); // 对象本身是否可修改
List<DataField> fieldList = Lists.newArrayList();
List<String> fields = Lists.newArrayList();
String blobField = null;
List<DataPicklist> dataPicklists = Lists.newArrayList();
for (Field field : dsr.getFields()) {
// 过滤字段
if (Const.TABLE_FILTERS.contains(field.getName())) {
continue;
}
if (Const.CREATED_DATE.equalsIgnoreCase(field.getName())) {
hasCreatedDate = true;
}
Map<String, Object> map = Maps.newHashMap();
String sfType = field.getType().toString();
if ("base64".equalsIgnoreCase(sfType)) {
blobField = field.getName();
}
map.put("type", DataUtil.fieldTypeToMysql(field));
// 英文' 转换为 \'
map.put("comment", field.getLabel().replaceAll("'", "\\\\'"));
map.put("name", field.getName());
list.add(map);
DataField dataField = new DataField();
dataField.setApi(apiName);
dataField.setSfType(sfType);
dataField.setField(field.getName());
dataField.setName(field.getLabel());
dataField.setIsCreateable(field.getCreateable());
dataField.setIsNillable(field.getNillable());
dataField.setIsDefaultedOnCreate(field.getDefaultedOnCreate());
String join = null;
// 会有非常多映射关系的字段在 这里置空不管
if (field.getReferenceTo().length <= 3) {
join = StringUtils.join(field.getReferenceTo(), ",");
} else {
log.warn("referenceTo too long set null, api:{}, field:{}, reference to:{}", apiName, field.getName(), StringUtils.join(field.getReferenceTo(), ","));
}
// picklist保存到picklist表
if ("picklist".equalsIgnoreCase(sfType)) {
join = "data_picklist";
PicklistEntry[] picklistValues = field.getPicklistValues();
if (ArrayUtils.isNotEmpty(picklistValues)) {
for (PicklistEntry picklistValue : picklistValues) {
DataPicklist dataPicklist = new DataPicklist();
dataPicklist.setApi(apiName);
dataPicklist.setField(field.getName());
dataPicklist.setLabel(picklistValue.getLabel());
dataPicklist.setValue(picklistValue.getValue());
dataPicklists.add(dataPicklist);
}
}
}
dataField.setReferenceTo(join);
fieldList.add(dataField);
fields.add(field.getName());
}
// 存在ownerId字段
String extraField = "";
if (dataObject != null && StringUtils.isNotBlank(dataObject.getExtraField())) {
extraField = dataObject.getExtraField();
}
if (fields.stream().anyMatch(Const.OWNER_ID::equalsIgnoreCase)) {
if (StringUtils.isNotBlank(extraField)) {
extraField += ",Owner.Type";
} else {
extraField = "Owner.Type";
}
}
// 是否存在额外字段 配置上表
if (StringUtils.isNotBlank(extraField)) {
Set<String> extras = Arrays.stream(StringUtils.split(extraField.replaceAll(StringUtils.SPACE, StringUtils.EMPTY), ",")).collect(Collectors.toSet());
for (String extra : extras) {
String fieldName = extra.replaceAll("\\.", "_");
DataField dataField = new DataField();
dataField.setApi(apiName);
dataField.setField(fieldName);
dataField.setName(extra);
fieldList.add(dataField);
fields.add(fieldName);
Map<String, Object> map = Maps.newHashMap();
map.put("type", "varchar(255)");
map.put("comment", extra);
map.put("name", fieldName);
list.add(map);
}
}
// 存在附件的表 作特殊处理 构建两个字段
if (StringUtils.isNotBlank(blobField)) {
fileExtraFieldBuild(apiName, list, fieldList, fields);
}
// 构建索引
List<Map<String, Object>> index = Lists.newArrayList();
for (String tableIndex : Const.TABLE_INDEX) {
if (!fields.contains(tableIndex)) {
continue;
}
Map<String, Object> createDateMap = Maps.newHashMap();
createDateMap.put("field", tableIndex);
createDateMap.put("name", String.format("IDX_%s_%s", apiName, tableIndex));
index.add(createDateMap);
}
log.info("api {} not exist, create..", apiName);
//新增一个用来存储新sfid的字段
Map<String, Object> map = Maps.newHashMap();
map.put("type", "varchar(255)");
map.put("comment", "新sfid");
map.put("name", "new_id");
list.add(map);
// if ("Task".equals(apiName) || "Event".equals(apiName)){
// Map<String, Object> LinkedMap = Maps.newHashMap();
// LinkedMap.put("type", "varchar(18)");
// LinkedMap.put("comment", "whatId关联对象");
// LinkedMap.put("name", "WhatId_Type__c");
// list.add(LinkedMap);
// Map<String, Object> LinkedMap1 = Maps.newHashMap();
// LinkedMap1.put("type", "varchar(18)");
// LinkedMap1.put("comment", "whoId关联对象");
// LinkedMap1.put("name", "WhoId_Type__c");
// list.add(LinkedMap1);
// }
if ("ContentDocumentLink".equals(apiName)){
//文档关联表新增关联对象字段
Map<String, Object> LinkedMap = Maps.newHashMap();
LinkedMap.put("type", "varchar(255)");
LinkedMap.put("comment", "关联对象");
LinkedMap.put("name", "LinkedEntity_Type");
list.add(LinkedMap);
}
if ("Attachment".equals(apiName) || "FeedComment".equals(apiName)
|| "FeedItem".equals(apiName)){
//文档关联表新增关联对象字段
Map<String, Object> LinkedMap = Maps.newHashMap();
LinkedMap.put("type", "varchar(255)");
LinkedMap.put("comment", "关联对象");
LinkedMap.put("name", "Parent_Type");
list.add(LinkedMap);
DataField dataField = new DataField();
dataField.setApi(apiName);
dataField.setField("Parent.Type");
dataField.setName("关联对象");
fieldList.add(dataField);
}
customMapper.createTable(apiName, label, list, index);
// 生成字段映射
QueryWrapper<DataField> delfieldQw = new QueryWrapper<>();
delfieldQw.eq("api", apiName);
dataFieldService.remove(delfieldQw);
dataFieldService.saveBatch(fieldList);
// 生成picklist
QueryWrapper<DataPicklist> deldQw = new QueryWrapper<>();
deldQw.eq("api", apiName);
dataPicklistService.remove(deldQw);
dataPicklistService.saveBatch(dataPicklists);
// 生成batch
Date startDate = DataUtil.DEFAULT_BEGIN_DATE;
// 取当天零点 作为最大时间
Date endCreateDate = DateUtils.parseDate(DateFormatUtils.format(now, "yyyy-MM-dd"), "yyyy-MM-dd");
Date lastDay = null;
QueryWrapper<DataBatch> delQw = new QueryWrapper<>();
delQw.eq("name", apiName);
dataBatchService.remove(delQw);
if (createBatch && Const.BATCH_FILTERS.stream().noneMatch(t -> apiName.indexOf(t) > 0)) {
// 匹配的不生成
DataBatch one = new DataBatch();
one.setFirstDbNum(0);
one.setFirstSfNum(0);
one.setDbNum(0);
one.setSfNum(0);
one.setName(apiName);
one.setLabel(label);
if (hasCreatedDate) {
List<Map<String, Object>> bachList = customMapper.list("code,value","system_config","code ='"+BATCH_TYPE+"'");
String batchType = bachList.get(0).get("value").toString();
do {
lastDay = getLastDay(batchType, endCreateDate, startDate);
DataBatch dataBatch = one.clone();
dataBatch.setSyncStartDate(startDate);
dataBatch.setSyncEndDate(lastDay);
dataBatchService.save(dataBatch);
startDate = lastDay;
} while (lastDay.compareTo(endCreateDate) < 0);
} else {
// 没创建时间 开始结束时间为空
dataBatchService.save(one);
}
}
DataObject update = new DataObject();
update.setLabel(label);
update.setName(apiName);
update.setLastUpdateDate(endCreateDate);
update.setBlobField(blobField);
if(!isCustomObject && !isUpdateable){
update.setIsEditable(false);
}
dataObjectService.saveOrUpdate(update);
}
} finally {
reentrantLock.unlock();
}
}
@Override
public ReturnT<String> createApi(SalesforceParam param) throws Exception {
List<String> apis;
if (StringUtils.isBlank(param.getApi())) {
apis = dataObjectService.list().stream().map(DataObject::getName).collect(Collectors.toList());
} else {
apis = DataUtil.toIdList(param.getApi());
}
log.info("打印所有待同步表:" +apis.toString());
for (String api : apis) {
try {
checkApi(api, true);
}catch (Exception e){
String message = e.getMessage();
String format = String.format("创建表结构 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
EmailUtil.send("DataDump ERROR", format);
}
}
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> getAllApi() throws Exception {
PartnerConnection partnerConnection = salesforceConnect.createConnect();
DescribeGlobalResult result = partnerConnection.describeGlobal();
DescribeGlobalSObjectResult[] Sobjects = result.getSobjects();
List<DataObject> dataObjects = new ArrayList<>();
for(DescribeGlobalSObjectResult Sobject : Sobjects) {
DataObject dataObject = new DataObject();
String SobjectName = Sobject.getName();
String SobjectLabel = Sobject.getLabel();
dataObject.setName(SobjectName);
dataObject.setLabel(SobjectLabel);
dataObjects.add(dataObject);
}
dataObjectService.saveBatch(dataObjects);
return ReturnT.SUCCESS;
}
/**
* 是否存在创建时间字段
*
* @param api api名称
* @return true存在创建时间字段 false不存在
*/
private Boolean hasCreatedDate(String api) {
QueryWrapper<DataField> qw = new QueryWrapper<>();
qw.eq("api", api).eq("field", Const.CREATED_DATE).last("limit 1");
DataField one = dataFieldService.getOne(qw);
return one != null;
}
/**
* 存在附件的表 特殊构建
*
* @param apiName 表名称
* @param list 构建表字段的list
* @param fieldList 构建字段表的list
* @param fields 字段名称list
*/
private static void fileExtraFieldBuild(String apiName, List<Map<String, Object>> list, List<DataField> fieldList, List<String> fields) {
if ("Document".equals(apiName)){
DataField dataField = new DataField();
dataField.setApi(apiName);
dataField.setField("url");
dataField.setName("文件路径");
fieldList.add(dataField);
fields.add("url");
Map<String, Object> map = Maps.newHashMap();
map.put("type", "text");
map.put("comment", "文件路径");
map.put("name", "localUrl");
list.add(map);
}else {
DataField dataField = new DataField();
dataField.setApi(apiName);
dataField.setField("url");
dataField.setName("文件路径");
fieldList.add(dataField);
fields.add("url");
Map<String, Object> map = Maps.newHashMap();
map.put("type", "text");
map.put("comment", "文件路径");
map.put("name", "url");
list.add(map);
}
{
DataField dataField = new DataField();
dataField.setApi(apiName);
dataField.setField("is_dump");
dataField.setName("是否存储");
fieldList.add(dataField);
fields.add("is_dump");
Map<String, Object> map = Maps.newHashMap();
map.put("type", "tinyint(1) DEFAULT 0");
map.put("comment", "是否存储");
map.put("name", "is_dump");
list.add(map);
}
{
DataField dataField = new DataField();
dataField.setApi(apiName);
dataField.setField("is_upload");
dataField.setName("是否上传");
fieldList.add(dataField);
fields.add("is_upload");
Map<String, Object> map = Maps.newHashMap();
map.put("type", "tinyint(1) DEFAULT 0");
map.put("comment", "是否上传");
map.put("name", "is_upload");
list.add(map);
}
}
@Override
public ReturnT<String> getDocumentLink(String paramStr) throws Exception {
String api = "ContentDocumentLink";
PartnerConnection partnerConnection = salesforceConnect.createConnect();
PartnerConnection connection = salesforceTargetConnect.createConnect();
List<Map<String, Object>> list = customMapper.list("Id", "ContentDocument", "new_id is not null");
DescribeSObjectResult dsr = partnerConnection.describeSObject(api);
List<String> fields = customMapper.getFields(api).stream().map(String::toUpperCase).collect(Collectors.toList());
Field[] dsrFields = dsr.getFields();
try {
if (list != null && list.size() > 0) {
for (Map<String, Object> map : list) {
String contentDocumentId = (String) map.get("Id");
String sql = "SELECT Id, LinkedEntityId, LinkedEntity.Type, ContentDocumentId, Visibility, ShareType, SystemModstamp, IsDeleted FROM ContentDocumentLink where ContentDocumentId = '" + contentDocumentId + "'";
JSONArray objects = null;
try {
QueryResult queryResult = partnerConnection.queryAll(sql);
SObject[] records = queryResult.getRecords();
objects = DataUtil.toJsonArray(records, dsrFields);
saveOrUpdate(api, fields, records, objects, true);
} catch (Throwable e) {
log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(objects), e);
TimeUnit.MINUTES.sleep(1);
return ReturnT.FAIL;
}
}
//表内数据总量
Integer count = customMapper.countBySQL(api, "where ShareType = 'V' and new_id = '0'");
//批量插入200一次
int page = count % 200 == 0 ? count / 200 : (count / 200) + 1;
for (int i = 0; i < page; i++) {
List<Map<String, Object>> linkList = customMapper.list("Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility", api, "ShareType = 'V' and new_id = '0' order by Id asc limit 200");
SObject[] accounts = new SObject[linkList.size()];
String[] ids = new String[linkList.size()];
int index = 0;
for (Map<String, Object> map : linkList) {
String linkedEntityId = (String) map.get("LinkedEntityId");
String id = (String) map.get("Id");
String contentDocumentId = (String) map.get("ContentDocumentId");
String linkedEntityType = (String) map.get("LinkedEntity_Type");
String shareType = (String) map.get("ShareType");
String Visibility = (String) map.get("Visibility");
// dataObject查询
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("name", linkedEntityType);
List<DataObject> objects = dataObjectService.list(qw);
if (!objects.isEmpty()) {
Map<String, Object> dMap = customMapper.getById("new_id", "ContentDocument", contentDocumentId);
Map<String, Object> lMap = customMapper.getById("new_id", linkedEntityType, linkedEntityId);
SObject account = new SObject();
account.setType(api);
account.setField("ContentDocumentId", dMap.get("new_id").toString());
account.setField("LinkedEntityId", lMap.get("new_id").toString());
account.setField("ShareType", shareType);
account.setField("Visibility", Visibility);
ids[index] = id;
accounts[index] = account;
index++;
}
}
try {
SaveResult[] saveResults = connection.create(accounts);
for (int j = 0; j < saveResults.length; j++) {
if (!saveResults[j].getSuccess()) {
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j]));
log.error(format);
} else {
List<Map<String, Object>> dList = new ArrayList<>();
Map<String, Object> linkMap = new HashMap<>();
linkMap.put("key", "new_id");
linkMap.put("value", saveResults[j].getId());
dList.add(linkMap);
customMapper.updateById("ContentDocumentLink", dList, ids[j]);
}
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(accounts), e);
EmailUtil.send("-------测试-----------", JSON.toJSONString(accounts));
throw new RuntimeException(e);
}
}
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, JSON.toJSONString(list), e);
return ReturnT.FAIL;
}
return null;
}
@Override
public String getDocumentId(PartnerConnection partnerConnection, String contentVersionId) throws Exception{
QueryResult queryResult = partnerConnection.queryAll("SELECT Id, ContentDocumentId, IsDeleted FROM ContentVersion where Id = " + "'" + contentVersionId + "'");
DescribeSObjectResult dsr = partnerConnection.describeSObject("ContentVersion");
Field[] dsrFields = dsr.getFields();
SObject[] records = queryResult.getRecords();
JSONArray objects = DataUtil.toJsonArray(records, dsrFields);
if (objects != null && objects.size() > 0) {
JSONObject jsonObject = objects.getJSONObject(0);
return jsonObject.getString("ContentDocumentId");
}
return null;
}
@Override
public ReturnT<String> getChatter(SalesforceParam param) throws Exception {
List<Map<String, Object>> feedList = customMapper.list("Parent_Type", "FeedItem", "1=1 GROUP BY Parent_Type");
for (Map<String, Object> map : feedList){
String parentType = map.get("Parent_Type").toString();
List<String> types = new ArrayList<>();
//检测表是否存在
if (StringUtils.isBlank(customMapper.checkTable(parentType))){
types.add(parentType);
}
if (types.size()>0){
customMapper.delete("FeedItem", types);
customMapper.delete("FeedComment", types);
}
}
return null;
}
public Date getLastDay(String batchType,Date endDate,Date startDate){
switch (batchType) {
case BATCH_TYPE_WEEK:
return DataUtil.getWeekLastDay(endDate, startDate);
case BATCH_TYPE_MONTH:
return DataUtil.getMonthLastDay(endDate, startDate);
case BATCH_TYPE_YEAR:
return DataUtil.getYearLastDay(endDate, startDate);
default:
return endDate;
}
}
}