From f35c4e44ffc1ba5e5607b4cc58513f9aae067e6f Mon Sep 17 00:00:00 2001 From: Kris <2893855659@qq.com> Date: Tue, 15 Jul 2025 15:53:29 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90feat=E3=80=91=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=B8=8B=E8=BD=BD=EF=BC=88=E6=96=B0=EF=BC=89?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E4=B8=8B=E8=BD=BD=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../celnet/datadump/job/DataDumpNewJob.java | 27 +- .../service/DataImportNewService.java | 2 + .../impl/DataImportNewServiceImpl.java | 277 ++++++++++++++++++ 3 files changed, 305 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java index 7ad48db..9a3e34a 100644 --- a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java +++ b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java @@ -195,7 +195,7 @@ public class DataDumpNewJob { * @throws Exception */ @XxlJob("insertSingleBatchJob") - public ReturnT insertSingleJob(String paramStr) throws Exception { + public ReturnT insertSingleBatchJob(String paramStr) throws Exception { log.info("insertSingleBatchJob execute start .................."); SalesforceParam param = new SalesforceParam(); try { @@ -210,4 +210,29 @@ public class DataDumpNewJob { param.setEndCreateDate(param.getEndDate()); return dataImportBatchService.insertSingleBatch(param); } + + + /** + * 文件下载 + * @param paramStr + * @author kris + * @return + * @throws Exception + */ + @XxlJob("dumpFileNewJob") + public ReturnT dumpFileNewJob(String paramStr) throws Exception { + log.info("dumpFileNewJob execute start .................."); + SalesforceParam param = new SalesforceParam(); + try { + if (StringUtils.isNotBlank(paramStr)) { + param = JSON.parseObject(paramStr, SalesforceParam.class); + } + } catch (Throwable throwable) { + return new ReturnT<>(500, "参数解析失败!"); + } + // 参数转换 + param.setBeginCreateDate(param.getBeginDate()); + param.setEndCreateDate(param.getEndDate()); + return dataImportNewService.dumpFileNew(param); + } } diff --git a/src/main/java/com/celnet/datadump/service/DataImportNewService.java b/src/main/java/com/celnet/datadump/service/DataImportNewService.java index f987e0a..42a4a98 100644 --- a/src/main/java/com/celnet/datadump/service/DataImportNewService.java +++ b/src/main/java/com/celnet/datadump/service/DataImportNewService.java @@ -34,4 +34,6 @@ public interface DataImportNewService { */ ReturnT uploadDocumentLinkJob(String paramStr) throws Exception; + ReturnT dumpFileNew(SalesforceParam param) throws Exception; + } diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java index 7adca34..61d167d 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java @@ -9,6 +9,7 @@ import com.celnet.datadump.config.SalesforceConnect; import com.celnet.datadump.config.SalesforceExecutor; import com.celnet.datadump.config.SalesforceTargetConnect; import com.celnet.datadump.entity.*; +import com.celnet.datadump.enums.FileType; import com.celnet.datadump.global.Const; import com.celnet.datadump.global.SystemConfigCode; import com.celnet.datadump.mapper.CustomMapper; @@ -17,6 +18,8 @@ import com.celnet.datadump.param.SalesforceParam; import com.celnet.datadump.service.*; import com.celnet.datadump.util.DataUtil; import com.celnet.datadump.util.EmailUtil; +import com.celnet.datadump.util.HttpUtil; +import com.celnet.datadump.util.OssUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.sforce.soap.partner.*; @@ -25,6 +28,7 @@ import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.util.DateUtil; import lombok.extern.slf4j.Slf4j; +import okhttp3.Response; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang3.ObjectUtils; @@ -32,6 +36,10 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; @@ -1004,4 +1012,273 @@ public class DataImportNewServiceImpl implements DataImportNewService { return ReturnT.SUCCESS; } + + @Override + public ReturnT dumpFileNew(SalesforceParam param) throws Exception { + String downloadUrl = null; + + QueryWrapper wrapper = new QueryWrapper<>(); + if (StringUtils.isNotBlank(param.getApi())) { + wrapper.in("name",DataUtil.toIdList(param.getApi())); + }else { + wrapper.isNotNull("blob_field"); + } + List objectList = dataObjectService.list(wrapper); + if (objectList.isEmpty()){ + log.info("没有对象存在文件二进制字段!不进行文件下载"); + } + + List> poll = customMapper.list("code,value","org_config",null); + for (Map map1 : poll) { + if ("FILE_DOWNLOAD_URL".equals(map1.get("code"))) { + downloadUrl = (String) map1.get("value"); + } + } + if (StringUtils.isEmpty(downloadUrl)) { + EmailUtil.send("DumpFile ERROR", "文件下载失败!下载地址未配置"); + return ReturnT.FAIL; + } + + PartnerConnection connect = salesforceConnect.createConnect(); + + List> futures = Lists.newArrayList(); + + for (DataObject dataObject : objectList) { + DataObject update = new DataObject(); + log.info("dump file api:{}, field:{}", dataObject.getName(), dataObject.getBlobField()); + + try { + String api = dataObject.getName(); + update.setName(dataObject.getName()); + List salesforceParams = null; + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", api); + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } + + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + String finalDownloadUrl = downloadUrl; + Future future = salesforceExecutor.execute(() -> { + try { + saveFile(salesforceParam, connect, finalDownloadUrl,dataObject.getName(),dataObject.getExtraField()); + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 0); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + update.setDataWork(0); + } catch (InterruptedException e) { + throw e; + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + update.setDataLock(0); + dataObjectService.updateById(update); + } + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + futures.clear(); + return ReturnT.SUCCESS; + } + + /** + * 下载文件 + */ + private void saveFile(SalesforceParam param, PartnerConnection Connection ,String downloadUrl, String api, String field) { + String extraSql = ""; + if (dataFieldService.hasDeleted(api)) { + extraSql += "AND IsDeleted = false "; + } + if (Const.FILE_TYPE == FileType.SERVER) { + // 检测路径是否存在 不存在则创建 + File excel = new File(Const.SERVER_FILE_PATH + "/" + api); + if (!excel.exists()) { + boolean mkdir = excel.mkdir(); + } + } + Date beginDate = param.getBeginCreateDate(); + Date endDate = param.getEndCreateDate(); + String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); + String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); + + String token = Connection.getSessionHeader().getSessionId(); + + try { + String name = getName(api); + log.info("api:{},field:{},开始时间:{},结束时间:{}", api, field,beginDateStr,endDateStr); + + Map headers = Maps.newHashMap(); + headers.put("Authorization", "Bearer " + token); + headers.put("connection", "keep-alive"); + long num = 0; + while (true) { + // 获取未存储的附件id + List> list = customMapper.list("Id, " + name, api, " is_dump = false and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr + extraSql + "' limit 10"); + if (CollectionUtils.isEmpty(list)) { + break; + } + for (Map map : list) { + String id = null; + // 上传完毕 更新附件信息 + List> maps = Lists.newArrayList(); + boolean isDump = true; + int failCount = 0; + while (true) { + try { + id = (String) map.get("Id"); + String fileName = (String) map.get(name); + log.info("------------文件名:" + id + "_" + fileName); + // 判断路径是否为空 + if (StringUtils.isNotBlank(fileName)) { + String filePath = api + "/" + id + "_" + fileName; + // 拼接url + String url = downloadUrl + String.format(Const.SF_FILE_URL, api, id, field); + Response response = HttpUtil.doGet(url, null, headers); + if (response.body() != null) { + InputStream inputStream = response.body().byteStream(); + switch (Const.FILE_TYPE) { + case OSS: + // 上传到oss + OssUtil.upload(inputStream, filePath); + break; + case SERVER: + dumpToServer(headers, id, filePath, url, response, inputStream); + break; + default: + log.error("id: {}, no mapping dump type", id); + } + Map paramMap = Maps.newHashMap(); + if ("Document".equals(api)) { + paramMap.put("key", "localUrl"); + } else { + paramMap.put("key", "url"); + } + paramMap.put("value", filePath); + maps.add(paramMap); + } + } + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "is_dump"); + paramMap.put("value", isDump); + maps.add(paramMap); + customMapper.updateById(api, maps, id); + TimeUnit.MILLISECONDS.sleep(1); + break; + } catch (Throwable throwable) { + log.error("dump file error, id: {}", id, throwable); + failCount++; + if (Const.MAX_FAIL_COUNT < failCount) { + { + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "is_dump"); + paramMap.put("value", 2); + maps.add(paramMap); + } + customMapper.updateById(api, maps, id); + break; + } + try { + TimeUnit.SECONDS.sleep(30); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + num += list.size(); + log.info("dump file count api:{}, field:{}, num:{}", api, field, num); + } + log.info("dump file success api:{}, field:{}, num:{}", api, field, num); + + } catch (Throwable throwable) { + log.error("dump file error", throwable); + } finally { + // 把is_dump为2的重置为0 + List> maps = Lists.newArrayList(); + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "is_dump"); + paramMap.put("value", 0); + maps.add(paramMap); + customMapper.update(maps, api, "is_dump = 2"); + } + + } + + /** + * 获取附件名称 + * + * @param api 表明 + * @return name + */ + private static String getName(String api) { + // 默认name + String name = "Name"; + // contentVersion使用PathOnClient + if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) { + name = "PathOnClient"; + } + return name; + } + + /** + * 下载文件到服务器 + * + * @param headers 请求头 + * @param id id + * @param filePath 文件路径 + * @param url 链接 + * @param response 响应 + * @param inputStream 流 + * @throws IOException exception + */ + private void dumpToServer(Map headers, String id, String filePath, String url, Response response, InputStream inputStream) throws IOException { + String path = Const.SERVER_FILE_PATH + "/" + filePath; + log.info("--------文件名称:"+path); + long offset = 0L; + RandomAccessFile accessFile = new RandomAccessFile(path, "rw"); + while (true) { + try { + // 保存到本地 + byte[] buf = new byte[8192]; + int len = 0; + if (offset > 0) { + inputStream.skip(offset); + accessFile.seek(offset); + } + while ((len = inputStream.read(buf)) != -1) { + accessFile.write(buf, 0, len); + offset += len; + } + break; + } catch (Exception e) { + if (offset <= 0) { + throw e; + } + log.warn("file dump to server EOF ERROR try to reconnect"); + response.close(); + response = HttpUtil.doGet(url, null, headers); + assert response.body() != null; + inputStream = response.body().byteStream(); + log.warn("reconnect success, id:{} skip {}", id, offset); + } + } + accessFile.close(); + } + }