You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

554 lines
23 KiB

5 months ago
  1. package com.example.demo.Mysql;
  2. import com.example.demo.Util.BaseDES;
  3. import com.example.demo.domain.entity.User;
  4. import com.example.demo.service.coin.AdminService;
  5. import com.example.demo.service.coin.MarketService;
  6. import com.example.demo.service.coin.UserService;
  7. import org.apache.commons.lang3.ObjectUtils;
  8. import org.apache.commons.lang3.StringUtils;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.beans.factory.annotation.Qualifier;
  13. import org.springframework.http.*;
  14. import org.springframework.scheduling.annotation.Scheduled;
  15. import org.springframework.stereotype.Service;
  16. import org.springframework.transaction.annotation.Transactional;
  17. import org.springframework.web.client.RestTemplate;
  18. import javax.sql.DataSource;
  19. import java.math.BigDecimal;
  20. import java.sql.*;
  21. import java.time.LocalDate;
  22. import java.time.LocalDateTime;
  23. import java.time.Month;
  24. import java.time.format.DateTimeFormatter;
  25. import java.util.*;
  26. import java.util.Date;
  27. @Service
  28. public class MysqlServiceImpl implements MysqlService {
  29. @Autowired
  30. private RestTemplate restTemplate;
  31. Set<Integer> validZeroTypes = new HashSet<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 11, 12, 13, 14, 18, 19, 20, 21, 22, 23, 24, 26, 28, 29, 35, 36, 40, 45, 46, 47, 48, 49, 53, 54, 60,67));
  32. Set<Integer> validOneTypes = new HashSet<>(Arrays.asList(9, 15, 17, 25, 27, 37, 41, 42, 43, 50, 51, 62,68));
  33. Set<Integer> validTwoTypes = new HashSet<>(Arrays.asList(52,61));
  34. Set<Integer> validThreeTypes = new HashSet<>(Arrays.asList(10, 16, 30, 31, 32, 33, 34, 39, 44));
  35. Set<Integer> validFourTypes = new HashSet<>(Arrays.asList(55, 56, 57, 58, 59, 63, 64, 65));
  36. LocalDateTime now = LocalDateTime.now();
  37. Month currentMonth = now.getMonth();
  38. @Autowired
  39. private AdminService adminService;
  40. @Autowired
  41. private UserService userService;
  42. @Autowired
  43. private MarketService marketService;
  44. @Autowired
  45. @Qualifier("sqlserver1DataSource")
  46. private DataSource sqlserver1DataSource;
  47. @Autowired
  48. @Qualifier("mysql1DataSource")
  49. private DataSource mysql1DataSource;
  50. private static final Logger logger = LoggerFactory.getLogger(MysqlServiceImpl.class);
  51. @Override
  52. @Transactional(transactionManager = "mysqlTransactionManager",rollbackFor = Exception.class) // 👈 保证插入和用户更新在一个事务
  53. public void getSqlserverData() throws Exception {
  54. logger.info("开始从 SQL Server 同步数据到 MySQL");
  55. try (Connection sqlServerConn = sqlserver1DataSource.getConnection();
  56. Connection mysqlConn = mysql1DataSource.getConnection()) {
  57. logger.info("开始查询数据...");
  58. int pageSize = 100;
  59. int offset = 0;
  60. boolean hasMoreData = true;
  61. // 👇 恢复动态时间查询(原注释掉的硬编码时间已移除)
  62. String querySql = """
  63. SELECT
  64. id, gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name,uid
  65. FROM
  66. hwhcGold.dbo.user_gold_records
  67. WHERE cz_time >= ?
  68. ORDER BY
  69. cz_time ASC
  70. OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;
  71. """;
  72. String insertSql = """
  73. INSERT IGNORE INTO user_gold_record
  74. (order_code, jwcode, sum_gold, permanent_gold, free_june, free_december,
  75. task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id,
  76. audit_status, create_time, flag, update_time, audit_time, is_refund, uid,link_id)
  77. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  78. """;
  79. while (hasMoreData) {
  80. try (PreparedStatement sqlServerStmt = sqlServerConn.prepareStatement(querySql)) {
  81. // 👇 恢复动态时间参数
  82. sqlServerStmt.setTimestamp(1, Timestamp.valueOf(LocalDateTime.now().minusHours(1)));
  83. sqlServerStmt.setInt(2, offset);
  84. sqlServerStmt.setInt(3, pageSize);
  85. ResultSet resultSet = sqlServerStmt.executeQuery();
  86. if (!resultSet.next()) {
  87. logger.info("无更多数据,offset={}", offset);
  88. break;
  89. }
  90. // 👇 步骤1:收集本批次所有记录
  91. List<RecordData> batchRecords = new ArrayList<>();
  92. List<String> batchUids = new ArrayList<>();
  93. do {
  94. RecordData data = extractRecordData(resultSet); // 👈 抽取数据
  95. batchRecords.add(data);
  96. batchUids.add(data.uid);
  97. } while (resultSet.next());
  98. logger.info("本批次共 {} 条记录", batchRecords.size());
  99. // 👇 步骤2:批量查询哪些 uid 已存在(性能优化)
  100. Set<String> existingUids = getExistingUids(mysqlConn, batchUids);
  101. logger.info("已存在记录数: {}", existingUids.size());
  102. // 👇 步骤3:准备批量插入
  103. try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) {
  104. for (RecordData data : batchRecords) {
  105. if (validFourTypes.contains(data.gtype)) {
  106. logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid);
  107. continue;
  108. }
  109. if ("4".equals(data.operation_platform)) {
  110. logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid);
  111. continue;
  112. }
  113. // 👇 跳过已存在的记录(避免重复更新用户余额)
  114. if (existingUids.contains(data.uid)) {
  115. logger.debug("跳过重复记录,uid={}", data.uid);
  116. continue;
  117. }
  118. // 👇 设置插入参数
  119. setStatementParams(mysqlStmt, data);
  120. mysqlStmt.addBatch();
  121. // 👇 只有新记录才更新用户余额(关键修复!)
  122. updateUserBalance(mysqlConn, data);
  123. }
  124. // 👇 执行批量插入
  125. int[] results = mysqlStmt.executeBatch();
  126. logger.info("成功插入新记录 {} 条", results.length);
  127. }
  128. offset += pageSize;
  129. }
  130. }
  131. logger.info("✅ 数据同步完成");
  132. } catch (SQLException e) {
  133. logger.error("数据库操作失败", e);
  134. throw new RuntimeException("同步失败", e);
  135. }
  136. }
  137. @Override
  138. @Transactional(transactionManager = "mysqlTransactionManager") // 👈 保证插入和用户更新在一个事务
  139. public void getSqlserverDataDay() throws Exception {
  140. logger.info("开始从 SQL Server 同步数据到 MySQL");
  141. try (Connection sqlServerConn = sqlserver1DataSource.getConnection();
  142. Connection mysqlConn = mysql1DataSource.getConnection()) {
  143. logger.info("开始查询数据...");
  144. int pageSize = 100;
  145. int offset = 0;
  146. boolean hasMoreData = true;
  147. // 👇 恢复动态时间查询(原注释掉的硬编码时间已移除)
  148. String querySql = """
  149. SELECT
  150. id, gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name, uid
  151. FROM
  152. hwhcGold.dbo.user_gold_records
  153. WHERE cz_time >= ?
  154. ORDER BY
  155. cz_time ASC
  156. OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;
  157. """;
  158. String insertSql = """
  159. INSERT IGNORE INTO user_gold_record
  160. (order_code, jwcode, sum_gold, permanent_gold, free_june, free_december,
  161. task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id,
  162. audit_status, create_time, flag, update_time, audit_time, is_refund, uid,link_id)
  163. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  164. """;
  165. while (hasMoreData) {
  166. try (PreparedStatement sqlServerStmt = sqlServerConn.prepareStatement(querySql)) {
  167. // 👇 恢复动态时间参数
  168. sqlServerStmt.setTimestamp(1, Timestamp.valueOf(LocalDate.now().minusDays(1).atStartOfDay()));
  169. sqlServerStmt.setInt(2, offset);
  170. sqlServerStmt.setInt(3, pageSize);
  171. ResultSet resultSet = sqlServerStmt.executeQuery();
  172. if (!resultSet.next()) {
  173. logger.info("无更多数据,offset={}", offset);
  174. break;
  175. }
  176. // 👇 步骤1:收集本批次所有记录
  177. List<RecordData> batchRecords = new ArrayList<>();
  178. List<String> batchUids = new ArrayList<>();
  179. do {
  180. RecordData data = extractRecordData(resultSet); // 👈 抽取数据
  181. batchRecords.add(data);
  182. batchUids.add(data.uid);
  183. } while (resultSet.next());
  184. logger.info("本批次共 {} 条记录", batchRecords.size());
  185. // 👇 步骤2:批量查询哪些 uid 已存在(性能优化)
  186. Set<String> existingUids = getExistingUids(mysqlConn, batchUids);
  187. logger.info("已存在记录数: {}", existingUids.size());
  188. // 👇 步骤3:准备批量插入
  189. try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) {
  190. for (RecordData data : batchRecords) {
  191. if (validFourTypes.contains(data.gtype)) {
  192. logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid);
  193. continue;
  194. }
  195. if ("4".equals(data.operation_platform)) {
  196. logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid);
  197. continue;
  198. }
  199. // 👇 跳过已存在的记录(避免重复更新用户余额)
  200. if (existingUids.contains(data.uid)) {
  201. logger.debug("跳过重复记录,uid={}", data.uid);
  202. continue;
  203. }
  204. // 👇 设置插入参数
  205. setStatementParams(mysqlStmt, data);
  206. mysqlStmt.addBatch();
  207. // 👇 只有新记录才更新用户余额(关键修复!)
  208. updateUserBalance(mysqlConn, data);
  209. }
  210. // 👇 执行批量插入
  211. int[] results = mysqlStmt.executeBatch();
  212. logger.info("成功插入新记录 {} 条", results.length);
  213. }
  214. offset += pageSize;
  215. }
  216. }
  217. logger.info("✅ 数据同步完成");
  218. } catch (SQLException e) {
  219. logger.error("数据库操作失败", e);
  220. throw new RuntimeException("同步失败", e);
  221. }
  222. }
  223. static class RecordData {
  224. int gtype, jwcode, free, core_jb, buy_jb;
  225. Timestamp cz_time;
  226. String cz_user, cz_bz, operation_platform, goods_name, uid;
  227. String orderNumber; // 预生成,避免重复计算
  228. String linkId;
  229. }
  230. private RecordData extractRecordData(ResultSet rs) throws SQLException {
  231. RecordData data = new RecordData();
  232. data.gtype = rs.getInt("gtype");
  233. data.jwcode = rs.getInt("jwcode");
  234. data.free = rs.getInt("free");
  235. data.core_jb = rs.getInt("core_jb");
  236. data.buy_jb = rs.getInt("buy_jb");
  237. data.cz_time = rs.getTimestamp("cz_time");
  238. data.cz_user = rs.getString("cz_user");
  239. data.cz_bz = rs.getString("cz_bz");
  240. data.operation_platform = rs.getString("operation_platform");
  241. data.goods_name = rs.getString("goods_name");
  242. data.uid = rs.getString("id");
  243. data.linkId = rs.getString("uid");
  244. // 预生成订单号(避免在循环中重复生成)
  245. String timestampPart = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"));
  246. String uuidPart = UUID.randomUUID().toString().replaceAll("-", "");
  247. data.orderNumber = timestampPart + "_" + uuidPart;
  248. return data;
  249. }
  250. private Set<String> getExistingUids(Connection conn, List<String> uids) throws SQLException {
  251. if (uids.isEmpty()) return Collections.emptySet();
  252. String placeholders = String.join(",", Collections.nCopies(uids.size(), "?"));
  253. String sql = "SELECT uid FROM user_gold_record WHERE uid IN (" + placeholders + ")";
  254. Set<String> existing = new HashSet<>();
  255. try (PreparedStatement stmt = conn.prepareStatement(sql)) {
  256. for (int i = 0; i < uids.size(); i++) {
  257. stmt.setString(i + 1, uids.get(i));
  258. }
  259. try (ResultSet rs = stmt.executeQuery()) {
  260. while (rs.next()) {
  261. existing.add(rs.getString("uid"));
  262. }
  263. }
  264. }
  265. return existing;
  266. }
  267. private void setStatementParams(PreparedStatement stmt, RecordData data) throws SQLException {
  268. String name = data.cz_user;
  269. // 设置 admin_id
  270. if (StringUtils.isNumeric(name)) {
  271. try {
  272. String adminIdStr = adminService.getId(name);
  273. if (adminIdStr != null && StringUtils.isNumeric(adminIdStr)) {
  274. stmt.setInt(14, Integer.parseInt(adminIdStr));
  275. } else {
  276. stmt.setInt(14, 99999);
  277. }
  278. } catch (Exception e) {
  279. logger.warn("解析 admin_id 失败,cz_user={}", name, e);
  280. stmt.setInt(14, 99999);
  281. }
  282. } else {
  283. stmt.setInt(14, 99999);
  284. }
  285. // refund_type, refund_model
  286. stmt.setString(10, null);
  287. stmt.setNull(11, Types.INTEGER);
  288. // 根据 gtype 设置 type 和 order_code
  289. if (validFourTypes.contains(data.gtype)) {
  290. throw new IllegalArgumentException("不应处理 validFourTypes 类型,应在上层过滤"); // 安全兜底
  291. }
  292. if (validZeroTypes.contains(data.gtype)) {
  293. stmt.setInt(13, 0);
  294. stmt.setInt(20, 0);
  295. stmt.setString(1, "ERPCZ_" + data.orderNumber);
  296. } else if (validOneTypes.contains(data.gtype)) {
  297. stmt.setInt(13, 1);
  298. stmt.setInt(20, 0);
  299. stmt.setString(1, "ERPXF_" + data.orderNumber);
  300. } else if (validTwoTypes.contains(data.gtype)) {
  301. stmt.setInt(13, 2);
  302. stmt.setInt(20, 0);
  303. stmt.setString(1, "ERPTK_" + data.orderNumber);
  304. stmt.setString(10, "退款商品");
  305. stmt.setInt(11, 0);
  306. } else if (validThreeTypes.contains(data.gtype)) {
  307. stmt.setInt(13, 3);
  308. stmt.setNull(20, Types.INTEGER);
  309. stmt.setString(1, "ERPQT_" + data.orderNumber);
  310. }
  311. stmt.setInt(2, data.jwcode);
  312. stmt.setInt(3, data.free + data.core_jb + data.buy_jb);
  313. stmt.setInt(4, data.buy_jb);
  314. if (currentMonth.getValue() >= 7) {
  315. stmt.setInt(5, data.free);
  316. stmt.setInt(6, 0);
  317. } else {
  318. stmt.setInt(5, 0);
  319. stmt.setInt(6, data.free);
  320. }
  321. stmt.setInt(7, data.core_jb);
  322. // pay_platform
  323. String platform = data.operation_platform;
  324. if ("1".equals(platform)) {
  325. stmt.setString(8, "ERP");
  326. } else if ("2".equals(platform)) {
  327. stmt.setString(8, "HomilyLink");
  328. } else if ("3".equals(platform)) {
  329. stmt.setString(8, "HomilyChart");
  330. } else if ("4".equals(platform)) {
  331. throw new IllegalArgumentException("不应处理 platform=4,应在上层过滤");
  332. } else if ("0".equals(platform)) {
  333. stmt.setString(8, "初始化金币");
  334. } else if ("5".equals(platform)){
  335. stmt.setString(8, "DC");
  336. }else {
  337. stmt.setString(8, "其他");
  338. }
  339. stmt.setString(9, data.goods_name);
  340. stmt.setString(12, data.cz_bz);
  341. stmt.setInt(15, 3);
  342. stmt.setTimestamp(16, data.cz_time);
  343. if (data.cz_bz != null && data.cz_bz.contains("测试") && data.cz_bz.contains("员工")) {
  344. stmt.setInt(17, 0);
  345. } else {
  346. stmt.setInt(17, 1);
  347. }
  348. Timestamp now = new Timestamp(System.currentTimeMillis());
  349. stmt.setTimestamp(18, now);
  350. stmt.setTimestamp(19, data.cz_time);
  351. stmt.setString(21, data.uid);
  352. stmt.setString(22, data.linkId);
  353. }
  354. private void updateUserBalance(Connection conn, RecordData data) throws Exception {
  355. logger.info("处理用户余额更新,jwcode={}", data.jwcode);
  356. User user = userService.selectAllUser(String.valueOf(data.jwcode));
  357. BigDecimal freeBD = BigDecimal.valueOf(data.free);
  358. BigDecimal buyJbBD = BigDecimal.valueOf(data.buy_jb);
  359. BigDecimal coreJbBD = BigDecimal.valueOf(data.core_jb);
  360. if (ObjectUtils.isEmpty(user)) {
  361. logger.info("用户不存在,jwcode={}", data.jwcode);
  362. user = new User();
  363. String country = "未知";
  364. String name = "未知";
  365. try {
  366. BaseDES des = new BaseDES();
  367. String desjwcode = des.encrypt(String.valueOf(data.jwcode));
  368. Map<String, String> requestBody = new HashMap<>();
  369. requestBody.put("jwcode", desjwcode);
  370. HttpHeaders headers = new HttpHeaders();
  371. headers.setContentType(MediaType.APPLICATION_JSON);
  372. HttpEntity<Map<String, String>> entity = new HttpEntity<>(requestBody, headers);
  373. ResponseEntity<Map> response = restTemplate.exchange(
  374. "http://hwapi.rzfwq.com/hwjnApp/hwhc-login/hwhclogin/hc/login/clent/info",
  375. HttpMethod.POST, entity, Map.class);
  376. if (response.getStatusCode().is2xxSuccessful()) {
  377. Map<String, Object> responseBody = response.getBody();
  378. if (responseBody != null) {
  379. Map<String, Object> dataMap = (Map<String, Object>) responseBody.get("data");
  380. if (dataMap != null) {
  381. name = (String) dataMap.get("name");
  382. country = getCountryWithDefault(dataMap, "未知");
  383. logger.info("获取用户信息成功: name={}, country={}", name, country);
  384. }
  385. }
  386. }
  387. } catch (Exception e) {
  388. logger.warn("获取用户信息失败,jwcode={}", data.jwcode, e);
  389. country = "未知";
  390. }
  391. String market = marketService.getMarketIdsDao(country);
  392. user.setJwcode(data.jwcode);
  393. user.setName(name);
  394. user.setMarket(market);
  395. userService.addUser(user);
  396. logger.info("用户创建成功,jwcode={}", data.jwcode);
  397. // 重新查询确保数据完整
  398. user = userService.selectAllUser(String.valueOf(data.jwcode));
  399. }
  400. // 更新当前金币
  401. if (currentMonth.getValue() >= 7) {
  402. BigDecimal newFreeJune = user.getCurrentFreeJune().add(freeBD);
  403. if (newFreeJune.compareTo(BigDecimal.ZERO) >= 0) {
  404. user.setCurrentFreeJune(newFreeJune);
  405. } else {
  406. BigDecimal remaining = newFreeJune;
  407. user.setCurrentFreeJune(BigDecimal.ZERO);
  408. user.setCurrentFreeDecember(user.getCurrentFreeDecember().add(remaining));
  409. }
  410. } else {
  411. BigDecimal newFreeDec = user.getCurrentFreeDecember().add(freeBD);
  412. if (newFreeDec.compareTo(BigDecimal.ZERO) >= 0) {
  413. user.setCurrentFreeDecember(newFreeDec);
  414. } else {
  415. BigDecimal remaining = newFreeDec;
  416. user.setCurrentFreeDecember(BigDecimal.ZERO);
  417. user.setCurrentFreeJune(remaining);
  418. }
  419. }
  420. user.setCurrentPermanentGold(user.getCurrentPermanentGold().add(buyJbBD));
  421. user.setCurrentTaskGold(user.getCurrentTaskGold().add(coreJbBD));
  422. // 更新统计字段
  423. if (validZeroTypes.contains(data.gtype)) {
  424. user.setRechargeNum(user.getRechargeNum() + 1);
  425. user.setSumPermanentGold(user.getSumPermanentGold().add(buyJbBD));
  426. user.setSumTaskGold(user.getSumTaskGold().add(coreJbBD));
  427. if (currentMonth.getValue() >= 7) {
  428. user.setSumFreeJune(user.getSumFreeJune().add(freeBD));
  429. } else {
  430. user.setSumFreeDecember(user.getSumFreeDecember().add(freeBD));
  431. }
  432. } else if (validOneTypes.contains(data.gtype)) {
  433. user.setConsumeNum(user.getConsumeNum() + 1);
  434. user.setSumConsumePermanent(user.getSumConsumePermanent().add(buyJbBD));
  435. user.setSumConsumeTask(user.getSumConsumeTask().add(coreJbBD));
  436. user.setSumConsumeFree(user.getSumConsumeFree().add(freeBD));
  437. }
  438. userService.updateAllGold(user);
  439. logger.info("用户余额更新成功,jwcode={}", data.jwcode);
  440. }
  441. private boolean checkRecordExists(Connection conn, String uid) throws SQLException {
  442. String sql = "SELECT 1 FROM user_gold_record WHERE uid = ? LIMIT 1";
  443. try (PreparedStatement stmt = conn.prepareStatement(sql)) {
  444. stmt.setString(1, uid);
  445. try (ResultSet rs = stmt.executeQuery()) {
  446. return rs.next();
  447. }
  448. }
  449. }
  450. private String getCountryWithDefault(Map<String, Object> dataMap, String defaultValue) {
  451. Object countryObj = dataMap.get("treelist");
  452. if (countryObj instanceof String) {
  453. String countryStr = ((String) countryObj).trim();
  454. if (countryStr.isEmpty()) {
  455. return defaultValue;
  456. }
  457. String[] parts = countryStr.split("-");
  458. return parts.length >= 3 ? parts[2] : defaultValue;
  459. }
  460. return defaultValue;
  461. }
  462. }