|
|
@ -27,6 +27,7 @@ import java.time.Month; |
|
|
import java.time.format.DateTimeFormatter; |
|
|
import java.time.format.DateTimeFormatter; |
|
|
import java.util.*; |
|
|
import java.util.*; |
|
|
import java.util.Date; |
|
|
import java.util.Date; |
|
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Service |
|
|
@Service |
|
|
@ -266,6 +267,10 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
// 👇 执行批量插入 |
|
|
// 👇 执行批量插入 |
|
|
int[] results = mysqlStmt.executeBatch(); |
|
|
int[] results = mysqlStmt.executeBatch(); |
|
|
logger.info("成功插入新记录 {} 条", results.length); |
|
|
logger.info("成功插入新记录 {} 条", results.length); |
|
|
|
|
|
|
|
|
|
|
|
// 👉【新增一行调用】👈 |
|
|
|
|
|
// 传入当前连接 + 本批次记录,自动完成路由更新 |
|
|
|
|
|
updateUserRegionWallet(mysqlConn, batchRecords); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
offset += pageSize; |
|
|
offset += pageSize; |
|
|
@ -281,10 +286,12 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
|
|
|
|
|
static class RecordData { |
|
|
static class RecordData { |
|
|
int gtype, jwcode, free, core_jb, buy_jb; |
|
|
int gtype, jwcode, free, core_jb, buy_jb; |
|
|
|
|
|
Integer permanent_gold; |
|
|
Timestamp cz_time; |
|
|
Timestamp cz_time; |
|
|
String cz_user, cz_bz, operation_platform, goods_name, uid; |
|
|
String cz_user, cz_bz, operation_platform, goods_name, uid; |
|
|
String orderNumber; // 预生成,避免重复计算 |
|
|
String orderNumber; // 预生成,避免重复计算 |
|
|
String linkId; |
|
|
String linkId; |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private RecordData extractRecordData(ResultSet rs) throws SQLException { |
|
|
private RecordData extractRecordData(ResultSet rs) throws SQLException { |
|
|
@ -552,4 +559,90 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
|
|
|
|
|
return defaultValue; |
|
|
return defaultValue; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
* 查 g_order 获取 pay_style,路由更新 user_region_wallet 余额 |
|
|
|
|
|
* @param mysqlConn 当前事务连接(⚠️必须使用此连接) |
|
|
|
|
|
* @param records 本批次有效记录 |
|
|
|
|
|
*/ |
|
|
|
|
|
private void updateUserRegionWallet(Connection mysqlConn, List<RecordData> records) { |
|
|
|
|
|
if (records == null || records.isEmpty()) return; |
|
|
|
|
|
|
|
|
|
|
|
// 1. 收集 jwcode 去重 |
|
|
|
|
|
List<Integer> jwcodes = records.stream() |
|
|
|
|
|
.map(r -> r.jwcode).filter(Objects::nonNull).distinct() |
|
|
|
|
|
.collect(Collectors.toList()); |
|
|
|
|
|
if (jwcodes.isEmpty()) return; |
|
|
|
|
|
|
|
|
|
|
|
// 2. 批量查 g_order 获取 pay_style(state=1 支付完成) |
|
|
|
|
|
String inSql = String.join(",", Collections.nCopies(jwcodes.size(), "?")); |
|
|
|
|
|
String querySql = "SELECT jwcode, pay_style FROM g_order WHERE jwcode IN (" + inSql + ") AND state = 1"; |
|
|
|
|
|
|
|
|
|
|
|
Map<Integer, Integer> jwcodePayStyleMap = new HashMap<>(); |
|
|
|
|
|
try (PreparedStatement stmt = mysqlConn.prepareStatement(querySql)) { |
|
|
|
|
|
for (int i = 0; i < jwcodes.size(); i++) stmt.setInt(i + 1, jwcodes.get(i)); |
|
|
|
|
|
try (ResultSet rs = stmt.executeQuery()) { |
|
|
|
|
|
while (rs.next()) jwcodePayStyleMap.put(rs.getInt("jwcode"), rs.getInt("pay_style")); |
|
|
|
|
|
} |
|
|
|
|
|
} catch (SQLException e) { |
|
|
|
|
|
throw new RuntimeException("查询 g_order 失败", e); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 3. 按 pay_style 路由到 wallet_id,累加永久金币 |
|
|
|
|
|
Map<String, Integer> userWalletGoldMap = new HashMap<>(); // key: "jwcode_walletId" -> 累加金额 |
|
|
|
|
|
for (RecordData data : records) { |
|
|
|
|
|
if (!jwcodePayStyleMap.containsKey(data.jwcode)) continue; |
|
|
|
|
|
if (data.permanent_gold == null || data.permanent_gold <= 0) continue; |
|
|
|
|
|
|
|
|
|
|
|
Integer payStyle = jwcodePayStyleMap.get(data.jwcode); |
|
|
|
|
|
Integer walletId = mapPayStyleToWalletId(payStyle); |
|
|
|
|
|
if (walletId == null) continue; |
|
|
|
|
|
|
|
|
|
|
|
String key = data.jwcode + "_" + walletId; |
|
|
|
|
|
userWalletGoldMap.merge(key, data.permanent_gold, Integer::sum); |
|
|
|
|
|
} |
|
|
|
|
|
if (userWalletGoldMap.isEmpty()) return; |
|
|
|
|
|
|
|
|
|
|
|
// 4. UPSERT 批量更新 user_region_wallet(不存在则插入,存在则累加) |
|
|
|
|
|
String upsertSql = """ |
|
|
|
|
|
INSERT INTO user_region_wallet |
|
|
|
|
|
(jwcode, wallet_id, current_permanent_gold, create_time, update_time) |
|
|
|
|
|
VALUES (?, ?, ?, NOW(), NOW()) |
|
|
|
|
|
ON DUPLICATE KEY UPDATE |
|
|
|
|
|
current_permanent_gold = current_permanent_gold + VALUES(current_permanent_gold), |
|
|
|
|
|
update_time = NOW() |
|
|
|
|
|
"""; |
|
|
|
|
|
|
|
|
|
|
|
try (PreparedStatement stmt = mysqlConn.prepareStatement(upsertSql)) { |
|
|
|
|
|
for (Map.Entry<String, Integer> entry : userWalletGoldMap.entrySet()) { |
|
|
|
|
|
String[] parts = entry.getKey().split("_"); |
|
|
|
|
|
int jwcode = Integer.parseInt(parts[0]); |
|
|
|
|
|
int walletId = Integer.parseInt(parts[1]); |
|
|
|
|
|
|
|
|
|
|
|
stmt.setInt(1, jwcode); |
|
|
|
|
|
stmt.setString(2, String.valueOf(walletId)); |
|
|
|
|
|
stmt.setInt(3, entry.getValue()); |
|
|
|
|
|
stmt.addBatch(); |
|
|
|
|
|
} |
|
|
|
|
|
int[] results = stmt.executeBatch(); |
|
|
|
|
|
logger.info("✅ 地区钱包余额更新 {} 条", results.length); |
|
|
|
|
|
} catch (SQLException e) { |
|
|
|
|
|
throw new RuntimeException("更新地区钱包失败", e); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
* pay_style → wallet_id 映射(⚠️ 请替换成你实际的 wallet_id) |
|
|
|
|
|
*/ |
|
|
|
|
|
private Integer mapPayStyleToWalletId(Integer payStyle) { |
|
|
|
|
|
if (payStyle == null) return null; |
|
|
|
|
|
return switch (payStyle) { |
|
|
|
|
|
case 3,4,9 -> 4; // 微信/支付宝 → 华东钱包 |
|
|
|
|
|
case 5,6 -> 1; // Stripe/PaymentAsia → 华南钱包 |
|
|
|
|
|
case 7 -> 3; // 其他 → 西南钱包 |
|
|
|
|
|
case 10 -> 7; |
|
|
|
|
|
case 15 -> 2; |
|
|
|
|
|
default -> null; |
|
|
|
|
|
}; |
|
|
|
|
|
} |
|
|
} |
|
|
} |