【feat】增加邮件发送错误日志
This commit is contained in:
parent
996e977aa3
commit
51b4ebde80
@ -3,6 +3,7 @@ package com.celnet.datadump.config;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.celnet.datadump.mapper.CustomMapper;
|
||||
import com.celnet.datadump.util.BulkUtil;
|
||||
import com.celnet.datadump.util.EmailUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.sforce.async.BulkConnection;
|
||||
import com.sforce.soap.partner.PartnerConnection;
|
||||
@ -63,6 +64,9 @@ public class SalesforceConnect {
|
||||
config.setReadTimeout(60 * 60 * 1000);
|
||||
return new PartnerConnection(config);
|
||||
} catch (ConnectionException e) {
|
||||
String message = "源ORG连接配置错误!";
|
||||
String format = String.format("ORG连接异常!, \ncause:\n%s", message);
|
||||
EmailUtil.send("DataDump ERROR", format);
|
||||
log.error("exception message", e);
|
||||
return null;
|
||||
}
|
||||
@ -103,6 +107,9 @@ public class SalesforceConnect {
|
||||
// config.setSessionId(connection.getSessionHeader().getSessionId());
|
||||
return BulkUtil.getBulkConnection(map.get("username"),map.get("password"),map.get("url"));
|
||||
} catch (Exception e) {
|
||||
String message = "源ORG连接配置错误!";
|
||||
String format = String.format("ORG连接异常!, \ncause:\n%s", message);
|
||||
EmailUtil.send("DataDump ERROR", format);
|
||||
log.error("exception message", e);
|
||||
return null;
|
||||
}
|
||||
|
@ -1,7 +1,9 @@
|
||||
package com.celnet.datadump.config;
|
||||
|
||||
import com.celnet.datadump.mapper.CustomMapper;
|
||||
import com.celnet.datadump.param.DataDumpParam;
|
||||
import com.celnet.datadump.util.BulkUtil;
|
||||
import com.celnet.datadump.util.EmailUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.sforce.async.BulkConnection;
|
||||
import com.sforce.soap.partner.PartnerConnection;
|
||||
@ -58,6 +60,9 @@ public class SalesforceTargetConnect {
|
||||
String orgId = connection.getUserInfo().getOrganizationId();
|
||||
return connection;
|
||||
} catch (ConnectionException e) {
|
||||
String message = "目标ORG连接配置错误!";
|
||||
String format = String.format("ORG连接异常!, \ncause:\n%s", message);
|
||||
EmailUtil.send("DataDump ERROR", format);
|
||||
log.error("exception message", e);
|
||||
return null;
|
||||
}
|
||||
@ -99,6 +104,9 @@ public class SalesforceTargetConnect {
|
||||
// config.setSessionId(connection.getSessionHeader().getSessionId());
|
||||
return BulkUtil.getBulkConnection(map.get("username"),map.get("password"),map.get("url"));
|
||||
} catch (Exception e) {
|
||||
String message = "目标ORG连接配置错误!";
|
||||
String format = String.format("ORG连接异常!, \ncause:\n%s", message);
|
||||
EmailUtil.send("DataDump ERROR", format);
|
||||
log.error("exception message", e);
|
||||
return null;
|
||||
}
|
||||
|
@ -140,8 +140,8 @@ public class DataDumpNewJob {
|
||||
* @return result
|
||||
*/
|
||||
@XxlJob("uploadDocumentLinkJob")
|
||||
public ReturnT<String> pullDocumentLinkJob(String paramStr) throws Exception{
|
||||
log.info("pullDocumentLinkJob execute start ..................");
|
||||
public ReturnT<String> uploadDocumentLinkJob(String paramStr) throws Exception{
|
||||
log.info("uploadDocumentLinkJob execute start ..................");
|
||||
|
||||
return dataImportNewService.uploadDocumentLinkJob(paramStr);
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
@ -73,9 +74,6 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
@Autowired
|
||||
private DataBatchHistoryService dataBatchHistoryService;
|
||||
|
||||
@Autowired
|
||||
private DataLogService dataLogService;
|
||||
|
||||
|
||||
/**
|
||||
* Insert入口
|
||||
@ -119,7 +117,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
List<DataObject> list = dataObjectService.list(qw);
|
||||
if (CollectionUtils.isNotEmpty(list)) {
|
||||
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
|
||||
return new ReturnT<>(500, "api:" + apiNames + " is locked");
|
||||
String message = "api:" + apiNames + " is locked";
|
||||
log.info(message);
|
||||
String format = String.format("数据Insert error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
|
||||
EmailUtil.send("DataDump ERROR", format);
|
||||
return new ReturnT<>(500, message);
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,6 +249,10 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
//如果必填lookup字段没有值,跳过
|
||||
update.setDataIndex(Integer.parseInt(map.get("data_index").toString()+1));
|
||||
dataObjectService.updateById(update);
|
||||
String message = "api:" + api + "的引用对象:" + reference + "不存在数据!";
|
||||
String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
|
||||
EmailUtil.send("DataDump ERROR", format);
|
||||
log.info(message);
|
||||
return;
|
||||
}else{
|
||||
account.put(dataField.getField(), referenceMap.get(0).get("new_id"));
|
||||
@ -298,8 +304,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
|
||||
sfNum = sfNum + checkInsertResults(bulkConnection, salesforceInsertJob, batchInfos, api, ids);
|
||||
|
||||
// Files.delete(Paths.get(fullPath));
|
||||
|
||||
new File(fullPath).delete();
|
||||
} catch (Exception e) {
|
||||
log.error("manualCreatedNewId error api:{}", api, e);
|
||||
throw e;
|
||||
@ -354,16 +359,9 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
maps.add(m);
|
||||
customMapper.updateById(api, maps, ids[index]);
|
||||
index ++;
|
||||
log.info("Created row with id " + id);
|
||||
log.info("Created Success row with id " + id);
|
||||
} else if (!insertStatus) {
|
||||
// DataLog dataLog = new DataLog();
|
||||
// dataLog.setRequestData("BulkInsert,api:" + api);
|
||||
// dataLog.setEndTime(new Date());
|
||||
// dataLog.setStartTime(new Date());
|
||||
// dataLog.setRequestType("Insert");
|
||||
// dataLog.setMessage(error);
|
||||
// dataLogService.save(dataLog);
|
||||
log.info("Failed with error: " + error);
|
||||
log.info("Created Fail with error: " + error);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -419,7 +417,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
List<DataObject> list = dataObjectService.list(qw);
|
||||
if (CollectionUtils.isNotEmpty(list)) {
|
||||
String apiNames = list.stream().map(DataObject::getName).collect(Collectors.joining());
|
||||
return new ReturnT<>(500, "api:" + apiNames + " is locked");
|
||||
String message = "api:" + apiNames + " is locked";
|
||||
log.info(message);
|
||||
String format = String.format("数据Update error, api name: %s, \nparam: %s, \ncause:\n%s", apiNames, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
|
||||
EmailUtil.send("DataDump ERROR", format);
|
||||
return new ReturnT<>(500, message);
|
||||
}
|
||||
}
|
||||
|
||||
@ -462,11 +464,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
}
|
||||
// 等待当前所有线程执行完成
|
||||
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
|
||||
update.setDataWork(0);
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
} finally {
|
||||
if (isFull) {
|
||||
update.setNeedUpdate(false);
|
||||
update.setName(api);
|
||||
update.setDataLock(0);
|
||||
dataObjectService.updateById(update);
|
||||
@ -498,6 +500,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
if(count == 0){
|
||||
return;
|
||||
}
|
||||
|
||||
//判断引用对象是否存在new_id
|
||||
DataObject update = new DataObject();
|
||||
update.setName(api);
|
||||
|
||||
// 总更新数
|
||||
int sfNum = 0;
|
||||
// 批量更新10000一次
|
||||
@ -530,6 +537,18 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
Map<String, Object> m = customMapper.getById("new_id", reference_to, String.valueOf(map.get(field)));
|
||||
if (m != null && !m.isEmpty()) {
|
||||
account.put(field, m.get("new_id"));
|
||||
}else {
|
||||
QueryWrapper<DataObject> maxIndex = new QueryWrapper<>();
|
||||
maxIndex.select("IFNULL(max(data_index),0) as data_index");
|
||||
maxIndex.ne("name", api);
|
||||
Map<String, Object> mapTo = dataObjectService.getMap(maxIndex);
|
||||
//如果必填lookup字段没有值,跳过
|
||||
update.setDataIndex(Integer.parseInt(mapTo.get("data_index").toString()+1));
|
||||
dataObjectService.updateById(update);
|
||||
String message = "对象类型:" + api + "的数据:"+ m.get("Id") +"的引用对象:" + dataField.getReferenceTo() + "的数据:"+ map.get(field) +"不存在!"; String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
|
||||
EmailUtil.send("DataDump ERROR", format);
|
||||
log.info(message);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -560,8 +579,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
|
||||
sfNum = sfNum + checkUpdateResults(bulkConnection, salesforceInsertJob, batchInfos,api);
|
||||
|
||||
// Files.delete(Paths.get(fullPath));
|
||||
|
||||
new File(fullPath).delete();
|
||||
} catch (Throwable e) {
|
||||
log.info(e.getMessage());
|
||||
throw e;
|
||||
@ -609,16 +627,9 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
||||
String error = resultInfo.get("Error");
|
||||
if (updateStatus) {
|
||||
index ++;
|
||||
log.info("Update row with id " + id);
|
||||
log.info("Update Success row with id " + id);
|
||||
} else {
|
||||
// DataLog dataLog = new DataLog();
|
||||
// dataLog.setRequestData("BulkUpdate,api:" + api);
|
||||
// dataLog.setEndTime(new Date());
|
||||
// dataLog.setStartTime(new Date());
|
||||
// dataLog.setRequestType("Update");
|
||||
// dataLog.setMessage(error);
|
||||
// dataLogService.save(dataLog);
|
||||
log.info("Failed with error: " + error);
|
||||
log.info("Update Fail with error: " + error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -723,6 +723,10 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
||||
return;
|
||||
}
|
||||
|
||||
//判断引用对象是否存在new_id
|
||||
DataObject update = new DataObject();
|
||||
update.setName(api);
|
||||
|
||||
int targetCount = 0;
|
||||
//批量插入200一次
|
||||
int page = count%200 == 0 ? count/200 : (count/200) + 1;
|
||||
@ -753,6 +757,18 @@ public class DataImportNewServiceImpl implements DataImportNewService {
|
||||
Map<String, Object> m = customMapper.getById("new_id", reference_to, String.valueOf(map.get(field)));
|
||||
if (m != null && !m.isEmpty()) {
|
||||
account.setField(field, m.get("new_id"));
|
||||
}else {
|
||||
QueryWrapper<DataObject> maxIndex = new QueryWrapper<>();
|
||||
maxIndex.select("IFNULL(max(data_index),0) as data_index");
|
||||
maxIndex.ne("name", api);
|
||||
Map<String, Object> mapTo = dataObjectService.getMap(maxIndex);
|
||||
//如果必填lookup字段没有值,跳过
|
||||
update.setDataIndex(Integer.parseInt(mapTo.get("data_index").toString()) +1);
|
||||
dataObjectService.updateById(update);
|
||||
String message = "对象类型:" + api + "的数据:"+ m.get("Id") +"的引用对象:" + dataField.getReferenceTo() + "的数据:"+ map.get(field) +"不存在!"; String format = String.format("数据导入 error, api name: %s, \nparam: %s, \ncause:\n%s", api, com.alibaba.fastjson2.JSON.toJSONString(param, DataDumpParam.getFilter()), message);
|
||||
EmailUtil.send("DataDump ERROR", format);
|
||||
log.info(message);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -352,10 +352,10 @@ public class DataImportServiceImpl implements DataImportService {
|
||||
account.setField("EventSubtype", String.valueOf(data.get(j - 1).get("EventSubtype")));
|
||||
// account.setField("IsRecurrence", String.valueOf(data.get(j - 1).get("IsRecurrence")));
|
||||
}
|
||||
if (api.equals("Account")){
|
||||
Map<String, Object> referenceMap = customMapper.list("new_id","RecordType", "new_id is not null and id = '"+ data.get(j - 1).get("RecordTypeId")+"' limit 1").get(0);
|
||||
account.setField("RecordTypeId", referenceMap.get("new_id") );
|
||||
}
|
||||
// if (api.equals("Account")){
|
||||
// Map<String, Object> referenceMap = customMapper.list("new_id","RecordType", "new_id is not null and id = '"+ data.get(j - 1).get("RecordTypeId")+"' limit 1").get(0);
|
||||
// account.setField("RecordTypeId", referenceMap.get("new_id") );
|
||||
// }
|
||||
if (api.equals("vlink__Wechat_User__c")){
|
||||
List<Map<String, Object>> maps = customMapper.list("new_id", "vlink__Wechat_Account__c", "new_id is not null and id = '" + data.get(j - 1).get("vlink__Wechat_Account__c") + "' limit 1");
|
||||
if (!maps.isEmpty()){
|
||||
|
@ -38,7 +38,6 @@ public class DataDumpConnetTests {
|
||||
@Test
|
||||
public void createConnect() throws Exception {
|
||||
try {
|
||||
List<Map<String, Object>> poll = customerMapper.list("code,value", "org_config", null);
|
||||
// //遍历poll,找出code值为TARGET_ORG_URL,TARGET_ORG_USERNAME,TARGET_ORG_PASSWORD的value值
|
||||
Map<String, String> map = new HashMap<>();
|
||||
// Map<String, Object> map = new HashMap<>();
|
||||
@ -57,9 +56,9 @@ public class DataDumpConnetTests {
|
||||
// }
|
||||
// }
|
||||
//遍历poll,找出code值为TARGET_ORG_URL,TARGET_ORG_USERNAME,TARGET_ORG_PASSWORD的value值
|
||||
map.put("url", "https://cookchina--sandbox.sandbox.my.sfcrmproducts.cn/services/Soap/u/56.0");
|
||||
map.put("username", "cong.chen@cookmedicalasia.com.sandbox");
|
||||
map.put("password", "cook202504");
|
||||
map.put("url", "https://steco-process.my.sfcrmproducts.cn/services/Soap/u/56.0");
|
||||
map.put("username", "binxu@steco-process.com");
|
||||
map.put("password", "AAM0902!");
|
||||
String username = map.get("username").toString();
|
||||
ConnectorConfig config = new ConnectorConfig();
|
||||
config.setUsername(username);
|
||||
|
Loading…
Reference in New Issue
Block a user