You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

439 lines
18 KiB

2 months ago
  1. package com.example.demo.Mysql;
  2. import com.example.demo.Util.BaseDES;
  3. import com.example.demo.domain.entity.User;
  4. import com.example.demo.service.coin.AdminService;
  5. import com.example.demo.service.coin.MarketService;
  6. import com.example.demo.service.coin.UserService;
  7. import org.apache.commons.lang3.ObjectUtils;
  8. import org.apache.commons.lang3.StringUtils;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.beans.factory.annotation.Qualifier;
  13. import org.springframework.http.*;
  14. import org.springframework.scheduling.annotation.Scheduled;
  15. import org.springframework.stereotype.Service;
  16. import org.springframework.transaction.annotation.Transactional;
  17. import org.springframework.web.client.RestTemplate;
  18. import javax.sql.DataSource;
  19. import java.math.BigDecimal;
  20. import java.sql.*;
  21. import java.time.LocalDateTime;
  22. import java.time.Month;
  23. import java.time.format.DateTimeFormatter;
  24. import java.util.*;
  25. @Service
  26. public class MysqlServiceImpl implements MysqlService {
  27. @Autowired
  28. private RestTemplate restTemplate;
  29. Set<Integer> validZeroTypes = new HashSet<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 11, 12, 13, 14, 18, 19, 20, 21, 22, 23, 24, 26, 28, 29, 35, 36, 40, 45, 46, 47, 48, 49, 53, 54, 60));
  30. Set<Integer> validOneTypes = new HashSet<>(Arrays.asList(9, 15, 17, 25, 27, 37, 41, 42, 43, 50, 51, 62));
  31. Set<Integer> validTwoTypes = new HashSet<>(Arrays.asList(52,61));
  32. Set<Integer> validThreeTypes = new HashSet<>(Arrays.asList(10, 16, 30, 31, 32, 33, 34, 39, 44));
  33. Set<Integer> validFourTypes = new HashSet<>(Arrays.asList(55, 56, 57, 58, 59, 63, 64, 65));
  34. LocalDateTime now = LocalDateTime.now();
  35. Month currentMonth = now.getMonth();
  36. @Autowired
  37. private AdminService adminService;
  38. @Autowired
  39. private UserService userService;
  40. @Autowired
  41. private MarketService marketService;
  42. @Autowired
  43. @Qualifier("sqlserver1DataSource")
  44. private DataSource sqlserver1DataSource;
  45. @Autowired
  46. @Qualifier("mysql1DataSource")
  47. private DataSource mysql1DataSource;
  48. private static final Logger logger = LoggerFactory.getLogger(MysqlServiceImpl.class);
  49. @Override
  50. @Transactional(transactionManager = "mysqlTransactionManager") // 👈 保证插入和用户更新在一个事务
  51. public void getSqlserverData() throws Exception {
  52. logger.info("开始从 SQL Server 同步数据到 MySQL");
  53. try (Connection sqlServerConn = sqlserver1DataSource.getConnection();
  54. Connection mysqlConn = mysql1DataSource.getConnection()) {
  55. logger.info("开始查询数据...");
  56. int pageSize = 100;
  57. int offset = 0;
  58. boolean hasMoreData = true;
  59. // 👇 恢复动态时间查询(原注释掉的硬编码时间已移除)
  60. String querySql = """
  61. SELECT
  62. id, gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name
  63. FROM
  64. hwhcGold.dbo.user_gold_records
  65. WHERE cz_time >= ?
  66. ORDER BY
  67. cz_time ASC
  68. OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;
  69. """;
  70. String insertSql = """
  71. INSERT IGNORE INTO user_gold_record
  72. (order_code, jwcode, sum_gold, permanent_gold, free_june, free_december,
  73. task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id,
  74. audit_status, create_time, flag, update_time, audit_time, is_refund, uid)
  75. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  76. """;
  77. while (hasMoreData) {
  78. try (PreparedStatement sqlServerStmt = sqlServerConn.prepareStatement(querySql)) {
  79. // 👇 恢复动态时间参数
  80. sqlServerStmt.setTimestamp(1, Timestamp.valueOf(LocalDateTime.now().minusHours(1)));
  81. sqlServerStmt.setInt(2, offset);
  82. sqlServerStmt.setInt(3, pageSize);
  83. ResultSet resultSet = sqlServerStmt.executeQuery();
  84. if (!resultSet.next()) {
  85. logger.info("无更多数据,offset={}", offset);
  86. break;
  87. }
  88. // 👇 步骤1:收集本批次所有记录
  89. List<RecordData> batchRecords = new ArrayList<>();
  90. List<String> batchUids = new ArrayList<>();
  91. do {
  92. RecordData data = extractRecordData(resultSet); // 👈 抽取数据
  93. batchRecords.add(data);
  94. batchUids.add(data.uid);
  95. } while (resultSet.next());
  96. logger.info("本批次共 {} 条记录", batchRecords.size());
  97. // 👇 步骤2:批量查询哪些 uid 已存在(性能优化)
  98. Set<String> existingUids = getExistingUids(mysqlConn, batchUids);
  99. logger.info("已存在记录数: {}", existingUids.size());
  100. // 👇 步骤3:准备批量插入
  101. try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) {
  102. for (RecordData data : batchRecords) {
  103. if (validFourTypes.contains(data.gtype)) {
  104. logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid);
  105. continue;
  106. }
  107. if ("4".equals(data.operation_platform)) {
  108. logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid);
  109. continue;
  110. }
  111. // 👇 跳过已存在的记录(避免重复更新用户余额)
  112. if (existingUids.contains(data.uid)) {
  113. logger.debug("跳过重复记录,uid={}", data.uid);
  114. continue;
  115. }
  116. // 👇 设置插入参数
  117. setStatementParams(mysqlStmt, data);
  118. mysqlStmt.addBatch();
  119. // 👇 只有新记录才更新用户余额(关键修复!)
  120. updateUserBalance(mysqlConn, data);
  121. }
  122. // 👇 执行批量插入
  123. int[] results = mysqlStmt.executeBatch();
  124. logger.info("成功插入新记录 {} 条", results.length);
  125. }
  126. offset += pageSize;
  127. }
  128. }
  129. logger.info("✅ 数据同步完成");
  130. } catch (SQLException e) {
  131. logger.error("数据库操作失败", e);
  132. throw new RuntimeException("同步失败", e);
  133. }
  134. }
  135. static class RecordData {
  136. int gtype, jwcode, free, core_jb, buy_jb;
  137. Timestamp cz_time;
  138. String cz_user, cz_bz, operation_platform, goods_name, uid;
  139. String orderNumber; // 预生成,避免重复计算
  140. }
  141. private RecordData extractRecordData(ResultSet rs) throws SQLException {
  142. RecordData data = new RecordData();
  143. data.gtype = rs.getInt("gtype");
  144. data.jwcode = rs.getInt("jwcode");
  145. data.free = rs.getInt("free");
  146. data.core_jb = rs.getInt("core_jb");
  147. data.buy_jb = rs.getInt("buy_jb");
  148. data.cz_time = rs.getTimestamp("cz_time");
  149. data.cz_user = rs.getString("cz_user");
  150. data.cz_bz = rs.getString("cz_bz");
  151. data.operation_platform = rs.getString("operation_platform");
  152. data.goods_name = rs.getString("goods_name");
  153. data.uid = rs.getString("id");
  154. // 预生成订单号(避免在循环中重复生成)
  155. String timestampPart = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"));
  156. String uuidPart = UUID.randomUUID().toString().replaceAll("-", "");
  157. data.orderNumber = timestampPart + "_" + uuidPart;
  158. return data;
  159. }
  160. private Set<String> getExistingUids(Connection conn, List<String> uids) throws SQLException {
  161. if (uids.isEmpty()) return Collections.emptySet();
  162. String placeholders = String.join(",", Collections.nCopies(uids.size(), "?"));
  163. String sql = "SELECT uid FROM user_gold_record WHERE uid IN (" + placeholders + ")";
  164. Set<String> existing = new HashSet<>();
  165. try (PreparedStatement stmt = conn.prepareStatement(sql)) {
  166. for (int i = 0; i < uids.size(); i++) {
  167. stmt.setString(i + 1, uids.get(i));
  168. }
  169. try (ResultSet rs = stmt.executeQuery()) {
  170. while (rs.next()) {
  171. existing.add(rs.getString("uid"));
  172. }
  173. }
  174. }
  175. return existing;
  176. }
  177. private void setStatementParams(PreparedStatement stmt, RecordData data) throws SQLException {
  178. String name = data.cz_user;
  179. // 设置 admin_id
  180. if (StringUtils.isNumeric(name)) {
  181. try {
  182. String adminIdStr = adminService.getId(name);
  183. if (adminIdStr != null && StringUtils.isNumeric(adminIdStr)) {
  184. stmt.setInt(14, Integer.parseInt(adminIdStr));
  185. } else {
  186. stmt.setInt(14, 99999);
  187. }
  188. } catch (Exception e) {
  189. logger.warn("解析 admin_id 失败,cz_user={}", name, e);
  190. stmt.setInt(14, 99999);
  191. }
  192. } else {
  193. stmt.setInt(14, 99999);
  194. }
  195. // refund_type, refund_model
  196. stmt.setString(10, null);
  197. stmt.setNull(11, Types.INTEGER);
  198. // 根据 gtype 设置 type 和 order_code
  199. if (validFourTypes.contains(data.gtype)) {
  200. throw new IllegalArgumentException("不应处理 validFourTypes 类型,应在上层过滤"); // 安全兜底
  201. }
  202. if (validZeroTypes.contains(data.gtype)) {
  203. stmt.setInt(13, 0);
  204. stmt.setNull(20, 0);
  205. stmt.setString(1, "ERPCZ_" + data.orderNumber);
  206. } else if (validOneTypes.contains(data.gtype)) {
  207. stmt.setInt(13, 1);
  208. stmt.setInt(20, 0);
  209. stmt.setString(1, "ERPXF_" + data.orderNumber);
  210. } else if (validTwoTypes.contains(data.gtype)) {
  211. stmt.setInt(13, 2);
  212. stmt.setInt(20, 0);
  213. stmt.setString(1, "ERPTK_" + data.orderNumber);
  214. stmt.setString(10, "退款商品");
  215. stmt.setInt(11, 0);
  216. } else if (validThreeTypes.contains(data.gtype)) {
  217. stmt.setInt(13, 3);
  218. stmt.setNull(20, Types.INTEGER);
  219. stmt.setString(1, "ERPQT_" + data.orderNumber);
  220. }
  221. stmt.setInt(2, data.jwcode);
  222. stmt.setInt(3, data.free + data.core_jb + data.buy_jb);
  223. stmt.setInt(4, data.buy_jb);
  224. if (currentMonth.getValue() >= 7) {
  225. stmt.setInt(5, data.free);
  226. stmt.setInt(6, 0);
  227. } else {
  228. stmt.setInt(5, 0);
  229. stmt.setInt(6, data.free);
  230. }
  231. stmt.setInt(7, data.core_jb);
  232. // pay_platform
  233. String platform = data.operation_platform;
  234. if ("1".equals(platform)) {
  235. stmt.setString(8, "ERP");
  236. } else if ("2".equals(platform)) {
  237. stmt.setString(8, "HomilyLink");
  238. } else if ("3".equals(platform)) {
  239. stmt.setString(8, "HomilyChart");
  240. } else if ("4".equals(platform)) {
  241. throw new IllegalArgumentException("不应处理 platform=4,应在上层过滤");
  242. } else if ("0".equals(platform)) {
  243. stmt.setString(8, "初始化金币");
  244. } else {
  245. stmt.setString(8, "其他");
  246. }
  247. stmt.setString(9, data.goods_name);
  248. stmt.setString(12, data.cz_bz);
  249. stmt.setInt(15, 3);
  250. stmt.setTimestamp(16, data.cz_time);
  251. if (data.cz_bz != null && data.cz_bz.contains("测试") && data.cz_bz.contains("员工")) {
  252. stmt.setInt(17, 0);
  253. } else {
  254. stmt.setInt(17, 1);
  255. }
  256. stmt.setTimestamp(18, data.cz_time);
  257. stmt.setTimestamp(19, data.cz_time);
  258. stmt.setString(21, data.uid);
  259. }
  260. private void updateUserBalance(Connection conn, RecordData data) throws Exception {
  261. logger.info("处理用户余额更新,jwcode={}", data.jwcode);
  262. User user = userService.selectAllUser(String.valueOf(data.jwcode));
  263. BigDecimal freeBD = BigDecimal.valueOf(data.free);
  264. BigDecimal buyJbBD = BigDecimal.valueOf(data.buy_jb);
  265. BigDecimal coreJbBD = BigDecimal.valueOf(data.core_jb);
  266. if (ObjectUtils.isEmpty(user)) {
  267. logger.info("用户不存在,jwcode={}", data.jwcode);
  268. user = new User();
  269. String country = "未知";
  270. String name = "未知";
  271. try {
  272. BaseDES des = new BaseDES();
  273. String desjwcode = des.encrypt(String.valueOf(data.jwcode));
  274. Map<String, String> requestBody = new HashMap<>();
  275. requestBody.put("jwcode", desjwcode);
  276. HttpHeaders headers = new HttpHeaders();
  277. headers.setContentType(MediaType.APPLICATION_JSON);
  278. HttpEntity<Map<String, String>> entity = new HttpEntity<>(requestBody, headers);
  279. ResponseEntity<Map> response = restTemplate.exchange(
  280. "http://hwapi.rzfwq.com/hwjnApp/hwhc-login/hwhclogin/hc/login/clent/info",
  281. HttpMethod.POST, entity, Map.class);
  282. if (response.getStatusCode().is2xxSuccessful()) {
  283. Map<String, Object> responseBody = response.getBody();
  284. if (responseBody != null) {
  285. Map<String, Object> dataMap = (Map<String, Object>) responseBody.get("data");
  286. if (dataMap != null) {
  287. name = (String) dataMap.get("name");
  288. country = getCountryWithDefault(dataMap, "未知");
  289. logger.info("获取用户信息成功: name={}, country={}", name, country);
  290. }
  291. }
  292. }
  293. } catch (Exception e) {
  294. logger.warn("获取用户信息失败,jwcode={}", data.jwcode, e);
  295. country = "未知";
  296. }
  297. String market = marketService.getMarketIdsDao(country);
  298. user.setJwcode(data.jwcode);
  299. user.setName(name);
  300. user.setMarket(market);
  301. userService.addUser(user);
  302. logger.info("用户创建成功,jwcode={}", data.jwcode);
  303. // 重新查询确保数据完整
  304. user = userService.selectAllUser(String.valueOf(data.jwcode));
  305. }
  306. // 更新当前金币
  307. if (currentMonth.getValue() >= 7) {
  308. BigDecimal newFreeJune = user.getCurrentFreeJune().add(freeBD);
  309. if (newFreeJune.compareTo(BigDecimal.ZERO) >= 0) {
  310. user.setCurrentFreeJune(newFreeJune);
  311. } else {
  312. BigDecimal remaining = newFreeJune;
  313. user.setCurrentFreeJune(BigDecimal.ZERO);
  314. user.setCurrentFreeDecember(user.getCurrentFreeDecember().add(remaining));
  315. }
  316. } else {
  317. BigDecimal newFreeDec = user.getCurrentFreeDecember().add(freeBD);
  318. if (newFreeDec.compareTo(BigDecimal.ZERO) >= 0) {
  319. user.setCurrentFreeDecember(newFreeDec);
  320. } else {
  321. BigDecimal remaining = newFreeDec;
  322. user.setCurrentFreeDecember(BigDecimal.ZERO);
  323. user.setCurrentFreeJune(remaining);
  324. }
  325. }
  326. user.setCurrentPermanentGold(user.getCurrentPermanentGold().add(buyJbBD));
  327. user.setCurrentTaskGold(user.getCurrentTaskGold().add(coreJbBD));
  328. // 更新统计字段
  329. if (validZeroTypes.contains(data.gtype)) {
  330. user.setRechargeNum(user.getRechargeNum() + 1);
  331. user.setSumPermanentGold(user.getSumPermanentGold().add(buyJbBD));
  332. user.setSumTaskGold(user.getSumTaskGold().add(coreJbBD));
  333. if (currentMonth.getValue() >= 7) {
  334. user.setSumFreeJune(user.getSumFreeJune().add(freeBD));
  335. } else {
  336. user.setSumFreeDecember(user.getSumFreeDecember().add(freeBD));
  337. }
  338. } else if (validOneTypes.contains(data.gtype)) {
  339. user.setConsumeNum(user.getConsumeNum() + 1);
  340. user.setSumConsumePermanent(user.getSumConsumePermanent().add(buyJbBD));
  341. user.setSumConsumeTask(user.getSumConsumeTask().add(coreJbBD));
  342. user.setSumConsumeFree(user.getSumConsumeFree().add(freeBD));
  343. }
  344. userService.updateAllGold(user);
  345. logger.info("用户余额更新成功,jwcode={}", data.jwcode);
  346. }
  347. private boolean checkRecordExists(Connection conn, String uid) throws SQLException {
  348. String sql = "SELECT 1 FROM user_gold_record WHERE uid = ? LIMIT 1";
  349. try (PreparedStatement stmt = conn.prepareStatement(sql)) {
  350. stmt.setString(1, uid);
  351. try (ResultSet rs = stmt.executeQuery()) {
  352. return rs.next();
  353. }
  354. }
  355. }
  356. private String getCountryWithDefault(Map<String, Object> dataMap, String defaultValue) {
  357. Object countryObj = dataMap.get("treelist");
  358. if (countryObj instanceof String) {
  359. String countryStr = ((String) countryObj).trim();
  360. if (countryStr.isEmpty()) {
  361. return defaultValue;
  362. }
  363. String[] parts = countryStr.split("-");
  364. return parts.length >= 3 ? parts[2] : defaultValue;
  365. }
  366. return defaultValue;
  367. }
  368. }