From abae324f3ec878c6be969cf0d2dacc96a1eaf69f Mon Sep 17 00:00:00 2001 From: huangqizhen <15552608129@163.com> Date: Tue, 4 Nov 2025 09:54:55 +0800 Subject: [PATCH] =?UTF-8?q?10.25=20=E4=BF=AE=E6=94=B9=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E6=B1=A0=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/example/demo/Mysql/MysqlController.java | 5 + .../java/com/example/demo/Mysql/MysqlService.java | 1 + .../com/example/demo/Mysql/MysqlServiceImpl.java | 109 +++++++++++++++++++++ 3 files changed, 115 insertions(+) diff --git a/src/main/java/com/example/demo/Mysql/MysqlController.java b/src/main/java/com/example/demo/Mysql/MysqlController.java index 65352ed..9b8442a 100644 --- a/src/main/java/com/example/demo/Mysql/MysqlController.java +++ b/src/main/java/com/example/demo/Mysql/MysqlController.java @@ -29,4 +29,9 @@ public class MysqlController { mysqlService.getSqlserverData(); return Result.success(); } + @RequestMapping("/Day") + public Result MysqlDay () throws Exception { + mysqlService.getSqlserverDataDay(); + return Result.success(); + } } diff --git a/src/main/java/com/example/demo/Mysql/MysqlService.java b/src/main/java/com/example/demo/Mysql/MysqlService.java index ae5a715..45fe09e 100644 --- a/src/main/java/com/example/demo/Mysql/MysqlService.java +++ b/src/main/java/com/example/demo/Mysql/MysqlService.java @@ -8,4 +8,5 @@ package com.example.demo.Mysql;/** **/public interface MysqlService { void getSqlserverData() throws Exception; + void getSqlserverDataDay() throws Exception; } diff --git a/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java b/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java index 186ccc7..bef5b69 100644 --- a/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java +++ b/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java @@ -21,6 +21,7 @@ import org.springframework.web.client.RestTemplate; import javax.sql.DataSource; import java.math.BigDecimal; import java.sql.*; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.Month; import java.time.format.DateTimeFormatter; @@ -169,6 +170,114 @@ public class MysqlServiceImpl implements MysqlService { } } + + @Override + @Transactional(transactionManager = "mysqlTransactionManager") // 👈 保证插入和用户更新在一个事务 + public void getSqlserverDataDay() throws Exception { + logger.info("开始从 SQL Server 同步数据到 MySQL"); + + try (Connection sqlServerConn = sqlserver1DataSource.getConnection(); + Connection mysqlConn = mysql1DataSource.getConnection()) { + + logger.info("开始查询数据..."); + + int pageSize = 100; + int offset = 0; + boolean hasMoreData = true; + + // 👇 恢复动态时间查询(原注释掉的硬编码时间已移除) + String querySql = """ + SELECT + id, gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name + FROM + hwhcGold.dbo.user_gold_records + WHERE cz_time >= ? + ORDER BY + cz_time ASC + OFFSET ? ROWS FETCH NEXT ? ROWS ONLY; + """; + + String insertSql = """ + INSERT IGNORE INTO user_gold_record + (order_code, jwcode, sum_gold, permanent_gold, free_june, free_december, + task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id, + audit_status, create_time, flag, update_time, audit_time, is_refund, uid) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """; + + while (hasMoreData) { + try (PreparedStatement sqlServerStmt = sqlServerConn.prepareStatement(querySql)) { + // 👇 恢复动态时间参数 + sqlServerStmt.setTimestamp(1, Timestamp.valueOf(LocalDate.now().minusDays(1).atStartOfDay())); + sqlServerStmt.setInt(2, offset); + sqlServerStmt.setInt(3, pageSize); + + ResultSet resultSet = sqlServerStmt.executeQuery(); + + if (!resultSet.next()) { + logger.info("无更多数据,offset={}", offset); + break; + } + + // 👇 步骤1:收集本批次所有记录 + List batchRecords = new ArrayList<>(); + List batchUids = new ArrayList<>(); + + do { + RecordData data = extractRecordData(resultSet); // 👈 抽取数据 + batchRecords.add(data); + batchUids.add(data.uid); + } while (resultSet.next()); + + logger.info("本批次共 {} 条记录", batchRecords.size()); + + // 👇 步骤2:批量查询哪些 uid 已存在(性能优化) + Set existingUids = getExistingUids(mysqlConn, batchUids); + logger.info("已存在记录数: {}", existingUids.size()); + + // 👇 步骤3:准备批量插入 + try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) { + + for (RecordData data : batchRecords) { + if (validFourTypes.contains(data.gtype)) { + logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid); + continue; + } + if ("4".equals(data.operation_platform)) { + logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid); + continue; + } + // 👇 跳过已存在的记录(避免重复更新用户余额) + if (existingUids.contains(data.uid)) { + logger.debug("跳过重复记录,uid={}", data.uid); + continue; + } + + // 👇 设置插入参数 + setStatementParams(mysqlStmt, data); + + mysqlStmt.addBatch(); + + // 👇 只有新记录才更新用户余额(关键修复!) + updateUserBalance(mysqlConn, data); + } + + // 👇 执行批量插入 + int[] results = mysqlStmt.executeBatch(); + logger.info("成功插入新记录 {} 条", results.length); + } + + offset += pageSize; + } + } + + logger.info("✅ 数据同步完成"); + } catch (SQLException e) { + logger.error("数据库操作失败", e); + throw new RuntimeException("同步失败", e); + } + } + static class RecordData { int gtype, jwcode, free, core_jb, buy_jb; Timestamp cz_time;