Compare commits

..

No commits in common. "2ef79ab15942739af9759750d5a6ca070fe56c9e" and "5ea0ff490b1705aae7fe05e0889336bdb14fe1e3" have entirely different histories.

2 changed files with 21 additions and 59 deletions

View File

@ -9,7 +9,10 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.celnet.datadump.config.SalesforceExecutor; import com.celnet.datadump.config.SalesforceExecutor;
import com.celnet.datadump.config.SalesforceTargetConnect; import com.celnet.datadump.config.SalesforceTargetConnect;
import com.celnet.datadump.entity.*; import com.celnet.datadump.entity.DataBatch;
import com.celnet.datadump.entity.DataBatchHistory;
import com.celnet.datadump.entity.DataField;
import com.celnet.datadump.entity.DataObject;
import com.celnet.datadump.global.SystemConfigCode; import com.celnet.datadump.global.SystemConfigCode;
import com.celnet.datadump.mapper.CustomMapper; import com.celnet.datadump.mapper.CustomMapper;
import com.celnet.datadump.param.DataDumpParam; import com.celnet.datadump.param.DataDumpParam;
@ -27,7 +30,6 @@ import com.sforce.soap.partner.sobject.SObject;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.DateUtil; import com.xxl.job.core.util.DateUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang.time.DateUtils;
@ -71,9 +73,6 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
@Autowired @Autowired
private DataBatchHistoryService dataBatchHistoryService; private DataBatchHistoryService dataBatchHistoryService;
@Autowired
private DataLogService dataLogService;
/** /**
* Insert入口 * Insert入口
@ -200,13 +199,13 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
if (count == 0) { if (count == 0) {
return; return;
} }
//批量插入10000一次 //批量插入2000一次
int page = count%10000 == 0 ? count/10000 : (count/10000) + 1; int page = count%2000 == 0 ? count/2000 : (count/2000) + 1;
//总插入数 //总插入数
int sfNum = 0; int sfNum = 0;
for (int i = 0; i < page; i++) { for (int i = 0; i < page; i++) {
List<JSONObject> data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 10000"); List<JSONObject> data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 2000");
int size = data.size(); int size = data.size();
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size); log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
@ -262,7 +261,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
// 转换为UTC时间并格式化 // 转换为UTC时间并格式化
LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter); LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter);
ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC")).minusHours(8) ; ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC"));
String convertedTime = utcDateTime.format(outputFormatter); String convertedTime = utcDateTime.format(outputFormatter);
@ -276,7 +275,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
ids[j-1] = data.get(j-1).get("Id").toString(); ids[j-1] = data.get(j-1).get("Id").toString();
insertList.add(account); insertList.add(account);
if (i*10000+j == count){ if (i*2000+j == count){
break; break;
} }
} }
@ -338,11 +337,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
for (int i = 0; i < resultCols; i++) { for (int i = 0; i < resultCols; i++) {
resultInfo.put(resultHeader.get(i), row.get(i)); resultInfo.put(resultHeader.get(i), row.get(i));
} }
boolean insertStatus = Boolean.valueOf(resultInfo.get("Success")); boolean success = Boolean.valueOf(resultInfo.get("Success"));
boolean created = Boolean.valueOf(resultInfo.get("Created")); boolean created = Boolean.valueOf(resultInfo.get("Created"));
String id = resultInfo.get("Id"); String id = resultInfo.get("Id");
String error = resultInfo.get("Error"); String error = resultInfo.get("Error");
if (insertStatus && created) { if (success && created) {
List<Map<String, Object>> maps = new ArrayList<>(); List<Map<String, Object>> maps = new ArrayList<>();
Map<String, Object> m = new HashMap<>(); Map<String, Object> m = new HashMap<>();
m.put("key", "new_id"); m.put("key", "new_id");
@ -351,15 +350,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
customMapper.updateById(api, maps, ids[index]); customMapper.updateById(api, maps, ids[index]);
index ++; index ++;
log.info("Created row with id " + id); log.info("Created row with id " + id);
} else if (!insertStatus) { } else if (!success) {
DataLog dataLog = new DataLog();
dataLog.setRequestData("BulkInsertapi:" + api);
dataLog.setEmailFlag(false);
dataLog.setEndTime(new Date());
dataLog.setStartTime(new Date());
dataLog.setRequestType("Insert");
dataLog.setErrorMessage(error);
dataLogService.save(dataLog);
log.info("Failed with error: " + error); log.info("Failed with error: " + error);
} }
} }
@ -497,10 +488,10 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
} }
// 总更新数 // 总更新数
int sfNum = 0; int sfNum = 0;
// 批量更新10000一次 // 批量更新2000一次
int page = count%10000 == 0 ? count/10000 : (count/10000) + 1; int page = count%2000 == 0 ? count/2000 : (count/2000) + 1;
for (int i = 0; i < page; i++) { for (int i = 0; i < page; i++) {
List<Map<String, Object>> mapList = customMapper.list("*", api, "new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' order by Id asc limit " + i * 10000 + ",10000"); List<Map<String, Object>> mapList = customMapper.list("*", api, "new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' order by Id asc limit " + i * 2000 + ",2000");
List<JSONObject> updateList = new ArrayList<>(); List<JSONObject> updateList = new ArrayList<>();
@ -555,7 +546,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos); BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos);
sfNum = sfNum + checkUpdateResults(bulkConnection, salesforceInsertJob, batchInfos,api); sfNum = sfNum + checkUpdateResults(bulkConnection, salesforceInsertJob, batchInfos);
} catch (Throwable e) { } catch (Throwable e) {
log.info(e.getMessage()); log.info(e.getMessage());
@ -583,7 +574,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
* 读写update结果 * 读写update结果
*/ */
public int checkUpdateResults(BulkConnection connection, JobInfo job, public int checkUpdateResults(BulkConnection connection, JobInfo job,
List<BatchInfo> batchInfoList,String api) List<BatchInfo> batchInfoList)
throws AsyncApiException, IOException { throws AsyncApiException, IOException {
int index = 0; int index = 0;
// batchInfoList was populated when batches were created and submitted // batchInfoList was populated when batches were created and submitted
@ -599,21 +590,13 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
for (int i = 0; i < resultCols; i++) { for (int i = 0; i < resultCols; i++) {
resultInfo.put(resultHeader.get(i), row.get(i)); resultInfo.put(resultHeader.get(i), row.get(i));
} }
boolean updateStatus = Boolean.valueOf(resultInfo.get("Success")); boolean success = Boolean.valueOf(resultInfo.get("Success"));
String id = resultInfo.get("Id"); String id = resultInfo.get("Id");
String error = resultInfo.get("Error"); String error = resultInfo.get("Error");
if (updateStatus) { if (success) {
index ++; index ++;
log.info("Update row with id " + id); log.info("Update row with id " + id);
} else { } else if (!success) {
DataLog dataLog = new DataLog();
dataLog.setRequestData("BulkUpdateapi:" + api);
dataLog.setEmailFlag(false);
dataLog.setEndTime(new Date());
dataLog.setStartTime(new Date());
dataLog.setRequestType("Update");
dataLog.setErrorMessage(error);
dataLogService.save(dataLog);
log.info("Failed with error: " + error); log.info("Failed with error: " + error);
} }
} }

View File

@ -85,12 +85,10 @@ public class DataUtil {
break; break;
case "date": case "date":
case "datetime": case "datetime":
case "time":
case "boolean": case "boolean":
result = type; result = type;
break; break;
case "time":
result = "time(3)";
break;
case "long": case "long":
case "int": case "int":
result = "int(" + length + ")"; result = "int(" + length + ")";
@ -422,30 +420,11 @@ public class DataUtil {
} }
calendar.setTime(date); calendar.setTime(date);
return calendar; return calendar;
case "time":
return adjustHour(data);
default: default:
return data; return data;
} }
} }
public static String adjustHour(String timeStr) {
// 提取小时部分并转换为整数
int hour = Integer.parseInt(timeStr.substring(0, 2));
// 减去8小时并处理跨天逻辑
int adjustedHour = hour - 8;
if (adjustedHour < 0) {
adjustedHour += 24; // 处理负数情况跨天
}
// 格式化为两位字符串自动补零
String newHour = String.format("%02d", adjustedHour);
// 拼接原始字符串的剩余部分分钟++毫秒
return newHour + timeStr.substring(2);
}
public static boolean isUpdate(String field){ public static boolean isUpdate(String field){
switch (field) { switch (field) {
case "LastModifiedDate": case "LastModifiedDate":