【feat】 文件上传任务优化

(cherry picked from commit 53cc4ac124)
This commit is contained in:
Kris 2025-07-25 11:37:02 +08:00
parent 7893f8fdb3
commit 6b2daf35ca
3 changed files with 382 additions and 43 deletions

View File

@ -272,11 +272,7 @@ public class DataDumpNewJob {
log.error("api:{} does not have blob field", api); log.error("api:{} does not have blob field", api);
XxlJobLogger.log("api:{} does not have blob field", api); XxlJobLogger.log("api:{} does not have blob field", api);
} }
if ("Attachment".equals(api)){ return dataImportNewService.uploadFileNew(param,dataObject);
dataImportNewService.uploadFileToAttachmentNew(param,dataObject);
}else {
dataImportNewService.uploadFileNew(param,dataObject);
}
} }
return ReturnT.SUCCESS; return ReturnT.SUCCESS;
} }

View File

@ -37,9 +37,7 @@ public interface DataImportNewService {
ReturnT<String> dumpFileNew(SalesforceParam param) throws Exception; ReturnT<String> dumpFileNew(SalesforceParam param) throws Exception;
void uploadFileToAttachmentNew(SalesforceParam param, DataObject dataObject) throws Exception; ReturnT<String> uploadFileNew(SalesforceParam param, DataObject dataObject) throws Exception;
void uploadFileNew(SalesforceParam param, DataObject dataObject) throws Exception;
} }

View File

@ -33,13 +33,18 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.File; import java.io.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
@ -1116,19 +1121,6 @@ public class DataImportNewServiceImpl implements DataImportNewService {
return ReturnT.SUCCESS; return ReturnT.SUCCESS;
} }
@Override
public void uploadFileToAttachmentNew(SalesforceParam param, DataObject dataObject) throws Exception {
}
@Override
public void uploadFileNew(SalesforceParam param, DataObject dataObject) throws Exception {
}
/** /**
* 下载文件 * 下载文件
@ -1200,10 +1192,8 @@ public class DataImportNewServiceImpl implements DataImportNewService {
case OSS: case OSS:
// 上传到oss // 上传到oss
OssUtil.upload(inputStream, filePath); OssUtil.upload(inputStream, filePath);
break;
case SERVER: case SERVER:
dumpToServer(headers, id, filePath, url, response, inputStream); dumpToServer(headers, id, filePath, url, response, inputStream);
break;
default: default:
log.error("id: {}, no mapping dump type", id); log.error("id: {}, no mapping dump type", id);
} }
@ -1239,24 +1229,8 @@ public class DataImportNewServiceImpl implements DataImportNewService {
} }
} }
} }
} }
/**
* 获取附件名称
*
* @param api 表明
* @return name
*/
private static String getName(String api) {
// 默认name
String name = "Name";
// contentVersion使用PathOnClient
if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) {
name = "PathOnClient";
}
return name;
}
/** /**
* 下载文件到服务器 * 下载文件到服务器
@ -1303,4 +1277,375 @@ public class DataImportNewServiceImpl implements DataImportNewService {
accessFile.close(); accessFile.close();
} }
@Override
public ReturnT<String> uploadFileNew(SalesforceParam param, DataObject dataObject) throws Exception {
String uploadUrl = null;
List<Map<String, Object>> poll = customMapper.list("code,value","org_config",null);
for (Map<String, Object> map1 : poll) {
if ("FILE_UPLOAD_URL".equals(map1.get("code"))) {
uploadUrl = (String) map1.get("value");
}
}
if (StringUtils.isBlank(uploadUrl)) {
EmailUtil.send("UploadFile ERROR", "文件上传失败!上传地址未配置");
return ReturnT.FAIL;
}
if (StringUtils.isEmpty(dataObject.getBlobField())){
log.info("没有对象存在文件二进制字段!不进行文件上传");
}
PartnerConnection connect = salesforceConnect.createConnect();
List<Future<?>> futures = Lists.newArrayList();
DataObject update = new DataObject();
log.info("dump file api:{}, field:{}", dataObject.getName(), dataObject.getBlobField());
try {
String api = dataObject.getName();
update.setName(dataObject.getName());
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
String finalDownloadUrl = uploadUrl;
Future<?> future = salesforceExecutor.execute(() -> {
try {
if ("Attachment".equals(dataObject.getName())){
uploadAttachment(salesforceParam, connect, finalDownloadUrl,dataObject.getName());
}else {
uploadFile(salesforceParam, connect, finalDownloadUrl,dataObject.getName());
}
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
} catch (InterruptedException e) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
throw e;
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
update.setDataLock(0);
dataObjectService.updateById(update);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
futures.clear();
return ReturnT.SUCCESS;
}
/**
* 上传Attachment
*/
private void uploadAttachment(SalesforceParam param, PartnerConnection connection ,String uploadUrl, String api) throws Exception {
String extraSql = "";
if (dataFieldService.hasDeleted(api)) {
extraSql += " AND IsDeleted = false ";
}
Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate();
String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
String token = connection.getSessionHeader().getSessionId();
String name = "Name";
if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) {
name = "Title,FileExtension";
}
Map<String, String> headers = Maps.newHashMap();
headers.put("Authorization", "Bearer " + token);
headers.put("connection", "keep-alive");
Integer count = customMapper.countBySQL(api, "where is_dump = 1 and is_upload = 0 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql );
log.error("总文件数 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
if (count == 0) {
return;
}
int page = count%200 == 0 ? count/200 : (count/200) + 1;
//总插入数
for (int i = 0; i < page; i++) {
// 获取未存储的附件id
List<Map<String, Object>> list = customMapper.list("Id, " + name, api, " is_dump = 1 and is_upload = 0 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql + "limit " + i * 200 + ",200");
for (Map<String, Object> map : list) {
String id = null;
// 上传完毕 更新附件信息
List<Map<String, Object>> maps = Lists.newArrayList();
boolean isUpload = true;
CloseableHttpResponse response = null;
String respContent = null;
try {
id = (String) map.get("Id");
String url_fileName = (String) map.get("url");
String parentId = (String) map.get("ParentId");
String parentType = (String) map.get("Parent_type");
// 判断路径是否为空
if (StringUtils.isNotBlank(url_fileName)) {
String filePath = Const.SERVER_FILE_PATH + "/" + url_fileName;
File file = new File(filePath);
boolean exists = file.exists();
if (!exists) {
log.info("文件不存在");
}
String fileName = (String) map.get("Name");
// dataObject查询
QueryWrapper<DataObject> qw = new QueryWrapper<>();
qw.eq("name", parentType);
List<DataObject> objects = dataObjectService.list(qw);
if (objects.isEmpty()) {
log.info("关联对象不存在: {}", parentType);
}
Map<String, Object> lMap = customMapper.getById("new_id",parentType, parentId);
// 拼接url
String url = uploadUrl + String.format(Const.SF_UPLOAD_FILE_URL, api);
HttpPost httpPost = new HttpPost(url);
httpPost.setHeader("Authorization", "Bearer " + token);
httpPost.setHeader("connection", "keep-alive");
com.alibaba.fastjson.JSONObject credentialsJsonParam = new com.alibaba.fastjson.JSONObject();
credentialsJsonParam.put("parentId", lMap.get("new_id"));
credentialsJsonParam.put("Name", fileName);
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
builder.addTextBody("data", credentialsJsonParam.toJSONString(), ContentType.APPLICATION_JSON);
builder.addBinaryBody("Body", new FileInputStream(file), ContentType.APPLICATION_OCTET_STREAM, fileName);
HttpEntity entity = builder.build();
httpPost.setEntity(entity);
CloseableHttpClient httpClient = HttpClients.createDefault();
response = httpClient.execute(httpPost);
if (response != null && response.getStatusLine() != null && response.getStatusLine().getStatusCode() < 400) {
HttpEntity he = response.getEntity();
if (he != null) {
respContent = EntityUtils.toString(he, "UTF-8");
String newId = com.alibaba.fastjson.JSONObject.parseObject(respContent).get("id").toString();
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "new_id");
paramMap.put("value", newId);
maps.add(paramMap);
}
} else {
throw new RuntimeException();
}
}
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_upload");
paramMap.put("value", isUpload);
maps.add(paramMap);
customMapper.updateById(api, maps, id);
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException interruptedException){
return;
} catch (Exception e) {
if (response != null) {
try {
response.close();
} catch (IOException re) {
log.error("exception message", re);
}
}
log.error("文件上传失败!, id: {}, 错误信息:{}", id ,e.getMessage());
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_upload");
paramMap.put("value", 2);
maps.add(paramMap);
customMapper.updateById(api, maps, id);
EmailUtil.send("File Upload ERROR", "文件上传失败!, id: "+ id + ",错误信息:" + e.getMessage());
}
}
}
}
/**
* 上传文件
*/
private void uploadFile(SalesforceParam param, PartnerConnection connection ,String uploadUrl, String api) throws Exception {
String extraSql = "";
if (dataFieldService.hasDeleted(api)) {
extraSql += " AND IsDeleted = false ";
}
Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate();
String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
String token = connection.getSessionHeader().getSessionId();
String name = "Name";
if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) {
name = "Title,FileExtension";
}
Map<String, String> headers = Maps.newHashMap();
headers.put("Authorization", "Bearer " + token);
headers.put("connection", "keep-alive");
Integer count = customMapper.countBySQL("ContentDocument", "where new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql );
log.error("总文件数 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
if (count == 0) {
return;
}
int page = count%200 == 0 ? count/200 : (count/200) + 1;
//总插入数
for (int i = 0; i < page; i++) {
// 获取未存储的附件id
List<Map<String, Object>> documentList = customMapper.list("Id, " + name, "ContentDocument", " new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql + "limit " + i * 200 + ",200");
for (Map<String, Object> documentMap : documentList) {
String documentId = (String) documentMap.get("Id");
// 获取未存储的附件id
List<Map<String, Object>> list = customMapper.list("Id, url, PathOnClient, Title, ContentDocumentId, VersionNumber", api, "ContentDocumentId = '" + documentId + "' and new_id is null ORDER BY VersionNumber ASC");
if (CollectionUtils.isEmpty(list)) {
continue;
}
String finalUploadUrl = uploadUrl;
String newDocumentId = null;
for (Map<String, Object> map : list) {
String id = null;
// 上传完毕 更新附件信息
List<Map<String, Object>> maps = Lists.newArrayList();
boolean isUpload = true;
int failCount = 0;
CloseableHttpResponse response = null;
String respContent = null;
try {
id = (String) map.get("Id");
String url_fileName = (String) map.get("url");
String fileName = (String) map.get("PathOnClient");
String title = (String) map.get("Title");
String oldDocumentId = (String) map.get("ContentDocumentId");
Integer versionNumber = Integer.valueOf(map.get("VersionNumber").toString());
// 判断路径是否为空
if (StringUtils.isNotBlank(url_fileName)) {
String filePath = Const.SERVER_FILE_PATH + "/" + url_fileName;
File file = new File(filePath);
boolean exists = file.exists();
if (!exists) {
log.info("文件不存在");
break;
}
// 拼接url
String url = finalUploadUrl + String.format(Const.SF_UPLOAD_FILE_URL, api);
HttpPost httpPost = new HttpPost(url);
httpPost.setHeader("Authorization", "Bearer " + token);
httpPost.setHeader("connection", "keep-alive");
com.alibaba.fastjson.JSONObject credentialsJsonParam = new com.alibaba.fastjson.JSONObject();
credentialsJsonParam.put("title", title);
credentialsJsonParam.put("pathOnClient", fileName);
if (newDocumentId != null) {
credentialsJsonParam.put("ContentDocumentId", newDocumentId);
}
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
builder.addTextBody("entity_content", credentialsJsonParam.toJSONString(), ContentType.APPLICATION_JSON);
builder.addBinaryBody("VersionData", new FileInputStream(file), ContentType.APPLICATION_OCTET_STREAM, fileName);
HttpEntity entity = builder.build();
httpPost.setEntity(entity);
CloseableHttpClient httpClient = HttpClients.createDefault();
response = httpClient.execute(httpPost);
if (response != null && response.getStatusLine() != null && response.getStatusLine().getStatusCode() < 400) {
HttpEntity he = response.getEntity();
if (he != null) {
respContent = EntityUtils.toString(he, "UTF-8");
String newId = String.valueOf(com.alibaba.fastjson.JSONObject.parseObject(respContent).get("id"));
if (StringUtils.isBlank(newId)) {
log.error("文件上传错误,返回实体信息:" + respContent);
break;
}
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "new_id");
paramMap.put("value", newId);
maps.add(paramMap);
if (versionNumber == 1) {
// 更新documentId
String dId = commonService.getDocumentId(connection, newId);
List<Map<String, Object>> dList = new ArrayList<>();
Map<String, Object> dMap = Maps.newHashMap();
dMap.put("key", "new_id");
dMap.put("value", dId);
dList.add(dMap);
customMapper.updateById("ContentDocument", dList, oldDocumentId);
newDocumentId = dId;
}
}else {
break;
}
} else {
throw new RuntimeException();
}
}
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_upload");
paramMap.put("value", isUpload);
maps.add(paramMap);
customMapper.updateById(api, maps, id);
TimeUnit.MILLISECONDS.sleep(1);
break;
} catch (Exception e) {
if (response != null) {
try {
response.close();
} catch (IOException re) {
log.error("exception message", re);
}
}
log.error("文件上传失败!, id: {}, 错误信息:{}", id ,e.getMessage());
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_upload");
paramMap.put("value", 2);
maps.add(paramMap);
customMapper.updateById(api, maps, id);
EmailUtil.send("File Upload ERROR", "文件上传失败!, id: "+ id + ",错误信息:" + e.getMessage());
}
}
}
}
}
} }