【feat】 新增文件下载(新)异步下载任务

This commit is contained in:
Kris 2025-07-15 15:53:29 +08:00
parent 6ec1b3f206
commit f35c4e44ff
3 changed files with 305 additions and 1 deletions

View File

@ -195,7 +195,7 @@ public class DataDumpNewJob {
* @throws Exception * @throws Exception
*/ */
@XxlJob("insertSingleBatchJob") @XxlJob("insertSingleBatchJob")
public ReturnT<String> insertSingleJob(String paramStr) throws Exception { public ReturnT<String> insertSingleBatchJob(String paramStr) throws Exception {
log.info("insertSingleBatchJob execute start .................."); log.info("insertSingleBatchJob execute start ..................");
SalesforceParam param = new SalesforceParam(); SalesforceParam param = new SalesforceParam();
try { try {
@ -210,4 +210,29 @@ public class DataDumpNewJob {
param.setEndCreateDate(param.getEndDate()); param.setEndCreateDate(param.getEndDate());
return dataImportBatchService.insertSingleBatch(param); return dataImportBatchService.insertSingleBatch(param);
} }
/**
* 文件下载
* @param paramStr
* @author kris
* @return
* @throws Exception
*/
@XxlJob("dumpFileNewJob")
public ReturnT<String> dumpFileNewJob(String paramStr) throws Exception {
log.info("dumpFileNewJob execute start ..................");
SalesforceParam param = new SalesforceParam();
try {
if (StringUtils.isNotBlank(paramStr)) {
param = JSON.parseObject(paramStr, SalesforceParam.class);
}
} catch (Throwable throwable) {
return new ReturnT<>(500, "参数解析失败!");
}
// 参数转换
param.setBeginCreateDate(param.getBeginDate());
param.setEndCreateDate(param.getEndDate());
return dataImportNewService.dumpFileNew(param);
}
} }

View File

@ -34,4 +34,6 @@ public interface DataImportNewService {
*/ */
ReturnT<String> uploadDocumentLinkJob(String paramStr) throws Exception; ReturnT<String> uploadDocumentLinkJob(String paramStr) throws Exception;
ReturnT<String> dumpFileNew(SalesforceParam param) throws Exception;
} }

View File

@ -9,6 +9,7 @@ import com.celnet.datadump.config.SalesforceConnect;
import com.celnet.datadump.config.SalesforceExecutor; import com.celnet.datadump.config.SalesforceExecutor;
import com.celnet.datadump.config.SalesforceTargetConnect; import com.celnet.datadump.config.SalesforceTargetConnect;
import com.celnet.datadump.entity.*; import com.celnet.datadump.entity.*;
import com.celnet.datadump.enums.FileType;
import com.celnet.datadump.global.Const; import com.celnet.datadump.global.Const;
import com.celnet.datadump.global.SystemConfigCode; import com.celnet.datadump.global.SystemConfigCode;
import com.celnet.datadump.mapper.CustomMapper; import com.celnet.datadump.mapper.CustomMapper;
@ -17,6 +18,8 @@ import com.celnet.datadump.param.SalesforceParam;
import com.celnet.datadump.service.*; import com.celnet.datadump.service.*;
import com.celnet.datadump.util.DataUtil; import com.celnet.datadump.util.DataUtil;
import com.celnet.datadump.util.EmailUtil; import com.celnet.datadump.util.EmailUtil;
import com.celnet.datadump.util.HttpUtil;
import com.celnet.datadump.util.OssUtil;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.sforce.soap.partner.*; import com.sforce.soap.partner.*;
@ -25,6 +28,7 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.DateUtil; import com.xxl.job.core.util.DateUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
@ -32,6 +36,10 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
@ -1004,4 +1012,273 @@ public class DataImportNewServiceImpl implements DataImportNewService {
return ReturnT.SUCCESS; return ReturnT.SUCCESS;
} }
@Override
public ReturnT<String> dumpFileNew(SalesforceParam param) throws Exception {
String downloadUrl = null;
QueryWrapper<DataObject> wrapper = new QueryWrapper<>();
if (StringUtils.isNotBlank(param.getApi())) {
wrapper.in("name",DataUtil.toIdList(param.getApi()));
}else {
wrapper.isNotNull("blob_field");
}
List<DataObject> objectList = dataObjectService.list(wrapper);
if (objectList.isEmpty()){
log.info("没有对象存在文件二进制字段!不进行文件下载");
}
List<Map<String, Object>> poll = customMapper.list("code,value","org_config",null);
for (Map<String, Object> map1 : poll) {
if ("FILE_DOWNLOAD_URL".equals(map1.get("code"))) {
downloadUrl = (String) map1.get("value");
}
}
if (StringUtils.isEmpty(downloadUrl)) {
EmailUtil.send("DumpFile ERROR", "文件下载失败!下载地址未配置");
return ReturnT.FAIL;
}
PartnerConnection connect = salesforceConnect.createConnect();
List<Future<?>> futures = Lists.newArrayList();
for (DataObject dataObject : objectList) {
DataObject update = new DataObject();
log.info("dump file api:{}, field:{}", dataObject.getName(), dataObject.getBlobField());
try {
String api = dataObject.getName();
update.setName(dataObject.getName());
List<SalesforceParam> salesforceParams = null;
QueryWrapper<DataBatch> dbQw = new QueryWrapper<>();
dbQw.eq("name", api);
List<DataBatch> list = dataBatchService.list(dbQw);
AtomicInteger batch = new AtomicInteger(1);
if (CollectionUtils.isNotEmpty(list)) {
salesforceParams = list.stream().map(t -> {
SalesforceParam salesforceParam = param.clone();
salesforceParam.setApi(t.getName());
salesforceParam.setBeginCreateDate(t.getSyncStartDate());
salesforceParam.setEndCreateDate(t.getSyncEndDate());
salesforceParam.setBatch(batch.getAndIncrement());
return salesforceParam;
}).collect(Collectors.toList());
}
// 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) {
String finalDownloadUrl = downloadUrl;
Future<?> future = salesforceExecutor.execute(() -> {
try {
saveFile(salesforceParam, connect, finalDownloadUrl,dataObject.getName(),dataObject.getExtraField());
} catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable);
}
}, salesforceParam.getBatch(), 0);
futures.add(future);
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
update.setDataLock(0);
dataObjectService.updateById(update);
}
}
// 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
futures.clear();
return ReturnT.SUCCESS;
}
/**
* 下载文件
*/
private void saveFile(SalesforceParam param, PartnerConnection Connection ,String downloadUrl, String api, String field) {
String extraSql = "";
if (dataFieldService.hasDeleted(api)) {
extraSql += "AND IsDeleted = false ";
}
if (Const.FILE_TYPE == FileType.SERVER) {
// 检测路径是否存在 不存在则创建
File excel = new File(Const.SERVER_FILE_PATH + "/" + api);
if (!excel.exists()) {
boolean mkdir = excel.mkdir();
}
}
Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate();
String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss");
String token = Connection.getSessionHeader().getSessionId();
try {
String name = getName(api);
log.info("api:{},field:{},开始时间:{},结束时间:{}", api, field,beginDateStr,endDateStr);
Map<String, String> headers = Maps.newHashMap();
headers.put("Authorization", "Bearer " + token);
headers.put("connection", "keep-alive");
long num = 0;
while (true) {
// 获取未存储的附件id
List<Map<String, Object>> list = customMapper.list("Id, " + name, api, " is_dump = false and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + extraSql + "' limit 10");
if (CollectionUtils.isEmpty(list)) {
break;
}
for (Map<String, Object> map : list) {
String id = null;
// 上传完毕 更新附件信息
List<Map<String, Object>> maps = Lists.newArrayList();
boolean isDump = true;
int failCount = 0;
while (true) {
try {
id = (String) map.get("Id");
String fileName = (String) map.get(name);
log.info("------------文件名:" + id + "_" + fileName);
// 判断路径是否为空
if (StringUtils.isNotBlank(fileName)) {
String filePath = api + "/" + id + "_" + fileName;
// 拼接url
String url = downloadUrl + String.format(Const.SF_FILE_URL, api, id, field);
Response response = HttpUtil.doGet(url, null, headers);
if (response.body() != null) {
InputStream inputStream = response.body().byteStream();
switch (Const.FILE_TYPE) {
case OSS:
// 上传到oss
OssUtil.upload(inputStream, filePath);
break;
case SERVER:
dumpToServer(headers, id, filePath, url, response, inputStream);
break;
default:
log.error("id: {}, no mapping dump type", id);
}
Map<String, Object> paramMap = Maps.newHashMap();
if ("Document".equals(api)) {
paramMap.put("key", "localUrl");
} else {
paramMap.put("key", "url");
}
paramMap.put("value", filePath);
maps.add(paramMap);
}
}
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", isDump);
maps.add(paramMap);
customMapper.updateById(api, maps, id);
TimeUnit.MILLISECONDS.sleep(1);
break;
} catch (Throwable throwable) {
log.error("dump file error, id: {}", id, throwable);
failCount++;
if (Const.MAX_FAIL_COUNT < failCount) {
{
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", 2);
maps.add(paramMap);
}
customMapper.updateById(api, maps, id);
break;
}
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
num += list.size();
log.info("dump file count api:{}, field:{}, num:{}", api, field, num);
}
log.info("dump file success api:{}, field:{}, num:{}", api, field, num);
} catch (Throwable throwable) {
log.error("dump file error", throwable);
} finally {
// 把is_dump为2的重置为0
List<Map<String, Object>> maps = Lists.newArrayList();
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", 0);
maps.add(paramMap);
customMapper.update(maps, api, "is_dump = 2");
}
}
/**
* 获取附件名称
*
* @param api 表明
* @return name
*/
private static String getName(String api) {
// 默认name
String name = "Name";
// contentVersion使用PathOnClient
if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) {
name = "PathOnClient";
}
return name;
}
/**
* 下载文件到服务器
*
* @param headers 请求头
* @param id id
* @param filePath 文件路径
* @param url 链接
* @param response 响应
* @param inputStream
* @throws IOException exception
*/
private void dumpToServer(Map<String, String> headers, String id, String filePath, String url, Response response, InputStream inputStream) throws IOException {
String path = Const.SERVER_FILE_PATH + "/" + filePath;
log.info("--------文件名称:"+path);
long offset = 0L;
RandomAccessFile accessFile = new RandomAccessFile(path, "rw");
while (true) {
try {
// 保存到本地
byte[] buf = new byte[8192];
int len = 0;
if (offset > 0) {
inputStream.skip(offset);
accessFile.seek(offset);
}
while ((len = inputStream.read(buf)) != -1) {
accessFile.write(buf, 0, len);
offset += len;
}
break;
} catch (Exception e) {
if (offset <= 0) {
throw e;
}
log.warn("file dump to server EOF ERROR try to reconnect");
response.close();
response = HttpUtil.doGet(url, null, headers);
assert response.body() != null;
inputStream = response.body().byteStream();
log.warn("reconnect success, id:{} skip {}", id, offset);
}
}
accessFile.close();
}
} }