package com.template.controller; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.template.annotation.PassToken; import com.template.common.utils.CommonUtil; import com.template.common.utils.DBUtil; import com.template.common.utils.QuartzJobUtils; import com.template.mapper.SmartDataSourceMapper; import com.template.mapper.SmartDataTaskMapper; import com.template.model.pojo.SmartDataSourceJobParams; import com.template.model.pojo.SmartDataTask; import com.template.model.pojo.SmartDataTaskErr; import io.swagger.models.auth.In; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobKey; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.stereotype.Component; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; @Component public class Task extends QuartzJobBean { @Autowired private SmartDataTaskMapper smartDataTaskMapper; @Autowired private SmartDataSourceMapper smartDataSourceMapper; private final boolean debugSqlFlag = false; @Override @PassToken protected void executeInternal(JobExecutionContext jobExecutionContext) { // 获取任务信息 JobDetail jobDetail = jobExecutionContext.getJobDetail(); JobKey key = jobDetail.getKey(); // 工作内容 JobDataMap jobDataMap = jobDetail.getJobDataMap(); // 来源数据源参数 SmartDataTask smartDataTask = (SmartDataTask) jobDataMap.get("smartDataTask"); // 来源数据源id Integer tkDsIdSource = smartDataTask.getTkDsIdSource(); // 目标数据源id Integer tkDsIdDestination = smartDataTask.getTkDsIdDestination(); // 根据id,获取数据源url、user、password、driver等 SmartDataSourceJobParams dsSourceInfo = smartDataSourceMapper.getDataSourceInfo(tkDsIdSource); SmartDataSourceJobParams dsDestinationInfo = smartDataSourceMapper.getDataSourceInfo(tkDsIdDestination); // 来源数据源参数 String sourceDriver = dsSourceInfo.getDsClsDriver(); String sourceUrl = dsSourceInfo.getDsUrl(); String sourceUser = dsSourceInfo.getDsUser(); String sourcePassword = dsSourceInfo.getDsPassword(); // 来源数据库sql String sourceSql = smartDataTask.getTkSql(); // 目标数据源参数 String destinationDriver = dsDestinationInfo.getDsClsDriver(); String destinationUrl = dsDestinationInfo.getDsUrl(); String destinationUser = dsDestinationInfo.getDsUser(); String destinationPassword = dsDestinationInfo.getDsPassword(); // 目标数据库表 String destinationTable = smartDataTask.getTkDestTable().substring(0, smartDataTask.getTkDestTable().indexOf("[")); // 任务数据id、列关系等等 Integer tkId = smartDataTask.getTkId(); String colRelationship = smartDataTask.getTkColRelationship(); Integer rsIncorrectData = smartDataTask.getTkRsIncorrectData(); Integer optCfgAutoManual = smartDataTask.getTkOptCfgAutoManual(); Integer optCfgRsNum = smartDataTask.getTkOptCfgRsNum(); Integer optCfgDefaultRsNum = smartDataTask.getTkOptCfgDefaultRsNum(); String dsSourceCharset = smartDataTask.getTkDsSourceCharset(); String dsDestinationCharset = smartDataTask.getTkDsDestinationCharset(); // 是否是MySQL数据源 if (sourceDriver.toLowerCase().contains("mysql")) { // 执行MySQL任务 executeMysqlExchangeTask(key, sourceDriver, sourceUrl, sourceUser, sourcePassword, sourceSql, destinationDriver, destinationUrl, destinationUser, destinationPassword, destinationTable, tkId, colRelationship, rsIncorrectData, optCfgRsNum, optCfgDefaultRsNum, optCfgAutoManual, dsSourceCharset, dsDestinationCharset); } else if (sourceDriver.toLowerCase().contains("oracle")) { // 执行Oracle任务 executeOracleTask(jobExecutionContext); } else { // 执行其他任务 executeOtherTask(jobExecutionContext); } } private void executeOracleTask(JobExecutionContext jobExecutionContext) { } private void executeOtherTask(JobExecutionContext jobExecutionContext) { } // 执行MySQL任务的操作 private void executeMysqlExchangeTask(JobKey key, String sourceDriver, String sourceUrl, String sourceUser, String sourcePassword, String sourceSql, String destinationDriver, String destinationUrl, String destinationUser, String destinationPassword, String destinationTable, int tkId, String colRelationship, int rsIncorrectData, int optCfgRsNum, int optCfgDefaultRsNum, int optCfgAutoManual, String dsSourceCharset, String dsDestinationCharset) { // 当前时间 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String datetime = dateFormat.format(date); // 及时更新下次执行时间 saveNextExtTime(tkId, datetime, key, rsIncorrectData); // 源连接 DBUtil sourceDbUtil = new DBUtil(sourceUrl, sourceUser, sourcePassword, sourceDriver, dsSourceCharset); Map sourceConnMap = sourceDbUtil.getConnection(); if ("1".equals(sourceConnMap.get("code"))) { if (rsIncorrectData == 0) { // 记录错误信息 saveErrorMsg(tkId, datetime, sourceConnMap.get("msg").toString()); return; } } Connection sourceConn = (Connection) sourceConnMap.get("msg"); // 目标连接 DBUtil destinationDbUtil = new DBUtil(destinationUrl, destinationUser, destinationPassword, destinationDriver, dsDestinationCharset); Map destinationConnMap = destinationDbUtil.getConnection(); if ("1".equals(destinationConnMap.get("code"))) { if (rsIncorrectData == 0) { // 记录错误信息 saveErrorMsg(tkId, datetime, destinationConnMap.get("msg").toString()); return; } } Connection destinationConn = (Connection) destinationConnMap.get("msg"); // 解析列对应关系 JSONArray colRelationshipArray = JSONArray.parseArray(colRelationship); // 源表列名列表 Map colSourceMap = new HashMap<>(); // 目标表列名列表 Map colDestinationMap = new HashMap<>(); int index = 1; int indexPk = 1; for (int i = 0; i < colRelationshipArray.size(); i++) { JSONObject colRelationshipObject = colRelationshipArray.getJSONObject(i); if (colRelationshipObject.getInteger("isUpdatePrimaryKey") == 1) { colSourceMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colSource")); colDestinationMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colDestination")); indexPk++; } else { colSourceMap.put("col" + index, colRelationshipObject.getString("colSource")); colDestinationMap.put("col" + index, colRelationshipObject.getString("colDestination")); index++; } } // 拼接主键排序条件 String sourceOrderBy = null; String destinationOrderBy = null; for (int i = 1; i < indexPk; i++) { sourceOrderBy = colSourceMap.get("colPrimaryKey" + i) + " ASC,"; destinationOrderBy = colDestinationMap.get("colPrimaryKey" + i) + " ASC,"; } if (sourceOrderBy != null) { sourceOrderBy = " ORDER BY " + sourceOrderBy.substring(0, sourceOrderBy.length() - 1); destinationOrderBy = " ORDER BY " + destinationOrderBy.substring(0, destinationOrderBy.length() - 1); } // 从第1页开始 int page = 0, totalInsert = 0, totalUpdate = 0; String sourceSql_tmp; String destinationSql; while (true) { // 当前时间 date = new Date(); datetime = dateFormat.format(date); // 获取对应id的字段值,方便后续使用 int tkActivation = smartDataTaskMapper.selectColSwappedPrimaryKeys(tkId); if (tkActivation == 0) { saveErrorMsg(tkId, datetime, "当前任务停止,需要手动启动!"); break; } else if (tkActivation == 2) { saveErrorMsg(tkId, datetime, "当前任务已暂停,需要手动恢复,手动恢复1秒后继续!"); try { Thread.sleep(1000); } catch (InterruptedException e) { saveErrorMsg(tkId, datetime, "线程休眠异常!" + e.getMessage()); } continue; } // 运行参数配置:0自动,1手动 if (optCfgAutoManual == 0) { sourceSql_tmp = sourceSql + sourceOrderBy + " LIMIT " + (page * optCfgDefaultRsNum) + "," + optCfgDefaultRsNum; destinationSql = "SELECT * FROM " + destinationTable + destinationOrderBy + " LIMIT " + (page * optCfgDefaultRsNum) + "," + optCfgDefaultRsNum; } else { sourceSql_tmp = sourceSql + destinationOrderBy + " LIMIT " + (page * optCfgRsNum) + "," + optCfgRsNum; destinationSql = "SELECT * FROM " + destinationTable + destinationOrderBy + " LIMIT " + (page * optCfgRsNum) + "," + optCfgRsNum; } // 调试输出sql if (debugSqlFlag) { saveErrorMsg(tkId, datetime, sourceSql_tmp); saveErrorMsg(tkId, datetime, destinationSql); } // 执行源sql拼接后的语句 Map sourceQuery = sourceDbUtil.query(sourceConn, sourceSql_tmp); if ("1".equals(sourceQuery.get("code"))) { if (rsIncorrectData == 0) { // 记录错误信息 saveErrorMsg(tkId, datetime, sourceQuery.get("msg").toString()); } break; } Map sourceMapPage = (Map) sourceQuery.get("msg"); ResultSet sourceRs = (ResultSet) sourceMapPage.get("rs"); // PreparedStatement sourceStmt = (PreparedStatement) sourceMapPage.get("stmt"); // 执行目标sql拼接后的语句 Map destinationQuery = destinationDbUtil.query(destinationConn, destinationSql); if ("1".equals(destinationQuery.get("code"))) { if (rsIncorrectData == 0) { // 记录错误信息 saveErrorMsg(tkId, datetime, destinationQuery.get("msg").toString()); } break; } Map destinationMapPage = (Map) destinationQuery.get("msg"); ResultSet destinationRs = (ResultSet) destinationMapPage.get("rs"); // PreparedStatement destinationStmt = (PreparedStatement) destinationMapPage.get("stmt"); // 2.从目标库中获取【默认记录数】或【指定记录数】生成哈希码 int sourceHashCode = 0, destinationHashCode = 0; boolean sourceHashCodeFlag = false; StringBuilder sourceBuilder = new StringBuilder(); StringBuilder destinationBuilder = new StringBuilder(); try { while (sourceRs.next()) { sourceHashCodeFlag = true; // 获取源数据 for (int i = 1; i < indexPk; i++) { sourceBuilder.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))); } for (int i = 1; i < index; i++) { sourceBuilder.append(sourceRs.getString(colSourceMap.get("col" + i))); } } // 将源数据拼接生成哈希码 sourceHashCode = sourceBuilder.toString().hashCode(); while (destinationRs.next()) { // 获取目标数据 for (int i = 1; i < indexPk; i++) { destinationBuilder.append(destinationRs.getString(colDestinationMap.get("colPrimaryKey" + i))); } for (int i = 1; i < index; i++) { destinationBuilder.append(destinationRs.getString(colDestinationMap.get("col" + i))); } } // 将目标数据拼接生成哈希码 destinationHashCode = destinationBuilder.toString().hashCode(); } catch (SQLException e) { saveErrorMsg(tkId, datetime, e.getMessage()); } // 3.比较两个哈希码,判断是否需要更新 if (sourceHashCode != destinationHashCode) { // 如果源数据和目标数据存在相同的主键值,则需要更新操作,如果源数据和目标数据不存在相同的主键值,则需要将来源库的数据插入到目标库中 Map returnMap = executeMysqlInsertOrUpdateTask(sourceDbUtil, destinationDbUtil, sourceConn, destinationConn, sourceSql_tmp, rsIncorrectData, tkId, datetime, indexPk, index, colSourceMap, colDestinationMap, destinationTable); if (returnMap.get("code").equals("0")) { page++; Map returnM = (Map) returnMap.get("msg"); totalInsert += returnM.get("totalInsert"); totalUpdate += returnM.get("totalUpdate"); continue; } else { break; } } // 都没有记录了,则不需要更新 if (!sourceHashCodeFlag) { break; } page++; } String tmp1 = "数据交换完成,共交换数据 " + (totalInsert + totalUpdate) + " 条:插入:" + totalInsert + " 条,更新:" + totalUpdate + " 条"; saveErrorMsg(tkId, datetime, tmp1); // 关闭数据库连接 if (destinationConn != null) { destinationDbUtil.closeConnection(destinationConn); } if (sourceConn != null) { sourceDbUtil.closeConnection(sourceConn); } } private Map executeMysqlInsertOrUpdateTask(DBUtil sourceDbUtil, DBUtil destinationDbUtil, Connection sourceConn, Connection destinationConn, String sourceSql, int rsIncorrectData, int tkId, String datetime, int indexPk, int index, Map colSourceMap, Map colDestinationMap, String destinationTable) { // 执行源sql Map sourceQuery = sourceDbUtil.query(sourceConn, sourceSql); if ("1".equals(sourceQuery.get("code"))) { if (rsIncorrectData == 0) { // 记录错误信息 saveErrorMsg(tkId, datetime, sourceQuery.get("msg").toString()); } return CommonUtil.getReturnMap("1", "执行源sql失败"); } Map sourceMap = (Map) sourceQuery.get("msg"); ResultSet sourceRs = (ResultSet) sourceMap.get("rs"); PreparedStatement sourceStmt = (PreparedStatement) sourceMap.get("stmt"); int totalInsert = 0, totalUpdate = 0; // 主键的值 StringBuilder stringPrimaryKeyData = new StringBuilder(); try { // 执行解析操作 while (sourceRs.next()) { // 拼接插入的值 StringBuilder stringInsertData = new StringBuilder(); // 拼接插入的列 StringBuilder stringInsertCols = new StringBuilder(); // 拼接更新的列和值 StringBuilder stringUpdateData = new StringBuilder(); // 拼接来源表的pk StringBuilder stringSourcePk = new StringBuilder(); // 拼接目标表的pk StringBuilder stringDestinationPk = new StringBuilder(); String tempStr; for (int i = 1; i < indexPk; i++) { tempStr = sourceRs.getString(colSourceMap.get("colPrimaryKey" + i)); // 来源表的主键where子句 stringSourcePk.append(colSourceMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND "); // 目标表的主键where子句 stringDestinationPk.append(colDestinationMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND "); // 拼接主键的值 stringPrimaryKeyData.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append(","); // 拼接插入的值 stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append("',"); // 拼接插入的列 stringInsertCols.append(colDestinationMap.get("colPrimaryKey" + i)).append(","); } // 去除最后的 AND stringSourcePk.setLength(stringSourcePk.length() - 5); stringDestinationPk.setLength(stringDestinationPk.length() - 5); stringPrimaryKeyData.setLength(stringPrimaryKeyData.length() - 1); stringPrimaryKeyData.append(";"); // 拼接更新的值 for (int i = 1; i < index; i++) { // 拼接插入的值 stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',"); // 拼接插入的列 stringInsertCols.append(colDestinationMap.get("col" + i)).append(","); // 拼接更新的列和值 stringUpdateData.append(colDestinationMap.get("col" + i)).append("='").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',"); } // 删除最后一个逗号 stringInsertData.setLength(stringInsertData.length() - 1); stringInsertCols.setLength(stringInsertCols.length() - 1); stringUpdateData.setLength(stringUpdateData.length() - 1); // 查询是否存在该主键的记录sql String destinationQuerySql = "SELECT * FROM " + destinationTable + " WHERE " + stringDestinationPk; // 调试输出sql if (debugSqlFlag) { saveErrorMsg(tkId, datetime, destinationQuerySql); } // 查询目标表是否存在该主键的记录 Map destinationQuery = destinationDbUtil.query(destinationConn, destinationQuerySql); // 判断查询结果,如果发生错误返回code:1 if (destinationQuery.get("code").equals("1")) { if (rsIncorrectData == 0) { // 记录错误信息 saveErrorMsg(tkId, datetime, destinationQuery.get("msg").toString()); } return CommonUtil.getReturnMap("1", "目标表查询错误" + destinationQuery.get("msg").toString()); } // 获取查询结果集map Map sourceQueryMap = (Map) destinationQuery.get("msg"); // 获取查询结果集 ResultSet destinationRs = (ResultSet) sourceQueryMap.get("rs"); // 获取查询sql的PreparedStatement PreparedStatement destinationQueryStmt = (PreparedStatement) sourceQueryMap.get("stmt"); // 判断目标表是否存在该主键的记录 if (destinationRs.next()) { // 更新语句 String destinationUpdateSql = "UPDATE " + destinationTable + " SET " + stringUpdateData + " WHERE " + stringDestinationPk; // 调试输出sql if (debugSqlFlag) { saveErrorMsg(tkId, datetime, destinationUpdateSql); } // 更新目标表 Map destinationUpdate = destinationDbUtil.update(destinationConn, destinationUpdateSql); // 判断更新是否成功 if (destinationUpdate.get("code").equals("1")) { if (rsIncorrectData == 0) { // 记录错误信息 saveErrorMsg(tkId, datetime, destinationUpdate.get("msg").toString()); } return CommonUtil.getReturnMap("1", "目标表更新失败" + destinationUpdate.get("msg").toString()); } totalUpdate++; // 获取返回结果 Map destinationQueryMap = (Map) destinationUpdate.get("msg"); // 获取更新目标表的PreparedStatement PreparedStatement destinationUpdateStmt = (PreparedStatement) destinationQueryMap.get("stmt"); // 关闭PreparedStatement destinationDbUtil.closeStatement(destinationUpdateStmt); } else { // 插入操作sql String destinationInsertSql = "INSERT INTO " + destinationTable + " (" + stringInsertCols + ") VALUES (" + stringInsertData + ")"; // 调试输出sql if (debugSqlFlag) { saveErrorMsg(tkId, datetime, destinationInsertSql); } // 执行插入操作 Map destinationInsert = destinationDbUtil.update(destinationConn, destinationInsertSql); // 判断插入操作是否成功 if (destinationInsert.get("code").equals("1")) { if (rsIncorrectData == 0) { // 记录错误信息 saveErrorMsg(tkId, datetime, destinationInsert.get("msg").toString()); } return CommonUtil.getReturnMap("1", "插入目标表失败" + destinationInsert.get("msg").toString()); } totalInsert++; Map destinationQueryMap = (Map) destinationInsert.get("msg"); // 获取插入操作的PreparedStatement PreparedStatement destinationInsertStmt = (PreparedStatement) destinationQueryMap.get("stmt"); // 关闭对象 PreparedStatement destinationDbUtil.closeStatement(destinationInsertStmt); } // 关闭记录集ResultSet、PreparedStatement、数据库连接Connection destinationDbUtil.closeResultSet(destinationRs); destinationDbUtil.closeStatement(destinationQueryStmt); } Map returnMap = new HashMap<>(); returnMap.put("totalInsert", totalInsert); returnMap.put("totalUpdate", totalUpdate); return CommonUtil.getReturnMap("0", returnMap); } catch (SQLException e) { if (rsIncorrectData == 0) { // 记录错误信息 saveErrorMsg(tkId, datetime, e.getMessage()); } return CommonUtil.getReturnMap("1", "异常:" + e.getMessage()); } finally { sourceDbUtil.closeResultSet(sourceRs); sourceDbUtil.closeStatement(sourceStmt); } } // 执行MySQL任务的insert操作 // private void executeMysqlInsertTask(JobKey key, String sourceDriver, String sourceUrl, String sourceUser, // String sourcePassword, JobDataMap jobDataMap, String destinationDriver, // String destinationUrl, String destinationUser, String destinationPassword, // String destinationTable, int tkId, String colRelationship, int rsIncorrectData, // int optCfgRsNum, int optCfgAutoManual, String dsSourceCharset, String dsDestinationCharset) { // // 当前时间 // Date date = new Date(); // SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // String datetime = dateFormat.format(date); // // 及时更新下次执行时间 // saveNextExtTime(tkId, datetime, key, rsIncorrectData); // // while (true) { // // 当前时间 // date = new Date(); // datetime = dateFormat.format(date); // // 获取sql // String sourceSql = (String) jobDataMap.get("sourceSql"); // // 获取交换主键 // SmartDataTask smartDataTask = smartDataTaskMapper.selectColSwappedPrimaryKeys(tkId); // String swappedPrimaryKeys = smartDataTask.getTkSwappedPrimaryKeys(); // // // 源连接 // DBUtil sourceDbUtil = new DBUtil(sourceUrl, sourceUser, sourcePassword, sourceDriver, dsSourceCharset); // Map sourceConnMap = sourceDbUtil.getConnection(); // if ("1".equals(sourceConnMap.get("code"))) { // if (rsIncorrectData == 0) { // // 记录错误信息 // saveErrorMsg(tkId, datetime, sourceConnMap.get("msg").toString()); // } // } // // // 解析列对应关系 // JSONArray colRelationshipArray = JSONArray.parseArray(colRelationship); // // 源表列名列表 // Map colSourceMap = new HashMap<>(); // // 目标表列名列表 // Map colDestinationMap = new HashMap<>(); // int index = 1; // int indexPk = 1; // for (int i = 0; i < colRelationshipArray.size(); i++) { // JSONObject colRelationshipObject = colRelationshipArray.getJSONObject(i); // if (colRelationshipObject.getInteger("isUpdatePrimaryKey") == 1) { // colSourceMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colSource")); // colDestinationMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colDestination")); // indexPk++; // } else { // colSourceMap.put("col" + index, colRelationshipObject.getString("colSource")); // colDestinationMap.put("col" + index, colRelationshipObject.getString("colDestination")); // index++; // } // } // // if (optCfgAutoManual == 0) { // int rows = smartDataTask.getTkOptCfgDefaultRsNum(); // if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) { // sourceSql = sourceSql + " LIMIT " + rows; // } else { // String notIn = generateNotIn(indexPk, swappedPrimaryKeys, colSourceMap); // sourceSql = sourceSql + notIn + " LIMIT " + rows; // } // } else { // if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) { // sourceSql = sourceSql + " LIMIT " + optCfgRsNum; // } else { // String notIn = generateNotIn(indexPk, swappedPrimaryKeys, colSourceMap); // sourceSql = sourceSql + notIn + " LIMIT " + optCfgRsNum; // } // } // Connection sourceConn = (Connection) sourceConnMap.get("msg"); // // 执行源sql // Map sourceQuery = sourceDbUtil.query(sourceConn, sourceSql); // if ("1".equals(sourceQuery.get("code"))) { // if (rsIncorrectData == 0) { // // 记录错误信息 // saveErrorMsg(tkId, datetime, sourceQuery.get("msg").toString()); // } // // 关闭连接 // sourceDbUtil.closeConnection(sourceConn); // return; // } // Map sourceMap = (Map) sourceQuery.get("msg"); // ResultSet sourceRs = (ResultSet) sourceMap.get("rs"); // PreparedStatement sourceStmt = (PreparedStatement) sourceMap.get("stmt"); // // // 主键的值 // StringBuilder stringPrimaryKeyData = new StringBuilder(); // boolean over = false; // try { // // 执行解析操作 // while (sourceRs.next()) { // // 拼接插入的值 // StringBuilder stringInsertData = new StringBuilder(); // // 拼接插入的列 // StringBuilder stringInsertCols = new StringBuilder(); // // 拼接更新的列和值 // StringBuilder stringUpdateData = new StringBuilder(); // // 拼接来源表的pk // StringBuilder stringSourcePk = new StringBuilder(); // // 拼接目标表的pk // StringBuilder stringDestinationPk = new StringBuilder(); // over = true; // String tempStr; // for (int i = 1; i < indexPk; i++) { // tempStr = sourceRs.getString(colSourceMap.get("colPrimaryKey" + i)); // // 来源表的主键where子句 // stringSourcePk.append(colSourceMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND "); // // 目标表的主键where子句 // stringDestinationPk.append(colDestinationMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND "); // // 拼接主键的值 // stringPrimaryKeyData.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append(","); // // 拼接插入的值 // stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append("',"); // // 拼接插入的列 // stringInsertCols.append(colDestinationMap.get("colPrimaryKey" + i)).append(","); // } // // 去除最后的 AND // stringSourcePk.setLength(stringSourcePk.length() - 5); // stringDestinationPk.setLength(stringDestinationPk.length() - 5); // stringPrimaryKeyData.setLength(stringPrimaryKeyData.length() - 1); // stringPrimaryKeyData.append(";"); // // 拼接更新的值 // for (int i = 1; i < index; i++) { // // 拼接插入的值 // stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',"); // // 拼接插入的列 // stringInsertCols.append(colDestinationMap.get("col" + i)).append(","); // // 拼接更新的列和值 // stringUpdateData.append(colDestinationMap.get("col" + i)).append("='").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',"); // } // // 删除最后一个逗号 // stringInsertData.setLength(stringInsertData.length() - 1); // stringInsertCols.setLength(stringInsertCols.length() - 1); // stringUpdateData.setLength(stringUpdateData.length() - 1); // // 执行插入操作 // // 数据库对象 // DBUtil destinationDbUtil = new DBUtil(destinationUrl, destinationUser, destinationPassword, destinationDriver, dsDestinationCharset); // // 获取数据库连接map // Map destinationConnMap = destinationDbUtil.getConnection(); // if (destinationConnMap.get("code").equals("1")) { // if (rsIncorrectData == 0) { // // 记录错误信息 // saveErrorMsg(tkId, datetime, destinationConnMap.get("msg").toString()); // } // } // // 获取目标连接 // Connection destinationConn = (Connection) destinationConnMap.get("msg"); // // 查询是否存在该主键的记录sql // String destinationQuerySql = "SELECT * FROM " + destinationTable + " WHERE " + stringDestinationPk; // // 查询目标表是否存在该主键的记录 // Map destinationQuery = destinationDbUtil.query(destinationConn, destinationQuerySql); // // 判断查询结果,如果发生错误返回code:1 // if (destinationQuery.get("code").equals("1")) { // if (rsIncorrectData == 0) { // // 记录错误信息 // saveErrorMsg(tkId, datetime, destinationQuery.get("msg").toString()); // } // // 关闭连接 // destinationDbUtil.closeConnection(destinationConn); // return; // } // // 获取查询结果集map // Map sourceQueryMap = (Map) destinationQuery.get("msg"); // // 获取查询结果集 // ResultSet destinationRs = (ResultSet) sourceQueryMap.get("rs"); // // 获取查询sql的PreparedStatement // PreparedStatement destinationQueryStmt = (PreparedStatement) sourceQueryMap.get("stmt"); // // 判断目标表是否存在该主键的记录 // if (destinationRs.next()) { // // 更新语句 // String destinationUpdateSql = "UPDATE " + destinationTable + " SET " + stringUpdateData + " WHERE " + stringDestinationPk; // // 更新目标表 // Map destinationUpdate = destinationDbUtil.update(destinationConn, destinationUpdateSql); // // 判断更新是否成功 // if (destinationUpdate.get("code").equals("1")) { // if (rsIncorrectData == 0) { // // 记录错误信息 // saveErrorMsg(tkId, datetime, destinationUpdate.get("msg").toString()); // } // // 关闭连接 // destinationDbUtil.closeConnection(destinationConn); // return; // } // // 获取返回结果 // Map destinationQueryMap = (Map) destinationUpdate.get("msg"); // // 获取更新目标表的PreparedStatement // PreparedStatement destinationUpdateStmt = (PreparedStatement) destinationQueryMap.get("stmt"); // // 关闭PreparedStatement // destinationDbUtil.closeStatement(destinationUpdateStmt); // } else { // // 插入操作sql // String destinationInsertSql = "INSERT INTO " + destinationTable + " (" + stringInsertCols + ") VALUES (" + stringInsertData + ")"; // // 执行插入操作 // Map destinationInsert = destinationDbUtil.update(destinationConn, destinationInsertSql); // // 判断插入操作是否成功 // if (destinationInsert.get("code").equals("1")) { // if (rsIncorrectData == 0) { // // 记录错误信息 // saveErrorMsg(tkId, datetime, destinationInsert.get("msg").toString()); // } // // 关闭连接 // destinationDbUtil.closeConnection(destinationConn); // return; // } // Map destinationQueryMap = (Map) destinationInsert.get("msg"); // // 获取插入操作的PreparedStatement // PreparedStatement destinationInsertStmt = (PreparedStatement) destinationQueryMap.get("stmt"); // // 关闭对象 PreparedStatement // destinationDbUtil.closeStatement(destinationInsertStmt); // // 创建SmartDataTask对象 // SmartDataTask tempSmartDataTask = new SmartDataTask(); // tempSmartDataTask.setTkId(tkId); // String tmpStr; // // 获取插入操作的返回值 // if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) { // // 如果为空 // tmpStr = stringPrimaryKeyData.toString(); // } else { // // 如果非空 // if (swappedPrimaryKeys.endsWith(";")) { // tmpStr = swappedPrimaryKeys + stringPrimaryKeyData; // } else { // tmpStr = swappedPrimaryKeys + ";" + stringPrimaryKeyData; // } // } // tempSmartDataTask.setTkSwappedPrimaryKeys(tmpStr); // // 保存交换后的主键 // int returnUpdateCount = smartDataTaskMapper.saveSwappedPrimaryKeys(tempSmartDataTask); // if (returnUpdateCount == 0) { // if (rsIncorrectData == 0) { // // 记录错误信息 // saveErrorMsg(tkId, datetime, "记录交换的主键失败!插入语句:" + destinationInsertSql); // } // } // } // // 关闭记录集ResultSet、PreparedStatement、数据库连接Connection // destinationDbUtil.closeResultSet(destinationRs); // destinationDbUtil.closeStatement(destinationQueryStmt); // destinationDbUtil.closeConnection(destinationConn); // } // } catch (SQLException e) { // if (rsIncorrectData == 0) { // // 记录错误信息 // saveErrorMsg(tkId, datetime, e.getMessage()); // } // } finally { // sourceDbUtil.closeResultSet(sourceRs); // sourceDbUtil.closeStatement(sourceStmt); // } // // 没有记录,则本次交换没有需要交换的数据了 // if (!over) { // if (rsIncorrectData == 0) { // // 记录错误信息 // saveErrorMsg(tkId, datetime, "交换中的Insert任务完成,接下来是Update任务!"); // } // break; // } // } // } // // // 生成不在目标表中的记录的SQL语句 // private String generateNotIn(int indexPk, String swappedPrimaryKeys, Map colSourceMap) { // String[] pksValue = new String[indexPk + 1]; // for (int i = 0; i < pksValue.length; i++) { // pksValue[i] = ""; // } // String[] pksArr = swappedPrimaryKeys.split(";"); // for (int i = 0; i < pksArr.length; i++) { // if (pksArr[i] != null && !pksArr[i].trim().equals("")) { // String[] pkArr = pksArr[i].split(","); // for (int j = 0; j < pkArr.length; j++) { // for (int k = 1; k < indexPk; k++) { // if (j + 1 == k) { // pksValue[k] = pksValue[k] + "'" + pkArr[j] + "',"; // } // } // } // } // } // StringBuilder notIn = new StringBuilder(); // notIn.append(" WHERE "); // for (int i = 1; i < indexPk; i++) { // notIn.append(colSourceMap.get("colPrimaryKey" + i)).append(" NOT IN("); // if (pksValue[i] != null && !pksValue[i].equals("")) { // notIn.append(pksValue[i]); // } // notIn.setLength(notIn.length() - 1); // notIn.append(")").append(" AND "); // } // if (notIn.toString().endsWith(" AND ")) { // notIn.setLength(notIn.length() - 5); // } else { // notIn.setLength(0); // 如果没有条件,则不加NOT IN // } // return notIn.toString(); // } // 保存错误信息 private void saveErrorMsg(Integer tkId, String datetime, String errorMsg) { // 实现错误信息保存的逻辑 SmartDataTaskErr smartDataTaskErr = new SmartDataTaskErr(); smartDataTaskErr.setETaskId(tkId); smartDataTaskErr.setEMsg(errorMsg); smartDataTaskErr.setEDateTime(datetime); SmartDataTaskErr returnSmartDataTaskErr = smartDataTaskMapper.selectErrorMsg(smartDataTaskErr); if (returnSmartDataTaskErr == null) { // 保存错误信息到数据库 int i = smartDataTaskMapper.insertErrorMsg(smartDataTaskErr); if (i == 0) { System.out.println(datetime + ":保存错误信息失败!"); } } else { smartDataTaskErr.setEId(returnSmartDataTaskErr.getEId()); int i = smartDataTaskMapper.updateErrorMsg(smartDataTaskErr); if (i == 0) { System.out.println(datetime + ":更新错误信息失败!"); } } } // 保存下次执行时间 private void saveNextExtTime(Integer tkId, String datetime, JobKey key, int rsIncorrectData) { QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq(key.getName() != null, "tk_name", key.getName()); SmartDataTask smartDataTask = smartDataTaskMapper.selectOne(queryWrapper); if (smartDataTask != null) { // 下次执行的时间 String nextExeTime = QuartzJobUtils.getNextExeTime(smartDataTask.getTkCron()); smartDataTask.setTkNextExeTime(nextExeTime); try { smartDataTaskMapper.markTaskById(smartDataTask); } catch (Exception e) { if (rsIncorrectData == 0) { saveErrorMsg(tkId, datetime, e.getMessage()); } } } else { if (rsIncorrectData == 0) { saveErrorMsg(tkId, datetime, "【下次执行的时间】无法更新至数据库中!"); } } } }