Compare commits
No commits in common. "2ef79ab15942739af9759750d5a6ca070fe56c9e" and "5ea0ff490b1705aae7fe05e0889336bdb14fe1e3" have entirely different histories.
2ef79ab159
...
5ea0ff490b
@ -9,7 +9,10 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|||||||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
||||||
import com.celnet.datadump.config.SalesforceExecutor;
|
import com.celnet.datadump.config.SalesforceExecutor;
|
||||||
import com.celnet.datadump.config.SalesforceTargetConnect;
|
import com.celnet.datadump.config.SalesforceTargetConnect;
|
||||||
import com.celnet.datadump.entity.*;
|
import com.celnet.datadump.entity.DataBatch;
|
||||||
|
import com.celnet.datadump.entity.DataBatchHistory;
|
||||||
|
import com.celnet.datadump.entity.DataField;
|
||||||
|
import com.celnet.datadump.entity.DataObject;
|
||||||
import com.celnet.datadump.global.SystemConfigCode;
|
import com.celnet.datadump.global.SystemConfigCode;
|
||||||
import com.celnet.datadump.mapper.CustomMapper;
|
import com.celnet.datadump.mapper.CustomMapper;
|
||||||
import com.celnet.datadump.param.DataDumpParam;
|
import com.celnet.datadump.param.DataDumpParam;
|
||||||
@ -27,7 +30,6 @@ import com.sforce.soap.partner.sobject.SObject;
|
|||||||
import com.xxl.job.core.biz.model.ReturnT;
|
import com.xxl.job.core.biz.model.ReturnT;
|
||||||
import com.xxl.job.core.log.XxlJobLogger;
|
import com.xxl.job.core.log.XxlJobLogger;
|
||||||
import com.xxl.job.core.util.DateUtil;
|
import com.xxl.job.core.util.DateUtil;
|
||||||
import lombok.Data;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.commons.lang.time.DateUtils;
|
import org.apache.commons.lang.time.DateUtils;
|
||||||
@ -71,9 +73,6 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private DataBatchHistoryService dataBatchHistoryService;
|
private DataBatchHistoryService dataBatchHistoryService;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private DataLogService dataLogService;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Insert入口
|
* Insert入口
|
||||||
@ -200,13 +199,13 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
|||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
//批量插入10000一次
|
//批量插入2000一次
|
||||||
int page = count%10000 == 0 ? count/10000 : (count/10000) + 1;
|
int page = count%2000 == 0 ? count/2000 : (count/2000) + 1;
|
||||||
//总插入数
|
//总插入数
|
||||||
int sfNum = 0;
|
int sfNum = 0;
|
||||||
for (int i = 0; i < page; i++) {
|
for (int i = 0; i < page; i++) {
|
||||||
|
|
||||||
List<JSONObject> data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 10000");
|
List<JSONObject> data = customMapper.listJsonObject("*", api, "new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' limit 2000");
|
||||||
int size = data.size();
|
int size = data.size();
|
||||||
|
|
||||||
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
|
log.info("执行api:{}, 执行page:{}, 执行size:{}", api, i+1, size);
|
||||||
@ -262,7 +261,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
|||||||
// 转换为UTC时间并格式化
|
// 转换为UTC时间并格式化
|
||||||
LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter);
|
LocalDateTime localDateTime = LocalDateTime.parse(String.valueOf(data.get(j - 1).get("CreatedDate")), inputFormatter);
|
||||||
|
|
||||||
ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC")).minusHours(8) ;
|
ZonedDateTime utcDateTime = localDateTime.atZone(ZoneId.of("UTC"));
|
||||||
|
|
||||||
String convertedTime = utcDateTime.format(outputFormatter);
|
String convertedTime = utcDateTime.format(outputFormatter);
|
||||||
|
|
||||||
@ -276,7 +275,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
|||||||
|
|
||||||
ids[j-1] = data.get(j-1).get("Id").toString();
|
ids[j-1] = data.get(j-1).get("Id").toString();
|
||||||
insertList.add(account);
|
insertList.add(account);
|
||||||
if (i*10000+j == count){
|
if (i*2000+j == count){
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -338,11 +337,11 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
|||||||
for (int i = 0; i < resultCols; i++) {
|
for (int i = 0; i < resultCols; i++) {
|
||||||
resultInfo.put(resultHeader.get(i), row.get(i));
|
resultInfo.put(resultHeader.get(i), row.get(i));
|
||||||
}
|
}
|
||||||
boolean insertStatus = Boolean.valueOf(resultInfo.get("Success"));
|
boolean success = Boolean.valueOf(resultInfo.get("Success"));
|
||||||
boolean created = Boolean.valueOf(resultInfo.get("Created"));
|
boolean created = Boolean.valueOf(resultInfo.get("Created"));
|
||||||
String id = resultInfo.get("Id");
|
String id = resultInfo.get("Id");
|
||||||
String error = resultInfo.get("Error");
|
String error = resultInfo.get("Error");
|
||||||
if (insertStatus && created) {
|
if (success && created) {
|
||||||
List<Map<String, Object>> maps = new ArrayList<>();
|
List<Map<String, Object>> maps = new ArrayList<>();
|
||||||
Map<String, Object> m = new HashMap<>();
|
Map<String, Object> m = new HashMap<>();
|
||||||
m.put("key", "new_id");
|
m.put("key", "new_id");
|
||||||
@ -351,15 +350,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
|||||||
customMapper.updateById(api, maps, ids[index]);
|
customMapper.updateById(api, maps, ids[index]);
|
||||||
index ++;
|
index ++;
|
||||||
log.info("Created row with id " + id);
|
log.info("Created row with id " + id);
|
||||||
} else if (!insertStatus) {
|
} else if (!success) {
|
||||||
DataLog dataLog = new DataLog();
|
|
||||||
dataLog.setRequestData("BulkInsert,api:" + api);
|
|
||||||
dataLog.setEmailFlag(false);
|
|
||||||
dataLog.setEndTime(new Date());
|
|
||||||
dataLog.setStartTime(new Date());
|
|
||||||
dataLog.setRequestType("Insert");
|
|
||||||
dataLog.setErrorMessage(error);
|
|
||||||
dataLogService.save(dataLog);
|
|
||||||
log.info("Failed with error: " + error);
|
log.info("Failed with error: " + error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -497,10 +488,10 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
|||||||
}
|
}
|
||||||
// 总更新数
|
// 总更新数
|
||||||
int sfNum = 0;
|
int sfNum = 0;
|
||||||
// 批量更新10000一次
|
// 批量更新2000一次
|
||||||
int page = count%10000 == 0 ? count/10000 : (count/10000) + 1;
|
int page = count%2000 == 0 ? count/2000 : (count/2000) + 1;
|
||||||
for (int i = 0; i < page; i++) {
|
for (int i = 0; i < page; i++) {
|
||||||
List<Map<String, Object>> mapList = customMapper.list("*", api, "new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' order by Id asc limit " + i * 10000 + ",10000");
|
List<Map<String, Object>> mapList = customMapper.list("*", api, "new_id is not null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + "' order by Id asc limit " + i * 2000 + ",2000");
|
||||||
|
|
||||||
List<JSONObject> updateList = new ArrayList<>();
|
List<JSONObject> updateList = new ArrayList<>();
|
||||||
|
|
||||||
@ -555,7 +546,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
|||||||
|
|
||||||
BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos);
|
BulkUtil.awaitCompletion(bulkConnection, salesforceInsertJob, batchInfos);
|
||||||
|
|
||||||
sfNum = sfNum + checkUpdateResults(bulkConnection, salesforceInsertJob, batchInfos,api);
|
sfNum = sfNum + checkUpdateResults(bulkConnection, salesforceInsertJob, batchInfos);
|
||||||
|
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.info(e.getMessage());
|
log.info(e.getMessage());
|
||||||
@ -583,7 +574,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
|||||||
* 读写update结果
|
* 读写update结果
|
||||||
*/
|
*/
|
||||||
public int checkUpdateResults(BulkConnection connection, JobInfo job,
|
public int checkUpdateResults(BulkConnection connection, JobInfo job,
|
||||||
List<BatchInfo> batchInfoList,String api)
|
List<BatchInfo> batchInfoList)
|
||||||
throws AsyncApiException, IOException {
|
throws AsyncApiException, IOException {
|
||||||
int index = 0;
|
int index = 0;
|
||||||
// batchInfoList was populated when batches were created and submitted
|
// batchInfoList was populated when batches were created and submitted
|
||||||
@ -599,21 +590,13 @@ public class DataImportBatchServiceImpl implements DataImportBatchService {
|
|||||||
for (int i = 0; i < resultCols; i++) {
|
for (int i = 0; i < resultCols; i++) {
|
||||||
resultInfo.put(resultHeader.get(i), row.get(i));
|
resultInfo.put(resultHeader.get(i), row.get(i));
|
||||||
}
|
}
|
||||||
boolean updateStatus = Boolean.valueOf(resultInfo.get("Success"));
|
boolean success = Boolean.valueOf(resultInfo.get("Success"));
|
||||||
String id = resultInfo.get("Id");
|
String id = resultInfo.get("Id");
|
||||||
String error = resultInfo.get("Error");
|
String error = resultInfo.get("Error");
|
||||||
if (updateStatus) {
|
if (success) {
|
||||||
index ++;
|
index ++;
|
||||||
log.info("Update row with id " + id);
|
log.info("Update row with id " + id);
|
||||||
} else {
|
} else if (!success) {
|
||||||
DataLog dataLog = new DataLog();
|
|
||||||
dataLog.setRequestData("BulkUpdate,api:" + api);
|
|
||||||
dataLog.setEmailFlag(false);
|
|
||||||
dataLog.setEndTime(new Date());
|
|
||||||
dataLog.setStartTime(new Date());
|
|
||||||
dataLog.setRequestType("Update");
|
|
||||||
dataLog.setErrorMessage(error);
|
|
||||||
dataLogService.save(dataLog);
|
|
||||||
log.info("Failed with error: " + error);
|
log.info("Failed with error: " + error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,12 +85,10 @@ public class DataUtil {
|
|||||||
break;
|
break;
|
||||||
case "date":
|
case "date":
|
||||||
case "datetime":
|
case "datetime":
|
||||||
|
case "time":
|
||||||
case "boolean":
|
case "boolean":
|
||||||
result = type;
|
result = type;
|
||||||
break;
|
break;
|
||||||
case "time":
|
|
||||||
result = "time(3)";
|
|
||||||
break;
|
|
||||||
case "long":
|
case "long":
|
||||||
case "int":
|
case "int":
|
||||||
result = "int(" + length + ")";
|
result = "int(" + length + ")";
|
||||||
@ -422,30 +420,11 @@ public class DataUtil {
|
|||||||
}
|
}
|
||||||
calendar.setTime(date);
|
calendar.setTime(date);
|
||||||
return calendar;
|
return calendar;
|
||||||
case "time":
|
|
||||||
return adjustHour(data);
|
|
||||||
default:
|
default:
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String adjustHour(String timeStr) {
|
|
||||||
// 提取小时部分并转换为整数
|
|
||||||
int hour = Integer.parseInt(timeStr.substring(0, 2));
|
|
||||||
|
|
||||||
// 减去8小时并处理跨天逻辑
|
|
||||||
int adjustedHour = hour - 8;
|
|
||||||
if (adjustedHour < 0) {
|
|
||||||
adjustedHour += 24; // 处理负数情况(跨天)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 格式化为两位字符串(自动补零)
|
|
||||||
String newHour = String.format("%02d", adjustedHour);
|
|
||||||
|
|
||||||
// 拼接原始字符串的剩余部分(分钟+秒+毫秒)
|
|
||||||
return newHour + timeStr.substring(2);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean isUpdate(String field){
|
public static boolean isUpdate(String field){
|
||||||
switch (field) {
|
switch (field) {
|
||||||
case "LastModifiedDate":
|
case "LastModifiedDate":
|
||||||
|
Loading…
Reference in New Issue
Block a user