diff --git a/src/main/java/com/example/demo/DemoApplication.java b/src/main/java/com/example/demo/DemoApplication.java index 9ab3517..e7293f0 100644 --- a/src/main/java/com/example/demo/DemoApplication.java +++ b/src/main/java/com/example/demo/DemoApplication.java @@ -12,6 +12,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; @MapperScan(basePackages = "com.example.demo.mapper.pay", sqlSessionTemplateRef = "mysql3SqlSessionTemplate") @MapperScan(basePackages = "com.example.demo.mapper.cms", sqlSessionTemplateRef = "mysql4SqlSessionTemplate") @MapperScan(basePackages = "com.example.demo.mapper.live", sqlSessionTemplateRef = "mysql5SqlSessionTemplate") +@MapperScan(basePackages = "com.example.demo.mapper.sqlserver", sqlSessionTemplateRef = "sqlserver1SqlSessionTemplate") public class DemoApplication { public static void main(String[] args) { diff --git a/src/main/java/com/example/demo/Mysql/MysqlController.java b/src/main/java/com/example/demo/Mysql/MysqlController.java index 65352ed..9ded12a 100644 --- a/src/main/java/com/example/demo/Mysql/MysqlController.java +++ b/src/main/java/com/example/demo/Mysql/MysqlController.java @@ -26,7 +26,7 @@ public class MysqlController { MysqlService mysqlService; @RequestMapping public Result Mysql () throws Exception { - mysqlService.getSqlserverData(); + mysqlService.getSqlserverData2(); return Result.success(); } } diff --git a/src/main/java/com/example/demo/Mysql/MysqlService.java b/src/main/java/com/example/demo/Mysql/MysqlService.java index ae5a715..2655e60 100644 --- a/src/main/java/com/example/demo/Mysql/MysqlService.java +++ b/src/main/java/com/example/demo/Mysql/MysqlService.java @@ -8,4 +8,5 @@ package com.example.demo.Mysql;/** **/public interface MysqlService { void getSqlserverData() throws Exception; + void getSqlserverData2() throws Exception; } diff --git a/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java b/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java index 2d69e73..f1bbd65 100644 --- a/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java +++ b/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java @@ -4,6 +4,7 @@ package com.example.demo.Mysql; import com.example.demo.Util.BaseDES; import com.example.demo.domain.entity.User; import com.example.demo.service.coin.AdminService; +import com.example.demo.service.coin.MarketService; import com.example.demo.service.coin.UserService; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; @@ -27,7 +28,7 @@ import java.util.*; @Service -@Transactional + public class MysqlServiceImpl implements MysqlService { @Autowired private RestTemplate restTemplate; @@ -43,6 +44,8 @@ public class MysqlServiceImpl implements MysqlService { private AdminService adminService; @Autowired private UserService userService; + @Autowired + private MarketService marketService; @Autowired @@ -59,10 +62,10 @@ public class MysqlServiceImpl implements MysqlService { @Override - @Scheduled(cron = "0 0 * * * ?") // 每小时执行一次 +// @Scheduled(cron = "0 0 * * * ?") // 每小时执行一次 public void getSqlserverData() throws Exception { logger.info("开始从 SQL Server 同步数据到 MySQL"); - try (Connection sqlServerConn = sqlserver1DataSource.getConnection(); + try (Connection sqlServerConn = mysql1DataSource.getConnection(); Connection mysqlConn = mysql1DataSource.getConnection()) { logger.info("开始查询数据..."); // 从 SQL Server 查询数据 @@ -177,7 +180,7 @@ public class MysqlServiceImpl implements MysqlService { if(ObjectUtils.isEmpty(user)){ logger.info("用户不存在"); user = new User(); - String country = "未知"; + List country = Collections.singletonList("未知"); BaseDES des = new BaseDES(); String desjwcode= des.encrypt(String.valueOf(jwcode)); // System.out.println("desjwcode:"+desjwcode); @@ -208,7 +211,7 @@ public class MysqlServiceImpl implements MysqlService { if (data != null) { // 提取name和country name = (String) data.get("name"); - country = (String) data.get("country"); + country = (List) data.get("country"); // 打印获取到的数据 System.out.println("Name: " + name); @@ -225,12 +228,13 @@ public class MysqlServiceImpl implements MysqlService { } catch (Exception e) { System.out.println("Error: " + e.getMessage()); // 设置默认的 country 值 - country = "未知"; + country = Collections.singletonList("未知"); } + String market= String.join(",",marketService.getMarketIds(country)); user.setJwcode(jwcode); user.setName( name); - user.setMarket(country); + user.setMarket(market); logger.info("新添用户"); userService.addUser(user); logger.info("用户添加成功"); @@ -275,4 +279,311 @@ public class MysqlServiceImpl implements MysqlService { throw new RuntimeException("数据链接失败", e); } } + + @Override + public void getSqlserverData2() throws Exception { + logger.info("开始从 MySQL 同步数据到 MySQL(分页处理)"); + + Timestamp startTime = Timestamp.valueOf(LocalDateTime.now().minusHours(1)); + int pageSize = 100; + int pageNumber = 0; + int totalProcessed = 0; + + try ( + + Connection sourceConn = mysql1DataSource.getConnection(); + Connection targetConn = mysql1DataSource.getConnection() + ) { + // 关闭自动提交,手动控制事务 + targetConn.setAutoCommit(false); + + // 分页查询 SQL(MySQL 使用 LIMIT offset, size) + String querySql = """ + SELECT gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name,flag + FROM user_gold_records + where cz_time >'2025-08-10 07:11:04' + ORDER BY cz_time ASC + LIMIT ? OFFSET ? + """; + + String insertSql = """ + INSERT INTO user_gold_record + (order_code, jwcode, sum_gold, permanent_gold, free_june, free_december, + task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id, + audit_status, create_time, flag, update_time) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """; + + try ( + PreparedStatement queryStmt = sourceConn.prepareStatement(querySql); + PreparedStatement insertStmt = targetConn.prepareStatement(insertSql) + ) { + + while (true) { + int offset = pageNumber * pageSize; + + queryStmt.setInt(1, pageSize); + queryStmt.setInt(2, offset); + + List batchData = new ArrayList<>(); + try (ResultSet rs = queryStmt.executeQuery()) { + boolean hasData = false; + while (rs.next()) { + hasData = true; + + RecordData data = new RecordData(); + data.gtype = rs.getInt("gtype"); + data.jwcode = rs.getInt("jwcode"); + data.free = rs.getInt("free"); + data.core_jb = rs.getInt("core_jb"); + data.buy_jb = rs.getInt("buy_jb"); + data.cz_time = rs.getTimestamp("cz_time"); + data.cz_user = rs.getString("cz_user"); + data.cz_bz = rs.getString("cz_bz"); + data.operation_platform = rs.getString("operation_platform"); + data.goods_name = rs.getString("goods_name"); + + batchData.add(data); + } + + if (!hasData || batchData.isEmpty()) { + logger.info("无更多数据可同步,共处理 {} 条", totalProcessed); + break; + } + } + + // 清空上一批 + insertStmt.clearBatch(); + + // 处理当前批次(100条以内) + for (RecordData data : batchData) { + buildInsertStatement(insertStmt, data); + insertStmt.addBatch(); + } + + // 执行批量插入 + insertStmt.executeBatch(); + targetConn.commit(); + totalProcessed += batchData.size(); + logger.info("第 {} 页,插入 {} 条记录,累计 {}", pageNumber, batchData.size(), totalProcessed); + + // 更新用户信息(可优化为批量) + for (RecordData data : batchData) { + updateUserGold(data); + } + + pageNumber++; + } + } catch (Exception e) { + targetConn.rollback(); + logger.error("同步批次失败,事务回滚", e); + throw e; + } + + logger.info("✅ 数据同步完成,共处理 {} 条记录", totalProcessed); + + } catch (SQLException e) { + logger.error("数据库连接或操作失败", e); + throw new RuntimeException("MySQL 同步失败", e); + } + } + + // 封装插入逻辑 + private void buildInsertStatement(PreparedStatement stmt, RecordData data) throws SQLException { + String timestampPart = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")); + Random random = new Random(); + int randomNumber = random.nextInt(900) + 100; + + // admin_id + String czUser = data.cz_user; + +// 1. 先判空 + if (czUser != null && StringUtils.isNumeric(czUser)) { + try { + // 2. 调用服务获取 ID + String adminIdStr = adminService.getId(czUser); // 可能返回 null 或字符串 + + // 3. 再次判空并转为 Integer + if (adminIdStr != null && StringUtils.isNumeric(adminIdStr)) { + Integer admin_id = Integer.parseInt(adminIdStr); // 安全转换 + stmt.setInt(14, admin_id); + } else { + // adminService 返回 null 或非数字,视为无效 admin + stmt.setInt(14, 99999); + } + } catch (Exception e) { + // 防御性兜底 + logger.warn("解析 admin_id 失败,cz_user={}", czUser, e); + stmt.setInt(14, 99999); + } + } else { + // cz_user 为 null 或非数字 + stmt.setInt(14, 99999); + } + + stmt.setString(10, null); + stmt.setNull(11, Types.INTEGER); + + if (validFourTypes.contains(data.gtype)) { + return; // 跳过 + } + + if (validZeroTypes.contains(data.gtype)) { + stmt.setInt(13, 0); + stmt.setString(1, "ERPCZ" + timestampPart + randomNumber); + } else if (validOneTypes.contains(data.gtype)) { + stmt.setInt(13, 1); + stmt.setString(1, "ERPXF" + timestampPart + randomNumber); + } else if (validTwoTypes.contains(data.gtype)) { + stmt.setInt(13, 2); + stmt.setString(1, "ERPTK" + timestampPart + randomNumber); + stmt.setString(10, "退款商品"); + stmt.setInt(11, 0); + } else if (validThreeTypes.contains(data.gtype)) { + stmt.setInt(13, 3); + stmt.setString(1, "ERPQT" + timestampPart + randomNumber); + } + + stmt.setInt(2, data.jwcode); + stmt.setInt(3, data.free + data.core_jb + data.buy_jb); + stmt.setInt(4, data.buy_jb); + + if (currentMonth.getValue() >= 7) { + stmt.setInt(5, data.free); + stmt.setInt(6, 0); + } else { + stmt.setInt(5, 0); + stmt.setInt(6, data.free); + } + + stmt.setInt(7, data.core_jb); + + String platform = data.operation_platform; + if ("1".equals(platform)) { + stmt.setString(8, "ERP"); + } else if ("2".equals(platform)) { + stmt.setString(8, "HomilyLink"); + } else if ("3".equals(platform)) { + stmt.setString(8, "HomilyChart"); + } else if ("4".equals(platform)) { + return; // 跳过这条数据 + } else if ("0".equals(platform)) { + stmt.setString(8, "初始化金币"); + } else { + stmt.setString(8, "其他"); + } + + stmt.setString(9, data.goods_name); + stmt.setString(12, data.cz_bz); + stmt.setInt(15, 3); + stmt.setTimestamp(16, data.cz_time); + stmt.setInt(17, data.flag); + stmt.setTimestamp(18, data.cz_time); + } + + // 更新用户金币逻辑 + private void updateUserGold(RecordData data) throws Exception { + User user = userService.selectAllUser(String.valueOf(data.jwcode)); + BigDecimal freeBD = BigDecimal.valueOf(data.free); + BigDecimal buyJbBD = BigDecimal.valueOf(data.buy_jb); + BigDecimal coreJbBD = BigDecimal.valueOf(data.core_jb); + + if (ObjectUtils.isEmpty(user)) { + logger.info("用户不存在"); + user = new User(); + List country = Collections.singletonList("未知"); + String name = "未知"; + BaseDES des = new BaseDES(); + String desjwcode= des.encrypt(String.valueOf(data.jwcode)); +// System.out.println("desjwcode:"+desjwcode); + + // 创建 JSON 请求体 + Map requestBody = new HashMap<>(); + requestBody.put("jwcode", desjwcode); + + // 设置请求头 + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + // 创建 HttpEntity + HttpEntity> entity = new HttpEntity<>(requestBody, headers); + + // 发送 POST 请求 + try { + ResponseEntity response = restTemplate.exchange( + "http://hwapi.rzfwq.com/hwjnApp/hwhc-login/hwhclogin/hc/login/clent/info", + HttpMethod.POST, entity, Map.class); + + // 检查响应状态码 + if (response.getStatusCode().is2xxSuccessful()) { + Map responseBody = response.getBody(); + if (responseBody != null) { + // 获取data部分 + Map data1 = (Map) responseBody.get("data"); + if (data1 != null) { + // 提取name和country + name = (String) data1.get("name"); + country = (List) data1.get("country"); + + // 打印获取到的数据 + System.out.println("Name: " + name); + System.out.println("Country: " + country); + } else { + System.out.println("Data is null"); + } + } else { + System.out.println("Response body is null"); + } + } else { + System.out.println("Request failed with status code: " + response.getStatusCode()); + } + } catch (Exception e) { + System.out.println("Error: " + e.getMessage()); + // 设置默认的 country 值 + country = Collections.singletonList("未知"); + } + + String market= String.join(",",marketService.getMarketIds(country)); + user.setJwcode(data.jwcode); + user.setName( name); + user.setMarket(market); + logger.info("新添用户"); + userService.addUser(user); + logger.info("用户添加成功"); + user = userService.selectAllUser(String.valueOf(data.jwcode)); + } + + if (currentMonth.getValue() >= 7) { + user.setCurrentFreeJune(user.getCurrentFreeJune().add(freeBD)); + } else { + user.setCurrentFreeDecember(user.getCurrentFreeDecember().add(freeBD)); + } + user.setCurrentPermanentGold(user.getCurrentPermanentGold().add(buyJbBD)); + user.setCurrentTaskGold(user.getCurrentTaskGold().add(coreJbBD)); + + if (validZeroTypes.contains(data.gtype)) { + user.setRechargeNum(user.getRechargeNum() + 1); + user.setSumPermanentGold(user.getSumPermanentGold().add(buyJbBD)); + user.setSumTaskGold(user.getSumTaskGold().add(coreJbBD)); + if (currentMonth.getValue() >= 7) { + user.setSumFreeJune(user.getSumFreeJune().add(freeBD)); + } else { + user.setSumFreeDecember(user.getSumFreeDecember().add(freeBD)); + } + } else if (validOneTypes.contains(data.gtype)) { + user.setConsumeNum(user.getConsumeNum() + 1); + user.setSumConsumePermanent(user.getSumConsumePermanent().add(buyJbBD)); + user.setSumConsumeTask(user.getSumConsumeTask().add(coreJbBD)); + user.setSumConsumeFree(user.getSumConsumeFree().add(freeBD)); + } + + userService.updateAllGold(user); + } + + // 内部类:封装查询结果 + static class RecordData { + int gtype, jwcode, free, core_jb, buy_jb, flag; + Timestamp cz_time; + String cz_user, cz_bz, operation_platform, goods_name; + } } \ No newline at end of file diff --git a/src/main/java/com/example/demo/Util/CacheRefreshTask.java b/src/main/java/com/example/demo/Util/CacheRefreshTask.java index fd4c6c3..4988a88 100644 --- a/src/main/java/com/example/demo/Util/CacheRefreshTask.java +++ b/src/main/java/com/example/demo/Util/CacheRefreshTask.java @@ -25,7 +25,7 @@ public class CacheRefreshTask { } // 每小时执行一次(15分0秒) - @Scheduled(cron = "0 15 * * * ?") +// @Scheduled(cron = "0 15 * * * ?") public void refreshCache() { List markets = generalService.getMarket(); try { diff --git a/src/main/java/com/example/demo/config/SqlServer1DataSourceConfig.java b/src/main/java/com/example/demo/config/SqlServer1DataSourceConfig.java index 0ee5b15..c54fbe7 100644 --- a/src/main/java/com/example/demo/config/SqlServer1DataSourceConfig.java +++ b/src/main/java/com/example/demo/config/SqlServer1DataSourceConfig.java @@ -38,7 +38,7 @@ public class SqlServer1DataSourceConfig { log.info("Initializing SQL Server SqlSessionFactory..."); SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean(); sessionFactory.setDataSource(dataSource); - sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/sqlServiceMapper/*.xml")); + sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:sqlServiceMapper/*.xml")); sessionFactory.setConfiguration(globalConfiguration); return sessionFactory.getObject(); } diff --git a/src/main/java/com/example/demo/mapper/coin/HistoryRecordMapper.java b/src/main/java/com/example/demo/mapper/sqlserver/HistoryRecordMapper.java similarity index 92% rename from src/main/java/com/example/demo/mapper/coin/HistoryRecordMapper.java rename to src/main/java/com/example/demo/mapper/sqlserver/HistoryRecordMapper.java index bbb3381..0342a55 100644 --- a/src/main/java/com/example/demo/mapper/coin/HistoryRecordMapper.java +++ b/src/main/java/com/example/demo/mapper/sqlserver/HistoryRecordMapper.java @@ -1,4 +1,4 @@ -package com.example.demo.mapper.coin; +package com.example.demo.mapper.sqlserver; import com.example.demo.domain.vo.coin.HistoryRecord; diff --git a/src/main/java/com/example/demo/serviceImpl/coin/HistoryRecordServiceImpl.java b/src/main/java/com/example/demo/serviceImpl/coin/HistoryRecordServiceImpl.java index 837a29c..496d45e 100644 --- a/src/main/java/com/example/demo/serviceImpl/coin/HistoryRecordServiceImpl.java +++ b/src/main/java/com/example/demo/serviceImpl/coin/HistoryRecordServiceImpl.java @@ -1,9 +1,7 @@ package com.example.demo.serviceImpl.coin; -import com.example.demo.domain.vo.coin.ConsumeUser; import com.example.demo.domain.vo.coin.HistoryRecord; -import com.example.demo.domain.vo.coin.Result; -import com.example.demo.mapper.coin.HistoryRecordMapper; +import com.example.demo.mapper.sqlserver.HistoryRecordMapper; import com.example.demo.service.coin.HistoryRecordService; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml index 0bd95af..0fdc73a 100644 --- a/src/main/resources/application-test.yml +++ b/src/main/resources/application-test.yml @@ -44,9 +44,9 @@ spring: pool-name: mysql5HikariCP maximum-pool-size: 10 sqlserver1: - jdbc-url: jdbc:sqlserver://10.19.183.6:1433;databaseName=hwhcGold;encrypt=true;sslProtocol=TLSv1;trustServerCertificate=true; - username: hwhc_gold_query - password: hwhc_gold_query4564jkj + jdbc-url: jdbc:sqlserver://52.76.43.43:1433;encrypt=true;sslProtocol=TLSv1;trustServerCertificate=true; + username: gjb_test + password: qweuio!@#$2 driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver application: diff --git a/src/main/resources/sqlServiceMapper/HistoryRecordMapper.xml b/src/main/resources/sqlServiceMapper/HistoryRecordMapper.xml index 4f7c390..f01262a 100644 --- a/src/main/resources/sqlServiceMapper/HistoryRecordMapper.xml +++ b/src/main/resources/sqlServiceMapper/HistoryRecordMapper.xml @@ -1,6 +1,6 @@ - +