From 15e755ff4b2b464f85c0c61de1cae0f3767e91de Mon Sep 17 00:00:00 2001 From: Kris <2893855659@qq.com> Date: Tue, 22 Jul 2025 14:03:41 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90fix=E3=80=91=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E6=89=B9=E9=87=8F=E4=B8=8B=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/CommonServiceImpl.java | 8 - .../impl/DataImportNewServiceImpl.java | 192 +++++++++--------- .../resources/mapper/SalesforceMapper.xml | 5 +- 3 files changed, 94 insertions(+), 111 deletions(-) diff --git a/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java index 8ff0d19..07c99a8 100644 --- a/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/CommonServiceImpl.java @@ -238,14 +238,6 @@ public class CommonServiceImpl implements CommonService { } // 等待当前所有线程执行完成 salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); - // 存在附件的开始dump - list.stream().filter(t -> StringUtils.isNotBlank(t.getBlobField())).forEach(t -> { - try { - fileService.dumpFile(t.getName(), t.getBlobField(), true); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); return ReturnT.SUCCESS; } catch (Throwable throwable) { salesforceExecutor.remove(futures.toArray(new Future[]{})); diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java index c48408e..d49d7d5 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java @@ -1035,6 +1035,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { } List> poll = customMapper.list("code,value","org_config",null); + for (Map map1 : poll) { if ("FILE_DOWNLOAD_URL".equals(map1.get("code"))) { downloadUrl = (String) map1.get("value"); @@ -1072,12 +1073,23 @@ public class DataImportNewServiceImpl implements DataImportNewService { }).collect(Collectors.toList()); } + if (Const.FILE_TYPE == FileType.SERVER) { + // 检测路径是否存在 不存在则创建 + File excel = new File(Const.SERVER_FILE_PATH + "/" + api); + if (!excel.exists()) { + boolean mkdir = excel.mkdir(); + if (!mkdir) { + log.info("创建文件存储目录失败!"); + } + } + } + // 手动任务优先执行 for (SalesforceParam salesforceParam : salesforceParams) { String finalDownloadUrl = downloadUrl; Future future = salesforceExecutor.execute(() -> { try { - saveFile(salesforceParam, connect, finalDownloadUrl,dataObject.getName(),dataObject.getExtraField()); + saveFile(salesforceParam, connect, finalDownloadUrl,dataObject.getName(),dataObject.getBlobField()); } catch (Throwable throwable) { log.error("salesforceExecutor error", throwable); throw new RuntimeException(throwable); @@ -1089,6 +1101,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); update.setDataWork(0); } catch (InterruptedException e) { + salesforceExecutor.remove(futures.toArray(new Future[]{})); throw e; } catch (Throwable e) { throw new RuntimeException(e); @@ -1106,18 +1119,13 @@ public class DataImportNewServiceImpl implements DataImportNewService { /** * 下载文件 */ - private void saveFile(SalesforceParam param, PartnerConnection Connection ,String downloadUrl, String api, String field) { + private void saveFile(SalesforceParam param, PartnerConnection Connection ,String downloadUrl, String api, String field) throws Exception { + String extraSql = ""; if (dataFieldService.hasDeleted(api)) { - extraSql += "AND IsDeleted = false "; - } - if (Const.FILE_TYPE == FileType.SERVER) { - // 检测路径是否存在 不存在则创建 - File excel = new File(Const.SERVER_FILE_PATH + "/" + api); - if (!excel.exists()) { - boolean mkdir = excel.mkdir(); - } + extraSql += " AND IsDeleted = false "; } + Date beginDate = param.getBeginCreateDate(); Date endDate = param.getEndCreateDate(); String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); @@ -1125,103 +1133,89 @@ public class DataImportNewServiceImpl implements DataImportNewService { String token = Connection.getSessionHeader().getSessionId(); - try { - String name = getName(api); - log.info("api:{},field:{},开始时间:{},结束时间:{}", api, field,beginDateStr,endDateStr); + String name = getName(api); + log.info("api:{},field:{},开始时间:{},结束时间:{}", api, field,beginDateStr,endDateStr); - Map headers = Maps.newHashMap(); - headers.put("Authorization", "Bearer " + token); - headers.put("connection", "keep-alive"); - long num = 0; - while (true) { - // 获取未存储的附件id - List> list = customMapper.list("Id, " + name, api, " is_dump = false and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + extraSql + "' limit 10"); - if (CollectionUtils.isEmpty(list)) { - break; - } - for (Map map : list) { - String id = null; - // 上传完毕 更新附件信息 - List> maps = Lists.newArrayList(); - boolean isDump = true; - int failCount = 0; - while (true) { - try { - id = (String) map.get("Id"); - String fileName = (String) map.get(name); - log.info("------------文件名:" + id + "_" + fileName); - // 判断路径是否为空 - if (StringUtils.isNotBlank(fileName)) { - String filePath = api + "/" + id + "_" + fileName; - // 拼接url - String url = downloadUrl + String.format(Const.SF_FILE_URL, api, id, field); - Response response = HttpUtil.doGet(url, null, headers); - if (response.body() != null) { - InputStream inputStream = response.body().byteStream(); - switch (Const.FILE_TYPE) { - case OSS: - // 上传到oss - OssUtil.upload(inputStream, filePath); - break; - case SERVER: - dumpToServer(headers, id, filePath, url, response, inputStream); - break; - default: - log.error("id: {}, no mapping dump type", id); - } - Map paramMap = Maps.newHashMap(); - if ("Document".equals(api)) { - paramMap.put("key", "localUrl"); - } else { - paramMap.put("key", "url"); - } - paramMap.put("value", filePath); - maps.add(paramMap); - } + Map headers = Maps.newHashMap(); + headers.put("Authorization", "Bearer " + token); + headers.put("connection", "keep-alive"); + + Integer count = customMapper.countBySQL(api, "where is_dump = 0 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql ); + + log.error("总文件数 count:{};-开始时间:{};-结束时间:{};-api:{};", count, beginDateStr, endDateStr, api); + + if (count == 0) { + return; + } + + int page = count%200 == 0 ? count/200 : (count/200) + 1; + //总插入数 + for (int i = 0; i < page; i++) { + + // 获取未存储的附件id + List> list = customMapper.list("Id, " + name, api, " is_dump = 0 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql + "limit " + i * 200 + ",200"); + + for (Map map : list) { + String id = null; + // 上传完毕 更新附件信息 + List> maps = Lists.newArrayList(); + try { + id = (String) map.get("Id"); + String fileName = (String) map.get(name); + log.info("------------文件名:" + id + "_" + fileName); + // 判断路径是否为空 + if (StringUtils.isNotBlank(fileName)) { + String filePath = api + "/" + id + "_" + fileName; + // 拼接url + String url = downloadUrl + String.format(Const.SF_FILE_URL, api, id, field); + + log.info("文件下载请求地址:{}",url); + Response response = HttpUtil.doGet(url, null, headers); + if (response.body() != null && response.code() == 200) { + InputStream inputStream = response.body().byteStream(); + log.info("文件下载返回状态码:{},返回信息:{}", response.code(),response.message()); + switch (Const.FILE_TYPE) { + case OSS: + // 上传到oss + OssUtil.upload(inputStream, filePath); + break; + case SERVER: + dumpToServer(headers, id, filePath, url, response, inputStream); + break; + default: + log.error("id: {}, no mapping dump type", id); } Map paramMap = Maps.newHashMap(); - paramMap.put("key", "is_dump"); - paramMap.put("value", isDump); + if ("Document".equals(api)) { + paramMap.put("key", "localUrl"); + } else { + paramMap.put("key", "url"); + } + paramMap.put("value", filePath); maps.add(paramMap); - customMapper.updateById(api, maps, id); - TimeUnit.MILLISECONDS.sleep(1); - break; - } catch (Throwable throwable) { - log.error("dump file error, id: {}", id, throwable); - failCount++; - if (Const.MAX_FAIL_COUNT < failCount) { - { - Map paramMap = Maps.newHashMap(); - paramMap.put("key", "is_dump"); - paramMap.put("value", 2); - maps.add(paramMap); - } - customMapper.updateById(api, maps, id); - break; - } - try { - TimeUnit.SECONDS.sleep(30); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + }else { + log.error("文件下载失败!, id: "+ id + ",返回体信息:" + response.message()); + EmailUtil.send("File Dump ERROR", "文件下载失败!, id: "+ id + ",返回体信息:" + response.message()); } } + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "is_dump"); + paramMap.put("value", true); + maps.add(paramMap); + customMapper.updateById(api, maps, id); + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException interruptedException){ + return; + } catch (Exception e) { + log.error("文件下载失败!, id: {}, 错误信息:{}", id ,e.getMessage()); + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "is_dump"); + paramMap.put("value", 2); + maps.add(paramMap); + customMapper.updateById(api, maps, id); + EmailUtil.send("File Dump ERROR", "文件下载失败!, id: "+ id + ",错误信息:" + e.getMessage()); } - num += list.size(); - log.info("dump file count api:{}, field:{}, num:{}", api, field, num); } - log.info("dump file success api:{}, field:{}, num:{}", api, field, num); - - } catch (Throwable throwable) { - log.error("dump file error", throwable); - } finally { - // 把is_dump为2的重置为0 - List> maps = Lists.newArrayList(); - Map paramMap = Maps.newHashMap(); - paramMap.put("key", "is_dump"); - paramMap.put("value", 0); - maps.add(paramMap); - customMapper.update(maps, api, "is_dump = 2"); } } diff --git a/src/main/resources/mapper/SalesforceMapper.xml b/src/main/resources/mapper/SalesforceMapper.xml index bf653be..8fba755 100644 --- a/src/main/resources/mapper/SalesforceMapper.xml +++ b/src/main/resources/mapper/SalesforceMapper.xml @@ -90,10 +90,7 @@ AND id IN #{id} - AND LastModifiedDate >= #{param.beginModifyDate} - - - AND LastModifiedDate >= #{param.beginModifyDate} + AND SystemModstamp >= #{param.beginModifyDate} AND IsDeleted = #{param.isDeleted}