Browse Source

3.26 钱包余额数据同步

milestone-20260224-现金钱包
huangqizhen 2 weeks ago
parent
commit
287cb02681
  1. 323
      src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java

323
src/main/java/com/example/demo/Mysql/MysqlServiceImpl.java

@ -131,6 +131,9 @@ public class MysqlServiceImpl implements MysqlService {
// 👇 步骤 3准备批量插入 // 👇 步骤 3准备批量插入
try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) { try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) {
// 👇 新增收集新记录列表
List<RecordData> newRecords = new ArrayList<>();
for (RecordData data : batchRecords) { for (RecordData data : batchRecords) {
if (validFourTypes.contains(data.gtype)) { if (validFourTypes.contains(data.gtype)) {
logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid); logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid);
@ -140,27 +143,25 @@ public class MysqlServiceImpl implements MysqlService {
logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid); logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid);
continue; continue;
} }
// 👇 跳过已存在的记录避免重复更新用户余额
if (existingUids.contains(data.uid)) { if (existingUids.contains(data.uid)) {
logger.debug("跳过重复记录,uid={}", data.uid); logger.debug("跳过重复记录,uid={}", data.uid);
continue; continue;
} }
// 👇 新增添加到新记录列表
newRecords.add(data);
// 👇 设置插入参数 // 👇 设置插入参数
setStatementParams(mysqlStmt, data); setStatementParams(mysqlStmt, data);
mysqlStmt.addBatch(); mysqlStmt.addBatch();
// 👇 只有新记录才更新用户余额关键修复
updateUserBalance(mysqlConn, data); updateUserBalance(mysqlConn, data);
} }
// 👇 执行批量插入
int[] results = mysqlStmt.executeBatch(); int[] results = mysqlStmt.executeBatch();
logger.info("成功插入新记录 {} 条", results.length); logger.info("成功插入新记录 {} 条", results.length);
// 👉新增一行调用👈
// 传入当前连接 + 本批次记录自动完成路由更新
updateUserRegionWallet(mysqlConn, batchRecords);
// 修正只传新记录15
updateUserRegionWallet(mysqlConn, newRecords);
} }
offset += pageSize; offset += pageSize;
@ -242,6 +243,9 @@ public class MysqlServiceImpl implements MysqlService {
// 👇 步骤 3准备批量插入 // 👇 步骤 3准备批量插入
try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) { try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) {
// 👇 新增收集新记录列表
List<RecordData> newRecords = new ArrayList<>();
for (RecordData data : batchRecords) { for (RecordData data : batchRecords) {
if (validFourTypes.contains(data.gtype)) { if (validFourTypes.contains(data.gtype)) {
logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid); logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid);
@ -251,28 +255,25 @@ public class MysqlServiceImpl implements MysqlService {
logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid); logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid);
continue; continue;
} }
// 👇 跳过已存在的记录避免重复更新用户余额
if (existingUids.contains(data.uid)) { if (existingUids.contains(data.uid)) {
logger.debug("跳过重复记录,uid={}", data.uid); logger.debug("跳过重复记录,uid={}", data.uid);
continue; continue;
} }
// 👇 新增添加到新记录列表
newRecords.add(data);
// 👇 设置插入参数 // 👇 设置插入参数
setStatementParams(mysqlStmt, data); setStatementParams(mysqlStmt, data);
mysqlStmt.addBatch(); mysqlStmt.addBatch();
// 👇 只有新记录才更新用户余额关键修复
updateUserBalance(mysqlConn, data); updateUserBalance(mysqlConn, data);
} }
// 👇 执行批量插入
int[] results = mysqlStmt.executeBatch(); int[] results = mysqlStmt.executeBatch();
logger.info("成功插入新记录 {} 条", results.length); logger.info("成功插入新记录 {} 条", results.length);
// 👉新增一行调用👈
// 传入当前连接 + 本批次记录自动完成路由更新
updateUserRegionWallet(mysqlConn, batchRecords);
// 修正只传新记录
updateUserRegionWallet(mysqlConn, newRecords);
} }
offset += pageSize; offset += pageSize;
@ -303,6 +304,9 @@ public class MysqlServiceImpl implements MysqlService {
data.free = rs.getInt("free"); data.free = rs.getInt("free");
data.core_jb = rs.getInt("core_jb"); data.core_jb = rs.getInt("core_jb");
data.buy_jb = rs.getInt("buy_jb"); data.buy_jb = rs.getInt("buy_jb");
// 👇👇👇 新增permanent_gold = buy_jb 👇👇👇
data.permanent_gold = data.buy_jb;
// 👆👆👆 新增结束 👆👆👆
data.cz_time = rs.getTimestamp("cz_time"); data.cz_time = rs.getTimestamp("cz_time");
data.cz_user = rs.getString("cz_user"); data.cz_user = rs.getString("cz_user");
data.cz_bz = rs.getString("cz_bz"); data.cz_bz = rs.getString("cz_bz");
@ -569,45 +573,142 @@ public class MysqlServiceImpl implements MysqlService {
/** /**
* g_order 获取 pay_style路由更新 user_region_wallet 余额 * g_order 获取 pay_style路由更新 user_region_wallet 余额
* @param mysqlConn 当前事务连接️必须使用此连接
* @param records 本批次有效记录
* 逻辑
* - 充值validZeroTypespermanent_gold > 0linkId 需为 G+数字格式 g_order 路由 wallet_id 累加
* - 消费validOneTypespermanent_gold < 0 不判断 G 格式 取绝对值按 wallet_id 从小到大扣减
* 钱包更新是附加逻辑异常不影响金币主流程
*/ */
/** /**
* g_order 获取 pay_style路由更新 user_region_wallet 余额 * g_order 获取 pay_style路由更新 user_region_wallet 余额
* 逻辑仅当 linkId G+ 自增 id 格式 g_order 查到了才更新钱包否则直接跳过
* @param mysqlConn 当前事务连接️必须使用此连接
* @param records 本批次有效记录
* 调试版添加详细日志追踪
*/ */
private void updateUserRegionWallet(Connection mysqlConn, List<RecordData> records) { private void updateUserRegionWallet(Connection mysqlConn, List<RecordData> records) {
// 👇👇👇 强制输出绕过日志配置确保能看到👇👇👇
System.out.println("🚨🚨🚨 [钱包更新] updateUserRegionWallet 被调用!records大小=" + (records == null ? 0 : records.size()));
System.err.println("🚨🚨🚨 [钱包更新] updateUserRegionWallet 被调用!records大小=" + (records == null ? 0 : records.size()));
if (records == null || records.isEmpty()) { if (records == null || records.isEmpty()) {
logger.debug("updateUserRegionWallet: records 为空,跳过钱包更新"); logger.debug("updateUserRegionWallet: records 为空,跳过钱包更新");
return; return;
} }
// 1. 收集符合 G+ 数字 格式的 order_id只处理 permanent_gold > 0 的记录
// 注意这里用 r.linkId 而不是 r.uid
Map<Long, RecordData> orderIdRecordMap = new HashMap<>();
for (RecordData r : records) {
if (r.permanent_gold == null || r.permanent_gold <= 0) continue;
// 关键修改 linkId 判断 G+ 格式
if (r.linkId != null && r.linkId.matches("^G\\d+$")) {
// 1. 按业务类型分离记录 + 详细日志追踪
Map<Long, RecordData> rechargeOrderIdMap = new HashMap<>();
Map<Integer, Integer> jwcodeConsumeTotalMap = new HashMap<>();
int totalRecords = records.size();
int filteredByType = 0; // gtype 不属于充值/消费
int filteredByGold = 0; // permanent_gold 不符合正负要求
int filteredByLinkId = 0; // 充值记录 linkId 格式不符
int rechargeCandidate = 0; // 充值候选数
int consumeCandidate = 0; // 消费候选数
logger.info("🔍 [钱包更新] 开始分析 {} 条记录", totalRecords);
for (int i = 0; i < records.size(); i++) {
RecordData r = records.get(i);
// 记录每条数据的关键字段
logger.info("🔍 [记录#{}/{}] gtype={}, permanent_gold={}, linkId='{}', jwcode={}",
i+1, totalRecords, r.gtype, r.permanent_gold, r.linkId, r.jwcode);
// 👉 充值类型判断
if (validZeroTypes.contains(r.gtype)) {
logger.debug(" → 属于充值类型 (validZeroTypes)");
if (r.permanent_gold == null) {
logger.info(" ⏭️ [过滤] permanent_gold 为 null");
filteredByGold++;
continue;
}
if (r.permanent_gold <= 0) {
logger.info(" ⏭️ [过滤] 充值记录 permanent_gold={} <= 0", r.permanent_gold);
filteredByGold++;
continue;
}
if (r.linkId == null) {
logger.info(" ⏭️ [过滤] 充值记录 linkId 为 null");
filteredByLinkId++;
continue;
}
if (!r.linkId.matches("^G\\d+$")) {
logger.info(" ⏭️ [过滤] 充值记录 linkId='{}' 不符合 G+数字格式", r.linkId);
filteredByLinkId++;
continue;
}
// 充值候选
try { try {
long orderId = Long.parseLong(r.linkId.substring(1)); long orderId = Long.parseLong(r.linkId.substring(1));
orderIdRecordMap.put(orderId, r);
logger.debug("提取 order_id={} from linkId={}, jwcode={}", orderId, r.linkId, r.jwcode);
rechargeOrderIdMap.put(orderId, r);
rechargeCandidate++;
logger.info(" ✅ [充值候选] orderId={}, permanent_gold={}, jwcode={}",
orderId, r.permanent_gold, r.jwcode);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
logger.debug("linkId={} 格式解析失败,跳过钱包更新", r.linkId);
logger.warn(" ⚠️ [异常] linkId='{}' 解析 orderId 失败", r.linkId, e);
filteredByLinkId++;
}
} }
// 👉 消费类型判断
else if (validOneTypes.contains(r.gtype)) {
logger.debug(" → 属于消费类型 (validOneTypes)");
if (r.permanent_gold == null) {
logger.info(" ⏭️ [过滤] permanent_gold 为 null");
filteredByGold++;
continue;
} }
// G+ 格式的记录直接跳过钱包更新不查 g_order
if (r.permanent_gold >= 0) {
logger.info(" ⏭️ [过滤] 消费记录 permanent_gold={} >= 0 (应该是负数)", r.permanent_gold);
filteredByGold++;
continue;
} }
if (orderIdRecordMap.isEmpty()) {
logger.info("无符合 G+ 格式的记录,跳过钱包更新");
return;
// 消费候选 不判断 G 格式
int consumeAmount = Math.abs(r.permanent_gold);
jwcodeConsumeTotalMap.merge(r.jwcode, consumeAmount, Integer::sum);
consumeCandidate++;
logger.info(" ✅ [消费候选] consumeAmount={}, jwcode={}, 累计扣减={}",
consumeAmount, r.jwcode, jwcodeConsumeTotalMap.get(r.jwcode));
}
// 👉 其他类型
else {
logger.debug(" → gtype={} 不属于充值/消费类型", r.gtype);
filteredByType++;
}
}
// 📊 统计汇总
logger.info("📊 [钱包更新统计] 总记录={}, 类型过滤={}, 金额过滤={}, linkId过滤={}, 充值候选={}, 消费候选={}",
totalRecords, filteredByType, filteredByGold, filteredByLinkId, rechargeCandidate, consumeCandidate);
// 2. 处理充值记录
if (!rechargeOrderIdMap.isEmpty()) {
logger.info("💰 开始处理 {} 条充值记录", rechargeOrderIdMap.size());
processRechargeWallet(mysqlConn, rechargeOrderIdMap);
} else {
logger.info("💰 无充值记录需要处理");
} }
// 2. 批量查 g_order 获取 pay_style id 查询 + state=1
// 3. 处理消费记录
if (!jwcodeConsumeTotalMap.isEmpty()) {
logger.info("💸 开始处理 {} 个用户的消费扣减", jwcodeConsumeTotalMap.size());
processConsumeWallet(mysqlConn, jwcodeConsumeTotalMap);
} else {
logger.info("💸 无消费记录需要处理");
}
logger.info("✅ [钱包更新] 方法执行完成");
}
/**
* 处理充值 pay_style 路由到 wallet_id累加 permanent_gold
* 调试版添加详细日志
*/
private void processRechargeWallet(Connection mysqlConn, Map<Long, RecordData> orderIdRecordMap) {
logger.info("💰 [充值处理] 开始,订单数={}", orderIdRecordMap.size());
List<Long> orderIds = new ArrayList<>(orderIdRecordMap.keySet()); List<Long> orderIds = new ArrayList<>(orderIdRecordMap.keySet());
String inSql = String.join(",", Collections.nCopies(orderIds.size(), "?")); String inSql = String.join(",", Collections.nCopies(orderIds.size(), "?"));
String querySql = "SELECT id, jwcode, pay_style FROM g_order WHERE id IN (" + inSql + ") AND state = 1"; String querySql = "SELECT id, jwcode, pay_style FROM g_order WHERE id IN (" + inSql + ") AND state = 1";
@ -615,51 +716,52 @@ public class MysqlServiceImpl implements MysqlService {
Map<Integer, Integer> jwcodePayStyleMap = new HashMap<>(); Map<Integer, Integer> jwcodePayStyleMap = new HashMap<>();
try (PreparedStatement stmt = mysqlConn.prepareStatement(querySql)) { try (PreparedStatement stmt = mysqlConn.prepareStatement(querySql)) {
for (int i = 0; i < orderIds.size(); i++) { for (int i = 0; i < orderIds.size(); i++) {
stmt.setLong(i + 1, orderIds.get(i)); // g_order.id 可能是 bigint setLong
stmt.setLong(i + 1, orderIds.get(i));
} }
try (ResultSet rs = stmt.executeQuery()) { try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) { while (rs.next()) {
int jwcode = rs.getInt("jwcode"); int jwcode = rs.getInt("jwcode");
int payStyle = rs.getInt("pay_style"); int payStyle = rs.getInt("pay_style");
jwcodePayStyleMap.put(jwcode, payStyle); jwcodePayStyleMap.put(jwcode, payStyle);
logger.debug("💰 [g_order匹配] id={}, jwcode={}, pay_style={}",
rs.getLong("id"), jwcode, payStyle);
} }
} }
} catch (SQLException e) { } catch (SQLException e) {
// 关键查库异常只记录日志不抛异常避免影响主流程金币更新照常
logger.error("查询 g_order 失败,跳过钱包更新", e);
logger.error("💰 [充值处理] 查询 g_order 失败", e);
return; return;
} }
if (jwcodePayStyleMap.isEmpty()) { if (jwcodePayStyleMap.isEmpty()) {
logger.info("g_order 未匹配到支付记录,跳过钱包更新(用户金币仍会更新)");
logger.warn("💰 [充值处理] g_order 未匹配到任何支付记录,跳过");
return; return;
} }
logger.info("💰 [充值处理] g_order 匹配成功 {} 条", jwcodePayStyleMap.size());
// 3. pay_style 路由到 wallet_id累加 permanent_gold
Map<String, Integer> userWalletGoldMap = new HashMap<>(); Map<String, Integer> userWalletGoldMap = new HashMap<>();
for (RecordData data : orderIdRecordMap.values()) { for (RecordData data : orderIdRecordMap.values()) {
Integer payStyle = jwcodePayStyleMap.get(data.jwcode); Integer payStyle = jwcodePayStyleMap.get(data.jwcode);
if (payStyle == null) { if (payStyle == null) {
logger.debug("jwcode={} 未在 g_order 中匹配到 pay_style,跳过钱包更新", data.jwcode);
logger.debug("💰 [跳过] jwcode={} 未匹配到 pay_style", data.jwcode);
continue; continue;
} }
Integer walletId = mapPayStyleToWalletId(payStyle); Integer walletId = mapPayStyleToWalletId(payStyle);
if (walletId == null) { if (walletId == null) {
logger.debug("payStyle={} 未映射到 wallet_id,跳过钱包更新", payStyle);
logger.debug("💰 [跳过] payStyle={} 未映射到 wallet_id", payStyle);
continue; continue;
} }
String key = data.jwcode + "_" + walletId; String key = data.jwcode + "_" + walletId;
userWalletGoldMap.merge(key, data.permanent_gold, Integer::sum); userWalletGoldMap.merge(key, data.permanent_gold, Integer::sum);
logger.info("💰 [路由成功] jwcode={}, payStyle={}, walletId={}, amount={}",
data.jwcode, payStyle, walletId, data.permanent_gold);
} }
if (userWalletGoldMap.isEmpty()) { if (userWalletGoldMap.isEmpty()) {
logger.info("无有效记录需要更新地区钱包");
logger.info("💰 [充值处理] 无有效记录需要更新钱包");
return; return;
} }
logger.info("💰 [充值处理] 待更新钱包记录 {} 条", userWalletGoldMap.size());
// 4. UPSERT 批量更新 user_region_wallet
String upsertSql = """ String upsertSql = """
INSERT INTO user_region_wallet INSERT INTO user_region_wallet
(jwcode, wallet_id, current_permanent_gold, create_time, update_time) (jwcode, wallet_id, current_permanent_gold, create_time, update_time)
@ -674,17 +776,136 @@ public class MysqlServiceImpl implements MysqlService {
String[] parts = entry.getKey().split("_"); String[] parts = entry.getKey().split("_");
int jwcode = Integer.parseInt(parts[0]); int jwcode = Integer.parseInt(parts[0]);
int walletId = Integer.parseInt(parts[1]); int walletId = Integer.parseInt(parts[1]);
stmt.setInt(1, jwcode); stmt.setInt(1, jwcode);
stmt.setString(2, String.valueOf(walletId)); stmt.setString(2, String.valueOf(walletId));
stmt.setInt(3, entry.getValue()); stmt.setInt(3, entry.getValue());
stmt.addBatch(); stmt.addBatch();
} }
int[] results = stmt.executeBatch(); int[] results = stmt.executeBatch();
logger.info("✅ 地区钱包余额更新成功 {} 条", results.length);
logger.info("✅ [充值完成] 地区钱包充值更新成功 {} 条", results.length);
} catch (SQLException e) {
logger.error("❌ [充值失败] 更新 user_region_wallet 异常", e);
}
}
/**
* 处理消费 wallet_id 从小到大顺序扣减 permanent_gold
* 调试版添加详细日志
*/
private void processConsumeWallet(Connection mysqlConn, Map<Integer, Integer> jwcodeConsumeMap) {
logger.info("💸 [消费处理] 开始,用户数={}", jwcodeConsumeMap.size());
for (Map.Entry<Integer, Integer> entry : jwcodeConsumeMap.entrySet()) {
int jwcode = entry.getKey();
int totalConsume = entry.getValue();
logger.info("💸 [消费用户] jwcode={}, 待扣总额={}", jwcode, totalConsume);
try {
// 1. 查询用户有余额的钱包
List<WalletInfo> wallets = queryUserWallets(mysqlConn, jwcode);
if (wallets.isEmpty()) {
logger.warn("💸 [跳过] 用户 {} 无可用地区钱包(余额>0)", jwcode);
continue;
}
logger.info("💸 [钱包列表] 用户 {} 有 {} 个有余额的钱包: {}",
jwcode, wallets.size(),
wallets.stream().map(w -> "walletId=" + w.walletId + ":余额" + w.balance)
.collect(Collectors.joining(", ")));
// 2. 计算扣减方案
int remaining = totalConsume;
List<WalletDeduct> deductList = new ArrayList<>();
for (WalletInfo wallet : wallets) {
if (remaining <= 0) break;
int deductAmount = Math.min(wallet.balance, remaining);
if (deductAmount > 0) {
deductList.add(new WalletDeduct(wallet.walletId, deductAmount));
remaining -= deductAmount;
logger.debug("💸 [扣减方案] walletId={}, 扣减={}, 剩余待扣={}",
wallet.walletId, deductAmount, remaining);
}
}
if (remaining > 0) {
logger.warn("💸 [余额不足] 用户 {} 钱包余额不足,待扣: {}, 已安排: {}",
jwcode, remaining, totalConsume - remaining);
}
// 3. 批量执行扣减
if (!deductList.isEmpty()) {
executeWalletDeducts(mysqlConn, jwcode, deductList);
logger.info("✅ [消费完成] 用户 {} 扣减成功,总额: {}, 钱包数: {}",
jwcode, totalConsume, deductList.size());
} else {
logger.info("💸 [无操作] 用户 {} 无扣减方案", jwcode);
}
} catch (SQLException e) { } catch (SQLException e) {
// 关键更新异常只记录日志不抛异常避免回滚主事务
logger.error("更新 user_region_wallet 失败,跳过钱包更新", e);
logger.error("❌ [消费失败] 用户 {} 扣减异常", jwcode, e);
}
}
logger.info("💸 [消费处理] 执行完成");
}
/**
* 查询用户有余额的地区钱包 wallet_id 升序
*/
private List<WalletInfo> queryUserWallets(Connection conn, int jwcode) throws SQLException {
String sql = "SELECT wallet_id, current_permanent_gold FROM user_region_wallet WHERE jwcode = ? AND current_permanent_gold > 0 ORDER BY wallet_id ASC";
List<WalletInfo> result = new ArrayList<>();
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, jwcode);
logger.debug("🔍 [查询钱包] SQL: {}, jwcode={}", sql, jwcode);
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
WalletInfo info = new WalletInfo();
info.walletId = rs.getInt("wallet_id");
info.balance = rs.getInt("current_permanent_gold");
result.add(info);
logger.debug(" 📦 [钱包] walletId={}, balance={}", info.walletId, info.balance);
}
}
}
return result;
}
/**
* 批量执行钱包扣减乐观锁防并发
*/
private void executeWalletDeducts(Connection conn, int jwcode, List<WalletDeduct> deducts) throws SQLException {
String sql = "UPDATE user_region_wallet SET current_permanent_gold = current_permanent_gold - ?, update_time = NOW() WHERE jwcode = ? AND wallet_id = ? AND current_permanent_gold >= ?";
logger.debug("🔧 [执行扣减] SQL: {}, 扣减数={}", sql, deducts.size());
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
for (WalletDeduct deduct : deducts) {
stmt.setInt(1, deduct.amount);
stmt.setInt(2, jwcode);
stmt.setString(3, String.valueOf(deduct.walletId));
stmt.setInt(4, deduct.amount);
stmt.addBatch();
logger.debug(" ➖ [扣减] walletId={}, amount={}", deduct.walletId, deduct.amount);
}
int[] results = stmt.executeBatch();
logger.debug("✅ [扣减执行] 成功 {} 条", results.length);
}
}
// 钱包余额信息内部辅助类
static class WalletInfo {
int walletId;
int balance;
}
// 扣减指令内部辅助类
static class WalletDeduct {
int walletId;
int amount;
WalletDeduct(int walletId, int amount) {
this.walletId = walletId;
this.amount = amount;
} }
} }

Loading…
Cancel
Save