Task.java 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. package com.template.controller;
  2. import com.alibaba.fastjson2.JSONArray;
  3. import com.alibaba.fastjson2.JSONObject;
  4. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  5. import com.template.annotation.PassToken;
  6. import com.template.common.utils.CommonUtil;
  7. import com.template.common.utils.DBUtil;
  8. import com.template.common.utils.HttpsClient;
  9. import com.template.common.utils.QuartzJobUtils;
  10. import com.template.mapper.SmartDataSourceMapper;
  11. import com.template.mapper.SmartDataTaskMapper;
  12. import com.template.model.pojo.SmartDataSourceJobParams;
  13. import com.template.model.pojo.SmartDataTask;
  14. import com.template.model.pojo.SmartDataTaskDebug;
  15. import com.template.model.pojo.SmartDataTaskLog;
  16. import org.quartz.JobDataMap;
  17. import org.quartz.JobDetail;
  18. import org.quartz.JobExecutionContext;
  19. import org.quartz.JobKey;
  20. import org.slf4j.Logger;
  21. import org.slf4j.LoggerFactory;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.scheduling.quartz.QuartzJobBean;
  24. import org.springframework.stereotype.Component;
  25. import java.sql.Connection;
  26. import java.sql.PreparedStatement;
  27. import java.sql.ResultSet;
  28. import java.sql.SQLException;
  29. import java.text.SimpleDateFormat;
  30. import java.util.*;
  31. @Component
  32. public class Task extends QuartzJobBean {
  33. @Autowired
  34. private SmartDataTaskMapper smartDataTaskMapper;
  35. @Autowired
  36. private SmartDataSourceMapper smartDataSourceMapper;
  37. private int debugSqlFlag = 0;
  38. private static Logger logger = LoggerFactory.getLogger(QuartzJobBean.class);
  39. @Override
  40. @PassToken
  41. protected void executeInternal(JobExecutionContext jobExecutionContext) {
  42. // 获取任务信息
  43. JobDetail jobDetail = jobExecutionContext.getJobDetail();
  44. JobKey key = jobDetail.getKey();
  45. // 工作内容
  46. JobDataMap jobDataMap = jobDetail.getJobDataMap();
  47. // 来源数据源参数
  48. SmartDataTask smartDataTask_transfer = (SmartDataTask) jobDataMap.get("smartDataTask");
  49. // 为了不重启任务就能够使用新的参数,这里需要重新读取
  50. QueryWrapper<SmartDataTask> queryWrapper = new QueryWrapper<>();
  51. queryWrapper.eq(smartDataTask_transfer.getTkId() != null, "tk_id", smartDataTask_transfer.getTkId());
  52. SmartDataTask smartDataTask = smartDataTaskMapper.selectOne(queryWrapper);
  53. if (smartDataTask == null) {
  54. // 当前时间
  55. SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  56. Date date = new Date();
  57. String datetime = dateFormat.format(date);
  58. // 记录错误信息
  59. this.saveDebugMsg(smartDataTask_transfer.getTkId(), smartDataTask_transfer.getTkName(), datetime, "未查询到任务信息");
  60. return;
  61. }
  62. // 来源数据源id
  63. Integer tkDsIdSource = smartDataTask.getTkDsIdSource();
  64. // 目标数据源id
  65. Integer tkDsIdDestination = smartDataTask.getTkDsIdDestination();
  66. // 根据id,获取数据源url、user、password、driver等
  67. SmartDataSourceJobParams dsSourceInfo = smartDataSourceMapper.getDataSourceInfo(tkDsIdSource);
  68. SmartDataSourceJobParams dsDestinationInfo = smartDataSourceMapper.getDataSourceInfo(tkDsIdDestination);
  69. // 来源数据源参数
  70. String sourceDriver = dsSourceInfo.getDsClsDriver();
  71. String sourceUrl = dsSourceInfo.getDsUrl();
  72. String sourceUser = dsSourceInfo.getDsUser();
  73. String sourcePassword = dsSourceInfo.getDsPassword();
  74. // 来源数据库sql
  75. String sourceSql = smartDataTask.getTkSql();
  76. // 目标数据源参数
  77. String destinationDriver = dsDestinationInfo.getDsClsDriver();
  78. String destinationUrl = dsDestinationInfo.getDsUrl();
  79. String destinationUser = dsDestinationInfo.getDsUser();
  80. String destinationPassword = dsDestinationInfo.getDsPassword();
  81. // 目标数据库表
  82. String destinationTable = smartDataTask.getTkDestTable().substring(0, smartDataTask.getTkDestTable().indexOf("["));
  83. // 任务数据id、列关系等等
  84. Integer tkId = smartDataTask.getTkId();
  85. debugSqlFlag = smartDataTask.getTkDebugSql();
  86. String colRelationship = smartDataTask.getTkColRelationship();
  87. Integer rsIncorrectData = smartDataTask.getTkRsIncorrectData();
  88. Integer optCfgAutoManual = smartDataTask.getTkOptCfgAutoManual();
  89. Integer optCfgRsNum = smartDataTask.getTkOptCfgRsNum();
  90. Integer optCfgDefaultRsNum = smartDataTask.getTkOptCfgDefaultRsNum();
  91. String dsSourceCharset = smartDataTask.getTkDsSourceCharset();
  92. String dsDestinationCharset = smartDataTask.getTkDsDestinationCharset();
  93. // 来源库
  94. String dsSourceName = dsSourceInfo.getDsName();
  95. Integer dsSourceId = dsSourceInfo.getDsId();
  96. // 目标库
  97. String dsDestinationName = dsDestinationInfo.getDsName();
  98. Integer dsTargetId = dsDestinationInfo.getDsId();
  99. // 部门
  100. int dtId = smartDataTask.getTkDtId();
  101. // 交换方式:0自定义SQL语句,1数据视图,2数据表
  102. Integer tkExchangeType = smartDataTask.getTkExchangeType();
  103. // 执行方式:0间隔执行,1定点执行,2每天,3每周,4每月
  104. Integer tkExeType = smartDataTask.getTkExeType();
  105. String tkExchangeServer = smartDataTask.getTkExchangeServer();
  106. // 是否是MySQL数据源
  107. if (sourceDriver.toLowerCase().contains("mysql")) {
  108. // 执行MySQL任务
  109. executeMysqlExchangeTask(key, sourceDriver, sourceUrl, sourceUser, sourcePassword, sourceSql,
  110. destinationDriver, destinationUrl, destinationUser, destinationPassword, destinationTable, tkId,
  111. colRelationship, rsIncorrectData, optCfgRsNum, optCfgDefaultRsNum, optCfgAutoManual, tkExchangeType,
  112. dtId, tkExeType, dsSourceId, dsSourceName, dsTargetId, dsDestinationName, tkExchangeServer, dsSourceCharset, dsDestinationCharset);
  113. } else if (sourceDriver.toLowerCase().contains("oracle")) {
  114. // 执行Oracle任务
  115. executeOracleTask(jobExecutionContext);
  116. } else {
  117. // 执行其他任务
  118. executeOtherTask(jobExecutionContext);
  119. }
  120. }
  121. private void executeOracleTask(JobExecutionContext jobExecutionContext) {
  122. }
  123. private void executeOtherTask(JobExecutionContext jobExecutionContext) {
  124. }
  125. // 执行MySQL任务的操作
  126. private void executeMysqlExchangeTask(JobKey key, String sourceDriver, String sourceUrl, String sourceUser,
  127. String sourcePassword, String sourceSql, String destinationDriver,
  128. String destinationUrl, String destinationUser, String destinationPassword,
  129. String destinationTable, Integer tkId, String colRelationship, int rsIncorrectData,
  130. int optCfgRsNum, int optCfgDefaultRsNum, int optCfgAutoManual, int tkExchangeType,
  131. int dtId, int tkExeType, int dsSourceId, String dsSourceName, int dsTargetId,
  132. String dsDestinationName, String tkExchangeServer,
  133. String dsSourceCharset, String dsDestinationCharset) {
  134. // 当前时间
  135. SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  136. Date date = new Date();
  137. String datetime = dateFormat.format(date);
  138. // 任务名称
  139. String tkTaskName = key.getName();
  140. // 及时更新下次执行时间
  141. this.saveNextExeTime(tkId, datetime, tkTaskName, rsIncorrectData);
  142. // 任务日志参数设置
  143. SmartDataTaskLog smartDataTaskLog = new SmartDataTaskLog();
  144. smartDataTaskLog.setTkLogTaskId(tkId);
  145. smartDataTaskLog.setTkLogTaskName(tkTaskName);
  146. smartDataTaskLog.setTkLogDtName(String.valueOf(dtId));
  147. smartDataTaskLog.setTkLogDsSourceName(dsSourceName);
  148. smartDataTaskLog.setTkLogDsSourceId(dsSourceId);
  149. smartDataTaskLog.setTkLogDsDestinationName(dsDestinationName);
  150. smartDataTaskLog.setTkLogDsDestinationId(dsTargetId);
  151. smartDataTaskLog.setTkLogDestTable(destinationTable);
  152. smartDataTaskLog.setTkLogExchangeServer(tkExchangeServer);
  153. // 运行参数配置:0自动,1手动
  154. smartDataTaskLog.setTkLogAutoManual(optCfgAutoManual == 0 ? "自动执行" : "手动执行");
  155. // 执行方式:0间隔执行,1定点执行,2每天,3每周,4每月
  156. if (tkExeType == 0) {
  157. smartDataTaskLog.setTkLogExeType("间隔执行");
  158. } else if (tkExeType == 1) {
  159. smartDataTaskLog.setTkLogExeType("定点执行");
  160. } else if (tkExeType == 2) {
  161. smartDataTaskLog.setTkLogExeType("每天执行");
  162. } else if (tkExeType == 3) {
  163. smartDataTaskLog.setTkLogExeType("每周执行");
  164. } else if (tkExeType == 4) {
  165. smartDataTaskLog.setTkLogExeType("每月执行");
  166. } else {
  167. smartDataTaskLog.setTkLogExeType("未知执行");
  168. }
  169. // 交换方式:0自定义SQL语句,1数据视图,2数据表
  170. if (tkExchangeType == 0) {
  171. smartDataTaskLog.setTkLogExchangeType("自定义SQL语句");
  172. } else if (tkExchangeType == 1) {
  173. smartDataTaskLog.setTkLogExchangeType("数据视图");
  174. } else if (tkExchangeType == 2) {
  175. smartDataTaskLog.setTkLogExchangeType("数据表");
  176. } else {
  177. smartDataTaskLog.setTkLogExchangeType("未知交换方式");
  178. }
  179. // 执行状态
  180. smartDataTaskLog.setTkLogExeStatus("执行中");
  181. // 执行开始时间
  182. smartDataTaskLog.setTkLogStartTime(datetime);
  183. // 获取代码开始时间
  184. long startTimeMillis = System.currentTimeMillis();
  185. int totalRead = 0, totalInsert = 0, totalUpdate = 0, totalError = 0;
  186. StringBuilder err = new StringBuilder();
  187. StringBuilder excetion = new StringBuilder();
  188. // 查询当前任务是否还在执行中
  189. List<SmartDataTaskLog> queryTaskExecuting = smartDataTaskMapper.queryTaskExecuting(tkId, 1);
  190. if (queryTaskExecuting.size() > 0) {
  191. smartDataTaskLog.setTkLogExeStatus("执行中断");
  192. Date endDate = new Date();
  193. String endDatetime = dateFormat.format(endDate);
  194. // 结束时间
  195. smartDataTaskLog.setTkLogEndTime(endDatetime);
  196. // 获取代码结束时间
  197. long endTimeMillis = System.currentTimeMillis();
  198. // 计算执行时间
  199. long elapsedTimeMillis = endTimeMillis - startTimeMillis;
  200. // 耗时
  201. smartDataTaskLog.setTkLogCostTime(String.valueOf(elapsedTimeMillis));
  202. // 读取数据量
  203. smartDataTaskLog.setTkLogReadRows(totalRead);
  204. // 增加数据量
  205. smartDataTaskLog.setTkLogInsertRows(totalInsert);
  206. // 更新数据量
  207. smartDataTaskLog.setTkLogUpdateRows(totalUpdate);
  208. // 错误数据量
  209. smartDataTaskLog.setTkLogErrRows(totalError);
  210. smartDataTaskLog.setTkLogErrException("当前任务正在执行中,尚未完成,本次中断,请稍后再试!");
  211. // 记录任务开始了,成功后返回tkLogId
  212. smartDataTaskMapper.insertTaskLog(smartDataTaskLog);
  213. } else {
  214. // 记录任务开始了,成功后返回tkLogId
  215. smartDataTaskMapper.insertTaskLog(smartDataTaskLog);
  216. // ========== 开始执行本次任务 ==============================
  217. // 源连接
  218. DBUtil sourceDbUtil = new DBUtil(sourceUrl, sourceUser, sourcePassword, sourceDriver, dsSourceCharset);
  219. Map<String, Object> sourceConnMap = sourceDbUtil.getConnection();
  220. if ("1".equals(sourceConnMap.get("code"))) {
  221. if (rsIncorrectData == 0) {
  222. // 记录错误信息
  223. this.saveDebugMsg(tkId, tkTaskName, datetime, sourceConnMap.get("msg").toString());
  224. return;
  225. }
  226. }
  227. Connection sourceConn = (Connection) sourceConnMap.get("msg");
  228. // 目标连接
  229. DBUtil destinationDbUtil = new DBUtil(destinationUrl, destinationUser, destinationPassword, destinationDriver, dsDestinationCharset);
  230. Map<String, Object> destinationConnMap = destinationDbUtil.getConnection();
  231. if ("1".equals(destinationConnMap.get("code"))) {
  232. if (rsIncorrectData == 0) {
  233. // 记录错误信息
  234. this.saveDebugMsg(tkId, tkTaskName, datetime, destinationConnMap.get("msg").toString());
  235. return;
  236. }
  237. }
  238. Connection destinationConn = (Connection) destinationConnMap.get("msg");
  239. // 解析列对应关系
  240. JSONArray colRelationshipArray = JSONArray.parseArray(colRelationship);
  241. // 源表列名列表
  242. Map<String, String> colSourceMap = new HashMap<>();
  243. // 目标表列名列表
  244. Map<String, String> colDestinationMap = new HashMap<>();
  245. // 是否更新
  246. Map<String, String> isUpdateMap = new HashMap<>();
  247. int index = 1;
  248. int indexPk = 1;
  249. for (int i = 0; i < colRelationshipArray.size(); i++) {
  250. JSONObject colRelationshipObject = colRelationshipArray.getJSONObject(i);
  251. if (colRelationshipObject.getInteger("isUpdatePrimaryKey") == 1) {
  252. // 如果该字段是更新字段,则记录1,否则0不记录
  253. if (colRelationshipObject.getInteger("isUpdate") == 1) {
  254. isUpdateMap.put("colPrimaryKey" + indexPk, "1");
  255. } else {
  256. isUpdateMap.put("colPrimaryKey" + indexPk, "0");
  257. }
  258. colSourceMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colSource"));
  259. colDestinationMap.put("colPrimaryKey" + indexPk, colRelationshipObject.getString("colDestination"));
  260. indexPk++;
  261. } else {
  262. // 如果该字段是更新字段,则记录1,否则0不记录
  263. if (colRelationshipObject.getInteger("isUpdate") == 1) {
  264. isUpdateMap.put("col" + index, "1");
  265. } else {
  266. isUpdateMap.put("col" + index, "0");
  267. }
  268. colSourceMap.put("col" + index, colRelationshipObject.getString("colSource"));
  269. colDestinationMap.put("col" + index, colRelationshipObject.getString("colDestination"));
  270. index++;
  271. }
  272. }
  273. // 拼接主键排序条件
  274. String sourceOrderBy = null;
  275. String destinationOrderBy = null;
  276. for (int i = 1; i < indexPk; i++) {
  277. sourceOrderBy = colSourceMap.get("colPrimaryKey" + i) + " ASC,";
  278. destinationOrderBy = colDestinationMap.get("colPrimaryKey" + i) + " ASC,";
  279. }
  280. if (sourceOrderBy != null) {
  281. sourceOrderBy = " ORDER BY " + sourceOrderBy.substring(0, sourceOrderBy.length() - 1);
  282. destinationOrderBy = " ORDER BY " + destinationOrderBy.substring(0, destinationOrderBy.length() - 1);
  283. }
  284. // 从第1页开始
  285. int page = 0;
  286. String sourceSql_tmp, destinationSql;
  287. while (true) {
  288. // 当前时间
  289. date = new Date();
  290. datetime = dateFormat.format(date);
  291. // 获取对应id的字段值,方便后续使用
  292. int tkActivation = smartDataTaskMapper.selectColSwappedPrimaryKeys(tkId);
  293. if (tkActivation == 0) {
  294. this.saveDebugMsg(tkId, tkTaskName, datetime, "当前任务停止,需要手动启动!");
  295. break;
  296. } else if (tkActivation == 2) {
  297. this.saveDebugMsg(tkId, tkTaskName, datetime, "当前任务已暂停,需要手动恢复,手动恢复10秒后继续!");
  298. try {
  299. Thread.sleep(10000);
  300. } catch (InterruptedException e) {
  301. this.saveDebugMsg(tkId, tkTaskName, datetime, "线程休眠异常!" + e.getMessage());
  302. }
  303. continue;
  304. }
  305. // 运行参数配置:0自动,1手动
  306. if (optCfgAutoManual == 0) {
  307. sourceSql_tmp = sourceSql + sourceOrderBy + " LIMIT " + (page * optCfgDefaultRsNum) + "," + optCfgDefaultRsNum;
  308. destinationSql = "SELECT * FROM " + destinationTable + destinationOrderBy + " LIMIT "
  309. + (page * optCfgDefaultRsNum) + "," + optCfgDefaultRsNum;
  310. } else {
  311. sourceSql_tmp = sourceSql + destinationOrderBy + " LIMIT " + (page * optCfgRsNum) + "," + optCfgRsNum;
  312. destinationSql = "SELECT * FROM " + destinationTable + destinationOrderBy
  313. + " LIMIT " + (page * optCfgRsNum) + "," + optCfgRsNum;
  314. }
  315. // 调试输出sql
  316. if (debugSqlFlag == 1) {
  317. this.saveDebugMsg(tkId, tkTaskName, datetime, sourceSql_tmp);
  318. this.saveDebugMsg(tkId, tkTaskName, datetime, destinationSql);
  319. }
  320. // 执行源sql拼接后的语句
  321. Map<String, Object> sourceQuery = sourceDbUtil.query(sourceConn, sourceSql_tmp);
  322. if ("1".equals(sourceQuery.get("code"))) {
  323. if (rsIncorrectData == 0) {
  324. // 记录错误信息
  325. this.saveDebugMsg(tkId, tkTaskName, datetime, sourceQuery.get("msg").toString());
  326. }
  327. break;
  328. }
  329. Map<String, Object> sourceMapPage = (Map<String, Object>) sourceQuery.get("msg");
  330. ResultSet sourceRs = (ResultSet) sourceMapPage.get("rs");
  331. // PreparedStatement sourceStmt = (PreparedStatement) sourceMapPage.get("stmt");
  332. // 执行目标sql拼接后的语句
  333. Map<String, Object> destinationQuery = destinationDbUtil.query(destinationConn, destinationSql);
  334. if ("1".equals(destinationQuery.get("code"))) {
  335. if (rsIncorrectData == 0) {
  336. // 记录错误信息
  337. this.saveDebugMsg(tkId, tkTaskName, datetime, destinationQuery.get("msg").toString());
  338. }
  339. break;
  340. }
  341. Map<String, Object> destinationMapPage = (Map<String, Object>) destinationQuery.get("msg");
  342. ResultSet destinationRs = (ResultSet) destinationMapPage.get("rs");
  343. // PreparedStatement destinationStmt = (PreparedStatement) destinationMapPage.get("stmt");
  344. // 2.从目标库中获取【默认记录数】或【指定记录数】生成哈希码
  345. int sourceHashCode = 0, destinationHashCode = 0;
  346. boolean sourceHashCodeFlag = false;
  347. StringBuilder sourceBuilder = new StringBuilder();
  348. StringBuilder destinationBuilder = new StringBuilder();
  349. try {
  350. while (sourceRs.next()) {
  351. sourceHashCodeFlag = true;
  352. // 获取源数据
  353. for (int i = 1; i < indexPk; i++) {
  354. sourceBuilder.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i)));
  355. }
  356. for (int i = 1; i < index; i++) {
  357. sourceBuilder.append(sourceRs.getString(colSourceMap.get("col" + i)));
  358. }
  359. totalRead++;
  360. }
  361. // 将源数据拼接生成哈希码
  362. sourceHashCode = sourceBuilder.toString().hashCode();
  363. while (destinationRs.next()) {
  364. // 获取目标数据
  365. for (int i = 1; i < indexPk; i++) {
  366. destinationBuilder.append(destinationRs.getString(colDestinationMap.get("colPrimaryKey" + i)));
  367. }
  368. for (int i = 1; i < index; i++) {
  369. destinationBuilder.append(destinationRs.getString(colDestinationMap.get("col" + i)));
  370. }
  371. }
  372. // 将目标数据拼接生成哈希码
  373. destinationHashCode = destinationBuilder.toString().hashCode();
  374. } catch (SQLException e) {
  375. this.saveDebugMsg(tkId, tkTaskName, datetime, e.getMessage());
  376. }
  377. // 3.比较两个哈希码,判断是否需要更新
  378. if (sourceHashCode != destinationHashCode) {
  379. // 如果源数据和目标数据存在相同的主键值,则需要更新操作
  380. // 如果源数据和目标数据不存在相同的主键值,则需要将来源库的数据插入到目标库中
  381. Map<String, Object> returnMap = executeMysqlInsertOrUpdateTask(sourceDbUtil, destinationDbUtil, sourceConn,
  382. destinationConn, sourceSql_tmp, rsIncorrectData, tkId, tkTaskName, datetime,
  383. indexPk, index, colSourceMap, colDestinationMap, isUpdateMap, destinationTable);
  384. if (returnMap.get("code").equals("0")) {
  385. page++;
  386. Map<String, String> returnM = (Map<String, String>) returnMap.get("msg");
  387. totalInsert += Integer.parseInt(returnM.get("totalInsert"));
  388. totalUpdate += Integer.parseInt(returnM.get("totalUpdate"));
  389. totalError += Integer.parseInt(returnM.get("totalError"));
  390. if (returnM.get("exception") != null) {
  391. err.append(returnM.get("exception")).append("\n");
  392. }
  393. continue;
  394. } else {
  395. excetion.append(returnMap.get("msg")).append("\n");
  396. break;
  397. }
  398. }
  399. // 都没有记录了,则结束任务
  400. if (!sourceHashCodeFlag) {
  401. break;
  402. }
  403. page++;
  404. }
  405. StringBuilder errOrException = new StringBuilder();
  406. if (err.length() > 0) {
  407. errOrException.append("错误:\n").append(err);
  408. }
  409. if (excetion.length() > 0) {
  410. errOrException.append("\n异常:\n").append(excetion);
  411. }
  412. if (errOrException.length() > 0) {
  413. smartDataTaskLog.setTkLogErrException(String.valueOf(errOrException));
  414. // 设置执行状态,执行失败
  415. smartDataTaskLog.setTkLogExeStatus("执行失败");
  416. } else {
  417. // 设置执行状态,执行成功
  418. smartDataTaskLog.setTkLogExeStatus("执行成功");
  419. smartDataTaskLog.setTkLogErrException("无异常和错误");
  420. }
  421. Date endDate = new Date();
  422. String endDatetime = dateFormat.format(endDate);
  423. // 结束时间
  424. smartDataTaskLog.setTkLogEndTime(endDatetime);
  425. // 获取代码结束时间
  426. long endTimeMillis = System.currentTimeMillis();
  427. // 计算执行时间
  428. long elapsedTimeMillis = endTimeMillis - startTimeMillis;
  429. // 耗时
  430. smartDataTaskLog.setTkLogCostTime(String.valueOf(elapsedTimeMillis));
  431. // 读取数据量
  432. smartDataTaskLog.setTkLogReadRows(totalRead);
  433. // 增加数据量
  434. smartDataTaskLog.setTkLogInsertRows(totalInsert);
  435. // 更新数据量
  436. smartDataTaskLog.setTkLogUpdateRows(totalUpdate);
  437. // 错误数据量
  438. smartDataTaskLog.setTkLogErrRows(totalError);
  439. // 记录任务开始了,成功后会返回id,更新对应的id
  440. int count = smartDataTaskMapper.updateTaskLog(smartDataTaskLog);
  441. // 关闭数据库连接
  442. if (destinationConn != null) {
  443. destinationDbUtil.closeConnection(destinationConn);
  444. }
  445. if (sourceConn != null) {
  446. sourceDbUtil.closeConnection(sourceConn);
  447. }
  448. }
  449. // ========== 结束执行本次任务 ==============================
  450. // 调试输出sql
  451. String tmp1 = "数据交换完成,共交换数据 " + (totalInsert + totalUpdate) + " 条:插入:" + totalInsert + " 条,更新:" + totalUpdate + " 条";
  452. this.saveDebugMsg(tkId, tkTaskName, datetime, tmp1);
  453. }
  454. // 执行MySQL的插入或更新任务
  455. private Map<String, Object> executeMysqlInsertOrUpdateTask(DBUtil sourceDbUtil, DBUtil destinationDbUtil, Connection sourceConn,
  456. Connection destinationConn, String sourceSql_tmp,
  457. int rsIncorrectData, int tkId, String tkTaskName, String datetime,
  458. int indexPk, int index, Map<String, String> colSourceMap,
  459. Map<String, String> colDestinationMap, Map<String, String> isUpdateMap,
  460. String destinationTable) {
  461. // 执行源sql
  462. Map<String, Object> sourceQuery = sourceDbUtil.query(sourceConn, sourceSql_tmp);
  463. if ("1".equals(sourceQuery.get("code"))) {
  464. if (rsIncorrectData == 0) {
  465. // 记录错误信息
  466. this.saveDebugMsg(tkId, tkTaskName, datetime, sourceQuery.get("msg").toString());
  467. }
  468. return CommonUtil.getReturnMap("1", "执行源sql失败:" + sourceSql_tmp);
  469. }
  470. Map<String, Object> sourceMap = (Map<String, Object>) sourceQuery.get("msg");
  471. ResultSet sourceRs = (ResultSet) sourceMap.get("rs");
  472. PreparedStatement sourceStmt = (PreparedStatement) sourceMap.get("stmt");
  473. int totalInsert = 0, totalUpdate = 0, totalError = 0;
  474. // 返回参数map
  475. Map<String, String> returnMap = new HashMap<>();
  476. try {
  477. // 执行解析操作
  478. while (sourceRs.next()) {
  479. // 拼接插入的值
  480. StringBuilder stringInsertData = new StringBuilder();
  481. // 拼接插入的列
  482. StringBuilder stringInsertCols = new StringBuilder();
  483. // 拼接更新的列和值
  484. StringBuilder stringUpdateData = new StringBuilder();
  485. // 拼接目标表的pk
  486. StringBuilder stringDestinationPk = new StringBuilder();
  487. String tempStr;
  488. for (int i = 1; i < indexPk; i++) {
  489. tempStr = sourceRs.getString(colSourceMap.get("colPrimaryKey" + i));
  490. // 目标表的主键where子句
  491. stringDestinationPk.append(colDestinationMap.get("colPrimaryKey" + i)).append("='").append(tempStr).append("' AND ");
  492. // 拼接插入的列
  493. stringInsertCols.append(colDestinationMap.get("colPrimaryKey" + i)).append(",");
  494. // 拼接插入的值
  495. stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i))).append("',");
  496. }
  497. // 去除最后的 AND
  498. stringDestinationPk.setLength(stringDestinationPk.length() - 5);
  499. // 拼接更新的值
  500. for (int i = 1; i < index; i++) {
  501. // 拼接插入的列
  502. stringInsertCols.append(colDestinationMap.get("col" + i)).append(",");
  503. // 拼接插入的值
  504. stringInsertData.append("'").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',");
  505. if (isUpdateMap.get("col" + i) != null && isUpdateMap.get("col" + i).equals("1")) {
  506. // 拼接更新的列和值
  507. stringUpdateData.append(colDestinationMap.get("col" + i)).append("='").append(sourceRs.getString(colSourceMap.get("col" + i))).append("',");
  508. }
  509. }
  510. // 删除最后一个逗号
  511. stringInsertCols.setLength(stringInsertCols.length() - 1);
  512. stringInsertData.setLength(stringInsertData.length() - 1);
  513. // 说明有需要更新的列
  514. if (stringUpdateData.length() > 0) {
  515. stringUpdateData.setLength(stringUpdateData.length() - 1);
  516. this.saveDebugMsg(tkId, tkTaskName, datetime, stringUpdateData.toString());
  517. }
  518. // 查询是否存在该主键的记录sql
  519. String destinationQuerySql = "SELECT * FROM " + destinationTable + " WHERE " + stringDestinationPk;
  520. // 调试输出sql
  521. if (debugSqlFlag == 1) {
  522. this.saveDebugMsg(tkId, tkTaskName, datetime, destinationQuerySql);
  523. }
  524. // 查询目标表是否存在该主键的记录
  525. Map<String, Object> destinationQuery = destinationDbUtil.query(destinationConn, destinationQuerySql);
  526. // 判断查询结果,如果发生错误返回code:1
  527. if (destinationQuery.get("code").equals("1")) {
  528. if (rsIncorrectData == 0) {
  529. // 记录错误信息
  530. this.saveDebugMsg(tkId, tkTaskName, datetime, destinationQuery.get("msg").toString());
  531. }
  532. return CommonUtil.getReturnMap("1", "目标表查询错误:" + destinationQuery.get("msg").toString());
  533. }
  534. // 获取查询结果集map
  535. Map<String, Object> sourceQueryMap = (Map<String, Object>) destinationQuery.get("msg");
  536. // 获取查询结果集
  537. ResultSet destinationRs = (ResultSet) sourceQueryMap.get("rs");
  538. // 获取查询sql的PreparedStatement
  539. PreparedStatement destinationQueryStmt = (PreparedStatement) sourceQueryMap.get("stmt");
  540. // ========== 之前是批量计算哈希码是否相等,现在是计算每一条的哈希码是否相等 ====================================================================
  541. int sourceHashCode = 0, destinationHashCode = 0;
  542. StringBuilder sourceBuilder = new StringBuilder();
  543. StringBuilder destinationBuilder = new StringBuilder();
  544. // 获取源数据
  545. for (int i = 1; i < indexPk; i++) {
  546. sourceBuilder.append(sourceRs.getString(colSourceMap.get("colPrimaryKey" + i)));
  547. }
  548. for (int i = 1; i < index; i++) {
  549. sourceBuilder.append(sourceRs.getString(colSourceMap.get("col" + i)));
  550. }
  551. // 将源数据拼接生成哈希码
  552. sourceHashCode = sourceBuilder.toString().hashCode();
  553. // 判断目标表是否存在该主键的记录
  554. if (destinationRs.next()) {
  555. if (stringUpdateData.length() == 0) {
  556. continue;
  557. }
  558. // 获取目标数据
  559. for (int i = 1; i < indexPk; i++) {
  560. destinationBuilder.append(destinationRs.getString(colDestinationMap.get("colPrimaryKey" + i)));
  561. }
  562. for (int i = 1; i < index; i++) {
  563. destinationBuilder.append(destinationRs.getString(colDestinationMap.get("col" + i)));
  564. }
  565. // 将目标数据拼接生成哈希码
  566. destinationHashCode = destinationBuilder.toString().hashCode();
  567. // 判断哈希码是否相等,相等则跳过,不相等则更新目标表
  568. if (sourceHashCode == destinationHashCode) {
  569. continue;
  570. }
  571. // 更新语句
  572. String destinationUpdateSql = "UPDATE " + destinationTable + " SET " + stringUpdateData + " WHERE " + stringDestinationPk;
  573. // 调试输出sql
  574. if (debugSqlFlag == 1) {
  575. this.saveDebugMsg(tkId, tkTaskName, datetime, destinationUpdateSql);
  576. }
  577. // 更新目标表
  578. Map<String, Object> destinationUpdate = destinationDbUtil.update(destinationConn, destinationUpdateSql);
  579. // 判断更新是否成功
  580. if (destinationUpdate.get("code").equals("1")) {
  581. if (rsIncorrectData == 0) {
  582. // 记录错误信息
  583. this.saveDebugMsg(tkId, tkTaskName, datetime, destinationUpdate.get("msg").toString());
  584. }
  585. totalError++;
  586. returnMap.put("totalInsert", String.valueOf(totalInsert));
  587. returnMap.put("totalUpdate", String.valueOf(totalUpdate));
  588. returnMap.put("totalError", String.valueOf(totalError));
  589. returnMap.put("exception", destinationUpdate.get("msg").toString());
  590. return CommonUtil.getReturnMap("0", returnMap);
  591. }
  592. totalUpdate++;
  593. // 获取返回结果
  594. Map<String, Object> destinationQueryMap = (Map<String, Object>) destinationUpdate.get("msg");
  595. // 获取更新目标表的PreparedStatement
  596. PreparedStatement destinationUpdateStmt = (PreparedStatement) destinationQueryMap.get("stmt");
  597. // 关闭PreparedStatement
  598. destinationDbUtil.closeStatement(destinationUpdateStmt);
  599. } else {
  600. // 插入操作sql
  601. String destinationInsertSql = "INSERT INTO " + destinationTable + " (" + stringInsertCols + ") VALUES (" + stringInsertData + ")";
  602. // 调试输出sql
  603. if (debugSqlFlag == 1) {
  604. this.saveDebugMsg(tkId, tkTaskName, datetime, destinationInsertSql);
  605. }
  606. // 执行插入操作
  607. Map<String, Object> destinationInsert = destinationDbUtil.update(destinationConn, destinationInsertSql);
  608. // 判断插入操作是否成功
  609. if (destinationInsert.get("code").equals("1")) {
  610. if (rsIncorrectData == 0) {
  611. // 记录错误信息
  612. this.saveDebugMsg(tkId, tkTaskName, datetime, destinationInsert.get("msg").toString());
  613. }
  614. totalError++;
  615. returnMap.put("totalInsert", String.valueOf(totalInsert));
  616. returnMap.put("totalUpdate", String.valueOf(totalUpdate));
  617. returnMap.put("totalError", String.valueOf(totalError));
  618. returnMap.put("exception", destinationInsert.get("msg").toString());
  619. return CommonUtil.getReturnMap("0", returnMap);
  620. }
  621. totalInsert++;
  622. Map<String, Object> destinationQueryMap = (Map<String, Object>) destinationInsert.get("msg");
  623. // 获取插入操作的PreparedStatement
  624. PreparedStatement destinationInsertStmt = (PreparedStatement) destinationQueryMap.get("stmt");
  625. // 关闭对象 PreparedStatement
  626. destinationDbUtil.closeStatement(destinationInsertStmt);
  627. }
  628. // 关闭记录集ResultSet、PreparedStatement、数据库连接Connection
  629. destinationDbUtil.closeResultSet(destinationRs);
  630. destinationDbUtil.closeStatement(destinationQueryStmt);
  631. }
  632. returnMap.put("totalInsert", String.valueOf(totalInsert));
  633. returnMap.put("totalUpdate", String.valueOf(totalUpdate));
  634. returnMap.put("totalError", String.valueOf(totalError));
  635. return CommonUtil.getReturnMap("0", returnMap);
  636. } catch (SQLException e) {
  637. if (rsIncorrectData == 0) {
  638. // 记录错误信息
  639. this.saveDebugMsg(tkId, tkTaskName, datetime, e.getMessage());
  640. }
  641. return CommonUtil.getReturnMap("1", "异常:" + Arrays.toString(e.getStackTrace()));
  642. } finally {
  643. sourceDbUtil.closeResultSet(sourceRs);
  644. sourceDbUtil.closeStatement(sourceStmt);
  645. }
  646. }
  647. // 保存debug信息
  648. private void saveDebugMsg(Integer eTaskId, String tkTaskName, String datetime, String errorMsg) {
  649. // 实现错误信息保存的逻辑
  650. SmartDataTaskDebug smartDataTaskDebug = new SmartDataTaskDebug();
  651. smartDataTaskDebug.setETaskId(eTaskId);
  652. smartDataTaskDebug.setETaskName(tkTaskName);
  653. smartDataTaskDebug.setEMsg(errorMsg);
  654. smartDataTaskDebug.setEDateTime(datetime);
  655. SmartDataTaskDebug returnSmartDataTaskErr = smartDataTaskMapper.selectErrorMsg(smartDataTaskDebug);
  656. if (returnSmartDataTaskErr == null) {
  657. // 保存错误信息到数据库
  658. int i = smartDataTaskMapper.insertErrorMsg(smartDataTaskDebug);
  659. if (i == 0) {
  660. logger.info(datetime + ":保存错误信息失败!");
  661. }
  662. } else {
  663. smartDataTaskDebug.setEId(returnSmartDataTaskErr.getEId());
  664. int i = smartDataTaskMapper.updateErrorMsg(smartDataTaskDebug);
  665. if (i == 0) {
  666. logger.info(datetime + ":更新错误信息失败!");
  667. }
  668. }
  669. }
  670. /**
  671. * 保存下次执行时间
  672. *
  673. * @param tkId 任务id
  674. * @param datetime 时间
  675. * @param tkTaskName 任务名称
  676. * @param rsIncorrectData 是否记录错误数据:0是,1否
  677. */
  678. private void saveNextExeTime(Integer tkId, String datetime, String tkTaskName, int rsIncorrectData) {
  679. QueryWrapper<SmartDataTask> queryWrapper = new QueryWrapper<>();
  680. queryWrapper.eq(tkTaskName != null, "tk_name", tkTaskName);
  681. SmartDataTask smartDataTask = smartDataTaskMapper.selectOne(queryWrapper);
  682. if (smartDataTask != null) {
  683. // 下次执行的时间
  684. String nextExeTime = QuartzJobUtils.getNextExeTime(smartDataTask.getTkCron());
  685. smartDataTask.setTkNextExeTime(nextExeTime);
  686. try {
  687. smartDataTaskMapper.markTaskById(smartDataTask);
  688. } catch (Exception e) {
  689. if (rsIncorrectData == 0) {
  690. this.saveDebugMsg(tkId, tkTaskName, datetime, e.getMessage());
  691. }
  692. }
  693. } else {
  694. if (rsIncorrectData == 0) {
  695. this.saveDebugMsg(tkId, tkTaskName, datetime, "【下次执行的时间】无法更新至数据库中!");
  696. }
  697. }
  698. }
  699. }