Compare commits

..

26 Commits

Author SHA1 Message Date
b0f37d3936 【feat】 优化User相关的引用 2025-07-16 19:13:36 +08:00
8ad59527e5 【feat】 问题优化 2025-07-16 19:01:53 +08:00
b22ab35f42 【fix】 调整打印SF交互信息 2025-07-16 14:33:33 +08:00
4bc2b8075a 【feat】 增加全表Bulk Inset ,全表Bulk Update 2025-07-16 12:05:17 +08:00
d920ec792f 【feat】增量任务优化,每次生成一条batch 2025-07-16 11:24:43 +08:00
ee157b323e 【fix】 多态字段统一处理 2025-07-15 17:06:47 +08:00
f35c4e44ff 【feat】 新增文件下载(新)异步下载任务 2025-07-15 15:53:29 +08:00
6ec1b3f206 【feat】 新增Bulk Api一次性插入数据任务 2025-07-15 12:03:17 +08:00
37d8d94004 【feat】对象多态字段处理 2025-07-15 11:21:08 +08:00
b353084aca 【fix】修复同步数据时,选择最后一个批次的条件 2025-07-11 15:41:48 +08:00
6680beb43a 【feat】增加org配置校验任务 2025-07-09 16:04:55 +08:00
2253747562 【feat】不能写入Createdate,createdbyId字段的标准对象Insert时不写入 2025-07-07 11:18:10 +08:00
27eb434539 【feat】支持SF底层对象Update数据时,无需写入Old ID和Old Owner ID 2025-07-07 10:52:50 +08:00
1e91bbb67b 【feat】增加判断对象是否可创建字段逻辑 2025-07-04 11:19:19 +08:00
87f1863d01 【feat】优化Attachment上传 2025-07-04 11:01:27 +08:00
1ebbd67a73 【feat】增对TaskRelation和EventRelation。新增处理 2025-07-04 11:01:03 +08:00
006ceba211 【feat】增加org配置连接检测 2025-07-02 15:51:44 +08:00
8ab83d71e5 【feat】更新数据时,判断对象是否写入old_Id 2025-07-02 15:12:53 +08:00
c05010b9ad 【feat】新增Task与Event关于WhoId和WhatId的处理逻辑 2025-07-01 15:23:10 +08:00
690a6da050 【feat】保留Document上传下载代码 2025-07-01 14:53:23 +08:00
7ccfb5ec45 【fix】修复文件上传异常 2025-06-26 14:22:44 +08:00
823f810702 【feat】优化文件上传异常信息提示
(cherry picked from commit ba34877b02)
2025-06-25 11:22:45 +08:00
b8a754aefd 【fix】xxl-job数据库连接配置useSSL为false 2025-06-25 11:07:23 +08:00
da2ae51a90 【feat】优化更新数据时的逻辑 2025-06-24 14:09:37 +08:00
bc9ed880ff 【feat】优化文件地址配置失效提示方式 2025-06-24 10:52:55 +08:00
723992f663 【feat】优化代码data_index值增形式,bulk同步数据date数据处理方式 2025-06-24 10:52:11 +08:00
34 changed files with 2188 additions and 332 deletions

View File

@ -5,7 +5,7 @@
#dbUsername="root"
#dbPassword="celnet@2025.bln"
#携科
dbUrl="jdbc:mysql://127.0.0.1:3306/data-dump-xxl-job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai"
dbUrl="jdbc:mysql://127.0.0.1:3306/data-dump-xxl-job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai"
dbUsername="root"
dbPassword="Celnet2025.QY"
#其它

View File

@ -1,24 +1,37 @@
package com.celnet.datadump.controller;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.celnet.datadump.entity.DataObject;
import com.celnet.datadump.entity.OrgConfig;
import com.celnet.datadump.global.Result;
import com.celnet.datadump.param.DumpFileParam;
import com.celnet.datadump.service.DataObjectService;
import com.celnet.datadump.service.FileManagerService;
import com.celnet.datadump.service.FileService;
import com.celnet.datadump.util.DataUtil;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
@RestController
@Api(value = "fileManager", tags = "文件管理")
@RequestMapping("/fileManager")
@Slf4j
public class FileManagerController {
@Autowired
private FileManagerService fileManagerService;
@Autowired
private FileService fileService;
@Autowired
private DataObjectService dataObjectService;
@GetMapping("/documentPage")
@ApiOperation("document page")
@ -45,4 +58,64 @@ public class FileManagerController {
public Result uploadImg(@RequestParam("token") String token) {
return Result.success(fileManagerService.uploadImg(token));
}
@PostMapping("/dumpFileLocal")
@ApiOperation("附件下载")
public ReturnT<String> dumpFileLocal(String paramStr) throws Exception {
log.info("dumpFileLocal execute start ..................");
DumpFileParam param = new DumpFileParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, DumpFileParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
if (StringUtils.isBlank(param.getApi())) {
return new ReturnT<>(500, "api参数缺失");
}
for (String api : DataUtil.toIdList(param.getApi())) {
DataObject dataObject = dataObjectService.getById(api);
String blobField = dataObject.getBlobField();
if (StringUtils.isBlank(blobField)) {
log.error("api:{} does not have blob field", api);
XxlJobLogger.log("api:{} does not have blob field", api);
}
fileService.dumpFile(api, blobField, param.getSingleThread());
}
return ReturnT.SUCCESS;
}
@PostMapping("/uploadFileLocal")
@ApiOperation("附件上传")
public ReturnT<String> uploadFileLocal(String paramStr) throws Exception {
log.info("uploadFileJob execute start ..................");
DumpFileParam param = new DumpFileParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, DumpFileParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
if (StringUtils.isBlank(param.getApi())) {
return new ReturnT<>(500, "api参数缺失");
}
for (String api : DataUtil.toIdList(param.getApi())) {
DataObject dataObject = dataObjectService.getById(api);
String blobField = dataObject.getBlobField();
if (StringUtils.isBlank(blobField)) {
log.error("api:{} does not have blob field", api);
XxlJobLogger.log("api:{} does not have blob field", api);
}
if ("Attachment".equals(api)){
fileService.uploadFileToAttachment(api, blobField, param.getSingleThread());
}else if("Document".equals(api)){
fileService.uploadFileToDocument(api, blobField, param.getSingleThread());
}else {
fileService.uploadFile(api, blobField, param.getSingleThread());
}
}
return ReturnT.SUCCESS;
}
}

View File

@ -299,4 +299,56 @@ public class JobController {
param.setEndCreateDate(param.getEndDate());
return dataImportNewService.immigrationUpdateNew(param);
}
/**
* 数据更新同步
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@PostMapping("/dataImportJob")
@ApiOperation("获取new_id")
@LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_UPDATE, remark = "获取new_id")
public ReturnT<String> dataImportJob(String paramStr) throws Exception {
log.info("dataImportJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportService.immigration(param);
}
/**
* 数据更新同步
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@PostMapping("/dataUpdateJob")
@ApiOperation("更新目标org数据")
@LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_UPDATE, remark = "更新目标org数据")
public ReturnT<String> dataUpdateJob(String paramStr) throws Exception {
log.info("dataUpdateJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportNewService.immigrationUpdateNew(param);
}
}

View File

@ -1,15 +1,21 @@
package com.celnet.datadump.controller;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.celnet.datadump.annotation.LogServiceAnnotation;
import com.celnet.datadump.constant.OperateTypeConstant;
import com.celnet.datadump.entity.OrgConfig;
import com.celnet.datadump.entity.SystemConfig;
import com.celnet.datadump.global.Result;
import com.celnet.datadump.param.SalesforceParam;
import com.celnet.datadump.service.OrgConfigService;
import com.celnet.datadump.service.SystemConfigService;
import com.xxl.job.core.biz.model.ReturnT;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@ -53,5 +59,14 @@ public class OrgConfigController {
public Result delete(@RequestBody OrgConfig orgConfig) {
return Result.success(orgConfigService.removeById(orgConfig));
}
@PostMapping("/getOrgConfigJob")
@ApiOperation("校验org配置")
@LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_SELECT, remark = "数据更新同步(新)")
public ReturnT<String> getOrgConfigJob(String paramStr) throws Exception {
orgConfigService.verifyOrgConfig();
return ReturnT.SUCCESS;
}
}

View File

@ -127,7 +127,7 @@ public class DataBatch implements Serializable, Cloneable {
@TableField(value = "created_date", fill = FieldFill.INSERT)
@ExcelIgnore
@ApiModelProperty(value = "创建时间")
private LocalDateTime createdDate;
private Date createdDate;
/**
* 最后更新时间
@ -135,7 +135,7 @@ public class DataBatch implements Serializable, Cloneable {
@TableField(value = "last_modified_date", fill = FieldFill.INSERT_UPDATE)
@ExcelIgnore
@ApiModelProperty(value = "最后更新时间")
private LocalDateTime lastModifiedDate;
private Date lastModifiedDate;
/**
* sf新增数据量

View File

@ -42,6 +42,13 @@ public class DataObject implements Serializable {
@ApiModelProperty(value = "对象名称")
private String label;
/**
* 对象名称
*/
@TableField("keyPrefix")
@ApiModelProperty(value = "对象ID前缀")
private String keyPrefix;
/**
* 是否启用
*/

View File

@ -0,0 +1,75 @@
package com.celnet.datadump.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* @author Kris
* @date 2025/07/10
*/
@Getter
@Setter
@TableName("link_config")
@ApiModel(value = "映射信息表")
public class LinkConfig implements Serializable {
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId(value = "id", type = IdType.AUTO)
@ApiModelProperty(value = "id")
private Integer id;
/**
* 对象api
*/
@TableField("api")
@ApiModelProperty(value = "对象api")
private String api;
/**
* 对象标签
*/
@TableField("label")
@ApiModelProperty(value = "对象标签")
private String label;
/**
* 多态ID字段
*/
@TableField("field")
@ApiModelProperty(value = "多态ID字段")
private String field;
/**
* 映射字段
*/
@TableField("link_field")
@ApiModelProperty(value = "映射字段")
private String linkField;
/**
* 是否创建
*/
@TableField("is_create")
@ApiModelProperty(value = "是否创建")
private Boolean isCreate;
/**
* 是否映射
*/
@TableField("is_link")
@ApiModelProperty(value = "是否映射")
private Boolean isLink;
}

View File

@ -0,0 +1,46 @@
package com.celnet.datadump.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* @author Kris
* @date 2025/07/10
*/
@Getter
@Setter
@TableName("metaclass_config")
@ApiModel(value = "元数据对象表")
public class MetaclassConfig implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 代码
*/
@TableId("name")
@ApiModelProperty(value = "名称")
private String name;
/**
*
*/
@TableField("label")
@ApiModelProperty(value = "标签")
private String label;
/**
* 备注
*/
@TableField("remark")
@ApiModelProperty(value = "备注")
private String remark;
}

View File

@ -35,6 +35,10 @@ public class SystemConfigCode {
* 批次类型 //
*/
public static final String BATCH_TYPE = "BATCH_TYPE";
/**
* 增量批次类型 //
*/
public static final String INCREMENT_BATCH_TYPE = "INCREMENT_BATCH_TYPE";
public static final String BATCH_TYPE_WEEK = "WEEK";
public static final String BATCH_TYPE_MONTH = "MONTH";

View File

@ -147,4 +147,113 @@ public class DataDumpNewJob {
}
/**
* 创建关联字段
* @return result
*/
@XxlJob("createLinkTypeFieldJob")
public ReturnT<String> createLinkTypeJob(String paramStr) throws Exception{
log.info("createLinkTypeFieldJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
return commonService.createLinkTypeField(param);
}
/**
* 更新关联类型
* @return result
*/
@XxlJob("updateLinkTypeJob")
public ReturnT<String> updateLinkTypeJob(String paramStr) throws Exception{
log.info("updateLinkTypeJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
return commonService.updateLinkType(param);
}
/**
* 一次性插入数据
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@XxlJob("insertSingleBatchJob")
public ReturnT<String> insertSingleBatchJob(String paramStr) throws Exception {
log.info("insertSingleBatchJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportBatchService.insertSingleBatch(param);
}
/**
* 文件下载
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@XxlJob("dumpFileNewJob")
public ReturnT<String> dumpFileNewJob(String paramStr) throws Exception {
log.info("dumpFileNewJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportNewService.dumpFileNew(param);
}
/**
* 增量任务
*
* @param paramStr 参数json
* @return result
*/
@XxlJob("dataDumpIncrementNewJob")
public ReturnT<String> dataDumpIncrementNewJob(String paramStr) throws Exception {
log.info("dataDumpIncrementNewJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
param.setType(2);
return commonService.incrementNew(param);
}
}

View File

@ -122,6 +122,8 @@ public class DumpFileJob {
}
if ("Attachment".equals(api)){
fileService.uploadFileToAttachment(api, blobField, param.getSingleThread());
}else if("Document".equals(api)){
fileService.uploadFileToDocument(api, blobField, param.getSingleThread());
}else {
fileService.uploadFile(api, blobField, param.getSingleThread());
}

View File

@ -0,0 +1,26 @@
package com.celnet.datadump.job;
import com.celnet.datadump.service.OrgConfigService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrgConfigJob {
@Autowired
private OrgConfigService orgConfigService;
@XxlJob("getOrgConfigJob")
public ReturnT<String> getOrgConfigJob(String paramStr) throws Exception {
log.info("getOrgConfigJob execute start ..................");
orgConfigService.verifyOrgConfig();
return ReturnT.SUCCESS;
}
}

View File

@ -57,6 +57,12 @@ public interface CustomMapper {
*/
public void createTable(@Param("tableName") String tableName, @Param("tableComment") String tableComment, @Param("maps") List<Map<String, Object>> maps, @Param("index") List<Map<String, Object>> index);
/**
* 创建字段
* @param tableName
*/
public int createField(@Param("tableName") String tableName, @Param("fieldName") String fieldName);
/**
* 更新方法
*

View File

@ -0,0 +1,16 @@
package com.celnet.datadump.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.celnet.datadump.entity.LinkConfig;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* Mapper 接口
* </p>
*/
@Mapper
public interface LinkConfigMapper extends BaseMapper<LinkConfig> {
}

View File

@ -0,0 +1,11 @@
package com.celnet.datadump.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.celnet.datadump.entity.MetaclassConfig;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface MetaclassConfigMapper extends BaseMapper<MetaclassConfig> {
}

View File

@ -17,6 +17,8 @@ public interface CommonService {
ReturnT<String> increment(SalesforceParam param) throws Exception;
ReturnT<String> incrementNew(SalesforceParam param) throws Exception;
ReturnT<String> dump(SalesforceParam param) throws Exception;
Integer countSfNum(PartnerConnection connect, SalesforceParam param) throws Exception;
@ -57,4 +59,21 @@ public interface CommonService {
String getDocumentId(PartnerConnection partnerConnection, String contentVersionId) throws Exception;
ReturnT<String> getChatter(SalesforceParam param) throws Exception;
/**
* 创建关联字段
* @param param 参数
* @return ReturnT
* @throws Exception exception
*/
ReturnT<String> createLinkTypeField(SalesforceParam param) throws Exception;
/**
* 更新关联类型
* @param param 参数
* @return ReturnT
* @throws Exception exception
*/
ReturnT<String> updateLinkType(SalesforceParam param) throws Exception;
}

View File

@ -10,4 +10,6 @@ public interface DataImportBatchService {
ReturnT<String> immigrationUpdateBatch(SalesforceParam param) throws Exception;
ReturnT<String> insertSingleBatch(SalesforceParam param) throws Exception;
}

View File

@ -34,4 +34,6 @@ public interface DataImportNewService {
*/
ReturnT<String> uploadDocumentLinkJob(String paramStr) throws Exception;
ReturnT<String> dumpFileNew(SalesforceParam param) throws Exception;
}

View File

@ -35,4 +35,7 @@ public interface FileService {
void uploadFile(String api, String field, Boolean singleThread);
void uploadFileToAttachment(String api, String field, Boolean singleThread);
void uploadFileToDocument(String api, String field, Boolean singleThread);
}

View File

@ -0,0 +1,10 @@
package com.celnet.datadump.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.celnet.datadump.entity.DataLog;
import com.celnet.datadump.entity.LinkConfig;
public interface LinkConfigService extends IService<LinkConfig> {
}

View File

@ -0,0 +1,9 @@
package com.celnet.datadump.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.celnet.datadump.entity.MetaclassConfig;
public interface MetaclassConfigService extends IService<MetaclassConfig> {
}

View File

@ -9,4 +9,6 @@ import com.celnet.datadump.entity.SystemConfig;
*/
public interface OrgConfigService extends IService<OrgConfig> {
void verifyOrgConfig();
}

View File

@ -3,9 +3,7 @@ package com.celnet.datadump.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.celnet.datadump.config.SalesforceConnect;
import com.celnet.datadump.config.SalesforceExecutor;
import com.celnet.datadump.config.SalesforceTargetConnect;
@ -22,7 +20,6 @@ import com.celnet.datadump.util.EmailUtil;
import com.celnet.datadump.util.SqlUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sforce.async.BulkConnection;
import com.sforce.soap.partner.*;
import com.sforce.soap.partner.sobject.SObject;
import com.xxl.job.core.biz.model.ReturnT;
@ -81,6 +78,10 @@ public class CommonServiceImpl implements CommonService {
private DataReportDetailService dataReportDetailService;
@Autowired
private SalesforceTargetConnect salesforceTargetConnect;
@Autowired
private MetaclassConfigService metaclassConfigService;
@Autowired
private LinkConfigService linkConfigService;
@Override
public ReturnT<String> increment(SalesforceParam param) throws Exception {
@ -192,6 +193,66 @@ public class CommonServiceImpl implements CommonService {
}
}
@Override
public ReturnT<String> incrementNew(SalesforceParam param) throws Exception {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
if (StringUtils.isNotBlank(param.getApi())) {
List<String> apis = DataUtil.toIdList(param.getApi());
qw.in("name", apis);
}
qw.eq("need_update", true)
.isNotNull("last_update_date");
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isEmpty(list)) {
return new ReturnT<>(500, ("" + param.getApi() + "不存在或未开启更新"));
}
List<Future<?>> futures = Lists.newArrayList();
try {
DataReport dataReport = new DataReport();
dataReport.setType(TypeCode.INCREMENT);
dataReport.setApis(list.stream().map(DataObject::getName).collect(Collectors.joining(",")));
dataReportService.save(dataReport);
for (DataObject dataObject : list) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
Date updateTime = new Date();
SalesforceParam salesforceParam = new SalesforceParam();
salesforceParam.setApi(dataObject.getName());
salesforceParam.setBeginModifyDate(dataObject.getLastUpdateDate());
salesforceParam.setType(2);
// 更新字段值不为空 按更新字段里的字段校验
if (StringUtils.isNotBlank(dataObject.getUpdateField())) {
salesforceParam.setUpdateField(dataObject.getUpdateField());
}
dumpDataNew(salesforceParam, dataReport,dataObject);
dataObject.setLastUpdateDate(updateTime);
dataObjectService.updateById(dataObject);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, 0, 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
// 存在附件的开始dump
list.stream().filter(t -> StringUtils.isNotBlank(t.getBlobField())).forEach(t -> {
try {
fileService.dumpFile(t.getName(), t.getBlobField(), true);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return ReturnT.SUCCESS;
} catch (Throwable throwable) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
throw throwable;
}
}
@Override
public ReturnT<String> dump(SalesforceParam param) throws Exception {
List<Future<?>> futures = Lists.newArrayList();
@ -305,10 +366,7 @@ public class CommonServiceImpl implements CommonService {
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
// 附件表 跑一遍dumpFile
if (StringUtils.isNotBlank(dataObject.getBlobField())) {
fileService.dumpFile(dataObject.getName(), dataObject.getBlobField(), true);
}
update.setDataWork(0);
} catch (Throwable e) {
String message = e.getMessage();
@ -316,10 +374,8 @@ public class CommonServiceImpl implements CommonService {
EmailUtil.send("DataDump ERROR", format);
throw new RuntimeException(e);
} finally {
if (dataObject != null) {
update.setDataLock(0);
dataObjectService.updateById(update);
}
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
// 等待当前所有线程执行完成
@ -401,11 +457,7 @@ public class CommonServiceImpl implements CommonService {
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
// 附件表 跑一遍dumpFile
DataObject one = dataObjectService.getById(api);
if (StringUtils.isNotBlank(one.getBlobField())) {
fileService.dumpFile(one.getName(), one.getBlobField(), true);
}
update.setDataWork(0);
} catch (Throwable e) {
log.error("manualDump error", e);
@ -493,6 +545,48 @@ public class CommonServiceImpl implements CommonService {
return connect;
}
/**
* 数据传输主体
*
* @param param 参数
*/
private void dumpDataNew(SalesforceParam param, DataReport dataReport , DataObject dataObject) throws Throwable {
String api = param.getApi();
PartnerConnection connect ;
try {
DataBatchHistory dataBatchHistory = new DataBatchHistory();
dataBatchHistory.setName(api);
dataBatchHistory.setStartDate(new Date());
dataBatchHistory.setSyncStartDate(param.getBeginCreateDate());
if (param.getEndCreateDate() != null) {
dataBatchHistory.setSyncEndDate(DateUtils.addSeconds(param.getEndCreateDate(), -1));
}
dataBatchHistory.setBatch(param.getBatch());
connect = salesforceConnect.createConnect();
// 存在isDeleted 只查询IsDeleted为false的
if (dataFieldService.hasDeleted(param.getApi())) {
param.setIsDeleted(false);
} else {
// 不存在 过滤
param.setIsDeleted(null);
}
dataBatchHistory.setSfNum(countSfNum(connect, param));
getAllSfData(param, connect, dataReport);
insertDataBatch(param, dataBatchHistory, dataObject);
} catch (Throwable throwable) {
log.error("dataDumpJob error api:{}", api, throwable);
String type = param.getType() == 1 ? "存量" : "增量";
String format = String.format("%s数据迁移 error, api name: %s, \nparam: %s, \ncause:\n%s", type, api, JSON.toJSONString(param, DataDumpParam.getFilter()), throwable);
EmailUtil.send("DataDump ERROR", format);
throw throwable;
}
}
/**
* 任务拆分
*
@ -579,9 +673,7 @@ public class CommonServiceImpl implements CommonService {
}
fields.add(field.getName());
}
if ("Attachment".equals(api) || "FeedItem".equals(api)) {
fields.add("Parent.type");
}
DataObject dataObject = dataObjectService.getById(api);
if (dataObject != null && StringUtils.isNotBlank(dataObject.getExtraField())) {
fields.addAll(Arrays.asList(StringUtils.split(dataObject.getExtraField().replaceAll(StringUtils.SPACE, StringUtils.EMPTY), ",")));
@ -692,6 +784,37 @@ public class CommonServiceImpl implements CommonService {
log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num);
}
/**
* 执行增量任务新增一个批次
* @param param 参数
* @param dataBatchHistory 执行记录
*/
private void insertDataBatch(SalesforceParam param, DataBatchHistory dataBatchHistory, DataObject dataObject) {
dataBatchHistory.setEndDate(new Date());
Integer num = customMapper.count(param);
dataBatchHistory.setDbNum(num);
if (dataBatchHistory.getSfNum() != null) {
dataBatchHistory.setSyncStatus(dataBatchHistory.getSfNum().equals(dataBatchHistory.getDbNum()) ? 1 : 0);
}
dataBatchHistory.setCost(DataUtil.calTime(dataBatchHistory.getEndDate(), dataBatchHistory.getStartDate()));
DataBatch dataBatch = new DataBatch();
dataBatch.setName(dataObject.getName());
dataBatch.setLabel(dataObject.getLabel());
dataBatch.setFirstSfNum(dataBatchHistory.getSfNum());
dataBatch.setFirstDbNum(dataBatchHistory.getDbNum());
dataBatch.setFirstSyncDate(dataBatchHistory.getStartDate());
dataBatch.setSyncStatus(dataBatchHistory.getSyncStatus());
dataBatch.setSyncStartDate(dataObject.getLastUpdateDate());
dataBatch.setSyncEndDate(new Date());
dataBatchService.save(dataBatch);
log.info("count db num: {}", num);
XxlJobLogger.log("count db num: {}", num);
dataBatchHistoryService.save(dataBatchHistory);
log.info("dataDumpJob done api:{}, count:{}", dataBatchHistory.getName(), num);
}
/**
* 获取需同步sf数据数量
*
@ -756,26 +879,7 @@ public class CommonServiceImpl implements CommonService {
maps.add(paramMap);
maps.add(paramMap2);
}
// Task和Event
// if ("Task".equals(api) || "Event".equals(api)){
// Map<String, Object> paramwhoMap = Maps.newHashMap();
// paramwhoMap.put("key", "WhoId_Type__c");
// paramwhoMap.put("value", jsonObject.get("Who_Type"));
// maps.add(paramwhoMap);
// Map<String, Object> paramwhatMap = Maps.newHashMap();
// paramwhoMap.put("key", "WhatId_Type__c");
// paramwhoMap.put("value", jsonObject.get("What_Type"));
// maps.add(paramwhoMap);
// }
//附件关联表 插入更新时给关联对象赋值
// if ("ContentDocumentLink".equals(api)) {
// String linkedEntity_Type = records[i].getChild("LinkedEntity").getChild("Type").getValue().toString();
// Map<String, Object> paramMap = Maps.newHashMap();
// paramMap.put("key", "linkedEntity_Type");
// paramMap.put("value", linkedEntity_Type);
// maps.add(paramMap);
// }
if (existsIds.contains(id)) {
customMapper.updateById(api, maps, id);
} else {
@ -811,8 +915,14 @@ public class CommonServiceImpl implements CommonService {
// 加个锁 避免重复执行创建api
reentrantLock.lock();
log.info("check api:{}", apiName);
QueryWrapper<MetaclassConfig> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("name",apiName);
long count = metaclassConfigService.count(queryWrapper);
try {
boolean hasCreatedDate = false;
PartnerConnection connection = salesforceConnect.createConnect();
DataObject dataObject = dataObjectService.getById(apiName);
Date now = new Date();
@ -822,8 +932,6 @@ public class CommonServiceImpl implements CommonService {
List<Map<String, Object>> list = Lists.newArrayList();
DescribeSObjectResult dsr = connection.describeSObject(apiName);
String label = dsr.getLabel();
boolean isCustomObject = dsr.isCustom(); // 自定义对象才支持新增字段
boolean isUpdateable = dsr.isUpdateable(); // 对象本身是否可修改
List<DataField> fieldList = Lists.newArrayList();
List<String> fields = Lists.newArrayList();
String blobField = null;
@ -859,7 +967,17 @@ public class CommonServiceImpl implements CommonService {
if (field.getReferenceTo().length <= 3) {
join = StringUtils.join(field.getReferenceTo(), ",");
} else {
log.warn("referenceTo too long set null, api:{}, field:{}, reference to:{}", apiName, field.getName(), StringUtils.join(field.getReferenceTo(), ","));
log.info("referenceTo too long set null, api:{}, field:{}, reference to:{}", apiName, field.getName(), StringUtils.join(field.getReferenceTo(), ","));
}
if (field.getReferenceTo().length > 1){
if (field.getName().contains("Id") && !"OwnerId".equals(field.getName())){
LinkConfig linkConfig = new LinkConfig();
linkConfig.setApi(apiName);
linkConfig.setLabel(label);
linkConfig.setField(field.getName());
linkConfig.setLinkField(StringUtils.replace(field.getName(), "Id", "_Type"));
linkConfigService.save(linkConfig);
}
}
// picklist保存到picklist表
if ("picklist".equalsIgnoreCase(sfType)) {
@ -934,44 +1052,6 @@ public class CommonServiceImpl implements CommonService {
map.put("name", "new_id");
list.add(map);
// if ("Task".equals(apiName) || "Event".equals(apiName)){
// Map<String, Object> LinkedMap = Maps.newHashMap();
// LinkedMap.put("type", "varchar(18)");
// LinkedMap.put("comment", "whatId关联对象");
// LinkedMap.put("name", "WhatId_Type__c");
// list.add(LinkedMap);
// Map<String, Object> LinkedMap1 = Maps.newHashMap();
// LinkedMap1.put("type", "varchar(18)");
// LinkedMap1.put("comment", "whoId关联对象");
// LinkedMap1.put("name", "WhoId_Type__c");
// list.add(LinkedMap1);
// }
if ("ContentDocumentLink".equals(apiName)){
//文档关联表新增关联对象字段
Map<String, Object> LinkedMap = Maps.newHashMap();
LinkedMap.put("type", "varchar(255)");
LinkedMap.put("comment", "关联对象");
LinkedMap.put("name", "LinkedEntity_Type");
list.add(LinkedMap);
}
if ("Attachment".equals(apiName) || "FeedComment".equals(apiName)
|| "FeedItem".equals(apiName)){
//文档关联表新增关联对象字段
Map<String, Object> LinkedMap = Maps.newHashMap();
LinkedMap.put("type", "varchar(255)");
LinkedMap.put("comment", "关联对象");
LinkedMap.put("name", "Parent_Type");
list.add(LinkedMap);
DataField dataField = new DataField();
dataField.setApi(apiName);
dataField.setField("Parent.Type");
dataField.setName("关联对象");
fieldList.add(dataField);
}
customMapper.createTable(apiName, label, list, index);
// 生成字段映射
QueryWrapper<DataField> delfieldQw = new QueryWrapper<>();
@ -1021,9 +1101,10 @@ public class CommonServiceImpl implements CommonService {
update.setName(apiName);
update.setLastUpdateDate(endCreateDate);
update.setBlobField(blobField);
if(!isCustomObject && !isUpdateable){
if (count>0){
update.setIsEditable(false);
}
update.setKeyPrefix(dsr.getKeyPrefix());
dataObjectService.saveOrUpdate(update);
}
} finally {
@ -1097,7 +1178,19 @@ public class CommonServiceImpl implements CommonService {
* @param fields 字段名称list
*/
private static void fileExtraFieldBuild(String apiName, List<Map<String, Object>> list, List<DataField> fieldList, List<String> fields) {
{
if ("Document".equals(apiName)){
DataField dataField = new DataField();
dataField.setApi(apiName);
dataField.setField("url");
dataField.setName("文件路径");
fieldList.add(dataField);
fields.add("url");
Map<String, Object> map = Maps.newHashMap();
map.put("type", "text");
map.put("comment", "文件路径");
map.put("name", "localUrl");
list.add(map);
}else {
DataField dataField = new DataField();
dataField.setApi(apiName);
dataField.setField("url");
@ -1206,7 +1299,7 @@ public class CommonServiceImpl implements CommonService {
for (int j = 0; j < saveResults.length; j++) {
if (!saveResults[j].getSuccess()) {
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j]));
EmailUtil.send("DataDump ContentDocumentLink ERROR", format);
log.error(format);
} else {
List<Map<String, Object>> dList = new ArrayList<>();
Map<String, Object> linkMap = new HashMap<>();
@ -1262,6 +1355,90 @@ public class CommonServiceImpl implements CommonService {
return null;
}
@Override
public ReturnT<String> createLinkTypeField(SalesforceParam param) throws Exception {
try {
List<String> apis;
if (StringUtils.isBlank(param.getApi())) {
apis = dataObjectService.list().stream().map(DataObject::getName).collect(Collectors.toList());
} else {
apis = DataUtil.toIdList(param.getApi());
}
QueryWrapper<LinkConfig> wrapper = new QueryWrapper<>();
wrapper.eq("is_link",1).eq("is_create",0);
if (!apis.isEmpty()){
wrapper.in("api", apis);
}
List<LinkConfig> list = linkConfigService.list(wrapper);
for (LinkConfig config : list) {
if (StringUtils.isBlank(customMapper.checkTable(config.getApi()))){
log.info("表api{} 不存在!", config.getApi());
continue;
}
log.info("表api{} 未创建字段:{},现在创建!", config.getApi(),config.getLinkField());
//创建字段
customMapper.createField(config.getApi(), config.getLinkField());
config.setIsCreate(true);
linkConfigService.updateById(config);
}
}catch (Exception e){
log.error(e.getMessage());
}
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> updateLinkType(SalesforceParam param) throws Exception {
try {
List<String> apis;
if (StringUtils.isBlank(param.getApi())) {
apis = dataObjectService.list().stream().map(DataObject::getName).collect(Collectors.toList());
} else {
apis = DataUtil.toIdList(param.getApi());
}
QueryWrapper<LinkConfig> wrapper = new QueryWrapper<>();
wrapper.eq("is_link","1").eq("is_create",1);
if (!apis.isEmpty()){
wrapper.in("api", apis);
}
Map<String, String> resultMap = dataObjectService.list().stream()
.collect(Collectors.toMap(
DataObject::getKeyPrefix,
DataObject::getName,
(existing, replacement) -> replacement));
List<LinkConfig> list = linkConfigService.list(wrapper);
for (LinkConfig config : list) {
//若是该对象已经存在数据则遍历写入值
Integer dbNum = customMapper.countBySQL(config.getApi(),null);
log.info("表api{} 存在" +dbNum+ "条数据!赋值当前表多态类型字段:{}", config.getApi(),config.getLinkField());
if (dbNum >0 ) {
int page = dbNum % 2000 == 0 ? dbNum / 2000 : (dbNum / 2000) + 1;
for (int i = 0; i < page; i++) {
List<Map<String, Object>> mapList = customMapper.list("*", config.getApi(), "1=1 order by Id limit " + i * 2000 + ",2000");
List<Map<String, Object>> updateMapList = new ArrayList<>();
for (int j = 1; j <= mapList.size(); j++) {
Map<String, Object> map = mapList.get(j - 1);
if (map.get(config.getField()) != null){
String type = resultMap.get(map.get(config.getField()).toString().substring(0, 3));
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", config.getLinkField());
paramMap.put("value", type);
updateMapList.add(paramMap);
customMapper.updateById(config.getApi(), updateMapList, String.valueOf(mapList.get(j - 1).get("Id") != null?mapList.get(j - 1).get("Id") : mapList.get(j - 1).get("id")));
}
}
}
}
}
}catch (Exception e){
log.error(e.getMessage());
}
return ReturnT.SUCCESS;
}
public Date getLastDay(String batchType,Date endDate,Date startDate){
switch (batchType) {

View File

@ -39,6 +39,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
@ -74,6 +76,8 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
@Autowired
private DataBatchHistoryService dataBatchHistoryService;
@Autowired
private LinkConfigService linkConfigService;
/**
* Insert入口
@ -88,6 +92,8 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
if (result != null) {
return result;
}
}else {
autoImmigrationBatch(param, futures);
}
return ReturnT.SUCCESS;
} catch (Exception exception) {
@ -98,7 +104,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
}
/**
* 组装执行参数
* 单表组装Insert执行参数
*/
public ReturnT<String> manualImmigrationBatch(SalesforceParam param, List<Future<?>> futures) throws Exception {
List<String> apis;
@ -156,7 +162,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
manualCreatedNewIdBatch(salesforceParam, bulkConnection);
createdNewIdBatch(salesforceParam, bulkConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
@ -183,10 +189,97 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
return null;
}
/**
* 多表组装Insert执行参数
*/
public void autoImmigrationBatch(SalesforceParam param, List<Future<?>> futures) throws Exception {
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils.isEmpty(param.getIds());
if (isFull) {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_lock", 1);
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isNotEmpty(list)) {
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
String message = "api:" + apiNames + " is locked";
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
EmailUtil.send("DataDump ERROR", format);
return ;
}
}
BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect();
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_work", 1)
.orderByAsc("data_index")
.last(" limit 10");
while (true) {
List<DataObject> dataObjects = dataObjectService.list(qw);
//判断dataObjects是否为空
if (CollectionUtils.isEmpty(dataObjects)) {
return;
}
for (DataObject object : dataObjects) {
TimeUnit.MILLISECONDS.sleep(1);
try {
List<SalesforceParam> salesforceParams = null;
object.setDataLock(1);
dataObjectService.updateById(object);
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", object.getName());
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
createdNewIdBatch(salesforceParam, bulkConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 1);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
object.setDataWork(0);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
log.error("manualImmigration error", e);
throw new RuntimeException(e);
} finally {
if (isFull) {
object.setDataLock(0);
dataObjectService.updateById(object);
}
}
}
}
}
/**
* 执行数据Insert
*/
public void manualCreatedNewIdBatch(SalesforceParam param, BulkConnection bulkConnection) throws Exception {
public void createdNewIdBatch(SalesforceParam param, BulkConnection bulkConnection) throws Exception {
String api = param.getApi();
QueryWrapper<DataField> dbQw = new QueryWrapper<>();
dbQw.eq("api", api);
@ -216,6 +309,20 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
List<JSONObject> insertList = new ArrayList<>();
//查询当前对象多态字段映射
QueryWrapper<LinkConfig> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("api",api).eq("is_create",1).eq("is_link",true);
List<LinkConfig> configs = linkConfigService.list(queryWrapper);
Map<String, String> fieldMap = new HashMap<>();
if (!configs.isEmpty()) {
fieldMap = configs.stream()
.collect(Collectors.toMap(
LinkConfig::getField, // Key提取器
LinkConfig::getLinkField, // Value提取器
(oldVal, newVal) -> newVal // 解决重复Key冲突保留新值
));
}
//判断引用对象是否存在new_id
DataObject update = new DataObject();
update.setName(api);
@ -234,12 +341,36 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|| "Id".equals(dataField.getField())){
continue;
}
if (dataField.getIsCreateable() && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()) {
if ("CreatedDate".equals(dataField.getField()) && dataField.getIsCreateable()){
// 转换为UTC时间并格式化
LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter);
ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC")).minusHours(8) ;
String convertedTime = utcDateTime.format(outputFormatter);
account.put("CreatedDate", convertedTime);
continue;
}
if ("CreatedById".equals(dataField.getField()) && dataField.getIsCreateable()){
Map<String, Object> CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString());
if(CreatedByIdMap.get("new_id") != null && StringUtils.isNotEmpty(CreatedByIdMap.get("new_id").toString())){
account.put("CreatedById", CreatedByIdMap.get("new_id"));
}
continue;
}
if (dataField.getIsCreateable() !=null && dataField.getIsCreateable() && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()) {
if ("reference".equals(dataField.getSfType())){
//引用类型
String reference = dataField.getReferenceTo();
if (reference == null){
reference = data.get(j-1).get("Parent_Type").toString();
//引用类型字段
String linkfield = fieldMap.get(dataField.getField());
if (StringUtils.isNotBlank(linkfield)){
reference = data.get(j-1).get(linkfield)!=null?data.get(j-1).get(linkfield).toString():null;
}
if (reference == null){
continue;
}
log.info("----------" + dataField.getField() + "的引用类型 ------------" + reference);
List<Map<String, Object>> referenceMap = customMapper.list("new_id", reference, "new_id is not null limit 1");
if (referenceMap.isEmpty()){
QueryWrapper<DataObject> maxIndex = new QueryWrapper<>();
@ -264,20 +395,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
account.put(dataField.getField(), pickList.get(0).get("value"));
continue;
}
account.put(dataField.getField(), DataUtil.fieldTypeToSf(dataField));
}
// 转换为UTC时间并格式化
LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter);
ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC")).minusHours(8) ;
String convertedTime = utcDateTime.format(outputFormatter);
account.put("CreatedDate", convertedTime);
Map<String, Object> CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString());
if(CreatedByIdMap.get("new_id") != null && StringUtils.isNotEmpty(CreatedByIdMap.get("new_id").toString())){
account.put("CreatedById", CreatedByIdMap.get("new_id"));
account.put(dataField.getField(), DataUtil.fieldTypeBulkToSf(dataField));
}
}
@ -304,6 +422,8 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
BulkUtil.closeJob(bulkConnection, salesforceInsertJob.getId());
new File(fullPath).delete();
} catch (Exception e) {
log.error("manualCreatedNewId error api:{}", api, e);
throw e;
@ -360,7 +480,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
index ++;
log.info("Created Success row with id " + id);
} else if (!insertStatus) {
log.info("Created Fail with error: " + error);
log.error("Created Fail with error: " + error);
}
}
}
@ -380,6 +500,8 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
if (result != null) {
return result;
}
}else {
autoUpdateSfDataBatch(param, futures);
}
return ReturnT.SUCCESS;
} catch (InterruptedException e) {
@ -392,7 +514,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
}
/**
* 组装执行参数
* 单彪组装Update执行参数
*/
public ReturnT<String> updateSfDataBatch(SalesforceParam param, List<Future<?>> futures) throws Exception {
List<String> apis;
@ -426,7 +548,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect();
for (String api : apis) {
DataObject update = new DataObject();
DataObject update = dataObjectService.getById(api);
try {
TimeUnit.MILLISECONDS.sleep(1);
List<SalesforceParam> salesforceParams = null;
@ -453,7 +575,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
manualUpdateSfDataBatch(salesforceParam, bulkConnection);
manualUpdateSfDataBatch(salesforceParam, bulkConnection,update);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
@ -477,10 +599,92 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
return null;
}
/**
* 多表组装Update执行参数
*/
public void autoUpdateSfDataBatch(SalesforceParam param, List<Future<?>> futures) throws Exception {
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils.isEmpty(param.getIds());
if (isFull) {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_lock", 1);
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isNotEmpty(list)) {
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
String message = "api:" + apiNames + " is locked";
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
EmailUtil.send("DataDump ERROR", format);
return ;
}
}
BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect();
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_work", 1)
.eq("need_update",1)
.orderByAsc("data_index")
.last(" limit 10");
while (true) {
List<DataObject> dataObjects = dataObjectService.list(qw);
//判断dataObjects是否为空
if (CollectionUtils.isEmpty(dataObjects)) {
return;
}
for (DataObject object : dataObjects) {
try {
TimeUnit.MILLISECONDS.sleep(1);
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", object.getName());
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
manualUpdateSfDataBatch(salesforceParam, bulkConnection,object);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 1);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
} catch (Exception e) {
throw e;
} finally {
if (isFull) {
object.setNeedUpdate(false);
object.setDataLock(0);
dataObjectService.updateById(object);
}
}
}
}
}
/**
* 执行数据Update
*/
private void manualUpdateSfDataBatch(SalesforceParam param, BulkConnection bulkConnection) throws Exception {
private void manualUpdateSfDataBatch(SalesforceParam param, BulkConnection bulkConnection,DataObject dataObject) throws Exception {
String api = param.getApi();
QueryWrapper<DataField> dbQw = new QueryWrapper<>();
@ -500,9 +704,20 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
return;
}
//判断引用对象是否存在new_id
DataObject update = new DataObject();
update.setName(api);
//查询当前对象多态字段映射
QueryWrapper<LinkConfig> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("api",api).eq("is_create",1).eq("is_link",true);
List<LinkConfig> configs = linkConfigService.list(queryWrapper);
Map<String, String> fieldMap = new HashMap<>();
if (!configs.isEmpty()) {
fieldMap = configs.stream()
.collect(Collectors.toMap(
LinkConfig::getField, // Key提取器
LinkConfig::getLinkField, // Value提取器
(oldVal, newVal) -> newVal // 解决重复Key冲突保留新值
));
}
// 总更新数
int sfNum = 0;
@ -520,24 +735,36 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
for (DataField dataField : list) {
String field = dataField.getField();
String reference_to = dataField.getReferenceTo();
String value = String.valueOf(map.get(field));
//根据旧sfid查找引用对象新sfid
if (field.equals("Id")) {
account.put("Id",String.valueOf(map.get("new_id")));
} else if (!DataUtil.isUpdate(field) || (dataField.getIsCreateable() != null && !dataField.getIsCreateable())) {
continue;
} else if ( "WhoId".equals(field) ||"WhatId".equals(field)){
continue;
} else if (StringUtils.isNotBlank(reference_to) && !"data_picklist".equals(reference_to)) {
if ( StringUtils.isNotEmpty(String.valueOf(map.get(field))) && !"OwnerId".equals(field)
&& !"Owner_Type".equals(field)) {
if (!"null".equals(value) && StringUtils.isNotEmpty(value)) {
//引用类型字段
String linkfield = fieldMap.get(dataField.getField());
if (StringUtils.isNotBlank(linkfield)){
reference_to = map.get(linkfield)!=null?map.get(linkfield).toString():null;
}
if (reference_to == null){
continue;
}
log.info("----------" + dataField.getField() + "的引用类型 ------------" + reference_to);
//判断reference_to内是否包含User字符串
if (reference_to.contains("User")) {
if (reference_to.contains(",User") || reference_to.contains("User,")) {
reference_to = "User";
}
Map<String, Object> m = customMapper.getById("new_id", reference_to, String.valueOf(map.get(field)));
Map<String, Object> m = customMapper.getById("new_id", reference_to, value);
if (m != null && !m.isEmpty()) {
account.put(field, m.get("new_id"));
}else {
String message = "对象类型:" + api + "的数据:"+ map.get("Id") +"的引用对象:" + dataField.getReferenceTo() + "的数据:"+ map.get(field) +"不存在!";
String message = "对象类型:" + api + "的数据:"+ map.get("Id") +"的引用对象:" + reference_to + "的数据:"+ map.get(field) +"不存在!";
EmailUtil.send("DataDump ERROR", message);
log.info(message);
return;
@ -551,9 +778,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
}
}
}
account.put("old_owner_id__c", map.get("OwnerId"));
account.put("old_sfdc_id__c", map.get("Id"));
if (dataObject.getIsEditable()){
account.put("old_owner_id__c", map.get("OwnerId"));
account.put("old_sfdc_id__c", map.get("Id"));
}
updateList.add(account);
}
@ -572,6 +801,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
BulkUtil.closeJob(bulkConnection, salesforceInsertJob.getId());
new File(fullPath).delete();
} catch (Throwable e) {
log.info(e.getMessage());
throw e;
@ -621,13 +851,357 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
index ++;
log.info("Update Success row with id " + id);
} else {
log.info("Update Fail with error: " + error);
log.error("Update Fail with error: " + error);
}
}
}
return index;
}
/**
* 一次新写入Insert入口
*/
@Override
public ReturnT<String> insertSingleBatch(SalesforceParam param) throws Exception {
List<Future<?>> futures = Lists.newArrayList();
try {
if (StringUtils.isNotBlank(param.getApi())) {
// 手动任务
ReturnT<String> result = manualInsertSingle(param, futures);
if (result != null) {
return result;
}
} else {
// 自动任务
autoInsertSingle(param, futures);
}
return ReturnT.SUCCESS;
} catch (Exception exception) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
log.error("insertSingle error", exception);
throw exception;
}
}
/**
* 单表 组装一次性写入参数
*/
public ReturnT<String> manualInsertSingle(SalesforceParam param, List<Future<?>> futures) throws Exception {
List<String> apis;
apis = DataUtil.toIdList(param.getApi());
String join = StringUtils.join(apis, ",");
log.info("insertSingle apis: {}", join);
XxlJobLogger.log("insertSingle apis: {}", join);
TimeUnit.MILLISECONDS.sleep(1);
// 全量的时候 检测是否有自动任务锁住的表
boolean isFull = CollectionUtils.isEmpty(param.getIds());
if (isFull) {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_lock", 1).in("name", apis);
List<DataObject> list = dataObjectService.list(qw);
if (CollectionUtils.isNotEmpty(list)) {
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
return new ReturnT<>(500, "api:" + apiNames + " is locked");
}
}
BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect();
for (String api : apis) {
DataObject update = new DataObject();
TimeUnit.MILLISECONDS.sleep(1);
try {
List<SalesforceParam> salesforceParams = null;
update.setName(api);
update.setDataLock(1);
dataObjectService.updateById(update);
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
insertSingleData(salesforceParam, bulkConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 1);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
log.error("insertSingle error", e);
throw new RuntimeException(e);
} finally {
if (isFull) {
update.setName(api);
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
}
return null;
}
/**
* 多表 组装一次新写入参数
*/
public void autoInsertSingle(SalesforceParam param, List<Future<?>> futures) throws Exception {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("data_work", 1)
.eq("data_lock", 0)
.orderByAsc("data_index")
.last(" limit 10");
BulkConnection bulkConnection = salesforceTargetConnect.createBulkConnect();
while (true) {
List<DataObject> dataObjects = dataObjectService.list(qw);
if (CollectionUtils.isEmpty(dataObjects)) {
break;
}
for (DataObject dataObject : dataObjects) {
DataObject update = new DataObject();
log.info("insertSingle api: {}", dataObject.getName());
TimeUnit.MILLISECONDS.sleep(1);
try {
String api = dataObject.getName();
update.setName(dataObject.getName());
update.setDataLock(1);
dataObjectService.updateById(update);
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
insertSingleData(salesforceParam, bulkConnection);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
futures.clear();
}
}
/**
* 执行一次性Insert数据
*/
private void insertSingleData(SalesforceParam param, BulkConnection bulkConnection) throws Exception {
String api = param.getApi();
QueryWrapper<DataField> dbQw = new QueryWrapper<>();
dbQw.eq("api", api);
List<DataField> list = dataFieldService.list(dbQw);
TimeUnit.MILLISECONDS.sleep(1);
Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate();
String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
//表内数据总量
Integer count = customMapper.countBySQL(api, "where new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'");
log.error("总Insert数据 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
if (count == 0) {
return;
}
//批量插入10000一次
int page = count%10000 == 0 ? count/10000 : (count/10000) + 1;
//总插入数
int sfNum = 0;
for (int i = 0; i < page; i++) {
List<JSONObject> data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 10000");
int size = data.size();
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
List<JSONObject> insertList = new ArrayList<>();
//查询当前对象多态字段映射
QueryWrapper<LinkConfig> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("api",api).eq("is_create",1).eq("is_link",true);
List<LinkConfig> configs = linkConfigService.list(queryWrapper);
Map<String, String> fieldMap = new HashMap<>();
if (!configs.isEmpty()) {
fieldMap = configs.stream()
.collect(Collectors.toMap(
LinkConfig::getField, // Key提取器
LinkConfig::getLinkField, // Value提取器
(oldVal, newVal) -> newVal // 解决重复Key冲突保留新值
));
}
//判断引用对象是否存在new_id
DataObject update = new DataObject();
update.setName(api);
//更新对象的new_id
String[] ids = new String[size];
// 定义输入/输出格式
DateTimeFormatter inputFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSxx");
for (int j = 1; j <= size; j++) {
JSONObject account = new JSONObject();
for (DataField dataField : list) {
if ("Owner_Type".equals(dataField.getField()) || "Id".equals(dataField.getField())){
continue;
}
if ("CreatedDate".equals(dataField.getField()) && dataField.getIsCreateable()){
// 转换为UTC时间并格式化
LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter);
ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC")).minusHours(8) ;
String convertedTime = utcDateTime.format(outputFormatter);
account.put("CreatedDate", convertedTime);
continue;
}
if ("CreatedById".equals(dataField.getField()) && dataField.getIsCreateable()){
Map<String, Object> CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString());
if(CreatedByIdMap.get("new_id") != null && StringUtils.isNotEmpty(CreatedByIdMap.get("new_id").toString())){
account.put("CreatedById", CreatedByIdMap.get("new_id"));
}
continue;
}
if (dataField.getIsCreateable() !=null && dataField.getIsCreateable()) {
if ("reference".equals(dataField.getSfType())){
//引用类型
String reference_to = dataField.getReferenceTo();
//引用类型字段
String linkfield = fieldMap.get(dataField.getField());
if (StringUtils.isNotBlank(linkfield)){
reference_to = data.get(j-1).get(linkfield)!=null?data.get(j-1).get(linkfield).toString():null;
}
if (reference_to == null){
continue;
}
if (reference_to.contains(",User") || reference_to.contains("User,")) {
reference_to = "User";
}
Map<String, Object> m = customMapper.getById("new_id", reference_to, data.get(j - 1).get(dataField.getField()).toString());
if (m != null && !m.isEmpty()) {
account.put(dataField.getField(), m.get("new_id"));
}else {
String message = "对象类型:" + api + "的数据:"+ data.get(j - 1).get("Id") +"的引用对象:" + reference_to + "的数据:"+ data.get(j - 1).get(dataField.getField()) +"不存在!";
EmailUtil.send("DataDump ERROR", message);
log.info(message);
return;
}
}else {
if (data.get(j - 1).get(dataField.getField()) != null && StringUtils.isNotBlank(dataField.getSfType())) {
account.put(dataField.getField(), DataUtil.localDataToSfData(dataField.getSfType(), data.get(j - 1).get(dataField.getField()).toString()));
}else {
account.put(dataField.getField(), data.get(j - 1).get(dataField.getField()) );
}
}
}
}
ids[j-1] = data.get(j-1).get("Id").toString();
insertList.add(account);
if (i*10000+j == count){
break;
}
}
try {
//写入csv文件
String fullPath = CsvConverterUtil.writeToCsv(insertList, UUID.randomUUID().toString());
JobInfo salesforceInsertJob = BulkUtil.createJob(bulkConnection, api, OperationEnum.insert);
List<BatchInfo> batchInfos = BulkUtil.createBatchesFromCSVFile(bulkConnection, salesforceInsertJob, fullPath);
BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos);
sfNum = sfNum + checkInsertResults(bulkConnection, salesforceInsertJob, batchInfos, api, ids);
BulkUtil.closeJob(bulkConnection, salesforceInsertJob.getId());
new File(fullPath).delete();
} catch (Exception e) {
log.error("manualCreatedNewId error api:{}", api, e);
throw e;
}
}
UpdateWrapper<DataBatchHistory> updateQw = new UpdateWrapper<>();
updateQw.eq("name", api)
.eq("sync_start_date", beginDate)
.eq("sync_end_date", DateUtils.addSeconds(endDate, -1))
.set("target_sf_num", sfNum);
dataBatchHistoryService.update(updateQw);
UpdateWrapper<DataBatch> updateQw2 = new UpdateWrapper<>();
updateQw2.eq("name", api)
.eq("sync_start_date", beginDate)
.eq("sync_end_date", endDate)
.set("sf_add_num", sfNum);
dataBatchService.update(updateQw2);
}
}

View File

@ -8,10 +8,8 @@ import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.celnet.datadump.config.SalesforceConnect;
import com.celnet.datadump.config.SalesforceExecutor;
import com.celnet.datadump.config.SalesforceTargetConnect;
import com.celnet.datadump.entity.DataBatch;
import com.celnet.datadump.entity.DataBatchHistory;
import com.celnet.datadump.entity.DataField;
import com.celnet.datadump.entity.DataObject;
import com.celnet.datadump.entity.*;
import com.celnet.datadump.enums.FileType;
import com.celnet.datadump.global.Const;
import com.celnet.datadump.global.SystemConfigCode;
import com.celnet.datadump.mapper.CustomMapper;
@ -20,6 +18,8 @@ import com.celnet.datadump.param.SalesforceParam;
import com.celnet.datadump.service.*;
import com.celnet.datadump.util.DataUtil;
import com.celnet.datadump.util.EmailUtil;
import com.celnet.datadump.util.HttpUtil;
import com.celnet.datadump.util.OssUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sforce.soap.partner.*;
@ -28,6 +28,7 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.ObjectUtils;
@ -35,6 +36,12 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -72,6 +79,8 @@ public class DataImportNewServiceImpl implements DataImportNewService {
@Autowired
private CommonService commonService;
@Autowired
private LinkConfigService linkConfigService;
/**
* Get返写个人客户联系人入口
@ -191,7 +200,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
//表内数据总量
Integer count = customMapper.countBySQL(api, "where new_id is null and IsPersonAccount = 1 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "'");
log.error("总Insert数据 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
log.info("总Insert数据 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
if (count == 0) {
return;
}
@ -368,7 +377,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
PartnerConnection partnerConnection = salesforceTargetConnect.createConnect();
for (String api : apis) {
DataObject update = new DataObject();
DataObject update = dataObjectService.getById(api);
try {
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
@ -394,7 +403,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
UpdateSfDataNew(salesforceParam, partnerConnection);
UpdateSfDataNew(salesforceParam, partnerConnection,update);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
@ -415,7 +424,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
}
}
return null;
return ReturnT.SUCCESS;
}
/**
@ -438,7 +447,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
PartnerConnection partnerConnection = salesforceTargetConnect.createConnect();
for (String api : apis) {
DataObject update = new DataObject();
DataObject update = dataObjectService.getById(api);
try {
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("name", api);
@ -447,17 +456,19 @@ public class DataImportNewServiceImpl implements DataImportNewService {
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
dbQw.gt("sync_end_date",dataObject.getLastUpdateDate());
List<DataBatch> dataBatches = dataBatchService.list(dbQw);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(dataBatches)) {
salesforceParams = dataBatches.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
if(t.getSyncEndDate().toInstant().isAfter(t.getLastModifiedDate().toInstant())){
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}
return null;
}).collect(Collectors.toList());
}
@ -465,7 +476,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
UpdateSfDataNew(salesforceParam, partnerConnection);
UpdateSfDataNew(salesforceParam, partnerConnection,update);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
@ -487,7 +498,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
}
}
return null;
return ReturnT.SUCCESS;
}
/**
@ -567,7 +578,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
UpdateSfDataNew(salesforceParam, partnerConnection);
UpdateSfDataNew(salesforceParam, partnerConnection,dataObject);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
@ -636,17 +647,19 @@ public class DataImportNewServiceImpl implements DataImportNewService {
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
dbQw.gt("sync_end_date",dataObject.getLastUpdateDate());
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
if(t.getSyncEndDate().toInstant().isAfter(t.getLastModifiedDate().toInstant())){
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}
return null;
}).collect(Collectors.toList());
}
@ -654,7 +667,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
for (SalesforceParam salesforceParam : salesforceParams) {
Future<?> future = salesforceExecutor.execute(() -> {
try {
UpdateSfDataNew(salesforceParam, partnerConnection);
UpdateSfDataNew(salesforceParam, partnerConnection,dataObject);
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
@ -681,7 +694,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
/**
* 执行Update更新数据
*/
private void UpdateSfDataNew(SalesforceParam param, PartnerConnection partnerConnection) throws Exception {
private void UpdateSfDataNew(SalesforceParam param, PartnerConnection partnerConnection,DataObject dataObject) throws Exception {
Map<String, Object> infoFlag = customMapper.list("code,value","system_config","code ='"+SystemConfigCode.INFO_FLAG+"'").get(0);
String api = param.getApi();
@ -717,15 +730,26 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
//表内数据总量
Integer count = customMapper.countBySQL(api, sql);
log.error("总Update数据 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
log.info("总Update数据 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
if(count == 0){
return;
}
//判断引用对象是否存在new_id
DataObject update = new DataObject();
update.setName(api);
//查询当前对象多态字段映射
QueryWrapper<LinkConfig> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("api",api).eq("is_create",1).eq("is_link",true);
List<LinkConfig> configs = linkConfigService.list(queryWrapper);
Map<String, String> fieldMap = new HashMap<>();
if (!configs.isEmpty()) {
fieldMap = configs.stream()
.collect(Collectors.toMap(
LinkConfig::getField, // Key提取器
LinkConfig::getLinkField, // Value提取器
(oldVal, newVal) -> newVal // 解决重复Key冲突保留新值
));
}
int targetCount = 0;
//批量插入200一次
@ -734,7 +758,8 @@ public class DataImportNewServiceImpl implements DataImportNewService {
List<Map<String, Object>> mapList = customMapper.list("*", api, sql2+ i * 200 + ",200");
SObject[] accounts = new SObject[mapList.size()];
int j = 0;
for (Map<String, Object> map : mapList) {
try {
for (Map<String, Object> map : mapList) {
SObject account = new SObject();
account.setType(api);
//给对象赋值
@ -750,17 +775,25 @@ public class DataImportNewServiceImpl implements DataImportNewService {
continue;
} else if (StringUtils.isNotBlank(reference_to) && !"data_picklist".equals(reference_to)) {
if (!"null".equals(value) && StringUtils.isNotEmpty(value) && (!"OwnerId".equals(field)
&& !"Owner_Type".equals(field))) {
if (!"null".equals(value) && StringUtils.isNotEmpty(value)) {
//引用类型字段
String linkfield = fieldMap.get(dataField.getField());
if (StringUtils.isNotBlank(linkfield)){
reference_to = map.get(linkfield)!=null?map.get(linkfield).toString():null;
}
if (reference_to == null){
continue;
}
log.info("----------" + dataField.getField() + "的引用类型 ------------" + reference_to);
//判断reference_to内是否包含User字符串
if (reference_to.contains("User")) {
if (reference_to.contains(",User") || reference_to.contains("User,")) {
reference_to = "User";
}
Map<String, Object> m = customMapper.getById("new_id", reference_to, value);
if (m != null && !m.isEmpty()) {
account.setField(field, m.get("new_id"));
}else {
String message = "对象类型:" + api + "的数据:"+ map.get("Id") +"的引用对象:" + dataField.getReferenceTo() + "的数据:"+ map.get(field) +"不存在!";
String message = "对象类型:" + api + "的数据:"+ map.get("Id") +"的引用对象:" + reference_to + "的数据:"+ map.get(field) +"不存在!";
EmailUtil.send("DataDump ERROR", message);
log.info(message);
return;
@ -774,12 +807,13 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
}
}
account.setField("old_owner_id__c", map.get("OwnerId"));
account.setField("old_sfdc_id__c", map.get("Id"));
if (dataObject.getIsEditable()){
account.setField("old_owner_id__c", map.get("OwnerId"));
account.setField("old_sfdc_id__c", map.get("Id"));
}
accounts[j++] = account;
}
try {
if (infoFlag != null && "1".equals(infoFlag.get("value"))){
printlnAccountsDetails(accounts,list);
}
@ -822,6 +856,9 @@ public class DataImportNewServiceImpl implements DataImportNewService {
for (int i = 0; i < accounts.length; i++) {
SObject account = accounts[i];
System.out.println("--- 对象数据[" + i + "] ---");
if (account == null){
continue;
}
// 获取对象所有字段名
for (DataField dataField : list) {
try {
@ -831,8 +868,8 @@ public class DataImportNewServiceImpl implements DataImportNewService {
System.out.println(dataField.getField() + ": [权限不足或字段不存在]");
}
}
System.out.println("old_owner_id__c: " + (account.getField("old_owner_id__c") != null ? account.getField("old_owner_id__c").toString() : "null"));
System.out.println("old_sfdc_id__c: " + (account.getField("old_sfdc_id__c") != null ? account.getField("old_sfdc_id__c").toString() : "null"));
System.out.println("old_owner_id__c: " + account.getField("old_owner_id__c"));
System.out.println("old_sfdc_id__c: " + account.getField("old_sfdc_id__c"));
}
}
@ -844,12 +881,15 @@ public class DataImportNewServiceImpl implements DataImportNewService {
HashMap<String, String> map = new HashMap<>();
for (int i = 0; i < accounts.length; i++) {
SObject account = accounts[i];
if (errorId.equals(account.getId()) || errorId.equals(account.getField("Id"))){
if (account == null){
continue;
}
if (errorId != null && (errorId.equals(account.getId()) || errorId.equals(account.getField("Id")))){
for (DataField dataField : list) {
try {
Object value = account.getField(dataField.getField());
map.put(dataField.getField(),String.valueOf(value));
System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null"));
System.out.println(dataField.getField() + ": " + value);
} catch (Exception e) {
System.out.println(dataField.getField() + ": [权限不足或字段不存在]");
}
@ -868,12 +908,23 @@ public class DataImportNewServiceImpl implements DataImportNewService {
public ReturnT<String> dumpDocumentLinkJob(String paramStr) throws Exception {
String api = "ContentDocumentLink";
PartnerConnection partnerConnection = salesforceConnect.createConnect();
List<Map<String, Object>> list = customMapper.list("Id", "ContentDocument", "new_id is not null");
DescribeSObjectResult dsr = partnerConnection.describeSObject(api);
List<String> fields = customMapper.getFields(api).stream().map(String::toUpperCase).collect(Collectors.toList());
Field[] dsrFields = dsr.getFields();
try {
if (list != null && !list.isEmpty()) {
Integer count = customMapper.countBySQL("ContentDocument", "where new_id is not null");
log.info("ContentDocument Total size" + count);
int page = count%200 == 0 ? count/200 : (count/200) + 1;
int totalSize = 0;
for (int i = 0; i < page; i++) {
List<Map<String, Object>> list = customMapper.list("Id", "ContentDocument", "new_id is not null order by Id limit " + i * 200 + ",200");
if (list.isEmpty()){
break;
}
totalSize = totalSize +list.size();
log.info("dumpContentDocumentLink By ContentDocument Now size" + totalSize);
DescribeSObjectResult dsr = partnerConnection.describeSObject(api);
List<String> fields = customMapper.getFields(api).stream().map(String::toUpperCase).collect(Collectors.toList());
Field[] dsrFields = dsr.getFields();
try {
for (Map<String, Object> map : list) {
String contentDocumentId = (String) map.get("Id");
String sql = "SELECT Id, LinkedEntityId, LinkedEntity.Type, ContentDocumentId, Visibility, ShareType, SystemModstamp, IsDeleted FROM ContentDocumentLink where ContentDocumentId = '" + contentDocumentId + "'";
@ -883,17 +934,13 @@ public class DataImportNewServiceImpl implements DataImportNewService {
objects = DataUtil.toJsonArray(records, dsrFields);
commonService.saveOrUpdate(api, fields, records, objects, true);
}
} catch (Throwable e) {
log.error("dumpDocumentLinkJob error message:{}", e.getMessage());
return ReturnT.FAIL;
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(list), e);
return ReturnT.FAIL;
} catch (Throwable e) {
log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(list), e);
TimeUnit.MINUTES.sleep(1);
return ReturnT.FAIL;
}
return null;
log.info("dumpDocumentLink Success !!! ");
return ReturnT.SUCCESS;
}
/**
@ -911,7 +958,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
//批量插入200一次
int page = count % 200 == 0 ? count / 200 : (count / 200) + 1;
for (int i = 0; i < page; i++) {
List<Map<String, Object>> linkList = customMapper.list("Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility", api, "ShareType = 'V' and new_id = '0' order by Id asc limit 200");
List<Map<String, Object>> linkList = customMapper.list("Id,LinkedEntityId,ContentDocumentId,LinkedEntity_Type,ShareType,Visibility", api, "ShareType = 'V' and new_id = '0' order by Id limit " + i * 200 + ",200");
SObject[] accounts = new SObject[linkList.size()];
String[] ids = new String[linkList.size()];
int index = 0;
@ -946,8 +993,8 @@ public class DataImportNewServiceImpl implements DataImportNewService {
SaveResult[] saveResults = connection.create(accounts);
for (int j = 0; j < saveResults.length; j++) {
if (!saveResults[j].getSuccess()) {
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(DataDumpParam.getFilter()), com.alibaba.fastjson.JSON.toJSONString(saveResults[j]));
EmailUtil.send("DataDump ContentDocumentLink ERROR", format);
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(DataDumpParam.getFilter()), JSON.toJSONString(saveResults[j]));
log.error(format);
} else {
List<Map<String, Object>> dList = new ArrayList<>();
Map<String, Object> linkMap = new HashMap<>();
@ -958,17 +1005,286 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(accounts), e);
EmailUtil.send("-------测试-----------", com.alibaba.fastjson2.JSON.toJSONString(accounts));
log.error("uploadDocumentLinkJob error message:{}", e.getMessage());
throw new RuntimeException(e);
}
}
}
} catch (Exception e) {
log.error("getDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(list), e);
log.error("uploadDocumentLink error api:{}, data:{}", api, com.alibaba.fastjson2.JSON.toJSONString(list), e);
return ReturnT.FAIL;
}
return null;
log.info("uploadDocumentLink Success !!! ");
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> dumpFileNew(SalesforceParam param) throws Exception {
String downloadUrl = null;
QueryWrapper<DataObject> wrapper = new QueryWrapper<>();
if (StringUtils.isNotBlank(param.getApi())) {
wrapper.in("name",DataUtil.toIdList(param.getApi()));
}else {
wrapper.isNotNull("blob_field");
}
List<DataObject> objectList = dataObjectService.list(wrapper);
if (objectList.isEmpty()){
log.info("没有对象存在文件二进制字段!不进行文件下载");
}
List<Map<String, Object>> poll = customMapper.list("code,value","org_config",null);
for (Map<String, Object> map1 : poll) {
if ("FILE_DOWNLOAD_URL".equals(map1.get("code"))) {
downloadUrl = (String) map1.get("value");
}
}
if (StringUtils.isEmpty(downloadUrl)) {
EmailUtil.send("DumpFile ERROR", "文件下载失败!下载地址未配置");
return ReturnT.FAIL;
}
PartnerConnection connect = salesforceConnect.createConnect();
List<Future<?>> futures = Lists.newArrayList();
for (DataObject dataObject : objectList) {
DataObject update = new DataObject();
log.info("dump file api:{}, field:{}", dataObject.getName(), dataObject.getBlobField());
try {
String api = dataObject.getName();
update.setName(dataObject.getName());
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
String finalDownloadUrl = downloadUrl;
Future<?> future = salesforceExecutor.execute(() -> {
try {
saveFile(salesforceParam, connect, finalDownloadUrl,dataObject.getName(),dataObject.getExtraField());
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
futures.clear();
return ReturnT.SUCCESS;
}
/**
* 下载文件
*/
private void saveFile(SalesforceParam param, PartnerConnection Connection ,String downloadUrl, String api, String field) {
String extraSql = "";
if (dataFieldService.hasDeleted(api)) {
extraSql += "AND IsDeleted = false ";
}
if (Const.FILE_TYPE == FileType.SERVER) {
// 检测路径是否存在 不存在则创建
File excel = new File(Const.SERVER_FILE_PATH + "/" + api);
if (!excel.exists()) {
boolean mkdir = excel.mkdir();
}
}
Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate();
String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
String token = Connection.getSessionHeader().getSessionId();
try {
String name = getName(api);
log.info("api:{},field:{},开始时间:{},结束时间:{}", api, field,beginDateStr,endDateStr);
Map<String, String> headers = Maps.newHashMap();
headers.put("Authorization", "Bearer " + token);
headers.put("connection", "keep-alive");
long num = 0;
while (true) {
// 获取未存储的附件id
List<Map<String, Object>> list = customMapper.list("Id, " + name, api, " is_dump = false and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + extraSql + "' limit 10");
if (CollectionUtils.isEmpty(list)) {
break;
}
for (Map<String, Object> map : list) {
String id = null;
// 上传完毕 更新附件信息
List<Map<String, Object>> maps = Lists.newArrayList();
boolean isDump = true;
int failCount = 0;
while (true) {
try {
id = (String) map.get("Id");
String fileName = (String) map.get(name);
log.info("------------文件名:" + id + "_" + fileName);
// 判断路径是否为空
if (StringUtils.isNotBlank(fileName)) {
String filePath = api + "/" + id + "_" + fileName;
// 拼接url
String url = downloadUrl + String.format(Const.SF_FILE_URL, api, id, field);
Response response = HttpUtil.doGet(url, null, headers);
if (response.body() != null) {
InputStream inputStream = response.body().byteStream();
switch (Const.FILE_TYPE) {
case OSS:
// 上传到oss
OssUtil.upload(inputStream, filePath);
break;
case SERVER:
dumpToServer(headers, id, filePath, url, response, inputStream);
break;
default:
log.error("id: {}, no mapping dump type", id);
}
Map<String, Object> paramMap = Maps.newHashMap();
if ("Document".equals(api)) {
paramMap.put("key", "localUrl");
} else {
paramMap.put("key", "url");
}
paramMap.put("value", filePath);
maps.add(paramMap);
}
}
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", isDump);
maps.add(paramMap);
customMapper.updateById(api, maps, id);
TimeUnit.MILLISECONDS.sleep(1);
break;
} catch (Throwable throwable) {
log.error("dump file error, id: {}", id, throwable);
failCount++;
if (Const.MAX_FAIL_COUNT < failCount) {
{
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", 2);
maps.add(paramMap);
}
customMapper.updateById(api, maps, id);
break;
}
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
num += list.size();
log.info("dump file count api:{}, field:{}, num:{}", api, field, num);
}
log.info("dump file success api:{}, field:{}, num:{}", api, field, num);
} catch (Throwable throwable) {
log.error("dump file error", throwable);
} finally {
// 把is_dump为2的重置为0
List<Map<String, Object>> maps = Lists.newArrayList();
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", 0);
maps.add(paramMap);
customMapper.update(maps, api, "is_dump = 2");
}
}
/**
* 获取附件名称
*
* @param api 表明
* @return name
*/
private static String getName(String api) {
// 默认name
String name = "Name";
// contentVersion使用PathOnClient
if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) {
name = "PathOnClient";
}
return name;
}
/**
* 下载文件到服务器
*
* @param headers 请求头
* @param id id
* @param filePath 文件路径
* @param url 链接
* @param response 响应
* @param inputStream
* @throws IOException exception
*/
private void dumpToServer(Map<String, String> headers, String id, String filePath, String url, Response response, InputStream inputStream) throws IOException {
String path = Const.SERVER_FILE_PATH + "/" + filePath;
log.info("--------文件名称:"+path);
long offset = 0L;
RandomAccessFile accessFile = new RandomAccessFile(path, "rw");
while (true) {
try {
// 保存到本地
byte[] buf = new byte[8192];
int len = 0;
if (offset > 0) {
inputStream.skip(offset);
accessFile.seek(offset);
}
while ((len = inputStream.read(buf)) != -1) {
accessFile.write(buf, 0, len);
offset += len;
}
break;
} catch (Exception e) {
if (offset <= 0) {
throw e;
}
log.warn("file dump to server EOF ERROR try to reconnect");
response.close();
response = HttpUtil.doGet(url, null, headers);
assert response.body() != null;
inputStream = response.body().byteStream();
log.warn("reconnect success, id:{} skip {}", id, offset);
}
}
accessFile.close();
}
}

View File

@ -75,7 +75,7 @@ public class DataImportServiceImpl implements DataImportService {
private CommonService commonService;
@Autowired
private SystemConfigService systemConfigService;
private LinkConfigService linkConfigService;
@Override
@ -200,6 +200,7 @@ public class DataImportServiceImpl implements DataImportService {
}
for (DataObject dataObject : dataObjects) {
DataObject update = new DataObject();
log.info("autoImmigration api: {}", dataObject.getName());
TimeUnit.MILLISECONDS.sleep(1);
try {
String api = dataObject.getName();
@ -243,10 +244,8 @@ public class DataImportServiceImpl implements DataImportService {
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
if (dataObject != null) {
update.setDataLock(0);
dataObjectService.updateById(update);
}
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
// 等待当前所有线程执行完成
@ -275,102 +274,109 @@ public class DataImportServiceImpl implements DataImportService {
return;
}
//查询当前对象多态字段映射
QueryWrapper<LinkConfig> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("api",api).eq("is_create",1).eq("is_link",true);
List<LinkConfig> configs = linkConfigService.list(queryWrapper);
Map<String, String> fieldMap = new HashMap<>();
if (!configs.isEmpty()) {
fieldMap = configs.stream()
.collect(Collectors.toMap(
LinkConfig::getField, // Key提取器
LinkConfig::getLinkField, // Value提取器
(oldVal, newVal) -> newVal // 解决重复Key冲突保留新值
));
}
//批量插入200一次
int page = count%200 == 0 ? count/200 : (count/200) + 1;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
DataObject update = new DataObject();
update.setName(api);
for (int i = 0; i < page; i++) {
List<Map<String, Object>> data = customMapper.list("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 200");
int size = data.size();
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
SObject[] accounts = new SObject[size];
String[] ids = new String[size];
for (int j = 1; j <= size; j++) {
SObject account = new SObject();
account.setType(api);
//找出sf对象必填字段并且给默认值
for (DataField dataField : list) {
if ("OwnerId".equals(dataField.getField()) || "Owner_Type".equals(dataField.getField())
|| "Id".equals(dataField.getField())){
continue;
}
if (dataField.getIsCreateable() && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()) {
if ("reference".equals(dataField.getSfType())){
log.info("----------" + dataField.getField() + " ------------" + dataField.getSfType());
String reference = dataField.getReferenceTo();
if (reference == null){
reference = data.get(j-1).get("Parent_Type").toString();
}
List<Map<String, Object>> referenceMap = customMapper.list("new_id", reference, "new_id is not null limit 1");
if (referenceMap.isEmpty()){
QueryWrapper<DataObject> maxIndex = new QueryWrapper<>();
maxIndex.select("IFNULL(max(data_index),0) as data_index");
maxIndex.ne("name", api);
Map<String, Object> map = dataObjectService.getMap(maxIndex);
//如果必填lookup字段没有值跳过
update.setDataIndex(Integer.parseInt(map.get("data_index").toString()) + 1);
dataObjectService.updateById(update);
return;
}else{
account.setField(dataField.getField(), referenceMap.get(0).get("new_id"));
continue;
}
}
if ("picklist".equals(dataField.getSfType())){
List<Map<String, Object>> pickList = customMapper.list("value", "data_picklist", "api = '"+api+"' and field = '"+dataField.getField()+"' limit 1");
account.setField(dataField.getField(), pickList.get(0).get("value"));
try {
List<Map<String, Object>> data = customMapper.list("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 200");
int size = data.size();
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
SObject[] accounts = new SObject[size];
String[] ids = new String[size];
for (int j = 1; j <= size; j++) {
SObject account = new SObject();
account.setType(api);
//找出sf对象必填字段并且给默认值
for (DataField dataField : list) {
if ("OwnerId".equals(dataField.getField()) || "Owner_Type".equals(dataField.getField())
|| "Id".equals(dataField.getField())){
continue;
}
account.setField(dataField.getField(), DataUtil.fieldTypeToSf(dataField));
}
}
if (!api.equals("Product2")){
//object类型转Date类型
Date date;
try {
date = sdf.parse(String.valueOf(data.get(j - 1).get("CreatedDate")));
}catch (ParseException e){
//解决当时间秒为0时转换秒精度丢失问题
date = sdf.parse(data.get(j - 1).get("CreatedDate")+":00");
}
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
account.setField("CreatedDate", calendar);
Map<String, Object> CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString());
if(CreatedByIdMap.get("new_id") != null && StringUtils.isNotEmpty(CreatedByIdMap.get("new_id").toString())){
account.setField("CreatedById", CreatedByIdMap.get("new_id"));
}
}
if (api.equals("Task")){
account.setField("TaskSubtype",data.get(j - 1).get("TaskSubtype"));
}
if ("CreatedDate".equals(dataField.getField()) && dataField.getIsCreateable()){
//object类型转Date类型
Date date;
try {
date = sdf.parse(String.valueOf(data.get(j - 1).get("CreatedDate")));
}catch (ParseException e){
//解决当时间秒为0时转换秒精度丢失问题
date = sdf.parse(data.get(j - 1).get("CreatedDate")+":00");
}
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
account.setField("CreatedDate", calendar);
continue;
}
if ("CreatedById".equals(dataField.getField()) && dataField.getIsCreateable()){
Map<String, Object> CreatedByIdMap = customMapper.getById("new_id", "User", data.get(j-1).get("CreatedById").toString());
if(CreatedByIdMap.get("new_id") != null && StringUtils.isNotEmpty(CreatedByIdMap.get("new_id").toString())){
account.setField("CreatedById", CreatedByIdMap.get("new_id"));
}
continue;
}
if (dataField.getIsCreateable() != null && dataField.getIsCreateable() && !dataField.getIsNillable() && !dataField.getIsDefaultedOnCreate()) {
if ("reference".equals(dataField.getSfType())){
//引用类型
String reference = dataField.getReferenceTo();
//引用类型字段
String linkfield = fieldMap.get(dataField.getField());
if (api.equals("Event")){
account.setField("EventSubtype", String.valueOf(data.get(j - 1).get("EventSubtype")));
// account.setField("IsRecurrence", String.valueOf(data.get(j - 1).get("IsRecurrence")));
}
// if (api.equals("Account")){
// Map<String, Object> referenceMap = customMapper.list("new_id","RecordType", "new_id is not null and id = '"+ data.get(j - 1).get("RecordTypeId")+"' limit 1").get(0);
// account.setField("RecordTypeId", referenceMap.get("new_id") );
// }
if (api.equals("vlink__Wechat_User__c")){
List<Map<String, Object>> maps = customMapper.list("new_id", "vlink__Wechat_Account__c", "new_id is not null and id = '" + data.get(j - 1).get("vlink__Wechat_Account__c") + "' limit 1");
if (!maps.isEmpty()){
Map<String, Object> referenceMap = maps.get(0);
account.setField("vlink__Wechat_Account__c", referenceMap.get("new_id"));
if (StringUtils.isNotBlank(linkfield)){
reference = data.get(j-1).get(linkfield)!=null?data.get(j-1).get(linkfield).toString():null;
}
if (reference == null){
continue;
}
log.info("----------" + dataField.getField() + "的引用类型 ------------" + reference);
List<Map<String, Object>> referenceMap = customMapper.list("new_id", reference, "new_id is not null limit 1");
if (referenceMap.isEmpty()){
QueryWrapper<DataObject> maxIndex = new QueryWrapper<>();
maxIndex.select("IFNULL(max(data_index),0) as data_index");
maxIndex.ne("name", api);
Map<String, Object> map = dataObjectService.getMap(maxIndex);
//如果必填lookup字段没有值跳过
update.setDataIndex(Integer.parseInt(map.get("data_index").toString()) + 1);
dataObjectService.updateById(update);
return;
}else{
account.setField(dataField.getField(), referenceMap.get(0).get("new_id"));
continue;
}
}
if ("picklist".equals(dataField.getSfType())){
List<Map<String, Object>> pickList = customMapper.list("value", "data_picklist", "api = '"+api+"' and field = '"+dataField.getField()+"' limit 1");
account.setField(dataField.getField(), pickList.get(0).get("value"));
continue;
}
account.setField(dataField.getField(), DataUtil.fieldTypeToSf(dataField));
}
}
ids[j-1] = data.get(j-1).get("Id").toString();
accounts[j-1] = account;
if (i*200+j == count){
break;
}
}
ids[j-1] = data.get(j-1).get("Id").toString();
accounts[j-1] = account;
if (i*200+j == count){
break;
}
}
try {
if (infoFlag != null && "1".equals(infoFlag.get("value"))){
printAccountsDetails(accounts, list);
}
@ -392,8 +398,7 @@ public class DataImportServiceImpl implements DataImportService {
}
TimeUnit.MILLISECONDS.sleep(1);
} catch (Exception e) {
log.error("manualCreatedNewId error api:{}", api, e);
throw e;
log.error("manualCreatedNewId error api:{},错误信息:{}", api, e.getMessage());
}
}
SalesforceParam countParam = new SalesforceParam();
@ -650,10 +655,9 @@ public class DataImportServiceImpl implements DataImportService {
} else if (!DataUtil.isUpdate(field) || (dataField.getIsCreateable() != null && !dataField.getIsCreateable())) {
continue;
} else if (StringUtils.isNotBlank(reference_to) && !"data_picklist".equals(reference_to)) {
if (!"null".equals(map.get(field)) && null != map.get(field) && !"OwnerId".equals(field)
|| !"Owner_Type".equals(field)) {
if (!"null".equals(map.get(field)) && null != map.get(field) ) {
//判断reference_to内是否包含User字符串
if (reference_to.contains("User")) {
if (reference_to.contains(",User") || reference_to.contains("User,")) {
reference_to = "User";
}
Map<String, Object> m = customMapper.getById("new_id", reference_to, String.valueOf(map.get(field)));
@ -665,16 +669,7 @@ public class DataImportServiceImpl implements DataImportService {
if (map.get(field) != null && StringUtils.isNotBlank(dataField.getSfType())) {
account.setField(field, DataUtil.localDataToSfData(dataField.getSfType(), String.valueOf(map.get(field))));
}else {
if (api.equals("Account")){
if ("1".equals(map.get("IsPersonAccount")) && field.equals("Name")){
continue;
}else if("0".equals(map.get("IsPersonAccount")) && field.equals("LastName")){
continue;
}
} else {
account.setField(field, map.get(field));
}
account.setField(field, map.get(field));
}
}
}
@ -726,18 +721,19 @@ public class DataImportServiceImpl implements DataImportService {
for (int i = 0; i < accounts.length; i++) {
SObject account = accounts[i];
System.out.println("--- Account[" + i + "] ---");
if (account == null){
return;
}
// 获取对象所有字段名
for (DataField dataField : list) {
try {
Object value = account.getField(dataField.getField());
System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null"));
System.out.println(dataField.getField() + ": " + account.getField(dataField.getField()));
} catch (Exception e) {
System.out.println(dataField.getField() + ": [权限不足或字段不存在]");
}
}
System.out.println("old_owner_id__c: " + (account.getField("old_owner_id__c") != null ? account.getField("old_owner_id__c").toString() : "null"));
System.out.println("old_sfdc_id__c: " + (account.getField("old_sfdc_id__c") != null ? account.getField("old_sfdc_id__c").toString() : "null"));
System.out.println("old_owner_id__c: " + account.getField("old_owner_id__c"));
System.out.println("old_sfdc_id__c: " + account.getField("old_sfdc_id__c"));
}
}
@ -748,18 +744,21 @@ public class DataImportServiceImpl implements DataImportService {
HashMap<String, String> map = new HashMap<>();
SObject account = accounts[i];
System.out.println("--- Account[" + i + "] ---");
if (account == null){
return null;
}
// 获取对象所有字段名
for (DataField dataField : list) {
try {
Object value = account.getField(dataField.getField());
map.put(dataField.getField(),String.valueOf(value));
System.out.println(dataField.getField() + ": " + (value != null ? value.toString() : "null"));
System.out.println(dataField.getField() + ": " + value);
} catch (Exception e) {
System.out.println(dataField.getField() + ": [权限不足或字段不存在]");
}
}
System.out.println("old_owner_id__c: " + (account.getField("old_owner_id__c") != null ? account.getField("old_owner_id__c").toString() : "null"));
System.out.println("old_sfdc_id__c: " + (account.getField("old_sfdc_id__c") != null ? account.getField("old_sfdc_id__c").toString() : "null"));
System.out.println("old_owner_id__c: " + account.getField("old_owner_id__c"));
System.out.println("old_sfdc_id__c: " + account.getField("old_sfdc_id__c"));
map.put("old_owner_id__c",String.valueOf(account.getField("old_owner_id__c")));
map.put("old_sfdc_id__c",String.valueOf(account.getField("old_sfdc_id__c")));
arrayList.add(map);
@ -827,10 +826,9 @@ public class DataImportServiceImpl implements DataImportService {
} else if (!DataUtil.isUpdate(field) || (dataField.getIsCreateable() != null && !dataField.getIsCreateable())) {
continue;
} else if (StringUtils.isNotBlank(reference_to) && !"data_picklist".equals(reference_to)) {
if (!"null".equals(map.get(field)) && null != map.get(field) && !"OwnerId".equals(field)
|| !"Owner_Type".equals(field)) {
if (!"null".equals(map.get(field)) && null != map.get(field)) {
//判断reference_to内是否包含User字符串
if (reference_to.contains("User")) {
if (reference_to.contains(",User") || reference_to.contains("User,")) {
reference_to = "User";
}

View File

@ -171,7 +171,7 @@ public class FileManagerServiceImpl implements FileManagerService {
downloadUrl = (String) map1.get("value");
}
}
if (StringUtils.isNotBlank(downloadUrl)) {
if (StringUtils.isBlank(downloadUrl)) {
EmailUtil.send("DumpFile ERROR", "文件下载失败!下载地址未配置");
return;
}

View File

@ -141,7 +141,7 @@ public class FileServiceImpl implements FileService {
downloadUrl = (String) map1.get("value");
}
}
if (StringUtils.isNotBlank(downloadUrl)) {
if (StringUtils.isEmpty(downloadUrl)) {
EmailUtil.send("DumpFile ERROR", "文件下载失败!下载地址未配置");
return;
}
@ -171,6 +171,7 @@ public class FileServiceImpl implements FileService {
// 获取未存储的附件id
List<Map<String, Object>> list = customMapper.list("Id, " + name, api, extraSql + " AND is_dump = false limit 1000");
if (CollectionUtils.isEmpty(list)) {
log.info("无需要下载文件数据!!!");
break;
}
String finalName = name;
@ -207,7 +208,11 @@ public class FileServiceImpl implements FileService {
log.error("id: {}, no mapping dump type", id);
}
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "url");
if ("Document".equals(api)){
paramMap.put("key", "localUrl");
}else {
paramMap.put("key", "url");
}
paramMap.put("value", filePath);
maps.add(paramMap);
}
@ -522,8 +527,9 @@ public class FileServiceImpl implements FileService {
HttpEntity he = response.getEntity();
if (he != null) {
respContent = EntityUtils.toString(he, "UTF-8");
String newId = JSONObject.parseObject(respContent).get("id").toString();
String newId = String.valueOf(JSONObject.parseObject(respContent).get("id"));
if (StringUtils.isBlank(newId)) {
log.error("文件上传错误,返回实体信息:" + respContent);
break;
}
Map<String, Object> paramMap = Maps.newHashMap();
@ -669,7 +675,7 @@ public class FileServiceImpl implements FileService {
log.info("文件不存在");
break;
}
String fileName = file.getName();
String fileName = (String) map.get("Name");
// dataObject查询
QueryWrapper<DataObject> qw = new QueryWrapper<>();
@ -769,4 +775,145 @@ public class FileServiceImpl implements FileService {
customMapper.update(maps, api, "is_upload = 2");
}
}
@Override
public void uploadFileToDocument(String api, String field, Boolean singleThread) {
String uploadUrl = null;
List<Map<String, Object>> poll = customMapper.list("code,value","org_config",null);
for (Map<String, Object> map1 : poll) {
if ("FILE_UPLOAD_URL".equals(map1.get("code"))) {
uploadUrl = (String) map1.get("value");
}
}
if (StringUtils.isBlank(uploadUrl)) {
EmailUtil.send("UploadFile ERROR", "文件上传失败!上传地址未配置");
return;
}
log.info("upload file api:{}, field:{}", api, field);
PartnerConnection connect = salesforceTargetConnect.createConnect();
String token = connect.getSessionHeader().getSessionId();
List<Future<?>> futures = Lists.newArrayList();
String extraSql = "";
if (dataFieldService.hasDeleted(api)) {
extraSql += "AND IsDeleted = false ";
}
if (Const.FILE_TYPE == FileType.SERVER) {
// 检测路径是否存在
File excel = new File(Const.SERVER_FILE_PATH + "/" + api);
if (!excel.exists()) {
throw new RuntimeException("找不到文件路径");
}
}
try {
// 获取未存储的附件id
List<Map<String, Object>> list = customMapper.list("Id, Name, localUrl, Description", api, "is_upload = 0");
for (Map<String, Object> map : list) {
String finalUploadUrl = uploadUrl;
String id = null;
// 上传完毕 更新附件信息
List<Map<String, Object>> maps = Lists.newArrayList();
boolean isUpload = true;
int failCount = 0;
CloseableHttpResponse response = null;
String respContent = null;
while (true) {
try {
id = (String) map.get("Id");
String url_fileName = (String) map.get("localUrl");
// 判断路径是否为空
if (StringUtils.isNotBlank(url_fileName)) {
String filePath = Const.SERVER_FILE_PATH + "/" + url_fileName;
File file = new File(filePath);
boolean exists = file.exists();
if (!exists) {
log.info("文件不存在");
break;
}
String fileName = file.getName();
// 拼接url
String url = finalUploadUrl + String.format(Const.SF_UPLOAD_FILE_URL, api);
HttpPost httpPost = new HttpPost(url);
httpPost.setHeader("Authorization", "Bearer " + token);
httpPost.setHeader("connection", "keep-alive");
JSONObject credentialsJsonParam = new JSONObject();
credentialsJsonParam.put("Name", fileName);
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
builder.addTextBody("data", credentialsJsonParam.toJSONString(), ContentType.APPLICATION_JSON);
builder.addBinaryBody("Body", new FileInputStream(file), ContentType.APPLICATION_OCTET_STREAM, fileName);
HttpEntity entity = builder.build();
httpPost.setEntity(entity);
CloseableHttpClient httpClient = HttpClients.createDefault();
response = httpClient.execute(httpPost);
if (response != null && response.getStatusLine() != null && response.getStatusLine().getStatusCode() < 400) {
HttpEntity he = response.getEntity();
if (he != null) {
respContent = EntityUtils.toString(he, "UTF-8");
String newId = JSONObject.parseObject(respContent).get("id").toString();
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "new_id");
paramMap.put("value", newId);
maps.add(paramMap);
}
} else {
throw new RuntimeException("Document文件上传失败地址或参数异常");
}
}
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_upload");
paramMap.put("value", isUpload);
maps.add(paramMap);
customMapper.updateById(api, maps, id);
TimeUnit.MILLISECONDS.sleep(1);
break;
} catch (Exception throwable) {
log.error("upload file error, id: {}", id, throwable);
failCount++;
if (Const.MAX_FAIL_COUNT < failCount) {
{
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_upload");
paramMap.put("value", 2);
maps.add(paramMap);
}
customMapper.updateById(api, maps, id);
break;
}
if (response != null) {
try {
response.close();
} catch (IOException e) {
log.error("exception message", e);
}
}
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
log.info("upload file success api:{}, field:{}", api, field);
} catch (Throwable throwable) {
log.error("upload file error", throwable);
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
} finally {
// 把is_upload为2的重置为0
List<Map<String, Object>> maps = Lists.newArrayList();
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_upload");
paramMap.put("value", 0);
maps.add(paramMap);
customMapper.update(maps, api, "is_upload = 2");
}
}
}

View File

@ -0,0 +1,16 @@
package com.celnet.datadump.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.celnet.datadump.entity.DataLog;
import com.celnet.datadump.entity.LinkConfig;
import com.celnet.datadump.mapper.DataLogMapper;
import com.celnet.datadump.mapper.LinkConfigMapper;
import com.celnet.datadump.service.DataLogService;
import com.celnet.datadump.service.LinkConfigService;
import org.springframework.stereotype.Service;
@Service
public class LinkConfigServiceImpl extends ServiceImpl<LinkConfigMapper, LinkConfig> implements LinkConfigService {
}

View File

@ -0,0 +1,13 @@
package com.celnet.datadump.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.celnet.datadump.entity.MetaclassConfig;
import com.celnet.datadump.mapper.MetaclassConfigMapper;
import com.celnet.datadump.service.MetaclassConfigService;
import org.springframework.stereotype.Service;
@Service
public class MetaclassConfigServiceImpl extends ServiceImpl<MetaclassConfigMapper, MetaclassConfig> implements MetaclassConfigService {
}

View File

@ -1,18 +1,123 @@
package com.celnet.datadump.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.celnet.datadump.config.SalesforceConnect;
import com.celnet.datadump.config.SalesforceTargetConnect;
import com.celnet.datadump.entity.OrgConfig;
import com.celnet.datadump.entity.SystemConfig;
import com.celnet.datadump.mapper.CustomMapper;
import com.celnet.datadump.mapper.OrgConfigMapper;
import com.celnet.datadump.mapper.SystemConfigMapper;
import com.celnet.datadump.service.OrgConfigService;
import com.celnet.datadump.service.SystemConfigService;
import com.celnet.datadump.util.EmailUtil;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* org配置表 服务实现类
*/
@Service
public class OrgConfigServiceImpl extends ServiceImpl<OrgConfigMapper, OrgConfig> implements OrgConfigService {
@Resource
private CustomMapper customerMapper;
@Override
public void verifyOrgConfig() {
boolean flag = true;
String sourceOrgUrl = null;
String targetOrgUrl = null;
try {
List<Map<String, Object>> poll = customerMapper.list("code,value","org_config",null);
//遍历poll,找出code值为SOURCE_ORG_URLSOURCE_ORG_USERNAMESOURCE_ORG_PASSWORD的value值
Map<String, String> map = new HashMap<>();
for (Map<String, Object> map1 : poll) {
if ("SOURCE_ORG_URL".equals(map1.get("code"))) {
map.put("url", (String) map1.get("value"));
sourceOrgUrl = map1.get("value").toString();
}
if ("SOURCE_ORG_USERNAME".equals(map1.get("code"))) {
map.put("username", (String) map1.get("value"));
}
if ("SOURCE_ORG_PASSWORD".equals(map1.get("code"))) {
map.put("password", String.valueOf(map1.get("value")));
}
}
String username = map.get("username");
ConnectorConfig config = new ConnectorConfig();
config.setUsername(username);
config.setPassword(map.get("password"));
String url = map.get("url");
config.setAuthEndpoint(url);
config.setServiceEndpoint(url);
config.setConnectionTimeout(60 * 60 * 1000);
config.setReadTimeout(60 * 60 * 1000);
PartnerConnection connection = new PartnerConnection(config);
String orgId = connection.getUserInfo().getOrganizationId();
} catch (ConnectionException e) {
String message = "源ORG连接配置错误,\n地址" + sourceOrgUrl;
String format = String.format("ORG连接异常, \ncause:\n%s", message);
EmailUtil.send("DataDump ERROR", format);
log.error("exception message", e);
flag = false;
}
try {
List<Map<String, Object>> poll = customerMapper.list("code,value","org_config",null);
//遍历poll,找出code值为TARGET_ORG_URLTARGET_ORG_USERNAMETARGET_ORG_PASSWORD的value值
Map<String, String> map = new HashMap<>();
for (Map<String, Object> map1 : poll) {
if ("TARGET_ORG_URL".equals(map1.get("code"))) {
map.put("url", (String) map1.get("value"));
targetOrgUrl = map1.get("value").toString();
}
if ("TARGET_ORG_USERNAME".equals(map1.get("code"))) {
map.put("username", (String) map1.get("value"));
}
if ("TARGET_ORG_PASSWORD".equals(map1.get("code"))) {
map.put("password", (String) map1.get("value"));
}
}
String username = map.get("username");
ConnectorConfig config = new ConnectorConfig();
config.setUsername(username);
config.setPassword(map.get("password"));
String url = map.get("url");
config.setAuthEndpoint(url);
config.setServiceEndpoint(url);
config.setConnectionTimeout(60 * 60 * 1000);
config.setReadTimeout(60 * 60 * 1000);
PartnerConnection connection = new PartnerConnection(config);
String orgId = connection.getUserInfo().getOrganizationId();
} catch (ConnectionException e) {
String message = "目标ORG连接配置错误,\n地址" + targetOrgUrl;
String format = String.format("ORG连接异常, \ncause:\n%s", message);
EmailUtil.send("DataDump ERROR", format);
log.error("exception message", e);
flag = false;
}
if (flag){
String message = "源ORG与目标ORG连接配置成功";
String format = String.format("ORG连接成功, \ncause:\n%s , \n源ORG地址:\n%s , \n目标ORG地址:\n%s", message,sourceOrgUrl,targetOrgUrl);
EmailUtil.send("DataDump ERROR", format);
}
}
}

View File

@ -1,5 +1,6 @@
package com.celnet.datadump.util;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.celnet.datadump.entity.DataField;
@ -30,6 +31,10 @@ import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@ -388,16 +393,33 @@ public class DataUtil {
case "datetime":
return date;
default:
return "初始"+System.currentTimeMillis();
return "初始"+ UUID.randomUUID();
}
}
public static Object fieldTypeBulkToSf(DataField dataField) throws ParseException {
switch (dataField.getSfType()) {
case "int":
return 1;
case "double":
case "currency":
case "percent":
return new BigDecimal("1");
case "boolean":
return true;
case "email":
return "kris.dong@163.com";
case "date":
case "datetime":
return "2020-01-01T00:00:00+00:00";
default:
return "初始"+ UUID.randomUUID();
}
}
public static Object localBulkDataToSfData(String fieldType, String data) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
Date date;
//date转Calendar类型
Calendar calendar = Calendar.getInstance();
switch (fieldType) {
case "int":
return Integer.parseInt(data);
@ -410,14 +432,9 @@ public class DataUtil {
case "date":
return data+"T08:00:00Z";
case "datetime":
try {
date = sd.parse(data);
}catch (ParseException e){
//解决当时间秒为0时转换秒精度丢失问题
date = sd.parse(data+":00");
}
calendar.setTime(date);
return calendar;
LocalDateTime localDateTime = LocalDateTime.parse(data, DateTimeFormatter.ISO_LOCAL_DATE_TIME);
ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.systemDefault());
return zonedDateTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
case "time":
return adjustHour(data);
default:

View File

@ -3,9 +3,9 @@ spring:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
# 携科
url: jdbc:mysql://127.0.0.1:3306/xieke?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
url: jdbc:mysql://127.0.0.1:3306/xieke?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: Celnet2025.QY
password: root
# cook
# url: jdbc:mysql://127.0.0.1:3306/cook_1?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
# username: root
@ -22,7 +22,7 @@ spring:
sf:
# 附件下载url
file-download-url: https://d2000000079c7eaa.lightning.force.com
file-upload-url: https://steco-process.lightning.sfcrmapps.cn
file-upload-url: https://steco-process.my.sfcrmproducts.cn
#线程数
executor-size: 5
list:

View File

@ -28,6 +28,10 @@
) COMMENT = '${tableComment}';
</update>
<update id="createField">
ALTER TABLE `${tableName}` ADD COLUMN `${fieldName}` varchar(255)
</update>
<select id="getIds" resultType="String">
SELECT
id