Parcourir la source

支持多字段主键

soft5566 il y a 2 ans
Parent
commit
bd9d916f56
1 fichiers modifiés avec 189 ajouts et 99 suppressions
  1. 189 99
      src/main/java/com/template/controller/Task.java

+ 189 - 99
src/main/java/com/template/controller/Task.java

@@ -22,6 +22,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -35,17 +36,12 @@ public class Task extends QuartzJobBean {
     @Override
     @PassToken
     protected void executeInternal(JobExecutionContext jobExecutionContext) {
-        // 输出当前时间
-        Date date = new Date();
-        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        String dateString = dateFormat.format(date);
-
         // 执行任务
-        executeTask(dateString, jobExecutionContext);
+        executeTask(jobExecutionContext);
     }
 
     // 执行任务的逻辑
-    private void executeTask(String datetime, JobExecutionContext jobExecutionContext) {
+    private void executeTask(JobExecutionContext jobExecutionContext) {
         // 获取任务信息
         JobDetail jobDetail = jobExecutionContext.getJobDetail();
         JobKey key = jobDetail.getKey();
@@ -58,7 +54,6 @@ public class Task extends QuartzJobBean {
         String sourceUser = (String) jobDataMap.get("sourceUser");
         String sourcePassword = (String) jobDataMap.get("sourcePassword");
 //        Integer exchangeType = (Integer) jobDataMap.get("exchangeType");
-        String sourceSql = (String) jobDataMap.get("sourceSql");
         // 目标数据源参数
         String destinationDriver = (String) jobDataMap.get("destinationDriver");
         String destinationUrl = (String) jobDataMap.get("destinationUrl");
@@ -73,103 +68,137 @@ public class Task extends QuartzJobBean {
         Integer optCfgAutoManual = (Integer) jobDataMap.get("optCfgAutoManual");
         String dsSourceCharset = (String) jobDataMap.get("dsSourceCharset");
         String dsDestinationCharset = (String) jobDataMap.get("dsDestinationCharset");
-        // 获取交换主键
-        SmartDataTask smartDataTask = smartDataTaskMapper.selectColSwappedPrimaryKeys(tkId);
-        String swappedPrimaryKeys = smartDataTask.getTkSwappedPrimaryKeys();
-
+        // 当前时间
+        Date date = new Date();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String datetime = dateFormat.format(date);
         // 及时更新下次执行时间
         saveNextExtTime(tkId, datetime, key, rsIncorrectData);
 
-        // 源连接
-        DBUtil sourceDbUtil = new DBUtil(sourceUrl, sourceUser, sourcePassword, sourceDriver, dsSourceCharset);
-        Map<String, Object> sourceConnMap = sourceDbUtil.getConnection();
-        if ("1".equals(sourceConnMap.get("code"))) {
-            if (rsIncorrectData == 0) {
-                // 记录错误信息
-                saveErrorMsg(tkId, datetime, sourceConnMap.get("msg").toString());
-            }
-        }
+        while (true) {
+            // 当前时间
+            date = new Date();
+            datetime = dateFormat.format(date);
+            // 获取sql
+            String sourceSql = (String) jobDataMap.get("sourceSql");
+            // 获取交换主键
+            SmartDataTask smartDataTask = smartDataTaskMapper.selectColSwappedPrimaryKeys(tkId);
+            String swappedPrimaryKeys = smartDataTask.getTkSwappedPrimaryKeys();
 
-        // 解析列对应关系
-        JSONArray colRelationshipArray = JSONArray.parseArray(colRelationship);
-        // 源表列名列表
-        Map<String, String> colSourceMap = new HashMap<>();
-        // 目标表列名列表
-        Map<String, String> colDestinationMap = new HashMap<>();
-        int index = 1;
-        for (int i = 0; i < colRelationshipArray.size(); i++) {
-            JSONObject colRelationshipObject = colRelationshipArray.getJSONObject(i);
-            if (colRelationshipObject.getInteger("isUpdatePrimaryKey") == 1) {
-                colSourceMap.put("colPrimaryKey", colRelationshipObject.getString("colSource"));
-                colDestinationMap.put("colPrimaryKey", colRelationshipObject.getString("colDestination"));
-            } else {
-                colSourceMap.put("col" + index, colRelationshipObject.getString("colSource"));
-                colDestinationMap.put("col" + index, colRelationshipObject.getString("colDestination"));
-                index++;
+            // 源连接
+            DBUtil sourceDbUtil = new DBUtil(sourceUrl, sourceUser, sourcePassword, sourceDriver, dsSourceCharset);
+            Map<String, Object> sourceConnMap = sourceDbUtil.getConnection();
+            if ("1".equals(sourceConnMap.get("code"))) {
+                if (rsIncorrectData == 0) {
+                    // 记录错误信息
+                    saveErrorMsg(tkId, datetime, sourceConnMap.get("msg").toString());
+                }
             }
-        }
 
-        if (optCfgAutoManual == 0) {
-            if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) {
-                sourceSql = sourceSql + " LIMIT 1 ";
-            } else {
-                sourceSql = sourceSql + " WHERE " + colSourceMap.get("colPrimaryKey") + " NOT IN(" + swappedPrimaryKeys + ") LIMIT 1 ";
+            // 解析列对应关系
+            JSONArray colRelationshipArray = JSONArray.parseArray(colRelationship);
+            // 源表列名列表
+            Map<String, String> colSourceMap = new HashMap<>();
+            // 目标表列名列表
+            Map<String, String> colDestinationMap = new HashMap<>();
+            int index = 1;
+            int indexPk = 1;
+            for (int i = 0; i < colRelationshipArray.size(); i++) {
+                JSONObject colRelationshipObject = colRelationshipArray.getJSONObject(i);
+                if (colRelationshipObject.getInteger("isUpdatePrimaryKey") == 1) {
+                    colSourceMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colSource"));
+                    colDestinationMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colDestination"));
+                    indexPk++;
+                } else {
+                    colSourceMap.put("col" + index, colRelationshipObject.getString("colSource"));
+                    colDestinationMap.put("col" + index, colRelationshipObject.getString("colDestination"));
+                    index++;
+                }
             }
-        } else {
-            if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) {
-                sourceSql = sourceSql + " LIMIT " + optCfgRsNum;
+
+            if (optCfgAutoManual == 0) {
+                int rows = 1;
+                if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) {
+                    sourceSql = sourceSql + " LIMIT " + rows;
+                } else {
+                    String notIn = generateNotIn(indexPk, swappedPrimaryKeys, colSourceMap);
+                    sourceSql = sourceSql + notIn + " LIMIT " + rows;
+                }
             } else {
-                sourceSql = sourceSql + " WHERE " + colSourceMap.get("colPrimaryKey") + " NOT IN(" + swappedPrimaryKeys + ") LIMIT " + optCfgRsNum;
+                if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) {
+                    sourceSql = sourceSql + " LIMIT " + optCfgRsNum;
+                } else {
+                    String notIn = generateNotIn(indexPk, swappedPrimaryKeys, colSourceMap);
+                    sourceSql = sourceSql + notIn + " LIMIT " + optCfgRsNum;
+                }
             }
-        }
-        Connection sourceConn = (Connection) sourceConnMap.get("msg");
-        // 执行源sql
-        Map<String, Object> sourceQuery = sourceDbUtil.query(sourceConn, sourceSql);
-        if ("1".equals(sourceQuery.get("code"))) {
-            if (rsIncorrectData == 0) {
-                // 记录错误信息
-                saveErrorMsg(tkId, datetime, sourceQuery.get("msg").toString());
+            Connection sourceConn = (Connection) sourceConnMap.get("msg");
+            // 执行源sql
+            Map<String, Object> sourceQuery = sourceDbUtil.query(sourceConn, sourceSql);
+            if ("1".equals(sourceQuery.get("code"))) {
+                if (rsIncorrectData == 0) {
+                    // 记录错误信息
+                    saveErrorMsg(tkId, datetime, sourceQuery.get("msg").toString());
+                }
+                // 关闭连接
+                sourceDbUtil.closeConnection(sourceConn);
+                return;
             }
-            // 关闭连接
-            sourceDbUtil.closeConnection(sourceConn);
-            return;
-        }
-        Map<String, Object> sourceMap = (Map<String, Object>) sourceQuery.get("msg");
-        ResultSet sourceRs = (ResultSet) sourceMap.get("rs");
-        PreparedStatement sourceStmt = (PreparedStatement) sourceMap.get("stmt");
-
-        StringBuilder stringInsertData = new StringBuilder();
-        StringBuilder stringInsertCols = new StringBuilder();
-        StringBuilder stringUpdateData = new StringBuilder();
-        String stringPrimaryKeyData;
-        try {
-            // 执行解析操作
-            while (true) {
-                if (!sourceRs.next()) {
-                    if (rsIncorrectData == 0) {
-                        // 记录错误信息
-                        saveErrorMsg(tkId, datetime, "没有需要交换的数据!");
+            Map<String, Object> sourceMap = (Map<String, Object>) sourceQuery.get("msg");
+            ResultSet sourceRs = (ResultSet) sourceMap.get("rs");
+            PreparedStatement sourceStmt = (PreparedStatement) sourceMap.get("stmt");
+            // 拼接插入的值
+            StringBuilder stringInsertData = new StringBuilder();
+            // 拼接插入的列
+            StringBuilder stringInsertCols = new StringBuilder();
+            // 拼接更新的列和值
+            StringBuilder stringUpdateData = new StringBuilder();
+            // 拼接来源表的pk
+            StringBuilder stringSourcePk = new StringBuilder();
+            // 拼接目标表的pk
+            StringBuilder stringDestinationPk = new StringBuilder();
+            // 主键的值
+            StringBuilder stringPrimaryKeyData = new StringBuilder();
+            boolean over = false;
+            try {
+                // 执行解析操作
+                while (sourceRs.next()) {
+                    over = true;
+                    String tempStr;
+                    for (int i = 1; i < indexPk; i++) {
+                        tempStr = sourceRs.getString(colSourceMap.get("colPrimaryKey" + i));
+                        // 来源表的主键where子句
+                        stringSourcePk.append(colSourceMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND ");
+                        // 目标表的主键where子句
+                        stringDestinationPk.append(colDestinationMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND ");
+                        // 拼接主键的值
+                        stringPrimaryKeyData.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append(",");
+                        // 拼接插入的值
+                        stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append("',");
+                        // 拼接插入的列
+                        stringInsertCols.append(colDestinationMap.get("colPrimaryKey" + i)).append(",");
                     }
-                    break;
-                } else {
-                    // 主键列的值
-                    stringPrimaryKeyData = sourceRs.getString(colSourceMap.get("colPrimaryKey"));
-                    // 拼接插入的值
-                    stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("colPrimaryKey"))).append("',");
-                    // 拼接插入的列
-                    stringInsertCols.append(colDestinationMap.get("colPrimaryKey")).append(",");
+                    // 去除最后的 AND
+                    stringSourcePk.setLength(stringSourcePk.length() - 5);
+                    stringDestinationPk.setLength(stringDestinationPk.length() - 5);
+                    stringPrimaryKeyData.setLength(stringPrimaryKeyData.length() - 1);
+                    // 拼接更新的值
                     for (int i = 1; i < index; i++) {
+                        // 拼接插入的值
                         stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',");
+                        // 拼接插入的列
                         stringInsertCols.append(colDestinationMap.get("col" + i)).append(",");
                         // 拼接更新的列和值
                         stringUpdateData.append(colDestinationMap.get("col" + i)).append("='").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',");
                     }
+                    // 删除最后一个逗号
                     stringInsertData.setLength(stringInsertData.length() - 1);
                     stringInsertCols.setLength(stringInsertCols.length() - 1);
                     stringUpdateData.setLength(stringUpdateData.length() - 1);
                     // 执行插入操作
-                    // 目标连接
+                    // 数据库对象
                     DBUtil destinationDbUtil = new DBUtil(destinationUrl, destinationUser, destinationPassword, destinationDriver, dsDestinationCharset);
+                    // 获取数据库连接map
                     Map<String, Object> destinationConnMap = destinationDbUtil.getConnection();
                     if (destinationConnMap.get("code").equals("1")) {
                         if (rsIncorrectData == 0) {
@@ -177,9 +206,13 @@ public class Task extends QuartzJobBean {
                             saveErrorMsg(tkId, datetime, destinationConnMap.get("msg").toString());
                         }
                     }
+                    // 获取目标连接
                     Connection destinationConn = (Connection) destinationConnMap.get("msg");
-                    String destinationQuerySql = "select * from " + destinationTable + " where " + colDestinationMap.get("colPrimaryKey") + " = '" + stringPrimaryKeyData + "'";
+                    // 查询是否存在该主键的记录sql
+                    String destinationQuerySql = "SELECT * FROM " + destinationTable + " WHERE " + stringDestinationPk;
+                    // 查询目标表是否存在该主键的记录
                     Map<String, Object> destinationQuery = destinationDbUtil.query(destinationConn, destinationQuerySql);
+                    // 判断查询结果,如果发生错误返回code:1
                     if (destinationQuery.get("code").equals("1")) {
                         if (rsIncorrectData == 0) {
                             // 记录错误信息
@@ -189,14 +222,19 @@ public class Task extends QuartzJobBean {
                         destinationDbUtil.closeConnection(destinationConn);
                         return;
                     }
-                    // 获取查询结果集
+                    // 获取查询结果集map
                     Map<String, Object> sourceQueryMap = (Map<String, Object>) destinationQuery.get("msg");
+                    // 获取查询结果集
                     ResultSet destinationRs = (ResultSet) sourceQueryMap.get("rs");
+                    // 获取查询sql的PreparedStatement
                     PreparedStatement destinationQueryStmt = (PreparedStatement) sourceQueryMap.get("stmt");
+                    // 判断目标表是否存在该主键的记录
                     if (destinationRs.next()) {
-                        // 更新操作
-                        String destinationUpdateSql = "UPDATE " + destinationTable + " SET " + stringUpdateData + " WHERE " + colDestinationMap.get("colPrimaryKey") + " = '" + stringPrimaryKeyData + "'";
+                        // 更新语句
+                        String destinationUpdateSql = "UPDATE " + destinationTable + " SET " + stringUpdateData + " WHERE " + stringDestinationPk;
+                        // 更新目标表
                         Map<String, Object> destinationUpdate = destinationDbUtil.update(destinationConn, destinationUpdateSql);
+                        // 判断更新是否成功
                         if (destinationUpdate.get("code").equals("1")) {
                             if (rsIncorrectData == 0) {
                                 // 记录错误信息
@@ -206,13 +244,18 @@ public class Task extends QuartzJobBean {
                             destinationDbUtil.closeConnection(destinationConn);
                             return;
                         }
+                        // 获取返回结果
                         Map<String, Object> destinationQueryMap = (Map<String, Object>) destinationUpdate.get("msg");
+                        // 获取更新目标表的PreparedStatement
                         PreparedStatement destinationUpdateStmt = (PreparedStatement) destinationQueryMap.get("stmt");
+                        // 关闭PreparedStatement
                         destinationDbUtil.closeStatement(destinationUpdateStmt);
                     } else {
-                        // 插入操作
+                        // 插入操作sql
                         String destinationInsertSql = "INSERT INTO " + destinationTable + " (" + stringInsertCols + ") VALUES (" + stringInsertData + ")";
+                        // 执行插入操作
                         Map<String, Object> destinationInsert = destinationDbUtil.update(destinationConn, destinationInsertSql);
+                        // 判断插入操作是否成功
                         if (destinationInsert.get("code").equals("1")) {
                             if (rsIncorrectData == 0) {
                                 // 记录错误信息
@@ -223,16 +266,20 @@ public class Task extends QuartzJobBean {
                             return;
                         }
                         Map<String, Object> destinationQueryMap = (Map<String, Object>) destinationInsert.get("msg");
+                        // 获取插入操作的PreparedStatement
                         PreparedStatement destinationInsertStmt = (PreparedStatement) destinationQueryMap.get("stmt");
                         // 关闭对象 PreparedStatement
                         destinationDbUtil.closeStatement(destinationInsertStmt);
-
+                        // 创建SmartDataTask对象
                         SmartDataTask tempSmartDataTask = new SmartDataTask();
                         tempSmartDataTask.setTkId(tkId);
+                        // 获取插入操作的返回值
                         if (swappedPrimaryKeys == null || swappedPrimaryKeys.equals("")) {
-                            tempSmartDataTask.setTkSwappedPrimaryKeys(stringPrimaryKeyData);
+                            // 如果为空
+                            tempSmartDataTask.setTkSwappedPrimaryKeys(stringPrimaryKeyData.toString());
                         } else {
-                            swappedPrimaryKeys = swappedPrimaryKeys + "," + stringPrimaryKeyData;
+                            // 如果非空
+                            swappedPrimaryKeys = swappedPrimaryKeys + ";" + stringPrimaryKeyData;
                             tempSmartDataTask.setTkSwappedPrimaryKeys(swappedPrimaryKeys);
                         }
                         // 保存交换后的主键
@@ -244,21 +291,64 @@ public class Task extends QuartzJobBean {
                             }
                         }
                     }
-
+                    // 关闭记录集ResultSet、PreparedStatement、数据库连接Connection
                     destinationDbUtil.closeResultSet(destinationRs);
                     destinationDbUtil.closeStatement(destinationQueryStmt);
                     destinationDbUtil.closeConnection(destinationConn);
                 }
+            } catch (SQLException e) {
+                if (rsIncorrectData == 0) {
+                    // 记录错误信息
+                    saveErrorMsg(tkId, datetime, e.getMessage());
+                }
+            } finally {
+                sourceDbUtil.closeResultSet(sourceRs);
+                sourceDbUtil.closeStatement(sourceStmt);
             }
-        } catch (SQLException e) {
-            if (rsIncorrectData == 0) {
-                // 记录错误信息
-                saveErrorMsg(tkId, datetime, e.getMessage());
+            // 没有记录,则本次交换没有需要交换的数据了
+            if (!over) {
+                if (rsIncorrectData == 0) {
+                    // 记录错误信息
+                    saveErrorMsg(tkId, datetime, "没有需要交换的数据!");
+                }
+                break;
+            }
+        }
+    }
+
+    // 生成不在目标表中的记录的SQL语句
+    private String generateNotIn(int indexPk, String swappedPrimaryKeys, Map<String, String> colSourceMap) {
+        String[] pksValue = new String[indexPk + 1];
+        for (int i = 0; i < pksValue.length; i++) {
+            pksValue[i] = "";
+        }
+        String[] pksArr = swappedPrimaryKeys.split(";");
+        for (int i = 0; i < pksArr.length; i++) {
+            String[] pkArr = pksArr[i].split(",");
+            for (int j = 0; j < pkArr.length; j++) {
+                for (int k = 1; k < indexPk; k++) {
+                    if (j + 1 == k) {
+                        pksValue[k] = pksValue[k] + "'" + pkArr[j] + "',";
+                    }
+                }
+            }
+        }
+        StringBuilder notIn = new StringBuilder();
+        notIn.append(" WHERE ");
+        for (int i = 1; i < indexPk; i++) {
+            notIn.append(colSourceMap.get("colPrimaryKey" + i)).append(" NOT IN(");
+            if (pksValue[i] != null && !pksValue[i].equals("")) {
+                notIn.append(pksValue[i]);
             }
-        } finally {
-            sourceDbUtil.closeResultSet(sourceRs);
-            sourceDbUtil.closeStatement(sourceStmt);
+            notIn.setLength(notIn.length() - 1);
+            notIn.append(")").append(" AND ");
+        }
+        if (notIn.toString().endsWith(" AND ")) {
+            notIn.setLength(notIn.length() - 5);
+        } else {
+            notIn.setLength(0); // 如果没有条件,则不加NOT IN
         }
+        return notIn.toString();
     }
 
     // 保存错误信息