| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817 |
- package com.template.controller;
- import com.alibaba.fastjson2.JSONArray;
- import com.alibaba.fastjson2.JSONObject;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.template.annotation.PassToken;
- import com.template.common.utils.CommonUtil;
- import com.template.common.utils.DBUtil;
- import com.template.common.utils.QuartzJobUtils;
- import com.template.mapper.SmartDataSourceMapper;
- import com.template.mapper.SmartDataTaskMapper;
- import com.template.model.pojo.SmartDataSourceJobParams;
- import com.template.model.pojo.SmartDataTask;
- import com.template.model.pojo.SmartDataTaskErr;
- import io.swagger.models.auth.In;
- import org.quartz.JobDataMap;
- import org.quartz.JobDetail;
- import org.quartz.JobExecutionContext;
- import org.quartz.JobKey;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.quartz.QuartzJobBean;
- import org.springframework.stereotype.Component;
- import java.sql.Connection;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
- @Component
- public class Task extends QuartzJobBean {
- @Autowired
- private SmartDataTaskMapper smartDataTaskMapper;
- @Autowired
- private SmartDataSourceMapper smartDataSourceMapper;
- private final boolean debugSqlFlag = false;
- @Override
- @PassToken
- protected void executeInternal(JobExecutionContext jobExecutionContext) {
- // 获取任务信息
- JobDetail jobDetail = jobExecutionContext.getJobDetail();
- JobKey key = jobDetail.getKey();
- // 工作内容
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- // 来源数据源参数
- SmartDataTask smartDataTask = (SmartDataTask) jobDataMap.get("smartDataTask");
- // 来源数据源id
- Integer tkDsIdSource = smartDataTask.getTkDsIdSource();
- // 目标数据源id
- Integer tkDsIdDestination = smartDataTask.getTkDsIdDestination();
- // 根据id,获取数据源url、user、password、driver等
- SmartDataSourceJobParams dsSourceInfo = smartDataSourceMapper.getDataSourceInfo(tkDsIdSource);
- SmartDataSourceJobParams dsDestinationInfo = smartDataSourceMapper.getDataSourceInfo(tkDsIdDestination);
- // 来源数据源参数
- String sourceDriver = dsSourceInfo.getDsClsDriver();
- String sourceUrl = dsSourceInfo.getDsUrl();
- String sourceUser = dsSourceInfo.getDsUser();
- String sourcePassword = dsSourceInfo.getDsPassword();
- // 来源数据库sql
- String sourceSql = smartDataTask.getTkSql();
- // 目标数据源参数
- String destinationDriver = dsDestinationInfo.getDsClsDriver();
- String destinationUrl = dsDestinationInfo.getDsUrl();
- String destinationUser = dsDestinationInfo.getDsUser();
- String destinationPassword = dsDestinationInfo.getDsPassword();
- // 目标数据库表
- String destinationTable = smartDataTask.getTkDestTable().substring(0, smartDataTask.getTkDestTable().indexOf("["));
- // 任务数据id、列关系等等
- Integer tkId = smartDataTask.getTkId();
- String colRelationship = smartDataTask.getTkColRelationship();
- Integer rsIncorrectData = smartDataTask.getTkRsIncorrectData();
- Integer optCfgAutoManual = smartDataTask.getTkOptCfgAutoManual();
- Integer optCfgRsNum = smartDataTask.getTkOptCfgRsNum();
- Integer optCfgDefaultRsNum = smartDataTask.getTkOptCfgDefaultRsNum();
- String dsSourceCharset = smartDataTask.getTkDsSourceCharset();
- String dsDestinationCharset = smartDataTask.getTkDsDestinationCharset();
- // 是否是MySQL数据源
- if (sourceDriver.toLowerCase().contains("mysql")) {
- // 执行MySQL任务
- executeMysqlExchangeTask(key, sourceDriver, sourceUrl, sourceUser, sourcePassword, sourceSql,
- destinationDriver, destinationUrl, destinationUser, destinationPassword, destinationTable, tkId,
- colRelationship, rsIncorrectData, optCfgRsNum, optCfgDefaultRsNum, optCfgAutoManual,
- dsSourceCharset, dsDestinationCharset);
- } else if (sourceDriver.toLowerCase().contains("oracle")) {
- // 执行Oracle任务
- executeOracleTask(jobExecutionContext);
- } else {
- // 执行其他任务
- executeOtherTask(jobExecutionContext);
- }
- }
- private void executeOracleTask(JobExecutionContext jobExecutionContext) {
- }
- private void executeOtherTask(JobExecutionContext jobExecutionContext) {
- }
- // 执行MySQL任务的操作
- private void executeMysqlExchangeTask(JobKey key, String sourceDriver, String sourceUrl, String sourceUser,
- String sourcePassword, String sourceSql, String destinationDriver,
- String destinationUrl, String destinationUser, String destinationPassword,
- String destinationTable, int tkId, String colRelationship, int rsIncorrectData,
- int optCfgRsNum, int optCfgDefaultRsNum, int optCfgAutoManual, String dsSourceCharset,
- String dsDestinationCharset) {
- // 当前时间
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Date date = new Date();
- String datetime = dateFormat.format(date);
- // 及时更新下次执行时间
- saveNextExtTime(tkId, datetime, key, rsIncorrectData);
- // 源连接
- DBUtil sourceDbUtil = new DBUtil(sourceUrl, sourceUser, sourcePassword, sourceDriver, dsSourceCharset);
- Map<String, Object> sourceConnMap = sourceDbUtil.getConnection();
- if ("1".equals(sourceConnMap.get("code"))) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- saveErrorMsg(tkId, datetime, sourceConnMap.get("msg").toString());
- return;
- }
- }
- Connection sourceConn = (Connection) sourceConnMap.get("msg");
- // 目标连接
- DBUtil destinationDbUtil = new DBUtil(destinationUrl, destinationUser, destinationPassword, destinationDriver, dsDestinationCharset);
- Map<String, Object> destinationConnMap = destinationDbUtil.getConnection();
- if ("1".equals(destinationConnMap.get("code"))) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- saveErrorMsg(tkId, datetime, destinationConnMap.get("msg").toString());
- return;
- }
- }
- Connection destinationConn = (Connection) destinationConnMap.get("msg");
- // 解析列对应关系
- JSONArray colRelationshipArray = JSONArray.parseArray(colRelationship);
- // 源表列名列表
- Map<String, String> colSourceMap = new HashMap<>();
- // 目标表列名列表
- Map<String, String> colDestinationMap = new HashMap<>();
- int index = 1;
- int indexPk = 1;
- for (int i = 0; i < colRelationshipArray.size(); i++) {
- JSONObject colRelationshipObject = colRelationshipArray.getJSONObject(i);
- if (colRelationshipObject.getInteger("isUpdatePrimaryKey") == 1) {
- colSourceMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colSource"));
- colDestinationMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colDestination"));
- indexPk++;
- } else {
- colSourceMap.put("col" + index, colRelationshipObject.getString("colSource"));
- colDestinationMap.put("col" + index, colRelationshipObject.getString("colDestination"));
- index++;
- }
- }
- // 拼接主键排序条件
- String sourceOrderBy = null;
- String destinationOrderBy = null;
- for (int i = 1; i < indexPk; i++) {
- sourceOrderBy = colSourceMap.get("colPrimaryKey" + i) + " ASC,";
- destinationOrderBy = colDestinationMap.get("colPrimaryKey" + i) + " ASC,";
- }
- if (sourceOrderBy != null) {
- sourceOrderBy = " ORDER BY " + sourceOrderBy.substring(0, sourceOrderBy.length() - 1);
- destinationOrderBy = " ORDER BY " + destinationOrderBy.substring(0, destinationOrderBy.length() - 1);
- }
- // 从第1页开始
- int page = 0, totalInsert = 0, totalUpdate = 0;
- String sourceSql_tmp;
- String destinationSql;
- while (true) {
- // 当前时间
- date = new Date();
- datetime = dateFormat.format(date);
- // 获取对应id的字段值,方便后续使用
- int tkActivation = smartDataTaskMapper.selectColSwappedPrimaryKeys(tkId);
- if (tkActivation == 0) {
- saveErrorMsg(tkId, datetime, "当前任务停止,需要手动启动!");
- break;
- } else if (tkActivation == 2) {
- saveErrorMsg(tkId, datetime, "当前任务已暂停,需要手动恢复,手动恢复1秒后继续!");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- saveErrorMsg(tkId, datetime, "线程休眠异常!" + e.getMessage());
- }
- continue;
- }
- // 运行参数配置:0自动,1手动
- if (optCfgAutoManual == 0) {
- sourceSql_tmp = sourceSql + sourceOrderBy + " LIMIT " + (page * optCfgDefaultRsNum) + "," + optCfgDefaultRsNum;
- destinationSql = "SELECT * FROM " + destinationTable + destinationOrderBy + " LIMIT "
- + (page * optCfgDefaultRsNum) + "," + optCfgDefaultRsNum;
- } else {
- sourceSql_tmp = sourceSql + destinationOrderBy + " LIMIT " + (page * optCfgRsNum) + "," + optCfgRsNum;
- destinationSql = "SELECT * FROM " + destinationTable + destinationOrderBy
- + " LIMIT " + (page * optCfgRsNum) + "," + optCfgRsNum;
- }
- // 调试输出sql
- if (debugSqlFlag) {
- saveErrorMsg(tkId, datetime, sourceSql_tmp);
- saveErrorMsg(tkId, datetime, destinationSql);
- }
- // 执行源sql拼接后的语句
- Map<String, Object> sourceQuery = sourceDbUtil.query(sourceConn, sourceSql_tmp);
- if ("1".equals(sourceQuery.get("code"))) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- saveErrorMsg(tkId, datetime, sourceQuery.get("msg").toString());
- }
- break;
- }
- Map<String, Object> sourceMapPage = (Map<String, Object>) sourceQuery.get("msg");
- ResultSet sourceRs = (ResultSet) sourceMapPage.get("rs");
- // PreparedStatement sourceStmt = (PreparedStatement) sourceMapPage.get("stmt");
- // 执行目标sql拼接后的语句
- Map<String, Object> destinationQuery = destinationDbUtil.query(destinationConn, destinationSql);
- if ("1".equals(destinationQuery.get("code"))) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- saveErrorMsg(tkId, datetime, destinationQuery.get("msg").toString());
- }
- break;
- }
- Map<String, Object> destinationMapPage = (Map<String, Object>) destinationQuery.get("msg");
- ResultSet destinationRs = (ResultSet) destinationMapPage.get("rs");
- // PreparedStatement destinationStmt = (PreparedStatement) destinationMapPage.get("stmt");
- // 2.从目标库中获取【默认记录数】或【指定记录数】生成哈希码
- int sourceHashCode = 0, destinationHashCode = 0;
- boolean sourceHashCodeFlag = false;
- StringBuilder sourceBuilder = new StringBuilder();
- StringBuilder destinationBuilder = new StringBuilder();
- try {
- while (sourceRs.next()) {
- sourceHashCodeFlag = true;
- // 获取源数据
- for (int i = 1; i < indexPk; i++) {
- sourceBuilder.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i)));
- }
- for (int i = 1; i < index; i++) {
- sourceBuilder.append(sourceRs.getString(colSourceMap.get("col" + i)));
- }
- }
- // 将源数据拼接生成哈希码
- sourceHashCode = sourceBuilder.toString().hashCode();
- while (destinationRs.next()) {
- // 获取目标数据
- for (int i = 1; i < indexPk; i++) {
- destinationBuilder.append(destinationRs.getString(colDestinationMap.get("colPrimaryKey" + i)));
- }
- for (int i = 1; i < index; i++) {
- destinationBuilder.append(destinationRs.getString(colDestinationMap.get("col" + i)));
- }
- }
- // 将目标数据拼接生成哈希码
- destinationHashCode = destinationBuilder.toString().hashCode();
- } catch (SQLException e) {
- saveErrorMsg(tkId, datetime, e.getMessage());
- }
- // 3.比较两个哈希码,判断是否需要更新
- if (sourceHashCode != destinationHashCode) {
- // 如果源数据和目标数据存在相同的主键值,则需要更新操作,如果源数据和目标数据不存在相同的主键值,则需要将来源库的数据插入到目标库中
- Map<String, Object> returnMap = executeMysqlInsertOrUpdateTask(sourceDbUtil, destinationDbUtil, sourceConn,
- destinationConn, sourceSql_tmp, rsIncorrectData, tkId, datetime,
- indexPk, index, colSourceMap, colDestinationMap, destinationTable);
- if (returnMap.get("code").equals("0")) {
- page++;
- Map<String, Integer> returnM = (Map<String, Integer>) returnMap.get("msg");
- totalInsert += returnM.get("totalInsert");
- totalUpdate += returnM.get("totalUpdate");
- continue;
- } else {
- break;
- }
- }
- // 都没有记录了,则不需要更新
- if (!sourceHashCodeFlag) {
- break;
- }
- page++;
- }
- String tmp1 = "数据交换完成,共交换数据 " + (totalInsert + totalUpdate) + " 条:插入:" + totalInsert + " 条,更新:" + totalUpdate + " 条";
- saveErrorMsg(tkId, datetime, tmp1);
- // 关闭数据库连接
- if (destinationConn != null) {
- destinationDbUtil.closeConnection(destinationConn);
- }
- if (sourceConn != null) {
- sourceDbUtil.closeConnection(sourceConn);
- }
- }
- private Map<String, Object> executeMysqlInsertOrUpdateTask(DBUtil sourceDbUtil, DBUtil destinationDbUtil, Connection sourceConn,
- Connection destinationConn, String sourceSql,
- int rsIncorrectData, int tkId, String datetime, int indexPk, int index,
- Map<String, String> colSourceMap, Map<String, String> colDestinationMap,
- String destinationTable) {
- // 执行源sql
- Map<String, Object> sourceQuery = sourceDbUtil.query(sourceConn, sourceSql);
- if ("1".equals(sourceQuery.get("code"))) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- saveErrorMsg(tkId, datetime, sourceQuery.get("msg").toString());
- }
- return CommonUtil.getReturnMap("1", "执行源sql失败");
- }
- Map<String, Object> sourceMap = (Map<String, Object>) sourceQuery.get("msg");
- ResultSet sourceRs = (ResultSet) sourceMap.get("rs");
- PreparedStatement sourceStmt = (PreparedStatement) sourceMap.get("stmt");
- int totalInsert = 0, totalUpdate = 0;
- // 主键的值
- StringBuilder stringPrimaryKeyData = new StringBuilder();
- try {
- // 执行解析操作
- while (sourceRs.next()) {
- // 拼接插入的值
- StringBuilder stringInsertData = new StringBuilder();
- // 拼接插入的列
- StringBuilder stringInsertCols = new StringBuilder();
- // 拼接更新的列和值
- StringBuilder stringUpdateData = new StringBuilder();
- // 拼接来源表的pk
- StringBuilder stringSourcePk = new StringBuilder();
- // 拼接目标表的pk
- StringBuilder stringDestinationPk = new StringBuilder();
- String tempStr;
- for (int i = 1; i < indexPk; i++) {
- tempStr = sourceRs.getString(colSourceMap.get("colPrimaryKey" + i));
- // 来源表的主键where子句
- stringSourcePk.append(colSourceMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND ");
- // 目标表的主键where子句
- stringDestinationPk.append(colDestinationMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND ");
- // 拼接主键的值
- stringPrimaryKeyData.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append(",");
- // 拼接插入的值
- stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append("',");
- // 拼接插入的列
- stringInsertCols.append(colDestinationMap.get("colPrimaryKey" + i)).append(",");
- }
- // 去除最后的 AND
- stringSourcePk.setLength(stringSourcePk.length() - 5);
- stringDestinationPk.setLength(stringDestinationPk.length() - 5);
- stringPrimaryKeyData.setLength(stringPrimaryKeyData.length() - 1);
- stringPrimaryKeyData.append(";");
- // 拼接更新的值
- for (int i = 1; i < index; i++) {
- // 拼接插入的值
- stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',");
- // 拼接插入的列
- stringInsertCols.append(colDestinationMap.get("col" + i)).append(",");
- // 拼接更新的列和值
- stringUpdateData.append(colDestinationMap.get("col" + i)).append("='").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',");
- }
- // 删除最后一个逗号
- stringInsertData.setLength(stringInsertData.length() - 1);
- stringInsertCols.setLength(stringInsertCols.length() - 1);
- stringUpdateData.setLength(stringUpdateData.length() - 1);
- // 查询是否存在该主键的记录sql
- String destinationQuerySql = "SELECT * FROM " + destinationTable + " WHERE " + stringDestinationPk;
- // 调试输出sql
- if (debugSqlFlag) {
- saveErrorMsg(tkId, datetime, destinationQuerySql);
- }
- // 查询目标表是否存在该主键的记录
- Map<String, Object> destinationQuery = destinationDbUtil.query(destinationConn, destinationQuerySql);
- // 判断查询结果,如果发生错误返回code:1
- if (destinationQuery.get("code").equals("1")) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- saveErrorMsg(tkId, datetime, destinationQuery.get("msg").toString());
- }
- return CommonUtil.getReturnMap("1", "目标表查询错误" + destinationQuery.get("msg").toString());
- }
- // 获取查询结果集map
- Map<String, Object> sourceQueryMap = (Map<String, Object>) destinationQuery.get("msg");
- // 获取查询结果集
- ResultSet destinationRs = (ResultSet) sourceQueryMap.get("rs");
- // 获取查询sql的PreparedStatement
- PreparedStatement destinationQueryStmt = (PreparedStatement) sourceQueryMap.get("stmt");
- // 判断目标表是否存在该主键的记录
- if (destinationRs.next()) {
- // 更新语句
- String destinationUpdateSql = "UPDATE " + destinationTable + " SET " + stringUpdateData + " WHERE " + stringDestinationPk;
- // 调试输出sql
- if (debugSqlFlag) {
- saveErrorMsg(tkId, datetime, destinationUpdateSql);
- }
- // 更新目标表
- Map<String, Object> destinationUpdate = destinationDbUtil.update(destinationConn, destinationUpdateSql);
- // 判断更新是否成功
- if (destinationUpdate.get("code").equals("1")) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- saveErrorMsg(tkId, datetime, destinationUpdate.get("msg").toString());
- }
- return CommonUtil.getReturnMap("1", "目标表更新失败" + destinationUpdate.get("msg").toString());
- }
- totalUpdate++;
- // 获取返回结果
- Map<String, Object> destinationQueryMap = (Map<String, Object>) destinationUpdate.get("msg");
- // 获取更新目标表的PreparedStatement
- PreparedStatement destinationUpdateStmt = (PreparedStatement) destinationQueryMap.get("stmt");
- // 关闭PreparedStatement
- destinationDbUtil.closeStatement(destinationUpdateStmt);
- } else {
- // 插入操作sql
- String destinationInsertSql = "INSERT INTO " + destinationTable + " (" + stringInsertCols + ") VALUES (" + stringInsertData + ")";
- // 调试输出sql
- if (debugSqlFlag) {
- saveErrorMsg(tkId, datetime, destinationInsertSql);
- }
- // 执行插入操作
- Map<String, Object> destinationInsert = destinationDbUtil.update(destinationConn, destinationInsertSql);
- // 判断插入操作是否成功
- if (destinationInsert.get("code").equals("1")) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- saveErrorMsg(tkId, datetime, destinationInsert.get("msg").toString());
- }
- return CommonUtil.getReturnMap("1", "插入目标表失败" + destinationInsert.get("msg").toString());
- }
- totalInsert++;
- Map<String, Object> destinationQueryMap = (Map<String, Object>) destinationInsert.get("msg");
- // 获取插入操作的PreparedStatement
- PreparedStatement destinationInsertStmt = (PreparedStatement) destinationQueryMap.get("stmt");
- // 关闭对象 PreparedStatement
- destinationDbUtil.closeStatement(destinationInsertStmt);
- }
- // 关闭记录集ResultSet、PreparedStatement、数据库连接Connection
- destinationDbUtil.closeResultSet(destinationRs);
- destinationDbUtil.closeStatement(destinationQueryStmt);
- }
- Map<String, Integer> returnMap = new HashMap<>();
- returnMap.put("totalInsert", totalInsert);
- returnMap.put("totalUpdate", totalUpdate);
- return CommonUtil.getReturnMap("0", returnMap);
- } catch (SQLException e) {
- if (rsIncorrectData == 0) {
- // 记录错误信息
- saveErrorMsg(tkId, datetime, e.getMessage());
- }
- return CommonUtil.getReturnMap("1", "异常:" + e.getMessage());
- } finally {
- sourceDbUtil.closeResultSet(sourceRs);
- sourceDbUtil.closeStatement(sourceStmt);
- }
- }
- // 执行MySQL任务的insert操作
- // private void executeMysqlInsertTask(JobKey key, String sourceDriver, String sourceUrl, String sourceUser,
- // String sourcePassword, JobDataMap jobDataMap, String destinationDriver,
- // String destinationUrl, String destinationUser, String destinationPassword,
- // String destinationTable, int tkId, String colRelationship, int rsIncorrectData,
- // int optCfgRsNum, int optCfgAutoManual, String dsSourceCharset, String dsDestinationCharset) {
- // // 当前时间
- // Date date = new Date();
- // SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- // String datetime = dateFormat.format(date);
- // // 及时更新下次执行时间
- // saveNextExtTime(tkId, datetime, key, rsIncorrectData);
- //
- // while (true) {
- // // 当前时间
- // date = new Date();
- // datetime = dateFormat.format(date);
- // // 获取sql
- // String sourceSql = (String) jobDataMap.get("sourceSql");
- // // 获取交换主键
- // SmartDataTask smartDataTask = smartDataTaskMapper.selectColSwappedPrimaryKeys(tkId);
- // String swappedPrimaryKeys = smartDataTask.getTkSwappedPrimaryKeys();
- //
- // // 源连接
- // DBUtil sourceDbUtil = new DBUtil(sourceUrl, sourceUser, sourcePassword, sourceDriver, dsSourceCharset);
- // Map<String, Object> sourceConnMap = sourceDbUtil.getConnection();
- // if ("1".equals(sourceConnMap.get("code"))) {
- // if (rsIncorrectData == 0) {
- // // 记录错误信息
- // saveErrorMsg(tkId, datetime, sourceConnMap.get("msg").toString());
- // }
- // }
- //
- // // 解析列对应关系
- // JSONArray colRelationshipArray = JSONArray.parseArray(colRelationship);
- // // 源表列名列表
- // Map<String, String> colSourceMap = new HashMap<>();
- // // 目标表列名列表
- // Map<String, String> colDestinationMap = new HashMap<>();
- // int index = 1;
- // int indexPk = 1;
- // for (int i = 0; i < colRelationshipArray.size(); i++) {
- // JSONObject colRelationshipObject = colRelationshipArray.getJSONObject(i);
- // if (colRelationshipObject.getInteger("isUpdatePrimaryKey") == 1) {
- // colSourceMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colSource"));
- // colDestinationMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colDestination"));
- // indexPk++;
- // } else {
- // colSourceMap.put("col" + index, colRelationshipObject.getString("colSource"));
- // colDestinationMap.put("col" + index, colRelationshipObject.getString("colDestination"));
- // index++;
- // }
- // }
- //
- // if (optCfgAutoManual == 0) {
- // int rows = smartDataTask.getTkOptCfgDefaultRsNum();
- // if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) {
- // sourceSql = sourceSql + " LIMIT " + rows;
- // } else {
- // String notIn = generateNotIn(indexPk, swappedPrimaryKeys, colSourceMap);
- // sourceSql = sourceSql + notIn + " LIMIT " + rows;
- // }
- // } else {
- // if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) {
- // sourceSql = sourceSql + " LIMIT " + optCfgRsNum;
- // } else {
- // String notIn = generateNotIn(indexPk, swappedPrimaryKeys, colSourceMap);
- // sourceSql = sourceSql + notIn + " LIMIT " + optCfgRsNum;
- // }
- // }
- // Connection sourceConn = (Connection) sourceConnMap.get("msg");
- // // 执行源sql
- // Map<String, Object> sourceQuery = sourceDbUtil.query(sourceConn, sourceSql);
- // if ("1".equals(sourceQuery.get("code"))) {
- // if (rsIncorrectData == 0) {
- // // 记录错误信息
- // saveErrorMsg(tkId, datetime, sourceQuery.get("msg").toString());
- // }
- // // 关闭连接
- // sourceDbUtil.closeConnection(sourceConn);
- // return;
- // }
- // Map<String, Object> sourceMap = (Map<String, Object>) sourceQuery.get("msg");
- // ResultSet sourceRs = (ResultSet) sourceMap.get("rs");
- // PreparedStatement sourceStmt = (PreparedStatement) sourceMap.get("stmt");
- //
- // // 主键的值
- // StringBuilder stringPrimaryKeyData = new StringBuilder();
- // boolean over = false;
- // try {
- // // 执行解析操作
- // while (sourceRs.next()) {
- // // 拼接插入的值
- // StringBuilder stringInsertData = new StringBuilder();
- // // 拼接插入的列
- // StringBuilder stringInsertCols = new StringBuilder();
- // // 拼接更新的列和值
- // StringBuilder stringUpdateData = new StringBuilder();
- // // 拼接来源表的pk
- // StringBuilder stringSourcePk = new StringBuilder();
- // // 拼接目标表的pk
- // StringBuilder stringDestinationPk = new StringBuilder();
- // over = true;
- // String tempStr;
- // for (int i = 1; i < indexPk; i++) {
- // tempStr = sourceRs.getString(colSourceMap.get("colPrimaryKey" + i));
- // // 来源表的主键where子句
- // stringSourcePk.append(colSourceMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND ");
- // // 目标表的主键where子句
- // stringDestinationPk.append(colDestinationMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND ");
- // // 拼接主键的值
- // stringPrimaryKeyData.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append(",");
- // // 拼接插入的值
- // stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append("',");
- // // 拼接插入的列
- // stringInsertCols.append(colDestinationMap.get("colPrimaryKey" + i)).append(",");
- // }
- // // 去除最后的 AND
- // stringSourcePk.setLength(stringSourcePk.length() - 5);
- // stringDestinationPk.setLength(stringDestinationPk.length() - 5);
- // stringPrimaryKeyData.setLength(stringPrimaryKeyData.length() - 1);
- // stringPrimaryKeyData.append(";");
- // // 拼接更新的值
- // for (int i = 1; i < index; i++) {
- // // 拼接插入的值
- // stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',");
- // // 拼接插入的列
- // stringInsertCols.append(colDestinationMap.get("col" + i)).append(",");
- // // 拼接更新的列和值
- // stringUpdateData.append(colDestinationMap.get("col" + i)).append("='").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',");
- // }
- // // 删除最后一个逗号
- // stringInsertData.setLength(stringInsertData.length() - 1);
- // stringInsertCols.setLength(stringInsertCols.length() - 1);
- // stringUpdateData.setLength(stringUpdateData.length() - 1);
- // // 执行插入操作
- // // 数据库对象
- // DBUtil destinationDbUtil = new DBUtil(destinationUrl, destinationUser, destinationPassword, destinationDriver, dsDestinationCharset);
- // // 获取数据库连接map
- // Map<String, Object> destinationConnMap = destinationDbUtil.getConnection();
- // if (destinationConnMap.get("code").equals("1")) {
- // if (rsIncorrectData == 0) {
- // // 记录错误信息
- // saveErrorMsg(tkId, datetime, destinationConnMap.get("msg").toString());
- // }
- // }
- // // 获取目标连接
- // Connection destinationConn = (Connection) destinationConnMap.get("msg");
- // // 查询是否存在该主键的记录sql
- // String destinationQuerySql = "SELECT * FROM " + destinationTable + " WHERE " + stringDestinationPk;
- // // 查询目标表是否存在该主键的记录
- // Map<String, Object> destinationQuery = destinationDbUtil.query(destinationConn, destinationQuerySql);
- // // 判断查询结果,如果发生错误返回code:1
- // if (destinationQuery.get("code").equals("1")) {
- // if (rsIncorrectData == 0) {
- // // 记录错误信息
- // saveErrorMsg(tkId, datetime, destinationQuery.get("msg").toString());
- // }
- // // 关闭连接
- // destinationDbUtil.closeConnection(destinationConn);
- // return;
- // }
- // // 获取查询结果集map
- // Map<String, Object> sourceQueryMap = (Map<String, Object>) destinationQuery.get("msg");
- // // 获取查询结果集
- // ResultSet destinationRs = (ResultSet) sourceQueryMap.get("rs");
- // // 获取查询sql的PreparedStatement
- // PreparedStatement destinationQueryStmt = (PreparedStatement) sourceQueryMap.get("stmt");
- // // 判断目标表是否存在该主键的记录
- // if (destinationRs.next()) {
- // // 更新语句
- // String destinationUpdateSql = "UPDATE " + destinationTable + " SET " + stringUpdateData + " WHERE " + stringDestinationPk;
- // // 更新目标表
- // Map<String, Object> destinationUpdate = destinationDbUtil.update(destinationConn, destinationUpdateSql);
- // // 判断更新是否成功
- // if (destinationUpdate.get("code").equals("1")) {
- // if (rsIncorrectData == 0) {
- // // 记录错误信息
- // saveErrorMsg(tkId, datetime, destinationUpdate.get("msg").toString());
- // }
- // // 关闭连接
- // destinationDbUtil.closeConnection(destinationConn);
- // return;
- // }
- // // 获取返回结果
- // Map<String, Object> destinationQueryMap = (Map<String, Object>) destinationUpdate.get("msg");
- // // 获取更新目标表的PreparedStatement
- // PreparedStatement destinationUpdateStmt = (PreparedStatement) destinationQueryMap.get("stmt");
- // // 关闭PreparedStatement
- // destinationDbUtil.closeStatement(destinationUpdateStmt);
- // } else {
- // // 插入操作sql
- // String destinationInsertSql = "INSERT INTO " + destinationTable + " (" + stringInsertCols + ") VALUES (" + stringInsertData + ")";
- // // 执行插入操作
- // Map<String, Object> destinationInsert = destinationDbUtil.update(destinationConn, destinationInsertSql);
- // // 判断插入操作是否成功
- // if (destinationInsert.get("code").equals("1")) {
- // if (rsIncorrectData == 0) {
- // // 记录错误信息
- // saveErrorMsg(tkId, datetime, destinationInsert.get("msg").toString());
- // }
- // // 关闭连接
- // destinationDbUtil.closeConnection(destinationConn);
- // return;
- // }
- // Map<String, Object> destinationQueryMap = (Map<String, Object>) destinationInsert.get("msg");
- // // 获取插入操作的PreparedStatement
- // PreparedStatement destinationInsertStmt = (PreparedStatement) destinationQueryMap.get("stmt");
- // // 关闭对象 PreparedStatement
- // destinationDbUtil.closeStatement(destinationInsertStmt);
- // // 创建SmartDataTask对象
- // SmartDataTask tempSmartDataTask = new SmartDataTask();
- // tempSmartDataTask.setTkId(tkId);
- // String tmpStr;
- // // 获取插入操作的返回值
- // if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) {
- // // 如果为空
- // tmpStr = stringPrimaryKeyData.toString();
- // } else {
- // // 如果非空
- // if (swappedPrimaryKeys.endsWith(";")) {
- // tmpStr = swappedPrimaryKeys + stringPrimaryKeyData;
- // } else {
- // tmpStr = swappedPrimaryKeys + ";" + stringPrimaryKeyData;
- // }
- // }
- // tempSmartDataTask.setTkSwappedPrimaryKeys(tmpStr);
- // // 保存交换后的主键
- // int returnUpdateCount = smartDataTaskMapper.saveSwappedPrimaryKeys(tempSmartDataTask);
- // if (returnUpdateCount == 0) {
- // if (rsIncorrectData == 0) {
- // // 记录错误信息
- // saveErrorMsg(tkId, datetime, "记录交换的主键失败!插入语句:" + destinationInsertSql);
- // }
- // }
- // }
- // // 关闭记录集ResultSet、PreparedStatement、数据库连接Connection
- // destinationDbUtil.closeResultSet(destinationRs);
- // destinationDbUtil.closeStatement(destinationQueryStmt);
- // destinationDbUtil.closeConnection(destinationConn);
- // }
- // } catch (SQLException e) {
- // if (rsIncorrectData == 0) {
- // // 记录错误信息
- // saveErrorMsg(tkId, datetime, e.getMessage());
- // }
- // } finally {
- // sourceDbUtil.closeResultSet(sourceRs);
- // sourceDbUtil.closeStatement(sourceStmt);
- // }
- // // 没有记录,则本次交换没有需要交换的数据了
- // if (!over) {
- // if (rsIncorrectData == 0) {
- // // 记录错误信息
- // saveErrorMsg(tkId, datetime, "交换中的Insert任务完成,接下来是Update任务!");
- // }
- // break;
- // }
- // }
- // }
- //
- // // 生成不在目标表中的记录的SQL语句
- // private String generateNotIn(int indexPk, String swappedPrimaryKeys, Map<String, String> colSourceMap) {
- // String[] pksValue = new String[indexPk + 1];
- // for (int i = 0; i < pksValue.length; i++) {
- // pksValue[i] = "";
- // }
- // String[] pksArr = swappedPrimaryKeys.split(";");
- // for (int i = 0; i < pksArr.length; i++) {
- // if (pksArr[i] != null && !pksArr[i].trim().equals("")) {
- // String[] pkArr = pksArr[i].split(",");
- // for (int j = 0; j < pkArr.length; j++) {
- // for (int k = 1; k < indexPk; k++) {
- // if (j + 1 == k) {
- // pksValue[k] = pksValue[k] + "'" + pkArr[j] + "',";
- // }
- // }
- // }
- // }
- // }
- // StringBuilder notIn = new StringBuilder();
- // notIn.append(" WHERE ");
- // for (int i = 1; i < indexPk; i++) {
- // notIn.append(colSourceMap.get("colPrimaryKey" + i)).append(" NOT IN(");
- // if (pksValue[i] != null && !pksValue[i].equals("")) {
- // notIn.append(pksValue[i]);
- // }
- // notIn.setLength(notIn.length() - 1);
- // notIn.append(")").append(" AND ");
- // }
- // if (notIn.toString().endsWith(" AND ")) {
- // notIn.setLength(notIn.length() - 5);
- // } else {
- // notIn.setLength(0); // 如果没有条件,则不加NOT IN
- // }
- // return notIn.toString();
- // }
- // 保存错误信息
- private void saveErrorMsg(Integer tkId, String datetime, String errorMsg) {
- // 实现错误信息保存的逻辑
- SmartDataTaskErr smartDataTaskErr = new SmartDataTaskErr();
- smartDataTaskErr.setETaskId(tkId);
- smartDataTaskErr.setEMsg(errorMsg);
- smartDataTaskErr.setEDateTime(datetime);
- SmartDataTaskErr returnSmartDataTaskErr = smartDataTaskMapper.selectErrorMsg(smartDataTaskErr);
- if (returnSmartDataTaskErr == null) {
- // 保存错误信息到数据库
- int i = smartDataTaskMapper.insertErrorMsg(smartDataTaskErr);
- if (i == 0) {
- System.out.println(datetime + ":保存错误信息失败!");
- }
- } else {
- smartDataTaskErr.setEId(returnSmartDataTaskErr.getEId());
- int i = smartDataTaskMapper.updateErrorMsg(smartDataTaskErr);
- if (i == 0) {
- System.out.println(datetime + ":更新错误信息失败!");
- }
- }
- }
- // 保存下次执行时间
- private void saveNextExtTime(Integer tkId, String datetime, JobKey key, int rsIncorrectData) {
- QueryWrapper<SmartDataTask> queryWrapper = new QueryWrapper<>();
- queryWrapper.eq(key.getName() != null, "tk_name", key.getName());
- SmartDataTask smartDataTask = smartDataTaskMapper.selectOne(queryWrapper);
- if (smartDataTask != null) {
- // 下次执行的时间
- String nextExeTime = QuartzJobUtils.getNextExeTime(smartDataTask.getTkCron());
- smartDataTask.setTkNextExeTime(nextExeTime);
- try {
- smartDataTaskMapper.markTaskById(smartDataTask);
- } catch (Exception e) {
- if (rsIncorrectData == 0) {
- saveErrorMsg(tkId, datetime, e.getMessage());
- }
- }
- } else {
- if (rsIncorrectData == 0) {
- saveErrorMsg(tkId, datetime, "【下次执行的时间】无法更新至数据库中!");
- }
- }
- }
- }
|