package com.template.services.impl; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.template.common.utils.CommonUtil; import com.template.common.utils.DBUtil; import com.template.common.utils.QuartzJobUtils; import com.template.mapper.SmartDataSourceMapper; import com.template.mapper.SmartDataTaskMapper; import com.template.model.pojo.SmartDataSourceJobParams; import com.template.model.pojo.SmartDataTask; import com.template.model.pojo.SmartDepartment; import com.template.model.result.PageUtils; import com.template.services.SmartDataTaskService; import org.quartz.CronExpression; import org.quartz.Scheduler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; /** *

* 数据源任务 服务实现类 *

* * @author ceshi * @since 2023-12-05 */ @Service public class SmartDataTaskServiceImpl extends ServiceImpl implements SmartDataTaskService { @Autowired private SmartDataTaskMapper smartDataTaskMapper; @Autowired private SmartDataSourceMapper smartDataSourceMapper; @Autowired private Scheduler scheduler; // 添加任务1-任务基本信息 @Override public Map insertSmartDataTask1(SmartDataTask smartDataTask) { // 检测参数,还有是否存在重复记录 // 任务属性 if (smartDataTask.getTkName() == null) { return CommonUtil.getReturnMap("1", "【任务名称】不能为空!"); } // 只能包含字母、数字、下划线和中文,且长度为5-32位 if (!CommonUtil.checkStrByRegx("^[\\w\\u4e00-\\u9fa5]{4,32}$", smartDataTask.getTkName())) { return CommonUtil.getReturnMap("1", "【任务名称】只能包含字母、数字、下划线和中文,且长度为4-32位!"); } QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq(smartDataTask.getTkName() != null, "tk_name", smartDataTask.getTkName()); SmartDataTask sdt = smartDataTaskMapper.selectOne(queryWrapper); if (sdt != null) { return CommonUtil.getReturnMap("1", "任务名有重名!"); } if (smartDataTask.getTkDtId() == null) { return CommonUtil.getReturnMap("1", "【部门id】不能为空!"); } int numOfDepartment = smartDataTaskMapper.isHaveDepartmentById(smartDataTask.getTkDtId()); if (numOfDepartment == 0) { return CommonUtil.getReturnMap("1", "【部门】不存在!"); } if (smartDataTask.getTkSyncPolicy() == null) { return CommonUtil.getReturnMap("1", "【同步策略】不能为空!"); } // 来源库设置 if (smartDataTask.getTkDsIdSource() == null) { return CommonUtil.getReturnMap("1", "【来源数据源id】不能为空!"); } SmartDataSourceJobParams dsSource = smartDataSourceMapper.getDataSourceInfo(smartDataTask.getTkDsIdSource()); if (dsSource == null) { return CommonUtil.getReturnMap("1", "选择的【来源数据源】不存在!"); } if (smartDataTask.getTkExchangeType() == null) { return CommonUtil.getReturnMap("1", "【交换方式】不能为空!"); } if (smartDataTask.getTkSql() == null) { return CommonUtil.getReturnMap("1", "【自定义SQL语句】不能为空!"); } // 目的库设置 if (smartDataTask.getTkDsIdDestination() == null) { return CommonUtil.getReturnMap("1", "【目标数据源id】不能为空!"); } SmartDataSourceJobParams dsDestination = smartDataSourceMapper.getDataSourceInfo(smartDataTask.getTkDsIdDestination()); if (dsDestination == null) { return CommonUtil.getReturnMap("1", "选择的【目标数据源】不存在!"); } if (smartDataTask.getTkDestTable() == null) { return CommonUtil.getReturnMap("1", "【目标数据表】不能为空!"); } // 高级设置 if (smartDataTask.getTkExchangeServer() == null) { return CommonUtil.getReturnMap("1", "【交换服务器】不能为空!"); } if (smartDataTask.getTkExchangeServer() == 1) { if (smartDataTask.getTkExchangeServerId() == null) { return CommonUtil.getReturnMap("1", "【指定服务器id】不能为空!"); } } if (smartDataTask.getTkOptCfgAutoManual() == null) { return CommonUtil.getReturnMap("1", "【运行参数配置】不能为空!"); } if (smartDataTask.getTkOptCfgAutoManual() == 1) { if (smartDataTask.getTkOptCfgRsNum() == null) { return CommonUtil.getReturnMap("1", "【运行参数配置:记录数】不能为空!"); } if (smartDataTask.getTkOptCfgRsNum() > 100) { return CommonUtil.getReturnMap("1", "【运行参数配置:记录数】不能大于100!"); } if (smartDataTask.getTkOptCfgThreadsNum() == null) { return CommonUtil.getReturnMap("1", "【运行参数配置:线程数】不能为空!"); } if (smartDataTask.getTkOptCfgThreadsNum() > 10) { return CommonUtil.getReturnMap("1", "【运行参数配置:线程数】不能大于10!"); } } if (smartDataTask.getTkRsIncorrectData() == null) { return CommonUtil.getReturnMap("1", "【是否记录错误数据】不能为空!"); } if (smartDataTask.getTkDsSourceCharset() == null) { return CommonUtil.getReturnMap("1", "【来源数据源字符集】不能为空!"); } if (!smartDataTask.getTkDsSourceCharset().equals("UTF8") && !smartDataTask.getTkDsSourceCharset().equals("GBK")) { return CommonUtil.getReturnMap("1", "【来源数据源字符集】只能为UTF8或GBK!"); } if (smartDataTask.getTkDsDestinationCharset() == null) { return CommonUtil.getReturnMap("1", "【目标数据源字符集】不能为空!"); } if (!smartDataTask.getTkDsDestinationCharset().equals("UTF8") && !smartDataTask.getTkDsDestinationCharset().equals("GBK")) { return CommonUtil.getReturnMap("1", "【目标数据源字符集】只能为UTF8或GBK!"); } queryWrapper.eq(smartDataTask.getTkDtId() != null, "tk_dt_id", smartDataTask.getTkDtId()); queryWrapper.eq(smartDataTask.getTkDsIdSource() != null, "tk_ds_id_source", smartDataTask.getTkDsIdSource()); queryWrapper.eq(smartDataTask.getTkSyncPolicy() != null, "tk_sync_policy", smartDataTask.getTkSyncPolicy()); queryWrapper.eq(smartDataTask.getTkExchangeType() != null, "tk_exchange_type", smartDataTask.getTkExchangeType()); // 交换方式:0自定义SQL语句,1数据视图,2数据表,如果是1或2,tkSql传视图或表格名称即可 if (smartDataTask.getTkExchangeType() == 0) { queryWrapper.eq(StringUtils.hasText(smartDataTask.getTkSql()), "tk_sql", smartDataTask.getTkSql()); } else { String sql = "SELECT * FROM " + smartDataTask.getTkSql().substring(0, smartDataTask.getTkSql().indexOf("[")); queryWrapper.eq(StringUtils.hasText(smartDataTask.getTkSql()), "tk_sql", sql); smartDataTask.setTkSql(sql); } queryWrapper.eq(smartDataTask.getTkDsIdDestination() != null, "tk_ds_id_destination", smartDataTask.getTkDsIdDestination()); queryWrapper.eq(smartDataTask.getTkDestTable() != null, "tk_dest_table", smartDataTask.getTkDestTable()); queryWrapper.eq(smartDataTask.getTkExchangeServer() != null, "tk_exchange_server", smartDataTask.getTkExchangeServer()); queryWrapper.eq(smartDataTask.getTkExchangeServer() != null && smartDataTask.getTkExchangeServerId() != null, "tk_exchange_server_id", smartDataTask.getTkExchangeServerId()); queryWrapper.eq(smartDataTask.getTkOptCfgAutoManual() != null, "tk_opt_cfg_auto_manual", smartDataTask.getTkOptCfgAutoManual()); queryWrapper.eq(smartDataTask.getTkOptCfgAutoManual() != null && smartDataTask.getTkOptCfgRsNum() != null, "tk_opt_cfg_rs_num", smartDataTask.getTkOptCfgRsNum()); queryWrapper.eq(smartDataTask.getTkOptCfgAutoManual() != null && smartDataTask.getTkOptCfgThreadsNum() != null, "tk_opt_cfg_threads_num", smartDataTask.getTkOptCfgThreadsNum()); queryWrapper.eq(smartDataTask.getTkRsIncorrectData() != null, "tk_rs_incorrect_data", smartDataTask.getTkRsIncorrectData()); queryWrapper.eq(smartDataTask.getTkDsSourceCharset() != null, "tk_ds_source_charset", smartDataTask.getTkDsSourceCharset()); queryWrapper.eq(StringUtils.hasText(smartDataTask.getTkDsDestinationCharset()), "tk_ds_destination_charset", smartDataTask.getTkDsDestinationCharset()); sdt = smartDataTaskMapper.selectOne(queryWrapper); if (sdt != null) { return CommonUtil.getReturnMap("1", "有重复记录!"); } // 插入记录,插入成功后获取记录的id返回 int result = smartDataTaskMapper.insert(smartDataTask); if (result > 0) { // 获取数据源对应的表、视图、Sql对应的结构 return CommonUtil.getReturnMap("0", this.getMetaData(smartDataTask, dsSource, dsDestination, "添加")); } else { return CommonUtil.getReturnMap("1", "【添加任务-任务基本信息】添加失败!"); } } private Map getMetaData(SmartDataTask smartDataTask, SmartDataSourceJobParams dsSource, SmartDataSourceJobParams dsDestination, String action) { // 源连接 DBUtil dsSourceDbUtil = new DBUtil(dsSource.getDsUrl(), dsSource.getDsUser(), dsSource.getDsPassword(), dsSource.getDsClsDriver(), "UTF8"); Map dsSourceMap = dsSourceDbUtil.getConnection(); if (dsSourceMap.get("code") == "1") { return CommonUtil.getReturnMap("1", "【来源数据源】连接失败!"); } Connection dsSourceConn = (Connection) dsSourceMap.get("msg"); // 解析字段名称、字段类型、字段大小等 Map metaDataBySql = dsSourceDbUtil.getMetaDataBySql(dsSourceConn, smartDataTask.getTkSql()); if (metaDataBySql.get("code") == "1") { return CommonUtil.getReturnMap("1", metaDataBySql.get("msg")); } List> listSource = (List>) metaDataBySql.get("msg"); // 目标连接 DBUtil dbDestinationUtil = new DBUtil(dsDestination.getDsUrl(), dsDestination.getDsUser(), dsDestination.getDsPassword(), dsDestination.getDsClsDriver(), "UTF8"); Map dsDestinationMap = dbDestinationUtil.getConnection(); if (dsDestinationMap.get("code") == "1") { return CommonUtil.getReturnMap("1", "【目标数据源】连接失败!"); } Connection dsDestinationConn = (Connection) dsDestinationMap.get("msg"); // 目标数据表 Map metaDataByTable = dbDestinationUtil.getMetaDataByTable(dsDestinationConn, smartDataTask.getTkDestTable().substring(0, smartDataTask.getTkDestTable().indexOf("["))); if (metaDataByTable.get("code") == "1") { return CommonUtil.getReturnMap("1", metaDataByTable.get("msg")); } List> listDestination = (List>) metaDataByTable.get("msg"); // 组合返回数据 Map returnMap = new HashMap<>(); returnMap.put("tkId", smartDataTask.getTkId()); returnMap.put("msg", "【" + action + "任务-任务基本信息】" + action + "成功!"); returnMap.put("dsSourceMetaData", listSource); returnMap.put("dsDestinationMetaData", listDestination); return returnMap; } // 添加任务2-字段配置 @Override public Map insertSmartDataTask2(JSONObject requestData) { return this.insertOrUpdate2(requestData, "添加"); } private Map insertOrUpdate2(JSONObject requestData, String action) { // 任务ID if (requestData.containsKey("tkId")) { // tkId 存在 int tkId = requestData.getIntValue("tkId"); if (tkId <= 0) { return CommonUtil.getReturnMap("1", "任务id错误!"); } // 字段对应关系 if (requestData.containsKey("colRelationship")) { JSONArray colRelationshipArray = requestData.getJSONArray("colRelationship"); if (colRelationshipArray.size() > 0) { String colRelationship = colRelationshipArray.toString(); // 检测是否有修改 if (action.equals("编辑")) { SmartDataTask smartDataTask = smartDataTaskMapper.selectColRelationship(tkId); if (JSONArray.parseArray(smartDataTask.getTkColRelationship()).equals(colRelationshipArray)) { return CommonUtil.getReturnMap("1", "数据未修改,请修改后再提交!"); } } // 检测数据类型和字段长度是否符合要求 for (int i = 0; i < colRelationshipArray.size(); i++) { JSONObject colRelationshipObj = colRelationshipArray.getJSONObject(i); String colSource = colRelationshipObj.getString("colSource"); String colSourceType = colRelationshipObj.getString("colSourceType"); Integer colSourceSize = colRelationshipObj.getInteger("colSourceSize"); String colDestination = colRelationshipObj.getString("colDestination"); String colDestinationType = colRelationshipObj.getString("colDestinationType"); Integer colDestinationSize = colRelationshipObj.getInteger("colDestinationSize"); // 判断字段类型是否一致 if (!colSourceType.equals(colDestinationType)) { return CommonUtil.getReturnMap("1", "来源字段类型【" + colSource + "】与 目标字段类型【" + colDestination + "】不一致!"); } // 判断目标字段长度是否小于来源字段长度 if (colSourceSize > colDestinationSize) { return CommonUtil.getReturnMap("1", "目标字段长度【" + colSource + "】小于 来源字段长度【" + colDestination + "】!"); } } // 存数据库中 int num = smartDataTaskMapper.insertColRelationship(tkId, colRelationship); if (num > 0) { return CommonUtil.getReturnMap("0", "【" + action + "任务-字段配置】" + action + "成功!"); } else { return CommonUtil.getReturnMap("1", "【" + action + "任务-字段配置】" + action + "失败!"); } } else { return CommonUtil.getReturnMap("1", "字段对应关系为空!"); } } else { return CommonUtil.getReturnMap("1", "字段对应关系为空!"); } } else { // tkId 不存在 return CommonUtil.getReturnMap("1", "任务ID为空!"); } } // 添加任务3-定时信息 @Override public Map insertSmartDataTask3(SmartDataTask smartDataTask) { return this.insertOrUpdate3(smartDataTask, "添加"); } private Map insertOrUpdate3(SmartDataTask smartDataTask, String action) { if (smartDataTask.getTkManualOrAuto() == null) { return CommonUtil.getReturnMap("1", "【手动或定时执行】不能为空!"); } if (smartDataTask.getTkManualOrAuto() == 0) { if (smartDataTask.getTkExeType() == null) { return CommonUtil.getReturnMap("1", "【执行方式】不能为空!"); } if (smartDataTask.getTkRepetTime() == null) { return CommonUtil.getReturnMap("1", "【重复时间】不能为空!"); } // 生成cron表达式 Map stringObjectMap = generateCron(smartDataTask.getTkExeType(), smartDataTask.getTkRepetTime()); if (stringObjectMap.get("code") == "1") { return stringObjectMap; } String cron = (String) stringObjectMap.get("msg"); if (CronExpression.isValidExpression(cron)) { smartDataTask.setTkCron(cron); } else { return CommonUtil.getReturnMap("1", "生成的【定时表达式】不正确!请联系管理员!" + cron); } } else { smartDataTask.setTkCron(""); smartDataTask.setTkExeType(-1); smartDataTask.setTkRepetTime(""); } // 检测是否有重复记录 if (action.equals("编辑")) { int numSDT = smartDataTaskMapper.isRepeatTask(smartDataTask); if (numSDT > 0) { return CommonUtil.getReturnMap("1", "数据未修改,请修改后再提交!"); } } // 入库 int num = smartDataTaskMapper.updateById(smartDataTask); if (num > 0) { return CommonUtil.getReturnMap("0", "【" + action + "任务-定时信息】" + action + "成功!"); } else { return CommonUtil.getReturnMap("1", "【" + action + "任务-定时信息】" + action + "失败!"); } } // 生成cron表达式 private Map generateCron(Integer exeType, String repetTime) { if (exeType == 0) { if (repetTime.endsWith("天")) { String n = CommonUtil.getNumberFromString(repetTime); if (n != null) { return CommonUtil.getReturnMap("0", "0 0 0 1/" + n + " * ?"); } else { return CommonUtil.getReturnMap("1", "【重复时间】格式错误,要求:x天"); } } else if (repetTime.endsWith("小时")) { String n = CommonUtil.getNumberFromString(repetTime); if (n != null) { return CommonUtil.getReturnMap("0", "0 0 0/" + n + " * * ?"); } else { return CommonUtil.getReturnMap("1", "【重复时间】格式错误,要求:x小时"); } } else if (repetTime.endsWith("分钟")) { String n = CommonUtil.getNumberFromString(repetTime); if (n != null) { return CommonUtil.getReturnMap("0", "0 0/" + n + " * * * ?"); } else { return CommonUtil.getReturnMap("1", "【重复时间】格式错误,要求:x分钟"); } } else { return CommonUtil.getReturnMap("1", "【重复时间】格式错误,要求:x天 或者 x小时 或者 x分钟"); } } else if (exeType == 1) { // 检查是否是格式:2023-12-19 09:40 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"); try { LocalDateTime dateTime = LocalDateTime.parse(repetTime, formatter); int year = dateTime.getYear(); int month = dateTime.getMonthValue(); int day = dateTime.getDayOfMonth(); int hour = dateTime.getHour(); int minute = dateTime.getMinute(); return CommonUtil.getReturnMap("0", String.format("0 %d %d %d %d ? %d", minute, hour, day, month, year)); } catch (DateTimeParseException e) { return CommonUtil.getReturnMap("1", "【重复时间】格式错误,要求:2023-12-19 09:40"); } } else if (exeType == 2) { // 检查是否是格式:09:40 if (isOnlyHourAndMinute(repetTime)) { String[] split = repetTime.split(":"); int hour = Integer.parseInt(split[0]); int minute = Integer.parseInt(split[1]); return CommonUtil.getReturnMap("0", String.format("0 %d %d 1/1 * ?", minute, hour)); } else { return CommonUtil.getReturnMap("1", "【重复时间】格式错误,要求:09:40"); } } else if (exeType == 3) { // 检查是否是格式:周日 09:40 String[] split = repetTime.split(" "); if (split.length != 2) { return CommonUtil.getReturnMap("1", "【重复时间】格式错误1,要求:周日 09:40"); } String weekday = ""; // MON, TUE, WED, THU, FRI, SAT, SUN if ("周一".equals(split[0])) weekday = "MON"; else if ("周二".equals(split[0])) weekday = "TUE"; else if ("周三".equals(split[0])) weekday = "WED"; else if ("周四".equals(split[0])) weekday = "THU"; else if ("周五".equals(split[0])) weekday = "FRI"; else if ("周六".equals(split[0])) weekday = "SAT"; else if ("周日".equals(split[0])) weekday = "SUN"; else CommonUtil.getReturnMap("1", "【重复时间】格式错误2,要求:周日 09:40"); if (isOnlyHourAndMinute(split[1])) { String[] hour_minute = split[1].split(":"); int hour = Integer.parseInt(hour_minute[0]); int minute = Integer.parseInt(hour_minute[1]); return CommonUtil.getReturnMap("0", String.format("0 %d %d ? * %s", minute, hour, weekday)); } else { return CommonUtil.getReturnMap("1", "【重复时间】格式错误3,要求:周日 09:40"); } } else if (exeType == 4) { // 检查是否是格式:19 09:40 String[] split = repetTime.split(" "); if (split.length != 2) { return CommonUtil.getReturnMap("1", "【重复时间】格式错误1,要求:19 09:40"); } int day = Integer.parseInt(split[0]); if (day > 30 || day < 1) CommonUtil.getReturnMap("1", "【重复时间】格式错误2,要求:19 09:40"); if (isOnlyHourAndMinute(split[1])) { String[] hour_minute = split[1].split(":"); int hour = Integer.parseInt(hour_minute[0]); int minute = Integer.parseInt(hour_minute[1]); return CommonUtil.getReturnMap("0", String.format("0 %d %d %d * ?", minute, hour, day)); } else { return CommonUtil.getReturnMap("1", "【重复时间】格式错误3,要求:19 09:40"); } } else { return CommonUtil.getReturnMap("1", "【执行方式】格式错误,要求:0间隔执行,1定点执行,2每天,3每周,4每月"); } } // 判断时分 private boolean isOnlyHourAndMinute(String time) { String regex = "^([01]?[0-9]|2[0-3]):[0-5][0-9]$"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(time); return matcher.matches(); } @Override public Map updateSmartDataTaskById1(SmartDataTask smartDataTask) { if (smartDataTask.getTkId() == null) { return CommonUtil.getReturnMap("1", "【任务id】不能为空!"); } // 任务属性 SmartDataTask sdc = smartDataTaskMapper.selectById(smartDataTask.getTkId()); if (sdc == null) { return CommonUtil.getReturnMap("1", "要修改的【任务】不存在!"); } if (smartDataTask.getTkName() == null) { return CommonUtil.getReturnMap("1", "【任务名称】不能为空!"); } // 只能包含字母、数字、下划线和中文,且长度为5-32位 if (!CommonUtil.checkStrByRegx("^[\\w\\u4e00-\\u9fa5]{4,32}$", smartDataTask.getTkName())) { return CommonUtil.getReturnMap("1", "【任务名称】只能包含字母、数字、下划线和中文,且长度为4-32位!"); } int numOfDataTask = smartDataTaskMapper.isRepeatTaskName(smartDataTask); if (numOfDataTask > 0) { return CommonUtil.getReturnMap("1", "任务名有重名!"); } if (smartDataTask.getTkDtId() == null) { return CommonUtil.getReturnMap("1", "【部门id】不能为空!"); } int numOfDepartment = smartDataTaskMapper.isHaveDepartmentById(smartDataTask.getTkDtId()); if (numOfDepartment == 0) { return CommonUtil.getReturnMap("1", "【部门】不存在!"); } if (smartDataTask.getTkSyncPolicy() == null) { return CommonUtil.getReturnMap("1", "【同步策略】不能为空!"); } // 来源库设置 if (smartDataTask.getTkDsIdSource() == null) { return CommonUtil.getReturnMap("1", "【来源数据源id】不能为空!"); } SmartDataSourceJobParams dsSource = smartDataSourceMapper.getDataSourceInfo(smartDataTask.getTkDsIdSource()); if (dsSource == null) { return CommonUtil.getReturnMap("1", "选择的【来源数据源】不存在!"); } if (smartDataTask.getTkExchangeType() == null) { return CommonUtil.getReturnMap("1", "【交换方式】不能为空!"); } if (smartDataTask.getTkSql() == null) { return CommonUtil.getReturnMap("1", "【自定义SQL语句】不能为空!"); } // 目的库设置 if (smartDataTask.getTkDsIdDestination() == null) { return CommonUtil.getReturnMap("1", "【目标数据源id】不能为空!"); } SmartDataSourceJobParams dsDestination = smartDataSourceMapper.getDataSourceInfo(smartDataTask.getTkDsIdDestination()); if (dsDestination == null) { return CommonUtil.getReturnMap("1", "选择的【目标数据源】不存在!"); } if (smartDataTask.getTkDestTable() == null) { return CommonUtil.getReturnMap("1", "【目标数据表】不能为空!"); } // 高级设置 if (smartDataTask.getTkExchangeServer() == null) { return CommonUtil.getReturnMap("1", "【交换服务器】不能为空!"); } if (smartDataTask.getTkExchangeServer() == 1) { if (smartDataTask.getTkExchangeServerId() == null) { return CommonUtil.getReturnMap("1", "【指定服务器id】不能为空!"); } } if (smartDataTask.getTkOptCfgAutoManual() == null) { return CommonUtil.getReturnMap("1", "【运行参数配置】不能为空!"); } if (smartDataTask.getTkOptCfgAutoManual() == 1) { if (smartDataTask.getTkOptCfgRsNum() == null) { return CommonUtil.getReturnMap("1", "【运行参数配置:记录数】不能为空!"); } if (smartDataTask.getTkOptCfgRsNum() > 100) { return CommonUtil.getReturnMap("1", "【运行参数配置:记录数】不能大于100!"); } if (smartDataTask.getTkOptCfgThreadsNum() == null) { return CommonUtil.getReturnMap("1", "【运行参数配置:线程数】不能为空!"); } if (smartDataTask.getTkOptCfgThreadsNum() > 10) { return CommonUtil.getReturnMap("1", "【运行参数配置:线程数】不能大于10!"); } } if (smartDataTask.getTkRsIncorrectData() == null) { return CommonUtil.getReturnMap("1", "【是否记录错误数据】不能为空!"); } if (smartDataTask.getTkDsSourceCharset() == null) { return CommonUtil.getReturnMap("1", "【来源数据源字符集】不能为空!"); } if (!smartDataTask.getTkDsSourceCharset().equals("UTF8") && !smartDataTask.getTkDsSourceCharset().equals("GBK")) { return CommonUtil.getReturnMap("1", "【来源数据源字符集】只能为UTF8或GBK!"); } if (smartDataTask.getTkDsDestinationCharset() == null) { return CommonUtil.getReturnMap("1", "【目标数据源字符集】不能为空!"); } if (!smartDataTask.getTkDsDestinationCharset().equals("UTF8") && !smartDataTask.getTkDsDestinationCharset().equals("GBK")) { return CommonUtil.getReturnMap("1", "【目标数据源字符集】只能为UTF8或GBK!"); } QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq(smartDataTask.getTkName() != null, "tk_name", smartDataTask.getTkName()); queryWrapper.eq(smartDataTask.getTkDtId() != null, "tk_dt_id", smartDataTask.getTkDtId()); queryWrapper.eq(smartDataTask.getTkDsIdSource() != null, "tk_ds_id_source", smartDataTask.getTkDsIdSource()); queryWrapper.eq(smartDataTask.getTkSyncPolicy() != null, "tk_sync_policy", smartDataTask.getTkSyncPolicy()); queryWrapper.eq(smartDataTask.getTkExchangeType() != null, "tk_exchange_type", smartDataTask.getTkExchangeType()); // 交换方式:0自定义SQL语句,1数据视图,2数据表,如果是1或2,tkSql传视图或表格名称即可 if (smartDataTask.getTkExchangeType() == 0) { queryWrapper.eq(StringUtils.hasText(smartDataTask.getTkSql()), "tk_sql", smartDataTask.getTkSql()); } else { String sql = "SELECT * FROM " + smartDataTask.getTkSql().substring(0, smartDataTask.getTkSql().indexOf("[")); queryWrapper.eq(StringUtils.hasText(smartDataTask.getTkSql()), "tk_sql", sql); smartDataTask.setTkSql(sql); } queryWrapper.eq(smartDataTask.getTkDsIdDestination() != null, "tk_ds_id_destination", smartDataTask.getTkDsIdDestination()); queryWrapper.eq(smartDataTask.getTkDestTable() != null, "tk_dest_table", smartDataTask.getTkDestTable()); queryWrapper.eq(smartDataTask.getTkExchangeServer() != null, "tk_exchange_server", smartDataTask.getTkExchangeServer()); queryWrapper.eq(smartDataTask.getTkExchangeServer() != null && smartDataTask.getTkExchangeServerId() != null, "tk_exchange_server_id", smartDataTask.getTkExchangeServerId()); queryWrapper.eq(smartDataTask.getTkOptCfgAutoManual() != null, "tk_opt_cfg_auto_manual", smartDataTask.getTkOptCfgAutoManual()); queryWrapper.eq(smartDataTask.getTkOptCfgAutoManual() != null && smartDataTask.getTkOptCfgRsNum() != null, "tk_opt_cfg_rs_num", smartDataTask.getTkOptCfgRsNum()); queryWrapper.eq(smartDataTask.getTkOptCfgAutoManual() != null && smartDataTask.getTkOptCfgThreadsNum() != null, "tk_opt_cfg_threads_num", smartDataTask.getTkOptCfgThreadsNum()); queryWrapper.eq(smartDataTask.getTkRsIncorrectData() != null, "tk_rs_incorrect_data", smartDataTask.getTkRsIncorrectData()); queryWrapper.eq(smartDataTask.getTkDsSourceCharset() != null, "tk_ds_source_charset", smartDataTask.getTkDsSourceCharset()); queryWrapper.eq(StringUtils.hasText(smartDataTask.getTkDsDestinationCharset()), "tk_ds_destination_charset", smartDataTask.getTkDsDestinationCharset()); SmartDataTask sdt = smartDataTaskMapper.selectOne(queryWrapper); if (sdt != null) { return CommonUtil.getReturnMap("1", "数据未修改,请修改后再提交!"); } int result = smartDataTaskMapper.updateById(smartDataTask); if (result > 0) { // 获取数据源对应的表、视图、Sql对应的结构 Map returnMap = this.getMetaData(smartDataTask, dsSource, dsDestination, "编辑"); if (returnMap.get("code") == null) { // 获取字段对应关系 String colRelationship = smartDataTaskMapper.getColRelationship(smartDataTask); JSONArray jsonArray = JSONArray.parseArray(colRelationship); returnMap.put("colRelationship", jsonArray); } return CommonUtil.getReturnMap("0", returnMap); } else { return CommonUtil.getReturnMap("1", "【编辑任务-任务基本信息】失败!"); } } @Override public Map updateSmartDataTaskById2(JSONObject requestData) { return this.insertOrUpdate2(requestData, "编辑"); } @Override public Map updateSmartDataTaskById3(SmartDataTask smartDataTask) { return this.insertOrUpdate3(smartDataTask, "编辑"); } // 判断之前状态是否启用 public Map updateSmartDataTaskActivation(SmartDataTask smartDataTask) { // 检测参数,还有是否存在重复记录 if (smartDataTask.getTkId() == null) { return CommonUtil.getReturnMap("1", "【任务id】不能为空!"); } SmartDataTask sdc = smartDataTaskMapper.selectById(smartDataTask.getTkId()); if (sdc == null) { return CommonUtil.getReturnMap("1", "要修改的【任务】不存在!"); } if (smartDataTask.getTkActivation() == null) { return CommonUtil.getReturnMap("1", "【是否启用】不能为空!"); } int result = smartDataTaskMapper.markTaskById(smartDataTask); if (result > 0) { return CommonUtil.getReturnMap(String.valueOf(result), "标注成功!"); } else { if (smartDataTask.getTkActivation() == 1) { return CommonUtil.getReturnMap("0", "标注失败,之前已是启用状态!"); } else { return CommonUtil.getReturnMap("1", "标注失败,之前已是启用状态!"); } } } @Override public PageUtils queryPageSmartDataTasks(int currentPage, int pageCount, SmartDataTask smartDataTask) { Page page = new Page<>(currentPage, pageCount); QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.like(smartDataTask.getTkName() != null, "tk_name", smartDataTask.getTkName()); queryWrapper.eq(smartDataTask.getTkDsIdSource() != null, "tk_ds_id_source", smartDataTask.getTkDsIdSource()); queryWrapper.eq(smartDataTask.getTkDsIdDestination() != null, "tk_ds_id_destination", smartDataTask.getTkDsIdDestination()); queryWrapper.eq(smartDataTask.getTkExchangeType() != null, "tk_exchange_type", smartDataTask.getTkExchangeType()); queryWrapper.eq(smartDataTask.getTkExeType() != null, "tk_exe_type", smartDataTask.getTkExeType()); queryWrapper.like(smartDataTask.getTkDestTable() != null, "tk_dest_table", smartDataTask.getTkDestTable()); queryWrapper.eq(smartDataTask.getTkActivation() != null, "tk_activation", smartDataTask.getTkActivation()); queryWrapper.orderByAsc("tk_deleted"); queryWrapper.orderByDesc("tk_update_time"); IPage result = smartDataTaskMapper.selectPage(page, queryWrapper); return new PageUtils<>(result); } @Override public Map deleteSmartDataTaskById(int id, int delMethod) { if (delMethod == 0) { // 逻辑删除 int num = smartDataTaskMapper.logicDeleteMarkTaskById(id); if (num > 0) { return CommonUtil.getReturnMap("0", "逻辑删除成功"); } return CommonUtil.getReturnMap("1", "逻辑删除失败"); } else if (delMethod == 1) { // 恢复逻辑删除 int num = smartDataTaskMapper.restoreLogicDeleteMarkTaskById(id); if (num > 0) { return CommonUtil.getReturnMap("0", "恢复逻辑删除成功"); } return CommonUtil.getReturnMap("1", "恢复逻辑删除失败"); } else if (delMethod == 9) { // 物理删除 int num = smartDataTaskMapper.physicsDeleteMarkTaskById(id); if (num > 0) { return CommonUtil.getReturnMap("0", "物理删除成功"); } return CommonUtil.getReturnMap("1", "物理删除失败"); } else { // 错误 return CommonUtil.getReturnMap("1", "删除方式错误"); } } @Override public SmartDataTask getSmartById(int id) { return smartDataTaskMapper.selectById(id); } // 校验任务名是否存在 public Map TaskNameValidator(SmartDataTask smartDataTask) { if (smartDataTask.getTkName() == null) { return CommonUtil.getReturnMap(String.valueOf(1), "任务名称为空!"); } QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq(smartDataTask.getTkName() != null, "tk_name", smartDataTask.getTkName()); SmartDataTask smartDataTask_return = smartDataTaskMapper.selectOne(queryWrapper); if (smartDataTask_return == null) { return CommonUtil.getReturnMap(String.valueOf(1), "【" + smartDataTask.getTkName() + "】没有找到!"); } else { return CommonUtil.getReturnMap(String.valueOf(0), smartDataTask_return); } } @Override public Map createJob(SmartDataTask smartDataTask) { // 校验任务名是否存在 Map tmp_map = TaskNameValidator(smartDataTask); if ("1".equals(tmp_map.get("code"))) { return tmp_map; } SmartDataTask smartDataTask_return = (SmartDataTask) tmp_map.get("msg"); if (smartDataTask_return.getTkCron() == null) { return CommonUtil.getReturnMap(String.valueOf(1), "任务调度cron表达式为空!该任务未设置【定时信息】!"); } // 来源数据源id Integer tkDsIdSource = smartDataTask_return.getTkDsIdSource(); // 目标数据源id Integer tkDsIdDestination = smartDataTask_return.getTkDsIdDestination(); // 根据id,获取数据源url、user、password、driver等 SmartDataSourceJobParams dsSourceInfo = smartDataSourceMapper.getDataSourceInfo(tkDsIdSource); dsSourceInfo.setExchangeType(smartDataTask_return.getTkExchangeType()); dsSourceInfo.setSourceSql(smartDataTask_return.getTkSql()); SmartDataSourceJobParams dsDestinationInfo = smartDataSourceMapper.getDataSourceInfo(tkDsIdDestination); dsDestinationInfo.setDestinationTable(smartDataTask_return.getTkDestTable()); Map returnMap = QuartzJobUtils.createScheduleJob(scheduler, smartDataTask_return, dsSourceInfo, dsDestinationInfo); if ("0".equals(returnMap.get("code"))) { smartDataTask.setTkId(smartDataTask_return.getTkId()); // 下次执行的时间 String nextExeTime = QuartzJobUtils.getNextExeTime(smartDataTask_return.getTkCron()); // 更新数据库中的下次执行时间 smartDataTask.setTkNextExeTime(nextExeTime); smartDataTask.setTkActivation(1); Map stringStringMap = updateSmartDataTaskActivation(smartDataTask); String msg; if ("0".equals(stringStringMap.get("code"))) { msg = (String) returnMap.get("msg") + stringStringMap.get("msg"); } else { msg = (String) returnMap.get("msg") + stringStringMap.get("msg"); } return CommonUtil.getReturnMap("0", msg); } else { return returnMap; } } @Override public Map pauseJob(SmartDataTask smartDataTask) { Map tmp_map = TaskNameValidator(smartDataTask); if ("1".equals(tmp_map.get("code"))) { return tmp_map; } SmartDataTask smartDataTask_return = (SmartDataTask) tmp_map.get("msg"); return QuartzJobUtils.pauseScheduleJob(scheduler, smartDataTask_return.getTkName()); } @Override public Map resumeJob(SmartDataTask smartDataTask) { Map tmp_map = TaskNameValidator(smartDataTask); if ("1".equals(tmp_map.get("code"))) { return tmp_map; } SmartDataTask smartDataTask_return = (SmartDataTask) tmp_map.get("msg"); return QuartzJobUtils.resumeScheduleJob(scheduler, smartDataTask_return.getTkName()); } @Override public Map updateJob(SmartDataTask smartDataTask) { Map tmp_map = TaskNameValidator(smartDataTask); if ("1".equals(tmp_map.get("code"))) { return tmp_map; } SmartDataTask smartDataTask_return = (SmartDataTask) tmp_map.get("msg"); return QuartzJobUtils.updateScheduleJob(scheduler, smartDataTask_return); } @Override public Map deleteJob(SmartDataTask smartDataTask) { Map tmp_map = TaskNameValidator(smartDataTask); if ("1".equals(tmp_map.get("code"))) { return tmp_map; } SmartDataTask smartDataTask_return = (SmartDataTask) tmp_map.get("msg"); Map returnMap = QuartzJobUtils.deleteScheduleJob(scheduler, smartDataTask_return.getTkName()); if ("0".equals(returnMap.get("code"))) { smartDataTask.setTkId(smartDataTask_return.getTkId()); // 更新数据库中的下次执行时间 smartDataTask.setTkNextExeTime(null); smartDataTask.setTkActivation(0); Map stringStringMap = updateSmartDataTaskActivation(smartDataTask); String msg; if ("0".equals(stringStringMap.get("code"))) { msg = (String) returnMap.get("msg") + stringStringMap.get("msg"); } else { msg = (String) returnMap.get("msg") + stringStringMap.get("msg"); } return CommonUtil.getReturnMap("0", msg); } else { return CommonUtil.getReturnMap("1", returnMap.get("msg")); } } @Override public Map runOnceJob(SmartDataTask smartDataTask) { Map tmp_map = TaskNameValidator(smartDataTask); if ("1".equals(tmp_map.get("code"))) { return tmp_map; } SmartDataTask smartDataTask_return = (SmartDataTask) tmp_map.get("msg"); return QuartzJobUtils.runOnce(scheduler, smartDataTask_return.getTkName()); } @Override public Map getDepart() { // 获取部门 List depart = smartDataSourceMapper.getDepart(); if (depart != null) { return CommonUtil.getReturnMap("0", depart); } else { return CommonUtil.getReturnMap("1", "部门为空"); } } @Override public Map getSyncPolicy() { // 获取同步策略: 0插入更新,1更新标记,2清空插入 Map syncPolicy1 = new HashMap<>(); Map syncPolicy2 = new HashMap<>(); Map syncPolicy3 = new HashMap<>(); syncPolicy1.put("name", "插入更新"); syncPolicy1.put("value", 0); syncPolicy2.put("name", "更新标记"); syncPolicy2.put("value", 1); syncPolicy3.put("name", "清空插入"); syncPolicy3.put("value", 2); List> list = new ArrayList<>(); list.add(syncPolicy1); list.add(syncPolicy2); list.add(syncPolicy3); return CommonUtil.getReturnMap("0", list); } @Override public Map getExchangeType() { // 获取同步策略: 0插入更新,1更新标记,2清空插入 Map syncPolicy1 = new HashMap<>(); Map syncPolicy2 = new HashMap<>(); Map syncPolicy3 = new HashMap<>(); syncPolicy1.put("name", "自定义SQL语句"); syncPolicy1.put("value", 0); syncPolicy2.put("name", "数据视图"); syncPolicy2.put("value", 1); syncPolicy3.put("name", "数据表"); syncPolicy3.put("value", 2); List> list = new ArrayList<>(); list.add(syncPolicy1); list.add(syncPolicy2); list.add(syncPolicy3); return CommonUtil.getReturnMap("0", list); } @Override public Map testSql(String json) { if (json == null) { return CommonUtil.getReturnMap("1", "json参数为空"); } int dsIdSource, exchangeType; String sql; try { JSONObject jsonObject = JSONObject.parseObject(json); dsIdSource = jsonObject.getInteger("dsIdSource"); exchangeType = jsonObject.getInteger("exchangeType"); sql = jsonObject.getString("sql"); } catch (Exception e) { return CommonUtil.getReturnMap("1", "请检查参数:【数据源id】、【交换方式】、【自定义sql语句】是否为空"); } // 获取数据源id对应的数据源 SmartDataSourceJobParams dataSourceInfo = smartDataSourceMapper.getDataSourceInfo(dsIdSource); // 只有sql语句的交换方式才需要检查 if (exchangeType == 0) { // sql语句 if (dataSourceInfo != null) { DBUtil dbUtil = new DBUtil(dataSourceInfo.getDsUrl(), dataSourceInfo.getDsUser(), dataSourceInfo.getDsPassword(), dataSourceInfo.getDsClsDriver(), "UTF8"); Map map_return = dbUtil.getConnection(); if (map_return.get("code") == "0") { Map metaDataBySql = dbUtil.getMetaDataBySql((Connection) map_return.get("msg"), sql); if (metaDataBySql.get("code") == "0") { return CommonUtil.getReturnMap("0", "SQL语句正确"); } else { return CommonUtil.getReturnMap("1", metaDataBySql.get("msg")); } } else { return CommonUtil.getReturnMap("1", map_return.get("msg")); } } else { return CommonUtil.getReturnMap("1", "来源数据源不存在"); } } else { return CommonUtil.getReturnMap("1", "只有【自定义SQL语句】的交换方式才需要测试sql"); } } @Override // 获取表 public Map getTables(String json) { // 查询数据库中的表 String sql = "SELECT TABLE_NAME tname,TABLE_COMMENT tcomment FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{db}';"; // 调用getTablesOrViews方法,传入json和sql,获取表 return getTablesOrViews(json, sql); } @Override //获取视图 public Map getViews(String json) { // 查询数据库中指定数据库的视图 String sql = "SELECT TABLE_NAME tname,TABLE_COMMENT tcomment FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{db}' AND TABLE_TYPE = 'VIEW';"; // 返回视图 return getTablesOrViews(json, sql); } // 获取表或视图信息 private Map getTablesOrViews(String json, String sql) { // 判断json是否为空 if (json == null) { // 如果为空,返回提示信息 return CommonUtil.getReturnMap("1", "json参数为空"); } int dsIdSource; try { // 将json字符串转换为JSONObject对象 JSONObject jsonObject = JSONObject.parseObject(json); // 从JSONObject对象中获取dsIdSource的值 dsIdSource = jsonObject.getInteger("dsIdSource"); } catch (Exception e) { // 如果获取失败,返回提示信息 return CommonUtil.getReturnMap("1", "请检查参数:【数据源id】是否为空"); } // 获取数据源信息 SmartDataSourceJobParams dataSourceInfo = smartDataSourceMapper.getDataSourceInfo(dsIdSource); if (dataSourceInfo != null) { // 创建DBUtil对象 DBUtil dbUtil = new DBUtil(dataSourceInfo.getDsUrl(), dataSourceInfo.getDsUser(), dataSourceInfo.getDsPassword(), dataSourceInfo.getDsClsDriver(), "UTF8"); // 获取连接 Map map_return = dbUtil.getConnection(); if (map_return.get("code") == "0") { // 获取连接 Connection conn = (Connection) map_return.get("msg"); // 获取数据库名 String db = dataSourceInfo.getDsUrl().substring(dataSourceInfo.getDsUrl().lastIndexOf("/") + 1); // 查询表或视图信息 sql = sql.replace("{db}", db); Map tableMetaData = dbUtil.query(conn, sql); if (tableMetaData.get("code") == "0") { Map map = (Map) tableMetaData.get("msg"); ResultSet rs = (ResultSet) map.get("rs"); PreparedStatement stmt = (PreparedStatement) map.get("stmt"); try { List list = new ArrayList<>(); while (rs.next()) { // 将表名和表注释添加到list中 list.add(rs.getString("tname") + " [" + rs.getString("tcomment") + "]"); } // 关闭结果集 dbUtil.closeResultSet(rs); // 关闭预处理语句 dbUtil.closeStatement(stmt); // 关闭连接 dbUtil.closeConnection(conn); // 返回表信息 return CommonUtil.getReturnMap("0", list); } catch (SQLException e) { // 如果出现异常,返回异常信息 return CommonUtil.getReturnMap("1", e.getMessage()); } } else { // 如果查询失败,返回查询失败信息 return CommonUtil.getReturnMap("1", tableMetaData.get("msg")); } } else { // 如果获取连接失败,返回获取连接失败信息 return CommonUtil.getReturnMap("1", map_return.get("msg")); } } else { // 如果数据源不存在,返回提示信息 return CommonUtil.getReturnMap("1", "数据源不存在"); } } }