diff --git a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java index 1ac57bb..ff38dd9 100644 --- a/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java +++ b/src/main/java/com/celnet/datadump/job/DataDumpNewJob.java @@ -272,11 +272,7 @@ public class DataDumpNewJob { log.error("api:{} does not have blob field", api); XxlJobLogger.log("api:{} does not have blob field", api); } - if ("Attachment".equals(api)){ - dataImportNewService.uploadFileToAttachmentNew(param,dataObject); - }else { - dataImportNewService.uploadFileNew(param,dataObject); - } + return dataImportNewService.uploadFileNew(param,dataObject); } return ReturnT.SUCCESS; } diff --git a/src/main/java/com/celnet/datadump/service/DataImportNewService.java b/src/main/java/com/celnet/datadump/service/DataImportNewService.java index 45c3e06..bbf75fd 100644 --- a/src/main/java/com/celnet/datadump/service/DataImportNewService.java +++ b/src/main/java/com/celnet/datadump/service/DataImportNewService.java @@ -37,9 +37,7 @@ public interface DataImportNewService { ReturnT dumpFileNew(SalesforceParam param) throws Exception; - void uploadFileToAttachmentNew(SalesforceParam param, DataObject dataObject) throws Exception; - - void uploadFileNew(SalesforceParam param, DataObject dataObject) throws Exception; + ReturnT uploadFileNew(SalesforceParam param, DataObject dataObject) throws Exception; } diff --git a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java index 12881bb..d5c6317 100644 --- a/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java +++ b/src/main/java/com/celnet/datadump/service/impl/DataImportNewServiceImpl.java @@ -33,13 +33,18 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.mime.MultipartEntityBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.RandomAccessFile; +import java.io.*; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; @@ -1116,19 +1121,6 @@ public class DataImportNewServiceImpl implements DataImportNewService { return ReturnT.SUCCESS; } - @Override - public void uploadFileToAttachmentNew(SalesforceParam param, DataObject dataObject) throws Exception { - - - } - - - @Override - public void uploadFileNew(SalesforceParam param, DataObject dataObject) throws Exception { - - - } - /** * 下载文件 @@ -1200,10 +1192,8 @@ public class DataImportNewServiceImpl implements DataImportNewService { case OSS: // 上传到oss OssUtil.upload(inputStream, filePath); - break; case SERVER: dumpToServer(headers, id, filePath, url, response, inputStream); - break; default: log.error("id: {}, no mapping dump type", id); } @@ -1239,24 +1229,8 @@ public class DataImportNewServiceImpl implements DataImportNewService { } } } - } - /** - * 获取附件名称 - * - * @param api 表明 - * @return name - */ - private static String getName(String api) { - // 默认name - String name = "Name"; - // contentVersion使用PathOnClient - if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) { - name = "PathOnClient"; - } - return name; - } /** * 下载文件到服务器 @@ -1303,4 +1277,375 @@ public class DataImportNewServiceImpl implements DataImportNewService { accessFile.close(); } + + @Override + public ReturnT uploadFileNew(SalesforceParam param, DataObject dataObject) throws Exception { + String uploadUrl = null; + List> poll = customMapper.list("code,value","org_config",null); + for (Map map1 : poll) { + if ("FILE_UPLOAD_URL".equals(map1.get("code"))) { + uploadUrl = (String) map1.get("value"); + } + } + if (StringUtils.isBlank(uploadUrl)) { + EmailUtil.send("UploadFile ERROR", "文件上传失败!上传地址未配置"); + return ReturnT.FAIL; + } + + if (StringUtils.isEmpty(dataObject.getBlobField())){ + log.info("没有对象存在文件二进制字段!不进行文件上传"); + } + + PartnerConnection connect = salesforceConnect.createConnect(); + + List> futures = Lists.newArrayList(); + + DataObject update = new DataObject(); + + log.info("dump file api:{}, field:{}", dataObject.getName(), dataObject.getBlobField()); + + try { + String api = dataObject.getName(); + update.setName(dataObject.getName()); + List salesforceParams = null; + QueryWrapper dbQw = new QueryWrapper<>(); + dbQw.eq("name", api); + List list = dataBatchService.list(dbQw); + AtomicInteger batch = new AtomicInteger(1); + if (CollectionUtils.isNotEmpty(list)) { + salesforceParams = list.stream().map(t -> { + SalesforceParam salesforceParam = param.clone(); + salesforceParam.setApi(t.getName()); + salesforceParam.setBeginCreateDate(t.getSyncStartDate()); + salesforceParam.setEndCreateDate(t.getSyncEndDate()); + salesforceParam.setBatch(batch.getAndIncrement()); + return salesforceParam; + }).collect(Collectors.toList()); + } + + + // 手动任务优先执行 + for (SalesforceParam salesforceParam : salesforceParams) { + String finalDownloadUrl = uploadUrl; + Future future = salesforceExecutor.execute(() -> { + try { + if ("Attachment".equals(dataObject.getName())){ + uploadAttachment(salesforceParam, connect, finalDownloadUrl,dataObject.getName()); + }else { + uploadFile(salesforceParam, connect, finalDownloadUrl,dataObject.getName()); + } + } catch (Throwable throwable) { + log.error("salesforceExecutor error", throwable); + throw new RuntimeException(throwable); + } + }, salesforceParam.getBatch(), 0); + futures.add(future); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + update.setDataWork(0); + } catch (InterruptedException e) { + salesforceExecutor.remove(futures.toArray(new Future[]{})); + throw e; + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + update.setDataLock(0); + dataObjectService.updateById(update); + } + // 等待当前所有线程执行完成 + salesforceExecutor.waitForFutures(futures.toArray(new Future[]{})); + futures.clear(); + return ReturnT.SUCCESS; + } + + /** + * 上传Attachment + */ + private void uploadAttachment(SalesforceParam param, PartnerConnection connection ,String uploadUrl, String api) throws Exception { + + String extraSql = ""; + if (dataFieldService.hasDeleted(api)) { + extraSql += " AND IsDeleted = false "; + } + + Date beginDate = param.getBeginCreateDate(); + Date endDate = param.getEndCreateDate(); + String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); + String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); + + String token = connection.getSessionHeader().getSessionId(); + + String name = "Name"; + if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) { + name = "Title,FileExtension"; + } + + Map headers = Maps.newHashMap(); + headers.put("Authorization", "Bearer " + token); + headers.put("connection", "keep-alive"); + + Integer count = customMapper.countBySQL(api, "where is_dump = 1 and is_upload = 0 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql ); + + log.error("总文件数 count:{};-开始时间:{};-结束时间:{};-api:{};", count, beginDateStr, endDateStr, api); + + if (count == 0) { + return; + } + + int page = count%200 == 0 ? count/200 : (count/200) + 1; + //总插入数 + for (int i = 0; i < page; i++) { + + // 获取未存储的附件id + List> list = customMapper.list("Id, " + name, api, " is_dump = 1 and is_upload = 0 and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql + "limit " + i * 200 + ",200"); + + for (Map map : list) { + String id = null; + // 上传完毕 更新附件信息 + List> maps = Lists.newArrayList(); + boolean isUpload = true; + CloseableHttpResponse response = null; + String respContent = null; + try { + id = (String) map.get("Id"); + String url_fileName = (String) map.get("url"); + String parentId = (String) map.get("ParentId"); + String parentType = (String) map.get("Parent_type"); + // 判断路径是否为空 + if (StringUtils.isNotBlank(url_fileName)) { + String filePath = Const.SERVER_FILE_PATH + "/" + url_fileName; + File file = new File(filePath); + boolean exists = file.exists(); + if (!exists) { + log.info("文件不存在"); + } + String fileName = (String) map.get("Name"); + + // dataObject查询 + QueryWrapper qw = new QueryWrapper<>(); + qw.eq("name", parentType); + List objects = dataObjectService.list(qw); + if (objects.isEmpty()) { + log.info("关联对象不存在: {}", parentType); + } + + Map lMap = customMapper.getById("new_id",parentType, parentId); + + // 拼接url + String url = uploadUrl + String.format(Const.SF_UPLOAD_FILE_URL, api); + HttpPost httpPost = new HttpPost(url); + httpPost.setHeader("Authorization", "Bearer " + token); + httpPost.setHeader("connection", "keep-alive"); + + com.alibaba.fastjson.JSONObject credentialsJsonParam = new com.alibaba.fastjson.JSONObject(); + credentialsJsonParam.put("parentId", lMap.get("new_id")); + credentialsJsonParam.put("Name", fileName); + + MultipartEntityBuilder builder = MultipartEntityBuilder.create(); + builder.addTextBody("data", credentialsJsonParam.toJSONString(), ContentType.APPLICATION_JSON); + builder.addBinaryBody("Body", new FileInputStream(file), ContentType.APPLICATION_OCTET_STREAM, fileName); + HttpEntity entity = builder.build(); + httpPost.setEntity(entity); + + CloseableHttpClient httpClient = HttpClients.createDefault(); + response = httpClient.execute(httpPost); + if (response != null && response.getStatusLine() != null && response.getStatusLine().getStatusCode() < 400) { + HttpEntity he = response.getEntity(); + if (he != null) { + respContent = EntityUtils.toString(he, "UTF-8"); + String newId = com.alibaba.fastjson.JSONObject.parseObject(respContent).get("id").toString(); + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "new_id"); + paramMap.put("value", newId); + maps.add(paramMap); + } + } else { + throw new RuntimeException(); + } + } + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "is_upload"); + paramMap.put("value", isUpload); + maps.add(paramMap); + customMapper.updateById(api, maps, id); + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException interruptedException){ + return; + } catch (Exception e) { + if (response != null) { + try { + response.close(); + } catch (IOException re) { + log.error("exception message", re); + } + } + log.error("文件上传失败!, id: {}, 错误信息:{}", id ,e.getMessage()); + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "is_upload"); + paramMap.put("value", 2); + maps.add(paramMap); + customMapper.updateById(api, maps, id); + EmailUtil.send("File Upload ERROR", "文件上传失败!, id: "+ id + ",错误信息:" + e.getMessage()); + } + } + } + } + + /** + * 上传文件 + */ + private void uploadFile(SalesforceParam param, PartnerConnection connection ,String uploadUrl, String api) throws Exception { + + String extraSql = ""; + if (dataFieldService.hasDeleted(api)) { + extraSql += " AND IsDeleted = false "; + } + + Date beginDate = param.getBeginCreateDate(); + Date endDate = param.getEndCreateDate(); + String beginDateStr = DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"); + String endDateStr = DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"); + + String token = connection.getSessionHeader().getSessionId(); + + String name = "Name"; + if (Const.CONTENT_VERSION.equalsIgnoreCase(api)) { + name = "Title,FileExtension"; + } + + Map headers = Maps.newHashMap(); + headers.put("Authorization", "Bearer " + token); + headers.put("connection", "keep-alive"); + + Integer count = customMapper.countBySQL("ContentDocument", "where new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql ); + + log.error("总文件数 count:{};-开始时间:{};-结束时间:{};-api:{};", count, beginDateStr, endDateStr, api); + + if (count == 0) { + return; + } + + int page = count%200 == 0 ? count/200 : (count/200) + 1; + //总插入数 + for (int i = 0; i < page; i++) { + + // 获取未存储的附件id + List> documentList = customMapper.list("Id, " + name, "ContentDocument", " new_id is null and CreatedDate >= '" + beginDateStr + "' and CreatedDate < '" + endDateStr +"'"+ extraSql + "limit " + i * 200 + ",200"); + + for (Map documentMap : documentList) { + + String documentId = (String) documentMap.get("Id"); + // 获取未存储的附件id + List> list = customMapper.list("Id, url, PathOnClient, Title, ContentDocumentId, VersionNumber", api, "ContentDocumentId = '" + documentId + "' and new_id is null ORDER BY VersionNumber ASC"); + if (CollectionUtils.isEmpty(list)) { + continue; + } + String finalUploadUrl = uploadUrl; + String newDocumentId = null; + for (Map map : list) { + String id = null; + // 上传完毕 更新附件信息 + List> maps = Lists.newArrayList(); + boolean isUpload = true; + int failCount = 0; + CloseableHttpResponse response = null; + String respContent = null; + try { + id = (String) map.get("Id"); + String url_fileName = (String) map.get("url"); + String fileName = (String) map.get("PathOnClient"); + String title = (String) map.get("Title"); + String oldDocumentId = (String) map.get("ContentDocumentId"); + Integer versionNumber = Integer.valueOf(map.get("VersionNumber").toString()); + // 判断路径是否为空 + if (StringUtils.isNotBlank(url_fileName)) { + String filePath = Const.SERVER_FILE_PATH + "/" + url_fileName; + File file = new File(filePath); + boolean exists = file.exists(); + if (!exists) { + log.info("文件不存在"); + break; + } + // 拼接url + String url = finalUploadUrl + String.format(Const.SF_UPLOAD_FILE_URL, api); + HttpPost httpPost = new HttpPost(url); + httpPost.setHeader("Authorization", "Bearer " + token); + httpPost.setHeader("connection", "keep-alive"); + + com.alibaba.fastjson.JSONObject credentialsJsonParam = new com.alibaba.fastjson.JSONObject(); + credentialsJsonParam.put("title", title); + credentialsJsonParam.put("pathOnClient", fileName); + if (newDocumentId != null) { + credentialsJsonParam.put("ContentDocumentId", newDocumentId); + } + MultipartEntityBuilder builder = MultipartEntityBuilder.create(); + builder.addTextBody("entity_content", credentialsJsonParam.toJSONString(), ContentType.APPLICATION_JSON); + builder.addBinaryBody("VersionData", new FileInputStream(file), ContentType.APPLICATION_OCTET_STREAM, fileName); + HttpEntity entity = builder.build(); + httpPost.setEntity(entity); + + CloseableHttpClient httpClient = HttpClients.createDefault(); + response = httpClient.execute(httpPost); + if (response != null && response.getStatusLine() != null && response.getStatusLine().getStatusCode() < 400) { + HttpEntity he = response.getEntity(); + if (he != null) { + respContent = EntityUtils.toString(he, "UTF-8"); + String newId = String.valueOf(com.alibaba.fastjson.JSONObject.parseObject(respContent).get("id")); + if (StringUtils.isBlank(newId)) { + log.error("文件上传错误,返回实体信息:" + respContent); + break; + } + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "new_id"); + paramMap.put("value", newId); + maps.add(paramMap); + + if (versionNumber == 1) { + // 更新documentId + String dId = commonService.getDocumentId(connection, newId); + List> dList = new ArrayList<>(); + Map dMap = Maps.newHashMap(); + dMap.put("key", "new_id"); + dMap.put("value", dId); + dList.add(dMap); + customMapper.updateById("ContentDocument", dList, oldDocumentId); + + newDocumentId = dId; + } + }else { + break; + } + } else { + throw new RuntimeException(); + } + } + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "is_upload"); + paramMap.put("value", isUpload); + maps.add(paramMap); + customMapper.updateById(api, maps, id); + TimeUnit.MILLISECONDS.sleep(1); + break; + } catch (Exception e) { + if (response != null) { + try { + response.close(); + } catch (IOException re) { + log.error("exception message", re); + } + } + log.error("文件上传失败!, id: {}, 错误信息:{}", id ,e.getMessage()); + Map paramMap = Maps.newHashMap(); + paramMap.put("key", "is_upload"); + paramMap.put("value", 2); + maps.add(paramMap); + customMapper.updateById(api, maps, id); + EmailUtil.send("File Upload ERROR", "文件上传失败!, id: "+ id + ",错误信息:" + e.getMessage()); + } + } + } + } + } + }