You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

647 lines
27 KiB

6 months ago
  1. package com.example.demo.Mysql;
  2. import com.example.demo.Util.BaseDES;
  3. import com.example.demo.domain.entity.User;
  4. import com.example.demo.service.coin.AdminService;
  5. import com.example.demo.service.coin.MarketService;
  6. import com.example.demo.service.coin.UserService;
  7. import org.apache.commons.lang3.ObjectUtils;
  8. import org.apache.commons.lang3.StringUtils;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.beans.factory.annotation.Qualifier;
  13. import org.springframework.http.*;
  14. import org.springframework.scheduling.annotation.Scheduled;
  15. import org.springframework.stereotype.Service;
  16. import org.springframework.transaction.annotation.Transactional;
  17. import org.springframework.web.client.RestTemplate;
  18. import javax.sql.DataSource;
  19. import java.math.BigDecimal;
  20. import java.sql.*;
  21. import java.time.LocalDate;
  22. import java.time.LocalDateTime;
  23. import java.time.Month;
  24. import java.time.format.DateTimeFormatter;
  25. import java.util.*;
  26. import java.util.Date;
  27. import java.util.stream.Collectors;
  28. @Service
  29. public class MysqlServiceImpl implements MysqlService {
  30. @Autowired
  31. private RestTemplate restTemplate;
  32. Set<Integer> validZeroTypes = new HashSet<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 11, 12, 13, 14, 18, 19, 20, 21, 22, 23, 24, 26, 28, 29, 35, 36, 40, 45, 46, 47, 48, 49, 53, 54, 60,67));
  33. Set<Integer> validOneTypes = new HashSet<>(Arrays.asList(9, 15, 17, 25, 27, 37, 41, 42, 43, 50, 51, 62,68));
  34. Set<Integer> validTwoTypes = new HashSet<>(Arrays.asList(52,61));
  35. Set<Integer> validThreeTypes = new HashSet<>(Arrays.asList(10, 16, 30, 31, 32, 33, 34, 39, 44));
  36. Set<Integer> validFourTypes = new HashSet<>(Arrays.asList(55, 56, 57, 58, 59, 63, 64, 65));
  37. LocalDateTime now = LocalDateTime.now();
  38. Month currentMonth = now.getMonth();
  39. @Autowired
  40. private AdminService adminService;
  41. @Autowired
  42. private UserService userService;
  43. @Autowired
  44. private MarketService marketService;
  45. @Autowired
  46. @Qualifier("sqlserver1DataSource")
  47. private DataSource sqlserver1DataSource;
  48. @Autowired
  49. @Qualifier("mysql1DataSource")
  50. private DataSource mysql1DataSource;
  51. private static final Logger logger = LoggerFactory.getLogger(MysqlServiceImpl.class);
  52. @Override
  53. @Transactional(transactionManager = "mysqlTransactionManager",rollbackFor = Exception.class) // 👈 保证插入和用户更新在一个事务
  54. public void getSqlserverData() throws Exception {
  55. logger.info("开始从 SQL Server 同步数据到 MySQL");
  56. try (Connection sqlServerConn = sqlserver1DataSource.getConnection();
  57. Connection mysqlConn = mysql1DataSource.getConnection()) {
  58. logger.info("开始查询数据...");
  59. int pageSize = 100;
  60. int offset = 0;
  61. boolean hasMoreData = true;
  62. // 👇 恢复动态时间查询(原注释掉的硬编码时间已移除)
  63. String querySql = """
  64. SELECT
  65. id, gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name,uid
  66. FROM
  67. hwhcGold.dbo.user_gold_records
  68. WHERE cz_time >= ?
  69. ORDER BY
  70. cz_time ASC
  71. OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;
  72. """;
  73. String insertSql = """
  74. INSERT IGNORE INTO user_gold_record
  75. (order_code, jwcode, sum_gold, permanent_gold, free_june, free_december,
  76. task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id,
  77. audit_status, create_time, flag, update_time, audit_time, is_refund, uid,link_id)
  78. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  79. """;
  80. while (hasMoreData) {
  81. try (PreparedStatement sqlServerStmt = sqlServerConn.prepareStatement(querySql)) {
  82. // 👇 恢复动态时间参数
  83. sqlServerStmt.setTimestamp(1, Timestamp.valueOf(LocalDateTime.now().minusHours(1)));
  84. sqlServerStmt.setInt(2, offset);
  85. sqlServerStmt.setInt(3, pageSize);
  86. ResultSet resultSet = sqlServerStmt.executeQuery();
  87. if (!resultSet.next()) {
  88. logger.info("无更多数据,offset={}", offset);
  89. break;
  90. }
  91. // 👇 步骤1:收集本批次所有记录
  92. List<RecordData> batchRecords = new ArrayList<>();
  93. List<String> batchUids = new ArrayList<>();
  94. do {
  95. RecordData data = extractRecordData(resultSet); // 👈 抽取数据
  96. batchRecords.add(data);
  97. batchUids.add(data.uid);
  98. } while (resultSet.next());
  99. logger.info("本批次共 {} 条记录", batchRecords.size());
  100. // 👇 步骤2:批量查询哪些 uid 已存在(性能优化)
  101. Set<String> existingUids = getExistingUids(mysqlConn, batchUids);
  102. logger.info("已存在记录数: {}", existingUids.size());
  103. // 👇 步骤3:准备批量插入
  104. try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) {
  105. for (RecordData data : batchRecords) {
  106. if (validFourTypes.contains(data.gtype)) {
  107. logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid);
  108. continue;
  109. }
  110. if ("4".equals(data.operation_platform)) {
  111. logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid);
  112. continue;
  113. }
  114. // 👇 跳过已存在的记录(避免重复更新用户余额)
  115. if (existingUids.contains(data.uid)) {
  116. logger.debug("跳过重复记录,uid={}", data.uid);
  117. continue;
  118. }
  119. // 👇 设置插入参数
  120. setStatementParams(mysqlStmt, data);
  121. mysqlStmt.addBatch();
  122. // 👇 只有新记录才更新用户余额(关键修复!)
  123. updateUserBalance(mysqlConn, data);
  124. }
  125. // 👇 执行批量插入
  126. int[] results = mysqlStmt.executeBatch();
  127. logger.info("成功插入新记录 {} 条", results.length);
  128. }
  129. offset += pageSize;
  130. }
  131. }
  132. logger.info("✅ 数据同步完成");
  133. } catch (SQLException e) {
  134. logger.error("数据库操作失败", e);
  135. throw new RuntimeException("同步失败", e);
  136. }
  137. }
  138. @Override
  139. @Transactional(transactionManager = "mysqlTransactionManager") // 👈 保证插入和用户更新在一个事务
  140. public void getSqlserverDataDay() throws Exception {
  141. logger.info("开始从 SQL Server 同步数据到 MySQL");
  142. try (Connection sqlServerConn = sqlserver1DataSource.getConnection();
  143. Connection mysqlConn = mysql1DataSource.getConnection()) {
  144. logger.info("开始查询数据...");
  145. int pageSize = 100;
  146. int offset = 0;
  147. boolean hasMoreData = true;
  148. // 👇 恢复动态时间查询(原注释掉的硬编码时间已移除)
  149. String querySql = """
  150. SELECT
  151. id, gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name, uid
  152. FROM
  153. hwhcGold.dbo.user_gold_records
  154. WHERE cz_time >= ?
  155. ORDER BY
  156. cz_time ASC
  157. OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;
  158. """;
  159. String insertSql = """
  160. INSERT IGNORE INTO user_gold_record
  161. (order_code, jwcode, sum_gold, permanent_gold, free_june, free_december,
  162. task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id,
  163. audit_status, create_time, flag, update_time, audit_time, is_refund, uid,link_id)
  164. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  165. """;
  166. while (hasMoreData) {
  167. try (PreparedStatement sqlServerStmt = sqlServerConn.prepareStatement(querySql)) {
  168. // 👇 恢复动态时间参数
  169. sqlServerStmt.setTimestamp(1, Timestamp.valueOf(LocalDate.now().minusDays(1).atStartOfDay()));
  170. sqlServerStmt.setInt(2, offset);
  171. sqlServerStmt.setInt(3, pageSize);
  172. ResultSet resultSet = sqlServerStmt.executeQuery();
  173. if (!resultSet.next()) {
  174. logger.info("无更多数据,offset={}", offset);
  175. break;
  176. }
  177. // 👇 步骤1:收集本批次所有记录
  178. List<RecordData> batchRecords = new ArrayList<>();
  179. List<String> batchUids = new ArrayList<>();
  180. do {
  181. RecordData data = extractRecordData(resultSet); // 👈 抽取数据
  182. batchRecords.add(data);
  183. batchUids.add(data.uid);
  184. } while (resultSet.next());
  185. logger.info("本批次共 {} 条记录", batchRecords.size());
  186. // 👇 步骤2:批量查询哪些 uid 已存在(性能优化)
  187. Set<String> existingUids = getExistingUids(mysqlConn, batchUids);
  188. logger.info("已存在记录数: {}", existingUids.size());
  189. // 👇 步骤3:准备批量插入
  190. try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) {
  191. for (RecordData data : batchRecords) {
  192. if (validFourTypes.contains(data.gtype)) {
  193. logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid);
  194. continue;
  195. }
  196. if ("4".equals(data.operation_platform)) {
  197. logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid);
  198. continue;
  199. }
  200. // 👇 跳过已存在的记录(避免重复更新用户余额)
  201. if (existingUids.contains(data.uid)) {
  202. logger.debug("跳过重复记录,uid={}", data.uid);
  203. continue;
  204. }
  205. // 👇 设置插入参数
  206. setStatementParams(mysqlStmt, data);
  207. mysqlStmt.addBatch();
  208. // 👇 只有新记录才更新用户余额(关键修复!)
  209. updateUserBalance(mysqlConn, data);
  210. }
  211. // 👇 执行批量插入
  212. int[] results = mysqlStmt.executeBatch();
  213. logger.info("成功插入新记录 {} 条", results.length);
  214. // 👉【新增一行调用】👈
  215. // 传入当前连接 + 本批次记录,自动完成路由更新
  216. updateUserRegionWallet(mysqlConn, batchRecords);
  217. }
  218. offset += pageSize;
  219. }
  220. }
  221. logger.info("✅ 数据同步完成");
  222. } catch (SQLException e) {
  223. logger.error("数据库操作失败", e);
  224. throw new RuntimeException("同步失败", e);
  225. }
  226. }
  227. static class RecordData {
  228. int gtype, jwcode, free, core_jb, buy_jb;
  229. Integer permanent_gold;
  230. Timestamp cz_time;
  231. String cz_user, cz_bz, operation_platform, goods_name, uid;
  232. String orderNumber; // 预生成,避免重复计算
  233. String linkId;
  234. }
  235. private RecordData extractRecordData(ResultSet rs) throws SQLException {
  236. RecordData data = new RecordData();
  237. data.gtype = rs.getInt("gtype");
  238. data.jwcode = rs.getInt("jwcode");
  239. data.free = rs.getInt("free");
  240. data.core_jb = rs.getInt("core_jb");
  241. data.buy_jb = rs.getInt("buy_jb");
  242. data.cz_time = rs.getTimestamp("cz_time");
  243. data.cz_user = rs.getString("cz_user");
  244. data.cz_bz = rs.getString("cz_bz");
  245. data.operation_platform = rs.getString("operation_platform");
  246. data.goods_name = rs.getString("goods_name");
  247. data.uid = rs.getString("id");
  248. data.linkId = rs.getString("uid");
  249. // 预生成订单号(避免在循环中重复生成)
  250. String timestampPart = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"));
  251. String uuidPart = UUID.randomUUID().toString().replaceAll("-", "");
  252. data.orderNumber = timestampPart + "_" + uuidPart;
  253. return data;
  254. }
  255. private Set<String> getExistingUids(Connection conn, List<String> uids) throws SQLException {
  256. if (uids.isEmpty()) return Collections.emptySet();
  257. String placeholders = String.join(",", Collections.nCopies(uids.size(), "?"));
  258. String sql = "SELECT uid FROM user_gold_record WHERE uid IN (" + placeholders + ")";
  259. Set<String> existing = new HashSet<>();
  260. try (PreparedStatement stmt = conn.prepareStatement(sql)) {
  261. for (int i = 0; i < uids.size(); i++) {
  262. stmt.setString(i + 1, uids.get(i));
  263. }
  264. try (ResultSet rs = stmt.executeQuery()) {
  265. while (rs.next()) {
  266. existing.add(rs.getString("uid"));
  267. }
  268. }
  269. }
  270. return existing;
  271. }
  272. private void setStatementParams(PreparedStatement stmt, RecordData data) throws SQLException {
  273. String name = data.cz_user;
  274. // 设置 admin_id
  275. if (StringUtils.isNumeric(name)) {
  276. try {
  277. String adminIdStr = adminService.getId(name);
  278. if (adminIdStr != null && StringUtils.isNumeric(adminIdStr)) {
  279. stmt.setInt(14, Integer.parseInt(adminIdStr));
  280. } else {
  281. stmt.setInt(14, 99999);
  282. }
  283. } catch (Exception e) {
  284. logger.warn("解析 admin_id 失败,cz_user={}", name, e);
  285. stmt.setInt(14, 99999);
  286. }
  287. } else {
  288. stmt.setInt(14, 99999);
  289. }
  290. // refund_type, refund_model
  291. stmt.setString(10, null);
  292. stmt.setNull(11, Types.INTEGER);
  293. // 根据 gtype 设置 type 和 order_code
  294. if (validFourTypes.contains(data.gtype)) {
  295. throw new IllegalArgumentException("不应处理 validFourTypes 类型,应在上层过滤"); // 安全兜底
  296. }
  297. if (validZeroTypes.contains(data.gtype)) {
  298. stmt.setInt(13, 0);
  299. stmt.setInt(20, 0);
  300. stmt.setString(1, "ERPCZ_" + data.orderNumber);
  301. } else if (validOneTypes.contains(data.gtype)) {
  302. stmt.setInt(13, 1);
  303. stmt.setInt(20, 0);
  304. stmt.setString(1, "ERPXF_" + data.orderNumber);
  305. } else if (validTwoTypes.contains(data.gtype)) {
  306. stmt.setInt(13, 2);
  307. stmt.setInt(20, 0);
  308. stmt.setString(1, "ERPTK_" + data.orderNumber);
  309. stmt.setString(10, "退款商品");
  310. stmt.setInt(11, 0);
  311. } else if (validThreeTypes.contains(data.gtype)) {
  312. stmt.setInt(13, 3);
  313. stmt.setNull(20, Types.INTEGER);
  314. stmt.setString(1, "ERPQT_" + data.orderNumber);
  315. }
  316. stmt.setInt(2, data.jwcode);
  317. stmt.setInt(3, data.free + data.core_jb + data.buy_jb);
  318. stmt.setInt(4, data.buy_jb);
  319. if (currentMonth.getValue() >= 7) {
  320. stmt.setInt(5, data.free);
  321. stmt.setInt(6, 0);
  322. } else {
  323. stmt.setInt(5, 0);
  324. stmt.setInt(6, data.free);
  325. }
  326. stmt.setInt(7, data.core_jb);
  327. // pay_platform
  328. String platform = data.operation_platform;
  329. if ("1".equals(platform)) {
  330. stmt.setString(8, "ERP");
  331. } else if ("2".equals(platform)) {
  332. stmt.setString(8, "HomilyLink");
  333. } else if ("3".equals(platform)) {
  334. stmt.setString(8, "HomilyChart");
  335. } else if ("4".equals(platform)) {
  336. throw new IllegalArgumentException("不应处理 platform=4,应在上层过滤");
  337. } else if ("0".equals(platform)) {
  338. stmt.setString(8, "初始化金币");
  339. } else if ("5".equals(platform)){
  340. stmt.setString(8, "DC");
  341. }else {
  342. stmt.setString(8, "其他");
  343. }
  344. stmt.setString(9, data.goods_name);
  345. stmt.setString(12, data.cz_bz);
  346. stmt.setInt(15, 3);
  347. stmt.setTimestamp(16, data.cz_time);
  348. if (data.cz_bz != null && data.cz_bz.contains("测试") && data.cz_bz.contains("员工")) {
  349. stmt.setInt(17, 0);
  350. } else {
  351. stmt.setInt(17, 1);
  352. }
  353. Timestamp now = new Timestamp(System.currentTimeMillis());
  354. stmt.setTimestamp(18, now);
  355. stmt.setTimestamp(19, data.cz_time);
  356. stmt.setString(21, data.uid);
  357. stmt.setString(22, data.linkId);
  358. }
  359. private void updateUserBalance(Connection conn, RecordData data) throws Exception {
  360. logger.info("处理用户余额更新,jwcode={}", data.jwcode);
  361. User user = userService.selectAllUser(String.valueOf(data.jwcode));
  362. BigDecimal freeBD = BigDecimal.valueOf(data.free);
  363. BigDecimal buyJbBD = BigDecimal.valueOf(data.buy_jb);
  364. BigDecimal coreJbBD = BigDecimal.valueOf(data.core_jb);
  365. if (ObjectUtils.isEmpty(user)) {
  366. logger.info("用户不存在,jwcode={}", data.jwcode);
  367. user = new User();
  368. String country = "未知";
  369. String name = "未知";
  370. try {
  371. BaseDES des = new BaseDES();
  372. String desjwcode = des.encrypt(String.valueOf(data.jwcode));
  373. Map<String, String> requestBody = new HashMap<>();
  374. requestBody.put("jwcode", desjwcode);
  375. HttpHeaders headers = new HttpHeaders();
  376. headers.setContentType(MediaType.APPLICATION_JSON);
  377. HttpEntity<Map<String, String>> entity = new HttpEntity<>(requestBody, headers);
  378. ResponseEntity<Map> response = restTemplate.exchange(
  379. "http://hwapi.rzfwq.com/hwjnApp/hwhc-login/hwhclogin/hc/login/clent/info",
  380. HttpMethod.POST, entity, Map.class);
  381. if (response.getStatusCode().is2xxSuccessful()) {
  382. Map<String, Object> responseBody = response.getBody();
  383. if (responseBody != null) {
  384. Map<String, Object> dataMap = (Map<String, Object>) responseBody.get("data");
  385. if (dataMap != null) {
  386. name = (String) dataMap.get("name");
  387. country = getCountryWithDefault(dataMap, "未知");
  388. logger.info("获取用户信息成功: name={}, country={}", name, country);
  389. }
  390. }
  391. }
  392. } catch (Exception e) {
  393. logger.warn("获取用户信息失败,jwcode={}", data.jwcode, e);
  394. country = "未知";
  395. }
  396. String market = marketService.getMarketIdsDao(country);
  397. user.setJwcode(data.jwcode);
  398. user.setName(name);
  399. user.setMarket(market);
  400. userService.addUser(user);
  401. logger.info("用户创建成功,jwcode={}", data.jwcode);
  402. // 重新查询确保数据完整
  403. user = userService.selectAllUser(String.valueOf(data.jwcode));
  404. }
  405. // 更新当前金币
  406. if (currentMonth.getValue() >= 7) {
  407. BigDecimal newFreeJune = user.getCurrentFreeJune().add(freeBD);
  408. if (newFreeJune.compareTo(BigDecimal.ZERO) >= 0) {
  409. user.setCurrentFreeJune(newFreeJune);
  410. } else {
  411. BigDecimal remaining = newFreeJune;
  412. user.setCurrentFreeJune(BigDecimal.ZERO);
  413. user.setCurrentFreeDecember(user.getCurrentFreeDecember().add(remaining));
  414. }
  415. } else {
  416. BigDecimal newFreeDec = user.getCurrentFreeDecember().add(freeBD);
  417. if (newFreeDec.compareTo(BigDecimal.ZERO) >= 0) {
  418. user.setCurrentFreeDecember(newFreeDec);
  419. } else {
  420. BigDecimal remaining = newFreeDec;
  421. user.setCurrentFreeDecember(BigDecimal.ZERO);
  422. user.setCurrentFreeJune(remaining);
  423. }
  424. }
  425. user.setCurrentPermanentGold(user.getCurrentPermanentGold().add(buyJbBD));
  426. user.setCurrentTaskGold(user.getCurrentTaskGold().add(coreJbBD));
  427. // 更新统计字段
  428. if (validZeroTypes.contains(data.gtype)) {
  429. user.setRechargeNum(user.getRechargeNum() + 1);
  430. user.setSumPermanentGold(user.getSumPermanentGold().add(buyJbBD));
  431. user.setSumTaskGold(user.getSumTaskGold().add(coreJbBD));
  432. if (currentMonth.getValue() >= 7) {
  433. user.setSumFreeJune(user.getSumFreeJune().add(freeBD));
  434. } else {
  435. user.setSumFreeDecember(user.getSumFreeDecember().add(freeBD));
  436. }
  437. } else if (validOneTypes.contains(data.gtype)) {
  438. user.setConsumeNum(user.getConsumeNum() + 1);
  439. user.setSumConsumePermanent(user.getSumConsumePermanent().add(buyJbBD));
  440. user.setSumConsumeTask(user.getSumConsumeTask().add(coreJbBD));
  441. user.setSumConsumeFree(user.getSumConsumeFree().add(freeBD));
  442. }
  443. userService.updateAllGold(user);
  444. logger.info("用户余额更新成功,jwcode={}", data.jwcode);
  445. }
  446. private boolean checkRecordExists(Connection conn, String uid) throws SQLException {
  447. String sql = "SELECT 1 FROM user_gold_record WHERE uid = ? LIMIT 1";
  448. try (PreparedStatement stmt = conn.prepareStatement(sql)) {
  449. stmt.setString(1, uid);
  450. try (ResultSet rs = stmt.executeQuery()) {
  451. return rs.next();
  452. }
  453. }
  454. }
  455. private String getCountryWithDefault(Map<String, Object> dataMap, String defaultValue) {
  456. Object countryObj = dataMap.get("treelist");
  457. if (countryObj instanceof String) {
  458. String countryStr = ((String) countryObj).trim();
  459. if (countryStr.isEmpty()) {
  460. return defaultValue;
  461. }
  462. String[] parts = countryStr.split("-");
  463. return parts.length >= 3 ? parts[2] : defaultValue;
  464. }
  465. return defaultValue;
  466. }
  467. /**
  468. * g_order 获取 pay_style路由更新 user_region_wallet 余额
  469. * @param mysqlConn 当前事务连接️必须使用此连接
  470. * @param records 本批次有效记录
  471. */
  472. private void updateUserRegionWallet(Connection mysqlConn, List<RecordData> records) {
  473. if (records == null || records.isEmpty()) return;
  474. // 1. 收集 jwcode 去重
  475. List<Integer> jwcodes = records.stream()
  476. .map(r -> r.jwcode).filter(Objects::nonNull).distinct()
  477. .collect(Collectors.toList());
  478. if (jwcodes.isEmpty()) return;
  479. // 2. 批量查 g_order 获取 pay_style(state=1 支付完成)
  480. String inSql = String.join(",", Collections.nCopies(jwcodes.size(), "?"));
  481. String querySql = "SELECT jwcode, pay_style FROM g_order WHERE jwcode IN (" + inSql + ") AND state = 1";
  482. Map<Integer, Integer> jwcodePayStyleMap = new HashMap<>();
  483. try (PreparedStatement stmt = mysqlConn.prepareStatement(querySql)) {
  484. for (int i = 0; i < jwcodes.size(); i++) stmt.setInt(i + 1, jwcodes.get(i));
  485. try (ResultSet rs = stmt.executeQuery()) {
  486. while (rs.next()) jwcodePayStyleMap.put(rs.getInt("jwcode"), rs.getInt("pay_style"));
  487. }
  488. } catch (SQLException e) {
  489. throw new RuntimeException("查询 g_order 失败", e);
  490. }
  491. // 3. 按 pay_style 路由到 wallet_id,累加永久金币
  492. Map<String, Integer> userWalletGoldMap = new HashMap<>(); // key: "jwcode_walletId" -> 累加金额
  493. for (RecordData data : records) {
  494. if (!jwcodePayStyleMap.containsKey(data.jwcode)) continue;
  495. if (data.permanent_gold == null || data.permanent_gold <= 0) continue;
  496. Integer payStyle = jwcodePayStyleMap.get(data.jwcode);
  497. Integer walletId = mapPayStyleToWalletId(payStyle);
  498. if (walletId == null) continue;
  499. String key = data.jwcode + "_" + walletId;
  500. userWalletGoldMap.merge(key, data.permanent_gold, Integer::sum);
  501. }
  502. if (userWalletGoldMap.isEmpty()) return;
  503. // 4. UPSERT 批量更新 user_region_wallet(不存在则插入,存在则累加)
  504. String upsertSql = """
  505. INSERT INTO user_region_wallet
  506. (jwcode, wallet_id, current_permanent_gold, create_time, update_time)
  507. VALUES (?, ?, ?, NOW(), NOW())
  508. ON DUPLICATE KEY UPDATE
  509. current_permanent_gold = current_permanent_gold + VALUES(current_permanent_gold),
  510. update_time = NOW()
  511. """;
  512. try (PreparedStatement stmt = mysqlConn.prepareStatement(upsertSql)) {
  513. for (Map.Entry<String, Integer> entry : userWalletGoldMap.entrySet()) {
  514. String[] parts = entry.getKey().split("_");
  515. int jwcode = Integer.parseInt(parts[0]);
  516. int walletId = Integer.parseInt(parts[1]);
  517. stmt.setInt(1, jwcode);
  518. stmt.setString(2, String.valueOf(walletId));
  519. stmt.setInt(3, entry.getValue());
  520. stmt.addBatch();
  521. }
  522. int[] results = stmt.executeBatch();
  523. logger.info("✅ 地区钱包余额更新 {} 条", results.length);
  524. } catch (SQLException e) {
  525. throw new RuntimeException("更新地区钱包失败", e);
  526. }
  527. }
  528. /**
  529. * pay_style wallet_id 映射 请替换成你实际的 wallet_id
  530. */
  531. private Integer mapPayStyleToWalletId(Integer payStyle) {
  532. if (payStyle == null) return null;
  533. return switch (payStyle) {
  534. case 3,4,9 -> 4; // 微信/支付宝 → 华东钱包
  535. case 5,6 -> 1; // Stripe/PaymentAsia → 华南钱包
  536. case 7 -> 3; // 其他 → 西南钱包
  537. case 10 -> 7;
  538. case 15 -> 2;
  539. default -> null;
  540. };
  541. }
  542. }