【fix】 优化文件批量下载

This commit is contained in:
Kris 2025-07-22 14:03:41 +08:00
parent 1e346e19ac
commit 15e755ff4b
3 changed files with 94 additions and 111 deletions

View File

@ -238,14 +238,6 @@ public class CommonServiceImpl implements CommonService {
} }
// 等待当前所有线程执行完成 // 等待当前所有线程执行完成
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{})); salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
// 存在附件的开始dump
list.stream().filter(t -> StringUtils.isNotBlank(t.getBlobField())).forEach(t -> {
try {
fileService.dumpFile(t.getName(), t.getBlobField(), true);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return ReturnT.SUCCESS; return ReturnT.SUCCESS;
} catch (Throwable throwable) { } catch (Throwable throwable) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{})); salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));

View File

@ -1035,6 +1035,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
} }
List<Map<String, Object>> poll = customMapper.list("code,value","org_config",null); List<Map<String, Object>> poll = customMapper.list("code,value","org_config",null);
for (Map<String, Object> map1 : poll) { for (Map<String, Object> map1 : poll) {
if ("FILE_DOWNLOAD_URL".equals(map1.get("code"))) { if ("FILE_DOWNLOAD_URL".equals(map1.get("code"))) {
downloadUrl = (String) map1.get("value"); downloadUrl = (String) map1.get("value");
@ -1072,12 +1073,23 @@ public class DataImportNewServiceImpl implements DataImportNewService {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }
if (Const.FILE_TYPE == FileType.SERVER) {
// 检测路径是否存在 不存在则创建
File excel = new File(Const.SERVER_FILE_PATH + "/" + api);
if (!excel.exists()) {
boolean mkdir = excel.mkdir();
if (!mkdir) {
log.info("创建文件存储目录失败!");
}
}
}
// 手动任务优先执行 // 手动任务优先执行
for (SalesforceParam salesforceParam : salesforceParams) { for (SalesforceParam salesforceParam : salesforceParams) {
String finalDownloadUrl = downloadUrl; String finalDownloadUrl = downloadUrl;
Future<?> future = salesforceExecutor.execute(() -> { Future<?> future = salesforceExecutor.execute(() -> {
try { try {
saveFile(salesforceParam, connect, finalDownloadUrl,dataObject.getName(),dataObject.getExtraField()); saveFile(salesforceParam, connect, finalDownloadUrl,dataObject.getName(),dataObject.getBlobField());
} catch (Throwable throwable) { } catch (Throwable throwable) {
log.error("salesforceExecutor error", throwable); log.error("salesforceExecutor error", throwable);
throw new RuntimeException(throwable); throw new RuntimeException(throwable);
@ -1089,6 +1101,7 @@ public class DataImportNewServiceImpl implements DataImportNewService {
salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{})); salesforceExecutor.waitForFutures(futures.toArray(new Future<?>[]{}));
update.setDataWork(0); update.setDataWork(0);
} catch (InterruptedException e) { } catch (InterruptedException e) {
salesforceExecutor.remove(futures.toArray(new Future<?>[]{}));
throw e; throw e;
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -1106,18 +1119,13 @@ public class DataImportNewServiceImpl implements DataImportNewService {
/** /**
* 下载文件 * 下载文件
*/ */
private void saveFile(SalesforceParam param, PartnerConnection Connection ,String downloadUrl, String api, String field) { private void saveFile(SalesforceParam param, PartnerConnection Connection ,String downloadUrl, String api, String field) throws Exception {
String extraSql = ""; String extraSql = "";
if (dataFieldService.hasDeleted(api)) { if (dataFieldService.hasDeleted(api)) {
extraSql += "AND IsDeleted = false "; extraSql += " AND IsDeleted = false ";
}
if (Const.FILE_TYPE == FileType.SERVER) {
// 检测路径是否存在 不存在则创建
File excel = new File(Const.SERVER_FILE_PATH + "/" + api);
if (!excel.exists()) {
boolean mkdir = excel.mkdir();
}
} }
Date beginDate = param.getBeginCreateDate(); Date beginDate = param.getBeginCreateDate();
Date endDate = param.getEndCreateDate(); Date endDate = param.getEndCreateDate();
String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss");
@ -1125,103 +1133,89 @@ public class DataImportNewServiceImpl implements DataImportNewService {
String token = Connection.getSessionHeader().getSessionId(); String token = Connection.getSessionHeader().getSessionId();
try { String name = getName(api);
String name = getName(api); log.info("api:{},field:{},开始时间:{},结束时间:{}", api, field,beginDateStr,endDateStr);
log.info("api:{},field:{},开始时间:{},结束时间:{}", api, field,beginDateStr,endDateStr);
Map<String, String> headers = Maps.newHashMap(); Map<String, String> headers = Maps.newHashMap();
headers.put("Authorization", "Bearer " + token); headers.put("Authorization", "Bearer " + token);
headers.put("connection", "keep-alive"); headers.put("connection", "keep-alive");
long num = 0;
while (true) { Integer count = customMapper.countBySQL(api, "where is_dump = 0 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql );
// 获取未存储的附件id
List<Map<String, Object>> list = customMapper.list("Id, " + name, api, " is_dump = false and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + extraSql + "' limit 10"); log.error("总文件数 count:{}-开始时间:{}-结束时间:{}-api:{}", count, beginDateStr, endDateStr, api);
if (CollectionUtils.isEmpty(list)) {
break; if (count == 0) {
} return;
for (Map<String, Object> map : list) { }
String id = null;
// 上传完毕 更新附件信息 int page = count%200 == 0 ? count/200 : (count/200) + 1;
List<Map<String, Object>> maps = Lists.newArrayList(); //总插入数
boolean isDump = true; for (int i = 0; i < page; i++) {
int failCount = 0;
while (true) { // 获取未存储的附件id
try { List<Map<String, Object>> list = customMapper.list("Id, " + name, api, " is_dump = 0 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql + "limit " + i * 200 + ",200");
id = (String) map.get("Id");
String fileName = (String) map.get(name); for (Map<String, Object> map : list) {
log.info("------------文件名:" + id + "_" + fileName); String id = null;
// 判断路径是否为空 // 上传完毕 更新附件信息
if (StringUtils.isNotBlank(fileName)) { List<Map<String, Object>> maps = Lists.newArrayList();
String filePath = api + "/" + id + "_" + fileName; try {
// 拼接url id = (String) map.get("Id");
String url = downloadUrl + String.format(Const.SF_FILE_URL, api, id, field); String fileName = (String) map.get(name);
Response response = HttpUtil.doGet(url, null, headers); log.info("------------文件名:" + id + "_" + fileName);
if (response.body() != null) { // 判断路径是否为空
InputStream inputStream = response.body().byteStream(); if (StringUtils.isNotBlank(fileName)) {
switch (Const.FILE_TYPE) { String filePath = api + "/" + id + "_" + fileName;
case OSS: // 拼接url
// 上传到oss String url = downloadUrl + String.format(Const.SF_FILE_URL, api, id, field);
OssUtil.upload(inputStream, filePath);
break; log.info("文件下载请求地址:{}",url);
case SERVER: Response response = HttpUtil.doGet(url, null, headers);
dumpToServer(headers, id, filePath, url, response, inputStream); if (response.body() != null && response.code() == 200) {
break; InputStream inputStream = response.body().byteStream();
default: log.info("文件下载返回状态码:{},返回信息:{}", response.code(),response.message());
log.error("id: {}, no mapping dump type", id); switch (Const.FILE_TYPE) {
} case OSS:
Map<String, Object> paramMap = Maps.newHashMap(); // 上传到oss
if ("Document".equals(api)) { OssUtil.upload(inputStream, filePath);
paramMap.put("key", "localUrl"); break;
} else { case SERVER:
paramMap.put("key", "url"); dumpToServer(headers, id, filePath, url, response, inputStream);
} break;
paramMap.put("value", filePath); default:
maps.add(paramMap); log.error("id: {}, no mapping dump type", id);
}
} }
Map<String, Object> paramMap = Maps.newHashMap(); Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump"); if ("Document".equals(api)) {
paramMap.put("value", isDump); paramMap.put("key", "localUrl");
} else {
paramMap.put("key", "url");
}
paramMap.put("value", filePath);
maps.add(paramMap); maps.add(paramMap);
customMapper.updateById(api, maps, id); }else {
TimeUnit.MILLISECONDS.sleep(1); log.error("文件下载失败!, id: "+ id + ",返回体信息:" + response.message());
break; EmailUtil.send("File Dump ERROR", "文件下载失败!, id: "+ id + ",返回体信息:" + response.message());
} catch (Throwable throwable) {
log.error("dump file error, id: {}", id, throwable);
failCount++;
if (Const.MAX_FAIL_COUNT < failCount) {
{
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", 2);
maps.add(paramMap);
}
customMapper.updateById(api, maps, id);
break;
}
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} }
} }
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", true);
maps.add(paramMap);
customMapper.updateById(api, maps, id);
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException interruptedException){
return;
} catch (Exception e) {
log.error("文件下载失败!, id: {}, 错误信息:{}", id ,e.getMessage());
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", 2);
maps.add(paramMap);
customMapper.updateById(api, maps, id);
EmailUtil.send("File Dump ERROR", "文件下载失败!, id: "+ id + ",错误信息:" + e.getMessage());
} }
num += list.size();
log.info("dump file count api:{}, field:{}, num:{}", api, field, num);
} }
log.info("dump file success api:{}, field:{}, num:{}", api, field, num);
} catch (Throwable throwable) {
log.error("dump file error", throwable);
} finally {
// 把is_dump为2的重置为0
List<Map<String, Object>> maps = Lists.newArrayList();
Map<String, Object> paramMap = Maps.newHashMap();
paramMap.put("key", "is_dump");
paramMap.put("value", 0);
maps.add(paramMap);
customMapper.update(maps, api, "is_dump = 2");
} }
} }

View File

@ -90,10 +90,7 @@
AND id IN <foreach item="id" collection="param.ids" open="(" separator="," close=")">#{id}</foreach> AND id IN <foreach item="id" collection="param.ids" open="(" separator="," close=")">#{id}</foreach>
</if> </if>
<if test="param.beginModifyDate != null"> <if test="param.beginModifyDate != null">
AND LastModifiedDate >= #{param.beginModifyDate} AND SystemModstamp >= #{param.beginModifyDate}
</if>
<if test="param.beginModifyDate != null">
AND LastModifiedDate >= #{param.beginModifyDate}
</if> </if>
<if test="param.isDeleted != null"> <if test="param.isDeleted != null">
AND IsDeleted = #{param.isDeleted} AND IsDeleted = #{param.isDeleted}