diff --git a/files/start.sh b/files/start.sh index 18abc41..c86780c 100644 --- a/files/start.sh +++ b/files/start.sh @@ -1,11 +1,11 @@ # 程序配置 # xxl job数据库链接 #cook -#dbUrl="jdbc:mysql://127.0.0.1:3306/data-dump-xxl-job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai" +#dbUrl="jdbc:mysql://127.0.0.1:3306/data-dump-xxl-job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai" #dbUsername="root" #dbPassword="celnet@2025.bln" #携科 -dbUrl="jdbc:mysql://127.0.0.1:3306/data-dump-xxl-job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai" +dbUrl="jdbc:mysql://127.0.0.1:3306/data-dump-xxl-job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai&useSSL=false" dbUsername="root" dbPassword="Celnet2025.QY" #其它 diff --git a/src/main/java/com/celnet/datadump/controller/FileManagerController.java b/src/main/java/com/celnet/datadump/controller/FileManagerController.java index 441c6f5..56b4fdd 100644 --- a/src/main/java/com/celnet/datadump/controller/FileManagerController.java +++ b/src/main/java/com/celnet/datadump/controller/FileManagerController.java @@ -110,8 +110,6 @@ public class FileManagerController { } if ("Attachment".equals(api)){ fileService.uploadFileToAttachment(api, blobField, param.getSingleThread()); - }else if("Document".equals(api)){ - fileService.uploadFileToDocument(api, blobField, param.getSingleThread()); }else { fileService.uploadFile(api, blobField, param.getSingleThread()); } diff --git a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java index b446aa4..1ac57bb 100644 --- a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java +++ b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java @@ -2,12 +2,16 @@ package com.celnet.datadump.job; import com.alibaba.fastjson.JSON; import com.celnet.datadump.config.SalesforceConnect; +import com.celnet.datadump.entity.DataObject; import com.celnet.datadump.param.SalesforceParam; import com.celnet.datadump.service.CommonService; import com.celnet.datadump.service.DataImportBatchService; import com.celnet.datadump.service.DataImportNewService; +import com.celnet.datadump.service.DataObjectService; +import com.celnet.datadump.util.DataUtil; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; +import com.xxl.job.core.log.XxlJobLogger; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -31,6 +35,9 @@ public class DataDumpNewJob { @Autowired private DataImportBatchService dataImportBatchService; + @Autowired + private DataObjectService dataObjectService; + /** * bulk批量大数据生成newSFID @@ -213,7 +220,7 @@ public class DataDumpNewJob { /** - * 文件下载 + * 文件下载(新) * @param paramStr * @author kris * @return @@ -236,6 +243,44 @@ public class DataDumpNewJob { return dataImportNewService.dumpFileNew(param); } + /** + * 文件上传(新) + * @param paramStr + * @author kris + * @return + * @throws Exception + */ + @XxlJob("uploadFileNewJob") + public ReturnT uploadFileNewJob(String paramStr) throws Exception { + log.info("uploadFileNewJob execute start .................."); + SalesforceParam param = new SalesforceParam(); + try { + if (StringUtils.isNotBlank(paramStr)) { + param = JSON.parseObject(paramStr, SalesforceParam.class); + } + } catch (Throwable throwable) { + return new ReturnT<>(500, "参数解析失败!"); + } + // 参数转换 + param.setBeginCreateDate(param.getBeginDate()); + param.setEndCreateDate(param.getEndDate()); + + for (String api : DataUtil.toIdList(param.getApi())) { + DataObject dataObject = dataObjectService.getById(api); + String blobField = dataObject.getBlobField(); + if (StringUtils.isBlank(blobField)) { + log.error("api:{} does not have blob field", api); + XxlJobLogger.log("api:{} does not have blob field", api); + } + if ("Attachment".equals(api)){ + dataImportNewService.uploadFileToAttachmentNew(param,dataObject); + }else { + dataImportNewService.uploadFileNew(param,dataObject); + } + } + return ReturnT.SUCCESS; + } + /** * 增量任务(新) * diff --git a/src/main/java/com/celnet/datadump/job/DumpFileJob.java b/src/main/java/com/celnet/datadump/job/DumpFileJob.java index bb86d01..df4c5d8 100644 --- a/src/main/java/com/celnet/datadump/job/DumpFileJob.java +++ b/src/main/java/com/celnet/datadump/job/DumpFileJob.java @@ -122,8 +122,6 @@ public class DumpFileJob { } if ("Attachment".equals(api)){ fileService.uploadFileToAttachment(api, blobField, param.getSingleThread()); - }else if("Document".equals(api)){ - fileService.uploadFileToDocument(api, blobField, param.getSingleThread()); }else { fileService.uploadFile(api, blobField, param.getSingleThread()); } diff --git a/src/main/java/com/celnet/datadump/service/DataImportNewService.java b/src/main/java/com/celnet/datadump/service/DataImportNewService.java index 42a4a98..45c3e06 100644 --- a/src/main/java/com/celnet/datadump/service/DataImportNewService.java +++ b/src/main/java/com/celnet/datadump/service/DataImportNewService.java @@ -1,5 +1,6 @@ package com.celnet.datadump.service; +import com.celnet.datadump.entity.DataObject; import com.celnet.datadump.param.SalesforceParam; import com.xxl.job.core.biz.model.ReturnT; @@ -36,4 +37,9 @@ public interface DataImportNewService { ReturnT dumpFileNew(SalesforceParam param) throws Exception; + void uploadFileToAttachmentNew(SalesforceParam param, DataObject dataObject) throws Exception; + + void uploadFileNew(SalesforceParam param, DataObject dataObject) throws Exception; + + } diff --git a/src/main/java/com/celnet/datadump/service/FileService.java b/src/main/java/com/celnet/datadump/service/FileService.java index 431320e..12ccfb4 100644 --- a/src/main/java/com/celnet/datadump/service/FileService.java +++ b/src/main/java/com/celnet/datadump/service/FileService.java @@ -36,6 +36,4 @@ public interface FileService { void uploadFileToAttachment(String api, String field, Boolean singleThread); - void uploadFileToDocument(String api, String field, Boolean singleThread); - } diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java index ee9f825..35cf59d 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportBatchServiceImpl.java @@ -370,7 +370,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { if (reference == null){ continue; } - log.info("----------" + dataField.getField() + "的引用类型 ------------" + reference); +// log.info("----------" + dataField.getField() + "的引用类型 ------------" + reference); List> referenceMap = customMapper.list("new_id", reference, "new_id is not null limit 1"); if (referenceMap.isEmpty()){ QueryWrapper maxIndex = new QueryWrapper<>(); @@ -755,7 +755,7 @@ public class DataImportBatchServiceImpl implements DataImportBatchService { if (reference_to == null){ continue; } - log.info("----------" + dataField.getField() + "的引用类型 ------------" + reference_to); +// log.info("----------" + dataField.getField() + "的引用类型 ------------" + reference_to); //判断reference_to内是否包含User字符串 if (reference_to.contains(",User") || reference_to.contains("User,")) { reference_to = "User"; diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java index d49d7d5..12881bb 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java @@ -784,7 +784,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { if (reference_to == null){ continue; } - log.info("----------" + dataField.getField() + "的引用类型 ------------" + reference_to); +// log.info("----------" + dataField.getField() + "的引用类型 ------------" + reference_to); //判断reference_to内是否包含User字符串 if (reference_to.contains(",User") || reference_to.contains("User,")) { reference_to = "User"; @@ -1116,6 +1116,20 @@ public class DataImportNewServiceImpl implements DataImportNewService { return ReturnT.SUCCESS; } + @Override + public void uploadFileToAttachmentNew(SalesforceParam param, DataObject dataObject) throws Exception { + + + } + + + @Override + public void uploadFileNew(SalesforceParam param, DataObject dataObject) throws Exception { + + + } + + /** * 下载文件 */ @@ -1133,7 +1147,10 @@ public class DataImportNewServiceImpl implements DataImportNewService { String token = Connection.getSessionHeader().getSessionId(); - String name = getName(api); + String name = "Name"; + if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) { + name = "Title,FileExtension"; + } log.info("api:{},field:{},开始时间:{},结束时间:{}", api, field,beginDateStr,endDateStr); Map headers = Maps.newHashMap(); @@ -1160,8 +1177,13 @@ public class DataImportNewServiceImpl implements DataImportNewService { // 上传完毕 更新附件信息 List> maps = Lists.newArrayList(); try { + String fileName; id = (String) map.get("Id"); - String fileName = (String) map.get(name); + if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) { + fileName = map.get("Title") + "." + map.get("FileExtension"); + }else { + fileName = (String) map.get("Name"); + } log.info("------------文件名:" + id + "_" + fileName); // 判断路径是否为空 if (StringUtils.isNotBlank(fileName)) { @@ -1200,7 +1222,7 @@ public class DataImportNewServiceImpl implements DataImportNewService { } Map paramMap = Maps.newHashMap(); paramMap.put("key", "is_dump"); - paramMap.put("value", true); + paramMap.put("value", 1); maps.add(paramMap); customMapper.updateById(api, maps, id); TimeUnit.MILLISECONDS.sleep(1); diff --git a/src/main/java/com/celnet/datadump/service/impl/FileServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/FileServiceImpl.java index 3f88ad0..e591e2c 100644 --- a/src/main/java/com/celnet/datadump/service/impl/FileServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/FileServiceImpl.java @@ -93,7 +93,7 @@ public class FileServiceImpl implements FileService { extraSqlTmp += "AND Id > '" + maxId + "'"; } // 获取未存储的附件id - List> list = customMapper.list("Id, url", api, extraSqlTmp + " AND is_dump = true order by id asc limit 10"); + List> list = customMapper.list("Id, url", api, extraSqlTmp + " AND is_dump = 1 order by id asc limit 10"); if (CollectionUtils.isEmpty(list)) { break; } @@ -162,19 +162,21 @@ public class FileServiceImpl implements FileService { } } try { - String name = getName(api); + String name = "Name"; + if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) { + name = "Title,FileExtension"; + } Map headers = Maps.newHashMap(); headers.put("Authorization", "Bearer " + token); headers.put("connection", "keep-alive"); long num = 0; while (true) { // 获取未存储的附件id - List> list = customMapper.list("Id, " + name, api, extraSql + " AND is_dump = false limit 1000"); + List> list = customMapper.list("Id, " + name, api, extraSql + " AND is_dump = 0 limit 1000"); if (CollectionUtils.isEmpty(list)) { log.info("无需要下载文件数据!!!"); break; } - String finalName = name; for (Map map : list) { String finalDownloadUrl = downloadUrl; Future future = salesforceExecutor.execute(() -> { @@ -185,8 +187,13 @@ public class FileServiceImpl implements FileService { int failCount = 0; while (true) { try { + String fileName; id = (String) map.get("Id"); - String fileName = (String) map.get(finalName); + if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) { + fileName = map.get("Title") + "." + map.get("FileExtension"); + }else { + fileName = (String) map.get("Name"); + } log.info("------------文件名:"+id + "_" + fileName); // 判断路径是否为空 if (StringUtils.isNotBlank(fileName)) { @@ -430,7 +437,7 @@ public class FileServiceImpl implements FileService { String name = "Name"; // contentVersion使用PathOnClient if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) { - name = "PathOnClient"; + name = "Title,FileExtension"; } return name; } @@ -776,144 +783,4 @@ public class FileServiceImpl implements FileService { } } - @Override - public void uploadFileToDocument(String api, String field, Boolean singleThread) { - String uploadUrl = null; - List> poll = customMapper.list("code,value","org_config",null); - for (Map map1 : poll) { - if ("FILE_UPLOAD_URL".equals(map1.get("code"))) { - uploadUrl = (String) map1.get("value"); - } - } - if (StringUtils.isBlank(uploadUrl)) { - EmailUtil.send("UploadFile ERROR", "文件上传失败!上传地址未配置"); - return; - } - log.info("upload file api:{}, field:{}", api, field); - PartnerConnection connect = salesforceTargetConnect.createConnect(); - String token = connect.getSessionHeader().getSessionId(); - - List> futures = Lists.newArrayList(); - String extraSql = ""; - if (dataFieldService.hasDeleted(api)) { - extraSql += "AND IsDeleted = false "; - } - if (Const.FILE_TYPE == FileType.SERVER) { - // 检测路径是否存在 - File excel = new File(Const.SERVER_FILE_PATH + "/" + api); - if (!excel.exists()) { - throw new RuntimeException("找不到文件路径"); - } - } - try { - // 获取未存储的附件id - List> list = customMapper.list("Id, Name, localUrl, Description", api, "is_upload = 0"); - for (Map map : list) { - String finalUploadUrl = uploadUrl; - String id = null; - // 上传完毕 更新附件信息 - List> maps = Lists.newArrayList(); - boolean isUpload = true; - int failCount = 0; - CloseableHttpResponse response = null; - String respContent = null; - while (true) { - try { - id = (String) map.get("Id"); - String url_fileName = (String) map.get("localUrl"); - - // 判断路径是否为空 - if (StringUtils.isNotBlank(url_fileName)) { - String filePath = Const.SERVER_FILE_PATH + "/" + url_fileName; - File file = new File(filePath); - boolean exists = file.exists(); - if (!exists) { - log.info("文件不存在"); - break; - } - String fileName = file.getName(); - - // 拼接url - String url = finalUploadUrl + String.format(Const.SF_UPLOAD_FILE_URL, api); - HttpPost httpPost = new HttpPost(url); - httpPost.setHeader("Authorization", "Bearer " + token); - httpPost.setHeader("connection", "keep-alive"); - - JSONObject credentialsJsonParam = new JSONObject(); - credentialsJsonParam.put("Name", fileName); - - MultipartEntityBuilder builder = MultipartEntityBuilder.create(); - builder.addTextBody("data", credentialsJsonParam.toJSONString(), ContentType.APPLICATION_JSON); - builder.addBinaryBody("Body", new FileInputStream(file), ContentType.APPLICATION_OCTET_STREAM, fileName); - HttpEntity entity = builder.build(); - httpPost.setEntity(entity); - - CloseableHttpClient httpClient = HttpClients.createDefault(); - response = httpClient.execute(httpPost); - if (response != null && response.getStatusLine() != null && response.getStatusLine().getStatusCode() < 400) { - HttpEntity he = response.getEntity(); - if (he != null) { - respContent = EntityUtils.toString(he, "UTF-8"); - String newId = JSONObject.parseObject(respContent).get("id").toString(); - Map paramMap = Maps.newHashMap(); - paramMap.put("key", "new_id"); - paramMap.put("value", newId); - maps.add(paramMap); - } - } else { - throw new RuntimeException("Document文件上传失败!地址或参数异常!"); - } - } - Map paramMap = Maps.newHashMap(); - paramMap.put("key", "is_upload"); - paramMap.put("value", isUpload); - maps.add(paramMap); - customMapper.updateById(api, maps, id); - TimeUnit.MILLISECONDS.sleep(1); - break; - } catch (Exception throwable) { - log.error("upload file error, id: {}", id, throwable); - failCount++; - if (Const.MAX_FAIL_COUNT < failCount) { - { - Map paramMap = Maps.newHashMap(); - paramMap.put("key", "is_upload"); - paramMap.put("value", 2); - maps.add(paramMap); - } - customMapper.updateById(api, maps, id); - break; - } - if (response != null) { - try { - response.close(); - } catch (IOException e) { - log.error("exception message", e); - } - } - try { - TimeUnit.SECONDS.sleep(30); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - } - - salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); - - log.info("upload file success api:{}, field:{}", api, field); - } catch (Throwable throwable) { - log.error("upload file error", throwable); - salesforceExecutor.remove(futures.toArray(new Future[]{})); - } finally { - // 把is_upload为2的重置为0 - List> maps = Lists.newArrayList(); - Map paramMap = Maps.newHashMap(); - paramMap.put("key", "is_upload"); - paramMap.put("value", 0); - maps.add(paramMap); - customMapper.update(maps, api, "is_upload = 2"); - } - } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 4ef8522..01834f6 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -3,13 +3,13 @@ spring: type: com.zaxxer.hikari.HikariDataSource driver-class-name: com.mysql.cj.jdbc.Driver # 携科 - url: jdbc:mysql://127.0.0.1:3306/xieke?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai + # url: jdbc:mysql://127.0.0.1:3306/xieke?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai + # username: root + # password: root + # cook + url: jdbc:mysql://127.0.0.1:3306/longten?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: Celnet2025.QY - # cook -# url: jdbc:mysql://127.0.0.1:3306/cook_1?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai -# username: root -# password: celnet@2025.bln mail: host: smtp.163.com diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index de5eac2..72d22a3 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -3,13 +3,13 @@ spring: type: com.zaxxer.hikari.HikariDataSource driver-class-name: com.mysql.cj.jdbc.Driver # 携科 - url: jdbc:mysql://127.0.0.1:3306/xieke?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai - username: root - password: root - # cook -# url: jdbc:mysql://127.0.0.1:3306/cook_1?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai +# url: jdbc:mysql://127.0.0.1:3306/xieke?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai # username: root -# password: celnet@2025.bln +# password: root + # cook + url: jdbc:mysql://127.0.0.1:3306/longten?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai + username: root + password: Celnet2025.QY mail: host: smtp.mxhichina.com port: 465 diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml index 2e0aeae..84002a0 100644 --- a/src/main/resources/application-test.yml +++ b/src/main/resources/application-test.yml @@ -3,13 +3,13 @@ spring: type: com.zaxxer.hikari.HikariDataSource driver-class-name: com.mysql.cj.jdbc.Driver # 携科 - url: jdbc:mysql://127.0.0.1:3306/xieke?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai + # url: jdbc:mysql://127.0.0.1:3306/xieke?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai + # username: root + # password: root + # cook + url: jdbc:mysql://127.0.0.1:3306/longten?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: Celnet2025.QY - # cook -# url: jdbc:mysql://127.0.0.1:3306/cook_1?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai -# username: root -# password: celnet@2025.bln mail: host: smtp.163.com