package com.template.services.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.template.common.utils.CommonUtil;
import com.template.common.utils.DBUtil;
import com.template.common.utils.QuartzJobUtils;
import com.template.mapper.SmartDataSourceMapper;
import com.template.mapper.SmartDataTaskMapper;
import com.template.model.pojo.SmartDataSourceJobParams;
import com.template.model.pojo.SmartDataTask;
import com.template.model.pojo.SmartDataTaskErr;
import com.template.model.pojo.SmartDepartment;
import com.template.model.result.PageUtils;
import com.template.services.SmartDataTaskService;
import org.quartz.CronExpression;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
* 数据源任务 服务实现类
*
*
* @author ceshi
* @since 2023-12-05
*/
@Service
public class SmartDataTaskServiceImpl extends ServiceImpl implements SmartDataTaskService {
@Autowired
private SmartDataTaskMapper smartDataTaskMapper;
@Autowired
private SmartDataSourceMapper smartDataSourceMapper;
@Autowired
private Scheduler scheduler;
// 添加任务1-任务基本信息
@Override
public Map insertSmartDataTask1(SmartDataTask smartDataTask) {
// 检测参数,还有是否存在重复记录
// 任务属性
if (smartDataTask.getTkName() == null) {
return CommonUtil.getReturnMap("1", "【任务名称】不能为空!");
}
// 只能包含字母、数字、下划线和中文,且长度为5-32位
if (!CommonUtil.checkStrByRegx("^[\\w\\u4e00-\\u9fa5]{4,32}$", smartDataTask.getTkName())) {
return CommonUtil.getReturnMap("1", "【任务名称】只能包含字母、数字、下划线和中文,且长度为4-32位!");
}
QueryWrapper queryWrapper = new QueryWrapper<>();
queryWrapper.eq(smartDataTask.getTkName() != null, "tk_name", smartDataTask.getTkName());
SmartDataTask sdt = smartDataTaskMapper.selectOne(queryWrapper);
if (sdt != null) {
return CommonUtil.getReturnMap("1", "任务名有重名!");
}
if (smartDataTask.getTkDtId() == null) {
return CommonUtil.getReturnMap("1", "【部门id】不能为空!");
}
int numOfDepartment = smartDataTaskMapper.isHaveDepartmentById(smartDataTask.getTkDtId());
if (numOfDepartment == 0) {
return CommonUtil.getReturnMap("1", "【部门】不存在!");
}
if (smartDataTask.getTkSyncPolicy() == null) {
return CommonUtil.getReturnMap("1", "【同步策略】不能为空!");
}
// 来源库设置
if (smartDataTask.getTkDsIdSource() == null) {
return CommonUtil.getReturnMap("1", "【来源数据源id】不能为空!");
}
SmartDataSourceJobParams dsSource = smartDataSourceMapper.getDataSourceInfo(smartDataTask.getTkDsIdSource());
if (dsSource == null) {
return CommonUtil.getReturnMap("1", "选择的【来源数据源】不存在!");
}
if (smartDataTask.getTkExchangeType() == null) {
return CommonUtil.getReturnMap("1", "【交换方式】不能为空!");
}
if (smartDataTask.getTkSql() == null) {
return CommonUtil.getReturnMap("1", "【自定义SQL语句】不能为空!");
}
// 目的库设置
if (smartDataTask.getTkDsIdDestination() == null) {
return CommonUtil.getReturnMap("1", "【目标数据源id】不能为空!");
}
SmartDataSourceJobParams dsDestination = smartDataSourceMapper.getDataSourceInfo(smartDataTask.getTkDsIdDestination());
if (dsDestination == null) {
return CommonUtil.getReturnMap("1", "选择的【目标数据源】不存在!");
}
if (smartDataTask.getTkDestTable() == null) {
return CommonUtil.getReturnMap("1", "【目标数据表】不能为空!");
}
// 高级设置
if (smartDataTask.getTkExchangeServer() == null) {
return CommonUtil.getReturnMap("1", "【交换服务器】不能为空!");
}
if (smartDataTask.getTkExchangeServer() == 1) {
if (smartDataTask.getTkExchangeServerId() == null) {
return CommonUtil.getReturnMap("1", "【指定服务器id】不能为空!");
}
}
if (smartDataTask.getTkOptCfgAutoManual() == null) {
return CommonUtil.getReturnMap("1", "【运行参数配置】不能为空!");
}
if (smartDataTask.getTkOptCfgAutoManual() == 1) {
if (smartDataTask.getTkOptCfgRsNum() == null) {
return CommonUtil.getReturnMap("1", "【运行参数配置:记录数】不能为空!");
}
if (smartDataTask.getTkOptCfgRsNum() > 100) {
return CommonUtil.getReturnMap("1", "【运行参数配置:记录数】不能大于100!");
}
if (smartDataTask.getTkOptCfgThreadsNum() == null) {
return CommonUtil.getReturnMap("1", "【运行参数配置:线程数】不能为空!");
}
if (smartDataTask.getTkOptCfgThreadsNum() > 10) {
return CommonUtil.getReturnMap("1", "【运行参数配置:线程数】不能大于10!");
}
}
if (smartDataTask.getTkRsIncorrectData() == null) {
return CommonUtil.getReturnMap("1", "【是否记录错误数据】不能为空!");
}
if (smartDataTask.getTkDsSourceCharset() == null) {
return CommonUtil.getReturnMap("1", "【来源数据源字符集】不能为空!");
}
if (!smartDataTask.getTkDsSourceCharset().equals("UTF8") && !smartDataTask.getTkDsSourceCharset().equals("GBK")) {
return CommonUtil.getReturnMap("1", "【来源数据源字符集】只能为UTF8或GBK!");
}
if (smartDataTask.getTkDsDestinationCharset() == null) {
return CommonUtil.getReturnMap("1", "【目标数据源字符集】不能为空!");
}
if (!smartDataTask.getTkDsDestinationCharset().equals("UTF8") && !smartDataTask.getTkDsDestinationCharset().equals("GBK")) {
return CommonUtil.getReturnMap("1", "【目标数据源字符集】只能为UTF8或GBK!");
}
queryWrapper.eq(smartDataTask.getTkDtId() != null, "tk_dt_id", smartDataTask.getTkDtId());
queryWrapper.eq(smartDataTask.getTkDsIdSource() != null, "tk_ds_id_source", smartDataTask.getTkDsIdSource());
queryWrapper.eq(smartDataTask.getTkSyncPolicy() != null, "tk_sync_policy", smartDataTask.getTkSyncPolicy());
queryWrapper.eq(smartDataTask.getTkExchangeType() != null, "tk_exchange_type", smartDataTask.getTkExchangeType());
// 交换方式:0自定义SQL语句,1数据视图,2数据表,如果是1或2,tkSql传视图或表格名称即可
if (smartDataTask.getTkExchangeType() == 0) {
queryWrapper.eq(StringUtils.hasText(smartDataTask.getTkSql()), "tk_sql", smartDataTask.getTkSql());
} else {
String sql = "SELECT * FROM " + smartDataTask.getTkSql().substring(0, smartDataTask.getTkSql().indexOf("["));
queryWrapper.eq(StringUtils.hasText(smartDataTask.getTkSql()), "tk_sql", sql);
smartDataTask.setTkSql(sql);
}
queryWrapper.eq(smartDataTask.getTkDsIdDestination() != null, "tk_ds_id_destination", smartDataTask.getTkDsIdDestination());
queryWrapper.eq(smartDataTask.getTkDestTable() != null, "tk_dest_table", smartDataTask.getTkDestTable());
queryWrapper.eq(smartDataTask.getTkExchangeServer() != null, "tk_exchange_server", smartDataTask.getTkExchangeServer());
queryWrapper.eq(smartDataTask.getTkExchangeServer() != null && smartDataTask.getTkExchangeServerId() != null, "tk_exchange_server_id", smartDataTask.getTkExchangeServerId());
queryWrapper.eq(smartDataTask.getTkOptCfgAutoManual() != null, "tk_opt_cfg_auto_manual", smartDataTask.getTkOptCfgAutoManual());
queryWrapper.eq(smartDataTask.getTkOptCfgAutoManual() != null && smartDataTask.getTkOptCfgRsNum() != null, "tk_opt_cfg_rs_num", smartDataTask.getTkOptCfgRsNum());
queryWrapper.eq(smartDataTask.getTkOptCfgAutoManual() != null && smartDataTask.getTkOptCfgThreadsNum() != null, "tk_opt_cfg_threads_num", smartDataTask.getTkOptCfgThreadsNum());
queryWrapper.eq(smartDataTask.getTkRsIncorrectData() != null, "tk_rs_incorrect_data", smartDataTask.getTkRsIncorrectData());
queryWrapper.eq(smartDataTask.getTkDsSourceCharset() != null, "tk_ds_source_charset", smartDataTask.getTkDsSourceCharset());
queryWrapper.eq(StringUtils.hasText(smartDataTask.getTkDsDestinationCharset()), "tk_ds_destination_charset", smartDataTask.getTkDsDestinationCharset());
sdt = smartDataTaskMapper.selectOne(queryWrapper);
if (sdt != null) {
return CommonUtil.getReturnMap("1", "有重复记录!");
}
// 插入记录,插入成功后获取记录的id返回
int result = smartDataTaskMapper.insert(smartDataTask);
if (result > 0) {
// 获取数据源对应的表、视图、Sql对应的结构
return CommonUtil.getReturnMap("0", this.getMetaData(smartDataTask, dsSource, dsDestination, "添加"));
} else {
return CommonUtil.getReturnMap("1", "【添加任务-任务基本信息】添加失败!");
}
}
private Map getMetaData(SmartDataTask smartDataTask, SmartDataSourceJobParams dsSource, SmartDataSourceJobParams dsDestination, String action) {
// 源连接
DBUtil dsSourceDbUtil = new DBUtil(dsSource.getDsUrl(), dsSource.getDsUser(), dsSource.getDsPassword(), dsSource.getDsClsDriver(), "UTF8");
Map dsSourceMap = dsSourceDbUtil.getConnection();
if (dsSourceMap.get("code") == "1") {
return CommonUtil.getReturnMap("1", "【来源数据源】连接失败!");
}
Connection dsSourceConn = (Connection) dsSourceMap.get("msg");
// 解析字段名称、字段类型、字段大小等
Map metaDataBySql = dsSourceDbUtil.getMetaDataBySql(dsSourceConn, smartDataTask.getTkSql());
if (metaDataBySql.get("code") == "1") {
return CommonUtil.getReturnMap("1", metaDataBySql.get("msg"));
}
List