From 287cb02681cef53b0ef9a3a7ce8d80a1a762d1f3 Mon Sep 17 00:00:00 2001 From: huangqizhen <15552608129@163.com> Date: Thu, 26 Mar 2026 11:48:27 +0800 Subject: [PATCH] =?UTF-8?q?3.26=20=E9=92=B1=E5=8C=85=E4=BD=99=E9=A2=9D?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/example/demo/Mysql/MysqlServiceImpl.java | 341 +++++++++++++++++---- 1 file changed, 281 insertions(+), 60 deletions(-) diff --git a/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java b/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java index 59783aa..0e59467 100644 --- a/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java +++ b/src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java @@ -128,9 +128,12 @@ public class MysqlServiceImpl implements MysqlService { Set existingUids = getExistingUids(mysqlConn, batchUids); logger.info("已存在记录数: {}", existingUids.size()); - // 👇 步骤3:准备批量插入 + // 👇 步骤 3:准备批量插入 try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) { + // 👇 新增:收集新记录列表 + List newRecords = new ArrayList<>(); + for (RecordData data : batchRecords) { if (validFourTypes.contains(data.gtype)) { logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid); @@ -140,27 +143,25 @@ public class MysqlServiceImpl implements MysqlService { logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid); continue; } - // 👇 跳过已存在的记录(避免重复更新用户余额) if (existingUids.contains(data.uid)) { logger.debug("跳过重复记录,uid={}", data.uid); continue; } + // 👇 新增:添加到新记录列表 + newRecords.add(data); + // 👇 设置插入参数 setStatementParams(mysqlStmt, data); - mysqlStmt.addBatch(); - - // 👇 只有新记录才更新用户余额(关键修复!) updateUserBalance(mysqlConn, data); } - // 👇 执行批量插入 int[] results = mysqlStmt.executeBatch(); logger.info("成功插入新记录 {} 条", results.length); - // 👉【新增一行调用】👈 - // 传入当前连接 + 本批次记录,自动完成路由更新 - updateUserRegionWallet(mysqlConn, batchRecords); + + // ✅ 修正:只传新记录(15 条) + updateUserRegionWallet(mysqlConn, newRecords); } offset += pageSize; @@ -239,9 +240,12 @@ public class MysqlServiceImpl implements MysqlService { Set existingUids = getExistingUids(mysqlConn, batchUids); logger.info("已存在记录数: {}", existingUids.size()); - // 👇 步骤3:准备批量插入 + // 👇 步骤 3:准备批量插入 try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) { + // 👇 新增:收集新记录列表 + List newRecords = new ArrayList<>(); + for (RecordData data : batchRecords) { if (validFourTypes.contains(data.gtype)) { logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid); @@ -251,28 +255,25 @@ public class MysqlServiceImpl implements MysqlService { logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid); continue; } - // 👇 跳过已存在的记录(避免重复更新用户余额) if (existingUids.contains(data.uid)) { logger.debug("跳过重复记录,uid={}", data.uid); continue; } + // 👇 新增:添加到新记录列表 + newRecords.add(data); + // 👇 设置插入参数 setStatementParams(mysqlStmt, data); - mysqlStmt.addBatch(); - - // 👇 只有新记录才更新用户余额(关键修复!) updateUserBalance(mysqlConn, data); } - // 👇 执行批量插入 int[] results = mysqlStmt.executeBatch(); logger.info("成功插入新记录 {} 条", results.length); - // 👉【新增一行调用】👈 - // 传入当前连接 + 本批次记录,自动完成路由更新 - updateUserRegionWallet(mysqlConn, batchRecords); + // ✅ 修正:只传新记录 + updateUserRegionWallet(mysqlConn, newRecords); } offset += pageSize; @@ -303,6 +304,9 @@ public class MysqlServiceImpl implements MysqlService { data.free = rs.getInt("free"); data.core_jb = rs.getInt("core_jb"); data.buy_jb = rs.getInt("buy_jb"); + // 👇👇👇 新增:permanent_gold = buy_jb 👇👇👇 + data.permanent_gold = data.buy_jb; +// 👆👆👆 新增结束 👆👆👆 data.cz_time = rs.getTimestamp("cz_time"); data.cz_user = rs.getString("cz_user"); data.cz_bz = rs.getString("cz_bz"); @@ -569,45 +573,142 @@ public class MysqlServiceImpl implements MysqlService { /** * 查 g_order 获取 pay_style,路由更新 user_region_wallet 余额 - * @param mysqlConn 当前事务连接(⚠️必须使用此连接) - * @param records 本批次有效记录 + * 逻辑: + * - 充值(validZeroTypes):permanent_gold > 0,linkId 需为 G+数字格式,查 g_order 路由 wallet_id 累加 + * - 消费(validOneTypes):permanent_gold < 0,❌ 不判断 G 格式 ❌ 取绝对值按 wallet_id 从小到大扣减 + * ⚠️ 钱包更新是附加逻辑,异常不影响金币主流程 */ /** * 查 g_order 获取 pay_style,路由更新 user_region_wallet 余额 - * 逻辑:仅当 linkId 是 G+ 自增 id 格式 且 g_order 查到了,才更新钱包;否则直接跳过 - * @param mysqlConn 当前事务连接(⚠️必须使用此连接) - * @param records 本批次有效记录 + * 【调试版】添加详细日志追踪 */ private void updateUserRegionWallet(Connection mysqlConn, List records) { + // 👇👇👇 强制输出(绕过日志配置,确保能看到)👇👇👇 + System.out.println("🚨🚨🚨 [钱包更新] updateUserRegionWallet 被调用!records大小=" + (records == null ? 0 : records.size())); + System.err.println("🚨🚨🚨 [钱包更新] updateUserRegionWallet 被调用!records大小=" + (records == null ? 0 : records.size())); + if (records == null || records.isEmpty()) { logger.debug("updateUserRegionWallet: records 为空,跳过钱包更新"); return; } - // 1. 收集符合 G+ 数字 格式的 order_id(只处理 permanent_gold > 0 的记录) - // ⚠️ 注意:这里用 r.linkId 而不是 r.uid - Map orderIdRecordMap = new HashMap<>(); - for (RecordData r : records) { - if (r.permanent_gold == null || r.permanent_gold <= 0) continue; - // ⚠️ 关键修改:用 linkId 判断 G+ 格式 - if (r.linkId != null && r.linkId.matches("^G\\d+$")) { + // 1. 按业务类型分离记录 + 详细日志追踪 + Map rechargeOrderIdMap = new HashMap<>(); + Map jwcodeConsumeTotalMap = new HashMap<>(); + + int totalRecords = records.size(); + int filteredByType = 0; // gtype 不属于充值/消费 + int filteredByGold = 0; // permanent_gold 不符合正负要求 + int filteredByLinkId = 0; // 充值记录 linkId 格式不符 + int rechargeCandidate = 0; // 充值候选数 + int consumeCandidate = 0; // 消费候选数 + + logger.info("🔍 [钱包更新] 开始分析 {} 条记录", totalRecords); + + for (int i = 0; i < records.size(); i++) { + RecordData r = records.get(i); + + // 记录每条数据的关键字段 + logger.info("🔍 [记录#{}/{}] gtype={}, permanent_gold={}, linkId='{}', jwcode={}", + i+1, totalRecords, r.gtype, r.permanent_gold, r.linkId, r.jwcode); + + // 👉 充值类型判断 + if (validZeroTypes.contains(r.gtype)) { + logger.debug(" → 属于充值类型 (validZeroTypes)"); + + if (r.permanent_gold == null) { + logger.info(" ⏭️ [过滤] permanent_gold 为 null"); + filteredByGold++; + continue; + } + if (r.permanent_gold <= 0) { + logger.info(" ⏭️ [过滤] 充值记录 permanent_gold={} <= 0", r.permanent_gold); + filteredByGold++; + continue; + } + + if (r.linkId == null) { + logger.info(" ⏭️ [过滤] 充值记录 linkId 为 null"); + filteredByLinkId++; + continue; + } + if (!r.linkId.matches("^G\\d+$")) { + logger.info(" ⏭️ [过滤] 充值记录 linkId='{}' 不符合 G+数字格式", r.linkId); + filteredByLinkId++; + continue; + } + + // ✅ 充值候选 try { long orderId = Long.parseLong(r.linkId.substring(1)); - orderIdRecordMap.put(orderId, r); - logger.debug("提取 order_id={} from linkId={}, jwcode={}", orderId, r.linkId, r.jwcode); + rechargeOrderIdMap.put(orderId, r); + rechargeCandidate++; + logger.info(" ✅ [充值候选] orderId={}, permanent_gold={}, jwcode={}", + orderId, r.permanent_gold, r.jwcode); } catch (NumberFormatException e) { - logger.debug("linkId={} 格式解析失败,跳过钱包更新", r.linkId); + logger.warn(" ⚠️ [异常] linkId='{}' 解析 orderId 失败", r.linkId, e); + filteredByLinkId++; } } - // 非 G+ 格式的记录,直接跳过钱包更新(不查 g_order) + // 👉 消费类型判断 + else if (validOneTypes.contains(r.gtype)) { + logger.debug(" → 属于消费类型 (validOneTypes)"); + + if (r.permanent_gold == null) { + logger.info(" ⏭️ [过滤] permanent_gold 为 null"); + filteredByGold++; + continue; + } + if (r.permanent_gold >= 0) { + logger.info(" ⏭️ [过滤] 消费记录 permanent_gold={} >= 0 (应该是负数)", r.permanent_gold); + filteredByGold++; + continue; + } + + // ✅ 消费候选(❌ 不判断 G 格式) + int consumeAmount = Math.abs(r.permanent_gold); + jwcodeConsumeTotalMap.merge(r.jwcode, consumeAmount, Integer::sum); + consumeCandidate++; + logger.info(" ✅ [消费候选] consumeAmount={}, jwcode={}, 累计扣减={}", + consumeAmount, r.jwcode, jwcodeConsumeTotalMap.get(r.jwcode)); + } + // 👉 其他类型 + else { + logger.debug(" → gtype={} 不属于充值/消费类型", r.gtype); + filteredByType++; + } } - if (orderIdRecordMap.isEmpty()) { - logger.info("无符合 G+ 格式的记录,跳过钱包更新"); - return; + // 📊 统计汇总 + logger.info("📊 [钱包更新统计] 总记录={}, 类型过滤={}, 金额过滤={}, linkId过滤={}, 充值候选={}, 消费候选={}", + totalRecords, filteredByType, filteredByGold, filteredByLinkId, rechargeCandidate, consumeCandidate); + + // 2. 处理充值记录 + if (!rechargeOrderIdMap.isEmpty()) { + logger.info("💰 开始处理 {} 条充值记录", rechargeOrderIdMap.size()); + processRechargeWallet(mysqlConn, rechargeOrderIdMap); + } else { + logger.info("💰 无充值记录需要处理"); + } + + // 3. 处理消费记录 + if (!jwcodeConsumeTotalMap.isEmpty()) { + logger.info("💸 开始处理 {} 个用户的消费扣减", jwcodeConsumeTotalMap.size()); + processConsumeWallet(mysqlConn, jwcodeConsumeTotalMap); + } else { + logger.info("💸 无消费记录需要处理"); } - // 2. 批量查 g_order 获取 pay_style(按 id 查询 + state=1) + logger.info("✅ [钱包更新] 方法执行完成"); + } + + /** + * 处理充值:按 pay_style 路由到 wallet_id,累加 permanent_gold + * 【调试版】添加详细日志 + */ + private void processRechargeWallet(Connection mysqlConn, Map orderIdRecordMap) { + logger.info("💰 [充值处理] 开始,订单数={}", orderIdRecordMap.size()); + List orderIds = new ArrayList<>(orderIdRecordMap.keySet()); String inSql = String.join(",", Collections.nCopies(orderIds.size(), "?")); String querySql = "SELECT id, jwcode, pay_style FROM g_order WHERE id IN (" + inSql + ") AND state = 1"; @@ -615,76 +716,196 @@ public class MysqlServiceImpl implements MysqlService { Map jwcodePayStyleMap = new HashMap<>(); try (PreparedStatement stmt = mysqlConn.prepareStatement(querySql)) { for (int i = 0; i < orderIds.size(); i++) { - stmt.setLong(i + 1, orderIds.get(i)); // ⚠️ g_order.id 可能是 bigint,用 setLong + stmt.setLong(i + 1, orderIds.get(i)); } try (ResultSet rs = stmt.executeQuery()) { while (rs.next()) { int jwcode = rs.getInt("jwcode"); int payStyle = rs.getInt("pay_style"); jwcodePayStyleMap.put(jwcode, payStyle); + logger.debug("💰 [g_order匹配] id={}, jwcode={}, pay_style={}", + rs.getLong("id"), jwcode, payStyle); } } } catch (SQLException e) { - // ⚠️ 关键:查库异常只记录日志,不抛异常,避免影响主流程(金币更新照常) - logger.error("查询 g_order 失败,跳过钱包更新", e); + logger.error("💰 [充值处理] 查询 g_order 失败", e); return; } if (jwcodePayStyleMap.isEmpty()) { - logger.info("g_order 未匹配到支付记录,跳过钱包更新(用户金币仍会更新)"); + logger.warn("💰 [充值处理] g_order 未匹配到任何支付记录,跳过"); return; } + logger.info("💰 [充值处理] g_order 匹配成功 {} 条", jwcodePayStyleMap.size()); - // 3. 按 pay_style 路由到 wallet_id,累加 permanent_gold Map userWalletGoldMap = new HashMap<>(); for (RecordData data : orderIdRecordMap.values()) { Integer payStyle = jwcodePayStyleMap.get(data.jwcode); if (payStyle == null) { - logger.debug("jwcode={} 未在 g_order 中匹配到 pay_style,跳过钱包更新", data.jwcode); + logger.debug("💰 [跳过] jwcode={} 未匹配到 pay_style", data.jwcode); continue; } - Integer walletId = mapPayStyleToWalletId(payStyle); if (walletId == null) { - logger.debug("payStyle={} 未映射到 wallet_id,跳过钱包更新", payStyle); + logger.debug("💰 [跳过] payStyle={} 未映射到 wallet_id", payStyle); continue; } - String key = data.jwcode + "_" + walletId; userWalletGoldMap.merge(key, data.permanent_gold, Integer::sum); + logger.info("💰 [路由成功] jwcode={}, payStyle={}, walletId={}, amount={}", + data.jwcode, payStyle, walletId, data.permanent_gold); } if (userWalletGoldMap.isEmpty()) { - logger.info("无有效记录需要更新地区钱包"); + logger.info("💰 [充值处理] 无有效记录需要更新钱包"); return; } + logger.info("💰 [充值处理] 待更新钱包记录 {} 条", userWalletGoldMap.size()); - // 4. UPSERT 批量更新 user_region_wallet String upsertSql = """ - INSERT INTO user_region_wallet - (jwcode, wallet_id, current_permanent_gold, create_time, update_time) - VALUES (?, ?, ?, NOW(), NOW()) - ON DUPLICATE KEY UPDATE - current_permanent_gold = current_permanent_gold + VALUES(current_permanent_gold), - update_time = NOW() - """; + INSERT INTO user_region_wallet + (jwcode, wallet_id, current_permanent_gold, create_time, update_time) + VALUES (?, ?, ?, NOW(), NOW()) + ON DUPLICATE KEY UPDATE + current_permanent_gold = current_permanent_gold + VALUES(current_permanent_gold), + update_time = NOW() + """; try (PreparedStatement stmt = mysqlConn.prepareStatement(upsertSql)) { for (Map.Entry entry : userWalletGoldMap.entrySet()) { String[] parts = entry.getKey().split("_"); int jwcode = Integer.parseInt(parts[0]); int walletId = Integer.parseInt(parts[1]); - stmt.setInt(1, jwcode); stmt.setString(2, String.valueOf(walletId)); stmt.setInt(3, entry.getValue()); stmt.addBatch(); } int[] results = stmt.executeBatch(); - logger.info("✅ 地区钱包余额更新成功 {} 条", results.length); + logger.info("✅ [充值完成] 地区钱包充值更新成功 {} 条", results.length); } catch (SQLException e) { - // ⚠️ 关键:更新异常只记录日志,不抛异常,避免回滚主事务 - logger.error("更新 user_region_wallet 失败,跳过钱包更新", e); + logger.error("❌ [充值失败] 更新 user_region_wallet 异常", e); + } + } + + /** + * 处理消费:按 wallet_id 从小到大顺序扣减 permanent_gold + * 【调试版】添加详细日志 + */ + private void processConsumeWallet(Connection mysqlConn, Map jwcodeConsumeMap) { + logger.info("💸 [消费处理] 开始,用户数={}", jwcodeConsumeMap.size()); + + for (Map.Entry entry : jwcodeConsumeMap.entrySet()) { + int jwcode = entry.getKey(); + int totalConsume = entry.getValue(); + + logger.info("💸 [消费用户] jwcode={}, 待扣总额={}", jwcode, totalConsume); + + try { + // 1. 查询用户有余额的钱包 + List wallets = queryUserWallets(mysqlConn, jwcode); + if (wallets.isEmpty()) { + logger.warn("💸 [跳过] 用户 {} 无可用地区钱包(余额>0)", jwcode); + continue; + } + logger.info("💸 [钱包列表] 用户 {} 有 {} 个有余额的钱包: {}", + jwcode, wallets.size(), + wallets.stream().map(w -> "walletId=" + w.walletId + ":余额" + w.balance) + .collect(Collectors.joining(", "))); + + // 2. 计算扣减方案 + int remaining = totalConsume; + List deductList = new ArrayList<>(); + + for (WalletInfo wallet : wallets) { + if (remaining <= 0) break; + int deductAmount = Math.min(wallet.balance, remaining); + if (deductAmount > 0) { + deductList.add(new WalletDeduct(wallet.walletId, deductAmount)); + remaining -= deductAmount; + logger.debug("💸 [扣减方案] walletId={}, 扣减={}, 剩余待扣={}", + wallet.walletId, deductAmount, remaining); + } + } + + if (remaining > 0) { + logger.warn("💸 [余额不足] 用户 {} 钱包余额不足,待扣: {}, 已安排: {}", + jwcode, remaining, totalConsume - remaining); + } + + // 3. 批量执行扣减 + if (!deductList.isEmpty()) { + executeWalletDeducts(mysqlConn, jwcode, deductList); + logger.info("✅ [消费完成] 用户 {} 扣减成功,总额: {}, 钱包数: {}", + jwcode, totalConsume, deductList.size()); + } else { + logger.info("💸 [无操作] 用户 {} 无扣减方案", jwcode); + } + + } catch (SQLException e) { + logger.error("❌ [消费失败] 用户 {} 扣减异常", jwcode, e); + } + } + logger.info("💸 [消费处理] 执行完成"); + } + + /** + * 查询用户有余额的地区钱包(按 wallet_id 升序) + */ + private List queryUserWallets(Connection conn, int jwcode) throws SQLException { + String sql = "SELECT wallet_id, current_permanent_gold FROM user_region_wallet WHERE jwcode = ? AND current_permanent_gold > 0 ORDER BY wallet_id ASC"; + List result = new ArrayList<>(); + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setInt(1, jwcode); + logger.debug("🔍 [查询钱包] SQL: {}, jwcode={}", sql, jwcode); + try (ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + WalletInfo info = new WalletInfo(); + info.walletId = rs.getInt("wallet_id"); + info.balance = rs.getInt("current_permanent_gold"); + result.add(info); + logger.debug(" 📦 [钱包] walletId={}, balance={}", info.walletId, info.balance); + } + } + } + return result; + } + + /** + * 批量执行钱包扣减(乐观锁防并发) + */ + private void executeWalletDeducts(Connection conn, int jwcode, List deducts) throws SQLException { + String sql = "UPDATE user_region_wallet SET current_permanent_gold = current_permanent_gold - ?, update_time = NOW() WHERE jwcode = ? AND wallet_id = ? AND current_permanent_gold >= ?"; + + logger.debug("🔧 [执行扣减] SQL: {}, 扣减数={}", sql, deducts.size()); + + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + for (WalletDeduct deduct : deducts) { + stmt.setInt(1, deduct.amount); + stmt.setInt(2, jwcode); + stmt.setString(3, String.valueOf(deduct.walletId)); + stmt.setInt(4, deduct.amount); + stmt.addBatch(); + logger.debug(" ➖ [扣减] walletId={}, amount={}", deduct.walletId, deduct.amount); + } + int[] results = stmt.executeBatch(); + logger.debug("✅ [扣减执行] 成功 {} 条", results.length); + } + } + + // 钱包余额信息(内部辅助类) + static class WalletInfo { + int walletId; + int balance; + } + + // 扣减指令(内部辅助类) + static class WalletDeduct { + int walletId; + int amount; + WalletDeduct(int walletId, int amount) { + this.walletId = walletId; + this.amount = amount; } }