package com.celnet.datadump.controller; import com.alibaba.fastjson.JSON; import com.celnet.datadump.annotation.LogServiceAnnotation; import com.celnet.datadump.entity.DataObject; import com.celnet.datadump.global.Result; import com.celnet.datadump.param.*; import com.celnet.datadump.service.*; import com.celnet.datadump.util.DataUtil; import com.celnet.datadump.constant.OperateTypeConstant; import com.xxl.job.core.biz.model.ReturnT; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author Red * @description * @date 2023/03/09 */ @RestController @Api(value = "job", tags = "执行任务") @RequestMapping("/job") @Slf4j public class JobController { @Autowired private DataVerifyService dataVerifyService; @Autowired private FileService fileService; @Autowired private DataObjectService dataObjectService; @Autowired private CommonService commonService; @Autowired private DataCheckDeletedService dataCheckDeletedService; @Autowired private DataImportService dataImportService; @Autowired private DataImportBatchService dataImportBatchService; @Autowired private DataImportNewService dataImportNewService; @PostMapping("/fileTransform") @ApiOperation("附件解析") public Result transform(@RequestBody FileTransformParam param ) { ReturnT returnT = fileService.transform(param); if (returnT.getCode() == ReturnT.SUCCESS_CODE) { return Result.success(); } else { return Result.fail(returnT.getMsg()); } } @PostMapping("/dumpFile") @ApiOperation("附件迁移") public Result dumpFile(@RequestBody DumpFileParam param ) { if (StringUtils.isBlank(param.getApi())) { return Result.fail("api参数缺失!"); } for (String api : DataUtil.toIdList(param.getApi())) { DataObject dataObject = dataObjectService.getById(api); String blobField = dataObject.getBlobField(); if (StringUtils.isBlank(blobField)) { log.error("api:{} does not have blob field", api); } fileService.dumpFile(api, blobField, param.getSingleThread()); } return Result.success(); } @PostMapping("/dumpFileVerify") @ApiOperation("附件校验") public Result dumpFileVerify(@RequestBody DumpFileParam param ) { if (StringUtils.isBlank(param.getApi())) { return Result.fail("api参数缺失!"); } for (String api : DataUtil.toIdList(param.getApi())) { DataObject dataObject = dataObjectService.getById(api); String blobField = dataObject.getBlobField(); if (StringUtils.isBlank(blobField)) { log.error("api:{} does not have blob field", api); } fileService.verifyFile(api, blobField); } return Result.success(); } /** * 数据校验 * * @param param 参数 * @return result */ @PostMapping("/dataVerify") @ApiOperation("数据校验") public Result dataVerifyJob(@RequestBody DataVerifyParam param) throws Exception { if (param.getType() == null) { return Result.fail("参数缺失!"); } ReturnT returnT = dataVerifyService.verify(param); if (returnT.getCode() == ReturnT.SUCCESS_CODE) { return Result.success(); } else { return Result.fail(returnT.getMsg()); } } /** * 创建表结构 * * @param param 参数 * @return result */ @PostMapping("/createApi") @ApiOperation("创建表结构") public Result createApi(@RequestBody SalesforceParam param) throws Exception { if (param.getType() == null) { return Result.fail("参数缺失!"); } ReturnT returnT = commonService.createApi(param); if (returnT.getCode() == ReturnT.SUCCESS_CODE) { return Result.success(); } else { return Result.fail(returnT.getMsg()); } } /** * 存量任务 * * @param param 参数 * @return result */ @PostMapping("/dataDumpManual") @ApiOperation("存量任务") public Result dataDumpManual(@RequestBody SalesforceParam param) throws Exception { if (param.getType() == null) { return Result.fail("参数缺失!"); } param.setType(1); // 参数转换 param.setBeginCreateDate(param.getBeginDate()); param.setEndCreateDate(param.getEndDate()); ReturnT returnT = commonService.dump(param); if (returnT.getCode() == ReturnT.SUCCESS_CODE) { return Result.success(); } else { return Result.fail(returnT.getMsg()); } } /** * 增量任务 * * @param param 参数 * @return result */ @PostMapping("/dataDumpIncrement") @ApiOperation("增量任务") public Result dataDumpIncrement(@RequestBody SalesforceParam param) throws Exception { if (param.getType() == null) { return Result.fail("参数缺失!"); } param.setType(2); ReturnT returnT = commonService.increment(param); if (returnT.getCode() == ReturnT.SUCCESS_CODE) { return Result.success(); } else { return Result.fail(returnT.getMsg()); } } /** * 检测删除数据 * * @param param 参数 * @return result */ @PostMapping("/dataCheckDeleted") @ApiOperation("检测删除数据") public Result dataCheckDeleted(@RequestBody DataCheckDeletedParam param) throws Throwable { ReturnT returnT = dataCheckDeletedService.checkDeletedData(param); if (returnT.getCode() == ReturnT.SUCCESS_CODE) { return Result.success(); } else { return Result.fail(returnT.getMsg()); } } /** * bulk批量大数据生成newSFID * @param paramStr * @author kris * @return * @throws Exception */ @PostMapping("/dataImportBatchJob") @ApiOperation("生成newSFID(大数据量)") @LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_INSERT, remark = "生成newSFID(大数据量)") public ReturnT dataImportBatchJob(String paramStr) throws Exception { log.info("dataImportBatchJob execute start .................."); SalesforceParam param = new SalesforceParam(); try { if (StringUtils.isNotBlank(paramStr)) { param = JSON.parseObject(paramStr, SalesforceParam.class); } } catch (Throwable throwable) { return new ReturnT<>(500, "参数解析失败!"); } param.setType(1); // 参数转换 param.setBeginCreateDate(param.getBeginDate()); param.setEndCreateDate(param.getEndDate()); return dataImportBatchService.immigrationBatch(param); } /** * bulk批量更新大数据量数据 * @param paramStr * @author kris * @return * @throws Exception */ @PostMapping("/dataUpdateBatchJob") @ApiOperation("更新数据(大数据量)") @LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_UPDATE, remark = "更新数据(大数据量)") public ReturnT dataUpdateBatchJob(String paramStr) throws Exception { log.info("dataImportBatchJob execute start .................."); SalesforceParam param = new SalesforceParam(); try { if (StringUtils.isNotBlank(paramStr)) { param = JSON.parseObject(paramStr, SalesforceParam.class); } } catch (Throwable throwable) { return new ReturnT<>(500, "参数解析失败!"); } // 参数转换 param.setBeginCreateDate(param.getBeginDate()); param.setEndCreateDate(param.getEndDate()); return dataImportBatchService.immigrationUpdateBatch(param); } /** * 返写个人联系人ID * @param paramStr * @author kris * @return * @throws Exception */ @PostMapping("/getPersonContactJob") @ApiOperation("返写个人联系人ID") @LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_UPDATE, remark = "返写个人联系人ID") public ReturnT getPersonContactJob(String paramStr) throws Exception { log.info("getPersonContactJob execute start .................."); SalesforceParam param = new SalesforceParam(); try { if (StringUtils.isNotBlank(paramStr)) { param = JSON.parseObject(paramStr, SalesforceParam.class); } } catch (Throwable throwable) { return new ReturnT<>(500, "参数解析失败!"); } // 参数转换 param.setBeginCreateDate(param.getBeginDate()); param.setEndCreateDate(param.getEndDate()); return dataImportNewService.getPersonContact(param); } /** * 数据更新同步(新) * @param paramStr * @author kris * @return * @throws Exception */ @PostMapping("/dataUpdateNewJob") @ApiOperation("数据更新同步(新)") @LogServiceAnnotation(operateType = OperateTypeConstant.TYPE_UPDATE, remark = "数据更新同步(新)") public ReturnT dataUpdateNewJob(String paramStr) throws Exception { log.info("getPersonContactJob execute start .................."); SalesforceParam param = new SalesforceParam(); try { if (StringUtils.isNotBlank(paramStr)) { param = JSON.parseObject(paramStr, SalesforceParam.class); } } catch (Throwable throwable) { return new ReturnT<>(500, "参数解析失败!"); } // 参数转换 param.setBeginCreateDate(param.getBeginDate()); param.setEndCreateDate(param.getEndDate()); return dataImportNewService.immigrationUpdateNew(param); } }