| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747 |
- package com.template.controller;
- import com.alibaba.fastjson2.JSONArray;
- import com.alibaba.fastjson2.JSONObject;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.template.annotation.PassToken;
- import com.template.common.utils.CommonUtil;
- import com.template.common.utils.DBUtil;
- import com.template.common.utils.HttpsClient;
- import com.template.common.utils.QuartzJobUtils;
- import com.template.mapper.SmartDataSourceMapper;
- import com.template.mapper.SmartDataTaskMapper;
- import com.template.model.pojo.SmartDataSourceJobParams;
- import com.template.model.pojo.SmartDataTask;
- import com.template.model.pojo.SmartDataTaskDebug;
- import com.template.model.pojo.SmartDataTaskLog;
- import org.quartz.JobDataMap;
- import org.quartz.JobDetail;
- import org.quartz.JobExecutionContext;
- import org.quartz.JobKey;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.quartz.QuartzJobBean;
- import org.springframework.stereotype.Component;
- import java.sql.Connection;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.text.SimpleDateFormat;
- import java.util.*;
- @Component
- public class Task extends QuartzJobBean {
- @Autowired
- private SmartDataTaskMapper smartDataTaskMapper;
- @Autowired
- private SmartDataSourceMapper smartDataSourceMapper;
- private int debugSqlFlag = 0;
- private static Logger logger = LoggerFactory.getLogger(QuartzJobBean.class);
- @Override
- @PassToken
- protected void executeInternal(JobExecutionContext jobExecutionContext) {
- // 获取任务信息
- JobDetail jobDetail = jobExecutionContext.getJobDetail();
- JobKey key = jobDetail.getKey();
- // 工作内容
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- // 来源数据源参数
- SmartDataTask smartDataTask_transfer = (SmartDataTask) jobDataMap.get("smartDataTask");
- // 为了不重启任务就能够使用新的参数,这里需要重新读取
- QueryWrapper<SmartDataTask> queryWrapper = new QueryWrapper<>();
- queryWrapper.eq(smartDataTask_transfer.getTkId() != null, "tk_id", smartDataTask_transfer.getTkId());
- SmartDataTask smartDataTask = smartDataTaskMapper.selectOne(queryWrapper);
- if (smartDataTask == null) {
- // 当前时间
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Date date = new Date();
- String datetime = dateFormat.format(date);
- // 记录错误信息
- this.saveDebugMsg(smartDataTask_transfer.getTkId(), smartDataTask_transfer.getTkName(), datetime, "未查询到任务信息");
- return;
- }
- // 来源数据源id
- Integer tkDsIdSource = smartDataTask.getTkDsIdSource();
- // 目标数据源id
- Integer tkDsIdDestination = smartDataTask.getTkDsIdDestination();
- // 根据id,获取数据源url、user、password、driver等
- SmartDataSourceJobParams dsSourceInfo = smartDataSourceMapper.getDataSourceInfo(tkDsIdSource);
- SmartDataSourceJobParams dsDestinationInfo = smartDataSourceMapper.getDataSourceInfo(tkDsIdDestination);
- // 来源数据源参数
- String sourceDriver = dsSourceInfo.getDsClsDriver();
- String sourceUrl = dsSourceInfo.getDsUrl();
- String sourceUser = dsSourceInfo.getDsUser();
- String sourcePassword = dsSourceInfo.getDsPassword();
- // 来源数据库sql
- String sourceSql = smartDataTask.getTkSql();
- // 目标数据源参数
- String destinationDriver = dsDestinationInfo.getDsClsDriver();
- String destinationUrl = dsDestinationInfo.getDsUrl();
- String destinationUser = dsDestinationInfo.getDsUser();
- String destinationPassword = dsDestinationInfo.getDsPassword();
- // 目标数据库表
- String destinationTable = smartDataTask.getTkDestTable().substring(0, smartDataTask.getTkDestTable().indexOf("["));
- // 任务数据id、列关系等等
- Integer tkId = smartDataTask.getTkId();
- debugSqlFlag = smartDataTask.getTkDebugSql();
- String colRelationship = smartDataTask.getTkColRelationship();
- Integer rsIncorrectData = smartDataTask.getTkRsIncorrectData();
- Integer optCfgAutoManual = smartDataTask.getTkOptCfgAutoManual();
- Integer optCfgRsNum = smartDataTask.getTkOptCfgRsNum();
- Integer optCfgDefaultRsNum = smartDataTask.getTkOptCfgDefaultRsNum();
- String dsSourceCharset = smartDataTask.getTkDsSourceCharset();
- String dsDestinationCharset = smartDataTask.getTkDsDestinationCharset();
- // 来源库
- String dsSourceName = dsSourceInfo.getDsName();
- Integer dsSourceId = dsSourceInfo.getDsId();
- // 目标库
- String dsDestinationName = dsDestinationInfo.getDsName();
- Integer dsTargetId = dsDestinationInfo.getDsId();
- // 部门
- int dtId = smartDataTask.getTkDtId();
- // 交换方式:0自定义SQL语句,1数据视图,2数据表
- Integer tkExchangeType = smartDataTask.getTkExchangeType();
- // 执行方式:0间隔执行,1定点执行,2每天,3每周,4每月
- Integer tkExeType = smartDataTask.getTkExeType();
- String tkExchangeServer = smartDataTask.getTkExchangeServer();
- // 是否是MySQL数据源
- if (sourceDriver.toLowerCase().contains("mysql")) {
- // 执行MySQL任务
- executeMysqlExchangeTask(key, sourceDriver, sourceUrl, sourceUser, sourcePassword, sourceSql,
- destinationDriver, destinationUrl, destinationUser, destinationPassword, destinationTable, tkId,
- colRelationship, rsIncorrectData, optCfgRsNum, optCfgDefaultRsNum, optCfgAutoManual, tkExchangeType,
- dtId, tkExeType, dsSourceId, dsSourceName, dsTargetId, dsDestinationName, tkExchangeServer, dsSourceCharset, dsDestinationCharset);
- } else if (sourceDriver.toLowerCase().contains("oracle")) {
- // 执行Oracle任务
- executeOracleTask(jobExecutionContext);
- } else {
- // 执行其他任务
- executeOtherTask(jobExecutionContext);
- }
- }
- private void executeOracleTask(JobExecutionContext jobExecutionContext) {
- }
- private void executeOtherTask(JobExecutionContext jobExecutionContext) {
- }
- // 执行MySQL任务的操作
- private void executeMysqlExchangeTask(JobKey key, String sourceDriver, String sourceUrl, String sourceUser,
- String sourcePassword, String sourceSql, String destinationDriver,
- String destinationUrl, String destinationUser, String destinationPassword,
- String destinationTable, Integer tkId, String colRelationship, int rsIncorrectData,
- int optCfgRsNum, int optCfgDefaultRsNum, int optCfgAutoManual, int tkExchangeType,
- int dtId, int tkExeType, int dsSourceId, String dsSourceName, int dsTargetId,
- String dsDestinationName, String tkExchangeServer,
- String dsSourceCharset, String dsDestinationCharset) {
- // 当前时间
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Date date = new Date();
- String datetime = dateFormat.format(date);
- // 任务名称
- String tkTaskName = key.getName();
- // 及时更新下次执行时间
- this.saveNextExeTime(tkId, datetime, tkTaskName, rsIncorrectData);
- // 任务日志参数设置
- SmartDataTaskLog smartDataTaskLog = new SmartDataTaskLog();
- smartDataTaskLog.setTkLogTaskId(tkId);
- smartDataTaskLog.setTkLogTaskName(tkTaskName);
- smartDataTaskLog.setTkLogDtName(String.valueOf(dtId));
- smartDataTaskLog.setTkLogDsSourceName(dsSourceName);
- smartDataTaskLog.setTkLogDsSourceId(dsSourceId);
- smartDataTaskLog.setTkLogDsDestinationName(dsDestinationName);
- smartDataTaskLog.setTkLogDsDestinationId(dsTargetId);
- smartDataTaskLog.setTkLogDestTable(destinationTable);
- smartDataTaskLog.setTkLogExchangeServer(tkExchangeServer);
- // 运行参数配置:0自动,1手动
- smartDataTaskLog.setTkLogAutoManual(optCfgAutoManual == 0 ? "自动执行" : "手动执行");
- // 执行方式:0间隔执行,1定点执行,2每天,3每周,4每月
- if (tkExeType == 0) {
- smartDataTaskLog.setTkLogExeType("间隔执行");
- } else if (tkExeType == 1) {
- smartDataTaskLog.setTkLogExeType("定点执行");
- } else if (tkExeType == 2) {
- smartDataTaskLog.setTkLogExeType("每天执行");
- } else if (tkExeType == 3) {
- smartDataTaskLog.setTkLogExeType("每周执行");
- } else if (tkExeType == 4) {
- smartDataTaskLog.setTkLogExeType("每月执行");
- } else {
- smartDataTaskLog.setTkLogExeType("未知执行");
- }
- // 交换方式:0自定义SQL语句,1数据视图,2数据表
- if (tkExchangeType == 0) {
- smartDataTaskLog.setTkLogExchangeType("自定义SQL语句");
- } else if (tkExchangeType == 1) {
- smartDataTaskLog.setTkLogExchangeType("数据视图");
- } else if (tkExchangeType == 2) {
- smartDataTaskLog.setTkLogExchangeType("数据表");
- } else {
- smartDataTaskLog.setTkLogExchangeType("未知交换方式");
- }
- // 执行状态
- smartDataTaskLog.setTkLogExeStatus("执行中");
- // 执行开始时间
- smartDataTaskLog.setTkLogStartTime(datetime);
- // 获取代码开始时间
- long startTimeMillis = System.currentTimeMillis();
- int totalRead = 0, totalInsert = 0, totalUpdate = 0, totalError = 0;
- StringBuilder err = new StringBuilder();
- StringBuilder excetion = new StringBuilder();
- // 查询当前任务是否还在执行中
- List<SmartDataTaskLog> queryTaskExecuting = smartDataTaskMapper.queryTaskExecuting(tkId, 1);
- if (queryTaskExecuting.size() > 0) {
- smartDataTaskLog.setTkLogExeStatus("执行中断");
- Date endDate = new Date();
- String endDatetime = dateFormat.format(endDate);
- // 结束时间
- smartDataTaskLog.setTkLogEndTime(endDatetime);
- // 获取代码结束时间
- long endTimeMillis = System.currentTimeMillis();
- // 计算执行时间
- long elapsedTimeMillis = endTimeMillis - startTimeMillis;
- // 耗时
- smartDataTaskLog.setTkLogCostTime(String.valueOf(elapsedTimeMillis));
- // 读取数据量
- smartDataTaskLog.setTkLogReadRows(totalRead);
- // 增加数据量
- smartDataTaskLog.setTkLogInsertRows(totalInsert);
- // 更新数据量
- smartDataTaskLog.setTkLogUpdateRows(totalUpdate);
- // 错误数据量
- smartDataTaskLog.setTkLogErrRows(totalError);
- smartDataTaskLog.setTkLogErrException("当前任务正在执行中,尚未完成,本次中断,请稍后再试!");
- // 记录任务开始了,成功后返回tkLogId
- smartDataTaskMapper.insertTaskLog(smartDataTaskLog);
- } else {
- // 记录任务开始了,成功后返回tkLogId
- smartDataTaskMapper.insertTaskLog(smartDataTaskLog);
- // ========== 开始执行本次任务 ==============================
- // 源连接
- DBUtil sourceDbUtil = new DBUtil(sourceUrl, sourceUser, sourcePassword, sourceDriver, dsSourceCharset);
- Map<String, Object> sourceConnMap = sourceDbUtil.getConnection();
- if ("1".equals(sourceConnMap.get("code"))) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- this.saveDebugMsg(tkId, tkTaskName, datetime, sourceConnMap.get("msg").toString());
- return;
- }
- }
- Connection sourceConn = (Connection) sourceConnMap.get("msg");
- // 目标连接
- DBUtil destinationDbUtil = new DBUtil(destinationUrl, destinationUser, destinationPassword, destinationDriver, dsDestinationCharset);
- Map<String, Object> destinationConnMap = destinationDbUtil.getConnection();
- if ("1".equals(destinationConnMap.get("code"))) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- this.saveDebugMsg(tkId, tkTaskName, datetime, destinationConnMap.get("msg").toString());
- return;
- }
- }
- Connection destinationConn = (Connection) destinationConnMap.get("msg");
- // 解析列对应关系
- JSONArray colRelationshipArray = JSONArray.parseArray(colRelationship);
- // 源表列名列表
- Map<String, String> colSourceMap = new HashMap<>();
- // 目标表列名列表
- Map<String, String> colDestinationMap = new HashMap<>();
- // 是否更新
- Map<String, String> isUpdateMap = new HashMap<>();
- int index = 1;
- int indexPk = 1;
- for (int i = 0; i < colRelationshipArray.size(); i++) {
- JSONObject colRelationshipObject = colRelationshipArray.getJSONObject(i);
- if (colRelationshipObject.getInteger("isUpdatePrimaryKey") == 1) {
- // 如果该字段是更新字段,则记录1,否则0不记录
- if (colRelationshipObject.getInteger("isUpdate") == 1) {
- isUpdateMap.put("colPrimaryKey" + indexPk, "1");
- } else {
- isUpdateMap.put("colPrimaryKey" + indexPk, "0");
- }
- colSourceMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colSource"));
- colDestinationMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colDestination"));
- indexPk++;
- } else {
- // 如果该字段是更新字段,则记录1,否则0不记录
- if (colRelationshipObject.getInteger("isUpdate") == 1) {
- isUpdateMap.put("col" + index, "1");
- } else {
- isUpdateMap.put("col" + index, "0");
- }
- colSourceMap.put("col" + index, colRelationshipObject.getString("colSource"));
- colDestinationMap.put("col" + index, colRelationshipObject.getString("colDestination"));
- index++;
- }
- }
- // 拼接主键排序条件
- String sourceOrderBy = null;
- String destinationOrderBy = null;
- for (int i = 1; i < indexPk; i++) {
- sourceOrderBy = colSourceMap.get("colPrimaryKey" + i) + " ASC,";
- destinationOrderBy = colDestinationMap.get("colPrimaryKey" + i) + " ASC,";
- }
- if (sourceOrderBy != null) {
- sourceOrderBy = " ORDER BY " + sourceOrderBy.substring(0, sourceOrderBy.length() - 1);
- destinationOrderBy = " ORDER BY " + destinationOrderBy.substring(0, destinationOrderBy.length() - 1);
- }
- // 从第1页开始
- int page = 0;
- String sourceSql_tmp, destinationSql;
- while (true) {
- // 当前时间
- date = new Date();
- datetime = dateFormat.format(date);
- // 获取对应id的字段值,方便后续使用
- int tkActivation = smartDataTaskMapper.selectColSwappedPrimaryKeys(tkId);
- if (tkActivation == 0) {
- this.saveDebugMsg(tkId, tkTaskName, datetime, "当前任务停止,需要手动启动!");
- break;
- } else if (tkActivation == 2) {
- this.saveDebugMsg(tkId, tkTaskName, datetime, "当前任务已暂停,需要手动恢复,手动恢复10秒后继续!");
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- this.saveDebugMsg(tkId, tkTaskName, datetime, "线程休眠异常!" + e.getMessage());
- }
- continue;
- }
- // 运行参数配置:0自动,1手动
- if (optCfgAutoManual == 0) {
- sourceSql_tmp = sourceSql + sourceOrderBy + " LIMIT " + (page * optCfgDefaultRsNum) + "," + optCfgDefaultRsNum;
- destinationSql = "SELECT * FROM " + destinationTable + destinationOrderBy + " LIMIT "
- + (page * optCfgDefaultRsNum) + "," + optCfgDefaultRsNum;
- } else {
- sourceSql_tmp = sourceSql + destinationOrderBy + " LIMIT " + (page * optCfgRsNum) + "," + optCfgRsNum;
- destinationSql = "SELECT * FROM " + destinationTable + destinationOrderBy
- + " LIMIT " + (page * optCfgRsNum) + "," + optCfgRsNum;
- }
- // 调试输出sql
- if (debugSqlFlag == 1) {
- this.saveDebugMsg(tkId, tkTaskName, datetime, sourceSql_tmp);
- this.saveDebugMsg(tkId, tkTaskName, datetime, destinationSql);
- }
- // 执行源sql拼接后的语句
- Map<String, Object> sourceQuery = sourceDbUtil.query(sourceConn, sourceSql_tmp);
- if ("1".equals(sourceQuery.get("code"))) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- this.saveDebugMsg(tkId, tkTaskName, datetime, sourceQuery.get("msg").toString());
- }
- break;
- }
- Map<String, Object> sourceMapPage = (Map<String, Object>) sourceQuery.get("msg");
- ResultSet sourceRs = (ResultSet) sourceMapPage.get("rs");
- // PreparedStatement sourceStmt = (PreparedStatement) sourceMapPage.get("stmt");
- // 执行目标sql拼接后的语句
- Map<String, Object> destinationQuery = destinationDbUtil.query(destinationConn, destinationSql);
- if ("1".equals(destinationQuery.get("code"))) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- this.saveDebugMsg(tkId, tkTaskName, datetime, destinationQuery.get("msg").toString());
- }
- break;
- }
- Map<String, Object> destinationMapPage = (Map<String, Object>) destinationQuery.get("msg");
- ResultSet destinationRs = (ResultSet) destinationMapPage.get("rs");
- // PreparedStatement destinationStmt = (PreparedStatement) destinationMapPage.get("stmt");
- // 2.从目标库中获取【默认记录数】或【指定记录数】生成哈希码
- int sourceHashCode = 0, destinationHashCode = 0;
- boolean sourceHashCodeFlag = false;
- StringBuilder sourceBuilder = new StringBuilder();
- StringBuilder destinationBuilder = new StringBuilder();
- try {
- while (sourceRs.next()) {
- sourceHashCodeFlag = true;
- // 获取源数据
- for (int i = 1; i < indexPk; i++) {
- sourceBuilder.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i)));
- }
- for (int i = 1; i < index; i++) {
- sourceBuilder.append(sourceRs.getString(colSourceMap.get("col" + i)));
- }
- totalRead++;
- }
- // 将源数据拼接生成哈希码
- sourceHashCode = sourceBuilder.toString().hashCode();
- while (destinationRs.next()) {
- // 获取目标数据
- for (int i = 1; i < indexPk; i++) {
- destinationBuilder.append(destinationRs.getString(colDestinationMap.get("colPrimaryKey" + i)));
- }
- for (int i = 1; i < index; i++) {
- destinationBuilder.append(destinationRs.getString(colDestinationMap.get("col" + i)));
- }
- }
- // 将目标数据拼接生成哈希码
- destinationHashCode = destinationBuilder.toString().hashCode();
- } catch (SQLException e) {
- this.saveDebugMsg(tkId, tkTaskName, datetime, e.getMessage());
- }
- // 3.比较两个哈希码,判断是否需要更新
- if (sourceHashCode != destinationHashCode) {
- // 如果源数据和目标数据存在相同的主键值,则需要更新操作
- // 如果源数据和目标数据不存在相同的主键值,则需要将来源库的数据插入到目标库中
- Map<String, Object> returnMap = executeMysqlInsertOrUpdateTask(sourceDbUtil, destinationDbUtil, sourceConn,
- destinationConn, sourceSql_tmp, rsIncorrectData, tkId, tkTaskName, datetime,
- indexPk, index, colSourceMap, colDestinationMap, isUpdateMap, destinationTable);
- if (returnMap.get("code").equals("0")) {
- page++;
- Map<String, String> returnM = (Map<String, String>) returnMap.get("msg");
- totalInsert += Integer.parseInt(returnM.get("totalInsert"));
- totalUpdate += Integer.parseInt(returnM.get("totalUpdate"));
- totalError += Integer.parseInt(returnM.get("totalError"));
- if (returnM.get("exception") != null) {
- err.append(returnM.get("exception")).append("\n");
- }
- continue;
- } else {
- excetion.append(returnMap.get("msg")).append("\n");
- break;
- }
- }
- // 都没有记录了,则结束任务
- if (!sourceHashCodeFlag) {
- break;
- }
- page++;
- }
- StringBuilder errOrException = new StringBuilder();
- if (err.length() > 0) {
- errOrException.append("错误:\n").append(err);
- }
- if (excetion.length() > 0) {
- errOrException.append("\n异常:\n").append(excetion);
- }
- if (errOrException.length() > 0) {
- smartDataTaskLog.setTkLogErrException(String.valueOf(errOrException));
- // 设置执行状态,执行失败
- smartDataTaskLog.setTkLogExeStatus("执行失败");
- } else {
- // 设置执行状态,执行成功
- smartDataTaskLog.setTkLogExeStatus("执行成功");
- smartDataTaskLog.setTkLogErrException("无异常和错误");
- }
- Date endDate = new Date();
- String endDatetime = dateFormat.format(endDate);
- // 结束时间
- smartDataTaskLog.setTkLogEndTime(endDatetime);
- // 获取代码结束时间
- long endTimeMillis = System.currentTimeMillis();
- // 计算执行时间
- long elapsedTimeMillis = endTimeMillis - startTimeMillis;
- // 耗时
- smartDataTaskLog.setTkLogCostTime(String.valueOf(elapsedTimeMillis));
- // 读取数据量
- smartDataTaskLog.setTkLogReadRows(totalRead);
- // 增加数据量
- smartDataTaskLog.setTkLogInsertRows(totalInsert);
- // 更新数据量
- smartDataTaskLog.setTkLogUpdateRows(totalUpdate);
- // 错误数据量
- smartDataTaskLog.setTkLogErrRows(totalError);
- // 记录任务开始了,成功后会返回id,更新对应的id
- int count = smartDataTaskMapper.updateTaskLog(smartDataTaskLog);
- // 关闭数据库连接
- if (destinationConn != null) {
- destinationDbUtil.closeConnection(destinationConn);
- }
- if (sourceConn != null) {
- sourceDbUtil.closeConnection(sourceConn);
- }
- }
- // ========== 结束执行本次任务 ==============================
- // 调试输出sql
- String tmp1 = "数据交换完成,共交换数据 " + (totalInsert + totalUpdate) + " 条:插入:" + totalInsert + " 条,更新:" + totalUpdate + " 条";
- this.saveDebugMsg(tkId, tkTaskName, datetime, tmp1);
- }
- // 执行MySQL的插入或更新任务
- private Map<String, Object> executeMysqlInsertOrUpdateTask(DBUtil sourceDbUtil, DBUtil destinationDbUtil, Connection sourceConn,
- Connection destinationConn, String sourceSql_tmp,
- int rsIncorrectData, int tkId, String tkTaskName, String datetime,
- int indexPk, int index, Map<String, String> colSourceMap,
- Map<String, String> colDestinationMap, Map<String, String> isUpdateMap,
- String destinationTable) {
- // 执行源sql
- Map<String, Object> sourceQuery = sourceDbUtil.query(sourceConn, sourceSql_tmp);
- if ("1".equals(sourceQuery.get("code"))) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- this.saveDebugMsg(tkId, tkTaskName, datetime, sourceQuery.get("msg").toString());
- }
- return CommonUtil.getReturnMap("1", "执行源sql失败:" + sourceSql_tmp);
- }
- Map<String, Object> sourceMap = (Map<String, Object>) sourceQuery.get("msg");
- ResultSet sourceRs = (ResultSet) sourceMap.get("rs");
- PreparedStatement sourceStmt = (PreparedStatement) sourceMap.get("stmt");
- int totalInsert = 0, totalUpdate = 0, totalError = 0;
- // 返回参数map
- Map<String, String> returnMap = new HashMap<>();
- try {
- // 执行解析操作
- while (sourceRs.next()) {
- // 拼接插入的值
- StringBuilder stringInsertData = new StringBuilder();
- // 拼接插入的列
- StringBuilder stringInsertCols = new StringBuilder();
- // 拼接更新的列和值
- StringBuilder stringUpdateData = new StringBuilder();
- // 拼接目标表的pk
- StringBuilder stringDestinationPk = new StringBuilder();
- String tempStr;
- for (int i = 1; i < indexPk; i++) {
- tempStr = sourceRs.getString(colSourceMap.get("colPrimaryKey" + i));
- // 目标表的主键where子句
- stringDestinationPk.append(colDestinationMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND ");
- // 拼接插入的列
- stringInsertCols.append(colDestinationMap.get("colPrimaryKey" + i)).append(",");
- // 拼接插入的值
- stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append("',");
- }
- // 去除最后的 AND
- stringDestinationPk.setLength(stringDestinationPk.length() - 5);
- // 拼接更新的值
- for (int i = 1; i < index; i++) {
- // 拼接插入的列
- stringInsertCols.append(colDestinationMap.get("col" + i)).append(",");
- // 拼接插入的值
- stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',");
- if (isUpdateMap.get("col" + i) != null && isUpdateMap.get("col" + i).equals("1")) {
- // 拼接更新的列和值
- stringUpdateData.append(colDestinationMap.get("col" + i)).append("='").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',");
- }
- }
- // 删除最后一个逗号
- stringInsertCols.setLength(stringInsertCols.length() - 1);
- stringInsertData.setLength(stringInsertData.length() - 1);
- // 说明有需要更新的列
- if (stringUpdateData.length() > 0) {
- stringUpdateData.setLength(stringUpdateData.length() - 1);
- this.saveDebugMsg(tkId, tkTaskName, datetime, stringUpdateData.toString());
- }
- // 查询是否存在该主键的记录sql
- String destinationQuerySql = "SELECT * FROM " + destinationTable + " WHERE " + stringDestinationPk;
- // 调试输出sql
- if (debugSqlFlag == 1) {
- this.saveDebugMsg(tkId, tkTaskName, datetime, destinationQuerySql);
- }
- // 查询目标表是否存在该主键的记录
- Map<String, Object> destinationQuery = destinationDbUtil.query(destinationConn, destinationQuerySql);
- // 判断查询结果,如果发生错误返回code:1
- if (destinationQuery.get("code").equals("1")) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- this.saveDebugMsg(tkId, tkTaskName, datetime, destinationQuery.get("msg").toString());
- }
- return CommonUtil.getReturnMap("1", "目标表查询错误:" + destinationQuery.get("msg").toString());
- }
- // 获取查询结果集map
- Map<String, Object> sourceQueryMap = (Map<String, Object>) destinationQuery.get("msg");
- // 获取查询结果集
- ResultSet destinationRs = (ResultSet) sourceQueryMap.get("rs");
- // 获取查询sql的PreparedStatement
- PreparedStatement destinationQueryStmt = (PreparedStatement) sourceQueryMap.get("stmt");
- // ========== 之前是批量计算哈希码是否相等,现在是计算每一条的哈希码是否相等 ====================================================================
- int sourceHashCode = 0, destinationHashCode = 0;
- StringBuilder sourceBuilder = new StringBuilder();
- StringBuilder destinationBuilder = new StringBuilder();
- // 获取源数据
- for (int i = 1; i < indexPk; i++) {
- sourceBuilder.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i)));
- }
- for (int i = 1; i < index; i++) {
- sourceBuilder.append(sourceRs.getString(colSourceMap.get("col" + i)));
- }
- // 将源数据拼接生成哈希码
- sourceHashCode = sourceBuilder.toString().hashCode();
- // 判断目标表是否存在该主键的记录
- if (destinationRs.next()) {
- if (stringUpdateData.length() == 0) {
- continue;
- }
- // 获取目标数据
- for (int i = 1; i < indexPk; i++) {
- destinationBuilder.append(destinationRs.getString(colDestinationMap.get("colPrimaryKey" + i)));
- }
- for (int i = 1; i < index; i++) {
- destinationBuilder.append(destinationRs.getString(colDestinationMap.get("col" + i)));
- }
- // 将目标数据拼接生成哈希码
- destinationHashCode = destinationBuilder.toString().hashCode();
- // 判断哈希码是否相等,相等则跳过,不相等则更新目标表
- if (sourceHashCode == destinationHashCode) {
- continue;
- }
- // 更新语句
- String destinationUpdateSql = "UPDATE " + destinationTable + " SET " + stringUpdateData + " WHERE " + stringDestinationPk;
- // 调试输出sql
- if (debugSqlFlag == 1) {
- this.saveDebugMsg(tkId, tkTaskName, datetime, destinationUpdateSql);
- }
- // 更新目标表
- Map<String, Object> destinationUpdate = destinationDbUtil.update(destinationConn, destinationUpdateSql);
- // 判断更新是否成功
- if (destinationUpdate.get("code").equals("1")) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- this.saveDebugMsg(tkId, tkTaskName, datetime, destinationUpdate.get("msg").toString());
- }
- totalError++;
- returnMap.put("totalInsert", String.valueOf(totalInsert));
- returnMap.put("totalUpdate", String.valueOf(totalUpdate));
- returnMap.put("totalError", String.valueOf(totalError));
- returnMap.put("exception", destinationUpdate.get("msg").toString());
- return CommonUtil.getReturnMap("0", returnMap);
- }
- totalUpdate++;
- // 获取返回结果
- Map<String, Object> destinationQueryMap = (Map<String, Object>) destinationUpdate.get("msg");
- // 获取更新目标表的PreparedStatement
- PreparedStatement destinationUpdateStmt = (PreparedStatement) destinationQueryMap.get("stmt");
- // 关闭PreparedStatement
- destinationDbUtil.closeStatement(destinationUpdateStmt);
- } else {
- // 插入操作sql
- String destinationInsertSql = "INSERT INTO " + destinationTable + " (" + stringInsertCols + ") VALUES (" + stringInsertData + ")";
- // 调试输出sql
- if (debugSqlFlag == 1) {
- this.saveDebugMsg(tkId, tkTaskName, datetime, destinationInsertSql);
- }
- // 执行插入操作
- Map<String, Object> destinationInsert = destinationDbUtil.update(destinationConn, destinationInsertSql);
- // 判断插入操作是否成功
- if (destinationInsert.get("code").equals("1")) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- this.saveDebugMsg(tkId, tkTaskName, datetime, destinationInsert.get("msg").toString());
- }
- totalError++;
- returnMap.put("totalInsert", String.valueOf(totalInsert));
- returnMap.put("totalUpdate", String.valueOf(totalUpdate));
- returnMap.put("totalError", String.valueOf(totalError));
- returnMap.put("exception", destinationInsert.get("msg").toString());
- return CommonUtil.getReturnMap("0", returnMap);
- }
- totalInsert++;
- Map<String, Object> destinationQueryMap = (Map<String, Object>) destinationInsert.get("msg");
- // 获取插入操作的PreparedStatement
- PreparedStatement destinationInsertStmt = (PreparedStatement) destinationQueryMap.get("stmt");
- // 关闭对象 PreparedStatement
- destinationDbUtil.closeStatement(destinationInsertStmt);
- }
- // 关闭记录集ResultSet、PreparedStatement、数据库连接Connection
- destinationDbUtil.closeResultSet(destinationRs);
- destinationDbUtil.closeStatement(destinationQueryStmt);
- }
- returnMap.put("totalInsert", String.valueOf(totalInsert));
- returnMap.put("totalUpdate", String.valueOf(totalUpdate));
- returnMap.put("totalError", String.valueOf(totalError));
- return CommonUtil.getReturnMap("0", returnMap);
- } catch (SQLException e) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- this.saveDebugMsg(tkId, tkTaskName, datetime, e.getMessage());
- }
- return CommonUtil.getReturnMap("1", "异常:" + Arrays.toString(e.getStackTrace()));
- } finally {
- sourceDbUtil.closeResultSet(sourceRs);
- sourceDbUtil.closeStatement(sourceStmt);
- }
- }
- // 保存debug信息
- private void saveDebugMsg(Integer eTaskId, String tkTaskName, String datetime, String errorMsg) {
- // 实现错误信息保存的逻辑
- SmartDataTaskDebug smartDataTaskDebug = new SmartDataTaskDebug();
- smartDataTaskDebug.setETaskId(eTaskId);
- smartDataTaskDebug.setETaskName(tkTaskName);
- smartDataTaskDebug.setEMsg(errorMsg);
- smartDataTaskDebug.setEDateTime(datetime);
- SmartDataTaskDebug returnSmartDataTaskErr = smartDataTaskMapper.selectErrorMsg(smartDataTaskDebug);
- if (returnSmartDataTaskErr == null) {
- // 保存错误信息到数据库
- int i = smartDataTaskMapper.insertErrorMsg(smartDataTaskDebug);
- if (i == 0) {
- logger.info(datetime + ":保存错误信息失败!");
- }
- } else {
- smartDataTaskDebug.setEId(returnSmartDataTaskErr.getEId());
- int i = smartDataTaskMapper.updateErrorMsg(smartDataTaskDebug);
- if (i == 0) {
- logger.info(datetime + ":更新错误信息失败!");
- }
- }
- }
- /**
- * 保存下次执行时间
- *
- * @param tkId 任务id
- * @param datetime 时间
- * @param tkTaskName 任务名称
- * @param rsIncorrectData 是否记录错误数据:0是,1否
- */
- private void saveNextExeTime(Integer tkId, String datetime, String tkTaskName, int rsIncorrectData) {
- QueryWrapper<SmartDataTask> queryWrapper = new QueryWrapper<>();
- queryWrapper.eq(tkTaskName != null, "tk_name", tkTaskName);
- SmartDataTask smartDataTask = smartDataTaskMapper.selectOne(queryWrapper);
- if (smartDataTask != null) {
- // 下次执行的时间
- String nextExeTime = QuartzJobUtils.getNextExeTime(smartDataTask.getTkCron());
- smartDataTask.setTkNextExeTime(nextExeTime);
- try {
- smartDataTaskMapper.markTaskById(smartDataTask);
- } catch (Exception e) {
- if (rsIncorrectData == 0) {
- this.saveDebugMsg(tkId, tkTaskName, datetime, e.getMessage());
- }
- }
- } else {
- if (rsIncorrectData == 0) {
- this.saveDebugMsg(tkId, tkTaskName, datetime, "【下次执行的时间】无法更新至数据库中!");
- }
- }
- }
- }
|