|
|
@ -61,437 +61,209 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
// @Scheduled(cron = "0 0 * * * ?") // 每小时执行一次 |
|
|
|
@Transactional(transactionManager = "mysqlTransactionManager") // 👈 保证插入和用户更新在一个事务 |
|
|
|
public void getSqlserverData() throws Exception { |
|
|
|
logger.info("开始从 SQL Server 同步数据到 MySQL"); |
|
|
|
|
|
|
|
try (Connection sqlServerConn = sqlserver1DataSource.getConnection(); |
|
|
|
Connection mysqlConn = mysql1DataSource.getConnection()) { |
|
|
|
|
|
|
|
logger.info("开始查询数据..."); |
|
|
|
|
|
|
|
// 定义分页参数 |
|
|
|
int pageSize = 100; |
|
|
|
int offset = 0; |
|
|
|
boolean hasMoreData = true; |
|
|
|
|
|
|
|
// 查询 SQL Server 数据的 SQL 语句 |
|
|
|
// 👇 恢复动态时间查询(原注释掉的硬编码时间已移除) |
|
|
|
String querySql = """ |
|
|
|
SELECT |
|
|
|
id, gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name\s |
|
|
|
FROM |
|
|
|
hwhcGold.dbo.user_gold_records |
|
|
|
WHERE cz_time>? |
|
|
|
ORDER BY |
|
|
|
cz_time ASC |
|
|
|
OFFSET ? ROWS FETCH NEXT ? ROWS ONLY; |
|
|
|
"""; |
|
|
|
|
|
|
|
// 插入 MySQL 数据的 SQL 语句 |
|
|
|
String insertSql = "INSERT IGNORE INTO user_gold_record (order_code, jwcode, sum_gold, permanent_gold, free_june, free_december, " + |
|
|
|
"task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id, " + |
|
|
|
"audit_status, create_time, flag, update_time, audit_time, is_refund,uid) " + |
|
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)"; |
|
|
|
SELECT |
|
|
|
id, gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name |
|
|
|
FROM |
|
|
|
hwhcGold.dbo.user_gold_records |
|
|
|
WHERE cz_time > ? |
|
|
|
ORDER BY |
|
|
|
cz_time ASC |
|
|
|
OFFSET ? ROWS FETCH NEXT ? ROWS ONLY; |
|
|
|
"""; |
|
|
|
|
|
|
|
String insertSql = """ |
|
|
|
INSERT IGNORE INTO user_gold_record |
|
|
|
(order_code, jwcode, sum_gold, permanent_gold, free_june, free_december, |
|
|
|
task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id, |
|
|
|
audit_status, create_time, flag, update_time, audit_time, is_refund, uid) |
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
|
"""; |
|
|
|
|
|
|
|
while (hasMoreData) { |
|
|
|
try (PreparedStatement sqlServerStmt = sqlServerConn.prepareStatement(querySql)) { |
|
|
|
sqlServerStmt.setTimestamp(1, Timestamp.valueOf(LocalDateTime.now().minusHours(1))); // 获取最近一小时的数据 |
|
|
|
sqlServerStmt.setInt(2, offset); // 设置 OFFSET |
|
|
|
sqlServerStmt.setInt(3, pageSize); // 设置 FETCH NEXT |
|
|
|
// 👇 恢复动态时间参数 |
|
|
|
sqlServerStmt.setTimestamp(1, Timestamp.valueOf(LocalDateTime.now().minusHours(1))); |
|
|
|
sqlServerStmt.setInt(2, offset); |
|
|
|
sqlServerStmt.setInt(3, pageSize); |
|
|
|
|
|
|
|
ResultSet resultSet = sqlServerStmt.executeQuery(); |
|
|
|
|
|
|
|
// 如果没有数据了,退出循环 |
|
|
|
if (!resultSet.next()) { |
|
|
|
hasMoreData = false; |
|
|
|
logger.info("无更多数据,offset={}", offset); |
|
|
|
break; |
|
|
|
} |
|
|
|
|
|
|
|
logger.info("查询到 {} 条数据", pageSize); |
|
|
|
// 👇 步骤1:收集本批次所有记录 |
|
|
|
List<RecordData> batchRecords = new ArrayList<>(); |
|
|
|
List<String> batchUids = new ArrayList<>(); |
|
|
|
|
|
|
|
try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) { |
|
|
|
do { |
|
|
|
int gtype = resultSet.getInt("gtype"); |
|
|
|
Integer jwcode = resultSet.getInt("jwcode"); |
|
|
|
int free = resultSet.getInt("free"); |
|
|
|
int core_jb = resultSet.getInt("core_jb"); |
|
|
|
int buy_jb = resultSet.getInt("buy_jb"); |
|
|
|
Timestamp created_at = resultSet.getTimestamp("cz_time"); |
|
|
|
String name = resultSet.getString("cz_user"); |
|
|
|
String remark = resultSet.getString("cz_bz"); |
|
|
|
String operation_platform = resultSet.getString("operation_platform"); |
|
|
|
String goods_name = resultSet.getString("goods_name"); |
|
|
|
String uid = resultSet.getString("id"); |
|
|
|
String timestampPart = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")); |
|
|
|
String orderNumber = UUID.randomUUID().toString().replaceAll("-", ""); |
|
|
|
|
|
|
|
if (StringUtils.isNumeric(name)) {; |
|
|
|
System.out.println("name是数字"+ name); |
|
|
|
Integer admin_id = Integer.valueOf(adminService.getId(name)); |
|
|
|
mysqlStmt.setInt(14, admin_id); |
|
|
|
} else { |
|
|
|
mysqlStmt.setInt(14, 99999); |
|
|
|
} |
|
|
|
do { |
|
|
|
RecordData data = extractRecordData(resultSet); // 👈 抽取数据 |
|
|
|
batchRecords.add(data); |
|
|
|
batchUids.add(data.uid); |
|
|
|
} while (resultSet.next()); |
|
|
|
|
|
|
|
Random random = new Random(); |
|
|
|
int randomNumber = random.nextInt(900) + 100; |
|
|
|
logger.info("本批次共 {} 条记录", batchRecords.size()); |
|
|
|
|
|
|
|
// 根据 gtype 设置不同的值 |
|
|
|
mysqlStmt.setString(10, null); |
|
|
|
mysqlStmt.setNull(11, java.sql.Types.INTEGER); |
|
|
|
if (validFourTypes.contains(gtype)) { |
|
|
|
continue; |
|
|
|
} |
|
|
|
if (validZeroTypes.contains(gtype)) { |
|
|
|
mysqlStmt.setInt(13, 0); |
|
|
|
mysqlStmt.setNull(20, java.sql.Types.INTEGER); |
|
|
|
mysqlStmt.setString(1, "ERPCZ_" +orderNumber ); |
|
|
|
} |
|
|
|
if (validOneTypes.contains(gtype)) { |
|
|
|
mysqlStmt.setInt(13, 1); |
|
|
|
mysqlStmt.setInt(20, 0); |
|
|
|
mysqlStmt.setString(1, "ERPXF_" + orderNumber); |
|
|
|
} |
|
|
|
if (validTwoTypes.contains(gtype)) { |
|
|
|
mysqlStmt.setInt(13, 2); |
|
|
|
mysqlStmt.setInt(20, 0); |
|
|
|
mysqlStmt.setString(1, "ERPTK_" +orderNumber); |
|
|
|
mysqlStmt.setString(10, "退款商品"); |
|
|
|
mysqlStmt.setInt(11, 0); |
|
|
|
} |
|
|
|
if (validThreeTypes.contains(gtype)) { |
|
|
|
mysqlStmt.setInt(13, 3); |
|
|
|
mysqlStmt.setNull(20, java.sql.Types.INTEGER); |
|
|
|
mysqlStmt.setString(1, "ERPQT_" +orderNumber); |
|
|
|
} |
|
|
|
// 👇 步骤2:批量查询哪些 uid 已存在(性能优化) |
|
|
|
Set<String> existingUids = getExistingUids(mysqlConn, batchUids); |
|
|
|
logger.info("已存在记录数: {}", existingUids.size()); |
|
|
|
|
|
|
|
mysqlStmt.setInt(2, jwcode); |
|
|
|
mysqlStmt.setInt(3, free + core_jb + buy_jb); |
|
|
|
mysqlStmt.setInt(4, buy_jb); |
|
|
|
if (currentMonth.getValue() >= 7) { |
|
|
|
mysqlStmt.setInt(6, free); |
|
|
|
mysqlStmt.setInt(5, 0); |
|
|
|
} |
|
|
|
if (currentMonth.getValue() < 7) { |
|
|
|
mysqlStmt.setInt(6, 0); |
|
|
|
mysqlStmt.setInt(5, free); |
|
|
|
} |
|
|
|
mysqlStmt.setInt(7, core_jb); |
|
|
|
if ("1".equals(operation_platform)) { |
|
|
|
mysqlStmt.setString(8, "ERP"); |
|
|
|
} else if ("2".equals(operation_platform)) { |
|
|
|
mysqlStmt.setString(8, "HomilyLink"); |
|
|
|
} else if ("3".equals(operation_platform)) { |
|
|
|
mysqlStmt.setString(8, "HomilyChart"); |
|
|
|
} else if ("4".equals(operation_platform)) { |
|
|
|
// 👇 步骤3:准备批量插入 |
|
|
|
try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) { |
|
|
|
|
|
|
|
for (RecordData data : batchRecords) { |
|
|
|
if (validFourTypes.contains(data.gtype)) { |
|
|
|
logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid); |
|
|
|
continue; |
|
|
|
} else if ("0".equals(operation_platform)) { |
|
|
|
mysqlStmt.setString(8, "初始化金币"); |
|
|
|
} else { |
|
|
|
mysqlStmt.setString(8, "其他"); |
|
|
|
} |
|
|
|
mysqlStmt.setString(9, goods_name); |
|
|
|
mysqlStmt.setString(12, remark); |
|
|
|
mysqlStmt.setInt(15, 3); |
|
|
|
mysqlStmt.setTimestamp(16, created_at); |
|
|
|
if (remark != null && remark.contains("测试") && remark.contains("员工")) { |
|
|
|
mysqlStmt.setInt(17, 0); |
|
|
|
} else { |
|
|
|
mysqlStmt.setInt(17, 1); |
|
|
|
if ("4".equals(data.operation_platform)) { |
|
|
|
logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid); |
|
|
|
continue; |
|
|
|
} |
|
|
|
mysqlStmt.setTimestamp(18, created_at); |
|
|
|
mysqlStmt.setTimestamp(19, created_at); |
|
|
|
mysqlStmt.setString(21, uid); |
|
|
|
|
|
|
|
mysqlStmt.addBatch(); |
|
|
|
|
|
|
|
// 更新用户信息逻辑 |
|
|
|
logger.info("查询用户是否存在"); |
|
|
|
User user = userService.selectAllUser(String.valueOf(jwcode)); |
|
|
|
if (ObjectUtils.isEmpty(user)) { |
|
|
|
logger.info("用户不存在"); |
|
|
|
user = new User(); |
|
|
|
List<String> country = Collections.singletonList("未知"); |
|
|
|
BaseDES des = new BaseDES(); |
|
|
|
String desjwcode = des.encrypt(String.valueOf(jwcode)); |
|
|
|
|
|
|
|
// 创建 JSON 请求体 |
|
|
|
Map<String, String> requestBody = new HashMap<>(); |
|
|
|
requestBody.put("jwcode", desjwcode); |
|
|
|
|
|
|
|
// 设置请求头 |
|
|
|
HttpHeaders headers = new HttpHeaders(); |
|
|
|
headers.setContentType(MediaType.APPLICATION_JSON); |
|
|
|
|
|
|
|
// 创建 HttpEntity |
|
|
|
HttpEntity<Map<String, String>> entity = new HttpEntity<>(requestBody, headers); |
|
|
|
|
|
|
|
// 发送 POST 请求 |
|
|
|
try { |
|
|
|
ResponseEntity<Map> response = restTemplate.exchange( |
|
|
|
"http://hwapi.rzfwq.com/hwjnApp/hwhc-login/hwhclogin/hc/login/clent/info", |
|
|
|
HttpMethod.POST, entity, Map.class); |
|
|
|
|
|
|
|
// 检查响应状态码 |
|
|
|
if (response.getStatusCode().is2xxSuccessful()) { |
|
|
|
Map<String, Object> responseBody = response.getBody(); |
|
|
|
if (responseBody != null) { |
|
|
|
// 获取 data 部分 |
|
|
|
Map<String, Object> data = (Map<String, Object>) responseBody.get("data"); |
|
|
|
if (data != null) { |
|
|
|
// 提取 name 和 country |
|
|
|
name = (String) data.get("name"); |
|
|
|
country = (List<String>) data.get("country"); |
|
|
|
|
|
|
|
// 打印获取到的数据 |
|
|
|
System.out.println("Name: " + name); |
|
|
|
System.out.println("Country: " + country); |
|
|
|
} else { |
|
|
|
System.out.println("Data is null"); |
|
|
|
} |
|
|
|
} else { |
|
|
|
System.out.println("Response body is null"); |
|
|
|
} |
|
|
|
} else { |
|
|
|
System.out.println("Request failed with status code: " + response.getStatusCode()); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
System.out.println("Error: " + e.getMessage()); |
|
|
|
// 设置默认的 country 值 |
|
|
|
country = Collections.singletonList("未知"); |
|
|
|
} |
|
|
|
|
|
|
|
String market = String.join(",", marketService.getMarketIds(country)); |
|
|
|
user.setJwcode(jwcode); |
|
|
|
user.setName(name); |
|
|
|
user.setMarket(market); |
|
|
|
logger.info("新添用户"); |
|
|
|
userService.addUser(user); |
|
|
|
logger.info("用户添加成功"); |
|
|
|
user = userService.selectAllUser(String.valueOf(jwcode)); |
|
|
|
// 👇 跳过已存在的记录(避免重复更新用户余额) |
|
|
|
if (existingUids.contains(data.uid)) { |
|
|
|
logger.debug("跳过重复记录,uid={}", data.uid); |
|
|
|
continue; |
|
|
|
} |
|
|
|
|
|
|
|
if (currentMonth.getValue() >= 7) { |
|
|
|
if (user.getCurrentFreeJune().add(BigDecimal.valueOf(free)).compareTo(BigDecimal.ZERO) >= 0) { |
|
|
|
user.setCurrentFreeJune(user.getCurrentFreeJune().add(BigDecimal.valueOf(free))); |
|
|
|
} else { |
|
|
|
BigDecimal remaining = user.getCurrentFreeJune().add(BigDecimal.valueOf(free)); |
|
|
|
user.setCurrentFreeJune(BigDecimal.ZERO); |
|
|
|
user.setCurrentFreeDecember(user.getCurrentFreeDecember().add(remaining)); |
|
|
|
} |
|
|
|
} |
|
|
|
if (currentMonth.getValue() < 7) { |
|
|
|
if (user.getCurrentFreeDecember().add(BigDecimal.valueOf(free)).compareTo(BigDecimal.ZERO) >= 0) { |
|
|
|
user.setCurrentFreeDecember(user.getCurrentFreeDecember().add(BigDecimal.valueOf(free))); |
|
|
|
} else { |
|
|
|
BigDecimal remaining = user.getCurrentFreeDecember().add(BigDecimal.valueOf(free)); |
|
|
|
user.setCurrentFreeDecember(BigDecimal.ZERO); |
|
|
|
user.setCurrentFreeJune(remaining); |
|
|
|
} |
|
|
|
} |
|
|
|
// 👇 设置插入参数 |
|
|
|
setStatementParams(mysqlStmt, data); |
|
|
|
|
|
|
|
user.setCurrentPermanentGold(user.getCurrentPermanentGold().add(BigDecimal.valueOf(buy_jb))); |
|
|
|
user.setCurrentTaskGold(user.getCurrentTaskGold().add(BigDecimal.valueOf(core_jb))); |
|
|
|
if (validZeroTypes.contains(gtype)) { |
|
|
|
user.setRechargeNum(user.getRechargeNum() + 1); |
|
|
|
user.setSumPermanentGold(user.getSumPermanentGold().add(BigDecimal.valueOf(buy_jb))); |
|
|
|
user.setSumTaskGold(user.getSumTaskGold().add(BigDecimal.valueOf(core_jb))); |
|
|
|
if (currentMonth.getValue() >= 7) { |
|
|
|
user.setSumFreeJune(user.getSumFreeJune().add(BigDecimal.valueOf(free))); |
|
|
|
} |
|
|
|
if (currentMonth.getValue() < 7) { |
|
|
|
user.setSumFreeDecember(user.getSumFreeDecember().add(BigDecimal.valueOf(free))); |
|
|
|
} |
|
|
|
} |
|
|
|
if (validOneTypes.contains(gtype)) { |
|
|
|
user.setConsumeNum(user.getConsumeNum() + 1); |
|
|
|
user.setSumConsumePermanent(user.getSumConsumePermanent().add(BigDecimal.valueOf(buy_jb))); |
|
|
|
user.setSumConsumeTask(user.getSumConsumeTask().add(BigDecimal.valueOf(core_jb))); |
|
|
|
user.setSumConsumeFree(user.getSumConsumeFree().add(BigDecimal.valueOf(free))); |
|
|
|
} |
|
|
|
userService.updateAllGold(user); |
|
|
|
mysqlStmt.addBatch(); |
|
|
|
|
|
|
|
} while (resultSet.next()); |
|
|
|
// 👇 只有新记录才更新用户余额(关键修复!) |
|
|
|
updateUserBalance(mysqlConn, data); |
|
|
|
} |
|
|
|
|
|
|
|
// 批量插入 MySQL |
|
|
|
mysqlStmt.executeBatch(); |
|
|
|
logger.info("已成功插入 {} 条数据", pageSize); |
|
|
|
// 👇 执行批量插入 |
|
|
|
int[] results = mysqlStmt.executeBatch(); |
|
|
|
logger.info("成功插入新记录 {} 条", results.length); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 更新分页参数 |
|
|
|
offset += pageSize; |
|
|
|
offset += pageSize; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
logger.info("数据同步完成"); |
|
|
|
logger.info("✅ 数据同步完成"); |
|
|
|
} catch (SQLException e) { |
|
|
|
logger.error("数据连接失败", e.getMessage()); |
|
|
|
throw new RuntimeException("数据链接失败", e); |
|
|
|
logger.error("数据库操作失败", e); |
|
|
|
throw new RuntimeException("同步失败", e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void getSqlserverData2() throws Exception { |
|
|
|
logger.info("开始从 MySQL 同步数据到 MySQL(分页处理)"); |
|
|
|
|
|
|
|
Timestamp startTime = Timestamp.valueOf(LocalDateTime.now().minusHours(1)); |
|
|
|
int pageSize = 100; |
|
|
|
int pageNumber = 0; |
|
|
|
int totalProcessed = 0; |
|
|
|
|
|
|
|
try ( |
|
|
|
|
|
|
|
Connection sourceConn = sqlserver1DataSource.getConnection(); |
|
|
|
Connection targetConn = mysql1DataSource.getConnection() |
|
|
|
) { |
|
|
|
// 关闭自动提交,手动控制事务 |
|
|
|
targetConn.setAutoCommit(false); |
|
|
|
|
|
|
|
// 分页查询 SQL(MySQL 使用 LIMIT offset, size) |
|
|
|
String querySql = """ |
|
|
|
SELECT gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name,flag |
|
|
|
FROM user_gold_records |
|
|
|
where cz_time >'2025-08-10 07:11:04' |
|
|
|
ORDER BY cz_time ASC |
|
|
|
LIMIT ? OFFSET ? |
|
|
|
"""; |
|
|
|
|
|
|
|
String insertSql = """ |
|
|
|
INSERT INTO user_gold_record |
|
|
|
(order_code, jwcode, sum_gold, permanent_gold, free_june, free_december, |
|
|
|
task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id, |
|
|
|
audit_status, create_time, flag, update_time) |
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
|
"""; |
|
|
|
|
|
|
|
try ( |
|
|
|
PreparedStatement queryStmt = sourceConn.prepareStatement(querySql); |
|
|
|
PreparedStatement insertStmt = targetConn.prepareStatement(insertSql) |
|
|
|
) { |
|
|
|
|
|
|
|
while (true) { |
|
|
|
int offset = pageNumber * pageSize; |
|
|
|
|
|
|
|
queryStmt.setInt(1, pageSize); |
|
|
|
queryStmt.setInt(2, offset); |
|
|
|
|
|
|
|
List<RecordData> batchData = new ArrayList<>(); |
|
|
|
try (ResultSet rs = queryStmt.executeQuery()) { |
|
|
|
boolean hasData = false; |
|
|
|
while (rs.next()) { |
|
|
|
hasData = true; |
|
|
|
|
|
|
|
RecordData data = new RecordData(); |
|
|
|
data.gtype = rs.getInt("gtype"); |
|
|
|
data.jwcode = rs.getInt("jwcode"); |
|
|
|
data.free = rs.getInt("free"); |
|
|
|
data.core_jb = rs.getInt("core_jb"); |
|
|
|
data.buy_jb = rs.getInt("buy_jb"); |
|
|
|
data.cz_time = rs.getTimestamp("cz_time"); |
|
|
|
data.cz_user = rs.getString("cz_user"); |
|
|
|
data.cz_bz = rs.getString("cz_bz"); |
|
|
|
data.operation_platform = rs.getString("operation_platform"); |
|
|
|
data.goods_name = rs.getString("goods_name"); |
|
|
|
|
|
|
|
batchData.add(data); |
|
|
|
} |
|
|
|
|
|
|
|
if (!hasData || batchData.isEmpty()) { |
|
|
|
logger.info("无更多数据可同步,共处理 {} 条", totalProcessed); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
static class RecordData { |
|
|
|
int gtype, jwcode, free, core_jb, buy_jb; |
|
|
|
Timestamp cz_time; |
|
|
|
String cz_user, cz_bz, operation_platform, goods_name, uid; |
|
|
|
String orderNumber; // 预生成,避免重复计算 |
|
|
|
} |
|
|
|
|
|
|
|
// 清空上一批 |
|
|
|
insertStmt.clearBatch(); |
|
|
|
private RecordData extractRecordData(ResultSet rs) throws SQLException { |
|
|
|
RecordData data = new RecordData(); |
|
|
|
data.gtype = rs.getInt("gtype"); |
|
|
|
data.jwcode = rs.getInt("jwcode"); |
|
|
|
data.free = rs.getInt("free"); |
|
|
|
data.core_jb = rs.getInt("core_jb"); |
|
|
|
data.buy_jb = rs.getInt("buy_jb"); |
|
|
|
data.cz_time = rs.getTimestamp("cz_time"); |
|
|
|
data.cz_user = rs.getString("cz_user"); |
|
|
|
data.cz_bz = rs.getString("cz_bz"); |
|
|
|
data.operation_platform = rs.getString("operation_platform"); |
|
|
|
data.goods_name = rs.getString("goods_name"); |
|
|
|
data.uid = rs.getString("id"); |
|
|
|
|
|
|
|
// 预生成订单号(避免在循环中重复生成) |
|
|
|
String timestampPart = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")); |
|
|
|
String uuidPart = UUID.randomUUID().toString().replaceAll("-", ""); |
|
|
|
data.orderNumber = timestampPart + "_" + uuidPart; |
|
|
|
|
|
|
|
// 处理当前批次(100条以内) |
|
|
|
for (RecordData data : batchData) { |
|
|
|
buildInsertStatement(insertStmt, data); |
|
|
|
insertStmt.addBatch(); |
|
|
|
} |
|
|
|
return data; |
|
|
|
} |
|
|
|
|
|
|
|
// 执行批量插入 |
|
|
|
insertStmt.executeBatch(); |
|
|
|
targetConn.commit(); |
|
|
|
totalProcessed += batchData.size(); |
|
|
|
logger.info("第 {} 页,插入 {} 条记录,累计 {}", pageNumber, batchData.size(), totalProcessed); |
|
|
|
private Set<String> getExistingUids(Connection conn, List<String> uids) throws SQLException { |
|
|
|
if (uids.isEmpty()) return Collections.emptySet(); |
|
|
|
|
|
|
|
// 更新用户信息(可优化为批量) |
|
|
|
for (RecordData data : batchData) { |
|
|
|
updateUserGold(data); |
|
|
|
} |
|
|
|
String placeholders = String.join(",", Collections.nCopies(uids.size(), "?")); |
|
|
|
String sql = "SELECT uid FROM user_gold_record WHERE uid IN (" + placeholders + ")"; |
|
|
|
|
|
|
|
pageNumber++; |
|
|
|
Set<String> existing = new HashSet<>(); |
|
|
|
try (PreparedStatement stmt = conn.prepareStatement(sql)) { |
|
|
|
for (int i = 0; i < uids.size(); i++) { |
|
|
|
stmt.setString(i + 1, uids.get(i)); |
|
|
|
} |
|
|
|
try (ResultSet rs = stmt.executeQuery()) { |
|
|
|
while (rs.next()) { |
|
|
|
existing.add(rs.getString("uid")); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
targetConn.rollback(); |
|
|
|
logger.error("同步批次失败,事务回滚", e); |
|
|
|
throw e; |
|
|
|
} |
|
|
|
|
|
|
|
logger.info("✅ 数据同步完成,共处理 {} 条记录", totalProcessed); |
|
|
|
|
|
|
|
} catch (SQLException e) { |
|
|
|
logger.error("数据库连接或操作失败", e); |
|
|
|
throw new RuntimeException("MySQL 同步失败", e); |
|
|
|
} |
|
|
|
return existing; |
|
|
|
} |
|
|
|
private void setStatementParams(PreparedStatement stmt, RecordData data) throws SQLException { |
|
|
|
String name = data.cz_user; |
|
|
|
|
|
|
|
// 封装插入逻辑 |
|
|
|
private void buildInsertStatement(PreparedStatement stmt, RecordData data) throws SQLException { |
|
|
|
String timestampPart = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")); |
|
|
|
Random random = new Random(); |
|
|
|
int randomNumber = random.nextInt(900) + 100; |
|
|
|
|
|
|
|
// admin_id |
|
|
|
String czUser = data.cz_user; |
|
|
|
|
|
|
|
// 1. 先判空 |
|
|
|
if (czUser != null && StringUtils.isNumeric(czUser)) { |
|
|
|
// 设置 admin_id |
|
|
|
if (StringUtils.isNumeric(name)) { |
|
|
|
try { |
|
|
|
// 2. 调用服务获取 ID |
|
|
|
String adminIdStr = adminService.getId(czUser); // 可能返回 null 或字符串 |
|
|
|
|
|
|
|
// 3. 再次判空并转为 Integer |
|
|
|
String adminIdStr = adminService.getId(name); |
|
|
|
if (adminIdStr != null && StringUtils.isNumeric(adminIdStr)) { |
|
|
|
Integer admin_id = Integer.parseInt(adminIdStr); // 安全转换 |
|
|
|
stmt.setInt(14, admin_id); |
|
|
|
stmt.setInt(14, Integer.parseInt(adminIdStr)); |
|
|
|
} else { |
|
|
|
// adminService 返回 null 或非数字,视为无效 admin |
|
|
|
stmt.setInt(14, 99999); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
// 防御性兜底 |
|
|
|
logger.warn("解析 admin_id 失败,cz_user={}", czUser, e); |
|
|
|
logger.warn("解析 admin_id 失败,cz_user={}", name, e); |
|
|
|
stmt.setInt(14, 99999); |
|
|
|
} |
|
|
|
} else { |
|
|
|
// cz_user 为 null 或非数字 |
|
|
|
stmt.setInt(14, 99999); |
|
|
|
} |
|
|
|
|
|
|
|
// refund_type, refund_model |
|
|
|
stmt.setString(10, null); |
|
|
|
stmt.setNull(11, Types.INTEGER); |
|
|
|
|
|
|
|
// 根据 gtype 设置 type 和 order_code |
|
|
|
if (validFourTypes.contains(data.gtype)) { |
|
|
|
return; // 跳过 |
|
|
|
throw new IllegalArgumentException("不应处理 validFourTypes 类型,应在上层过滤"); // 安全兜底 |
|
|
|
} |
|
|
|
|
|
|
|
if (validZeroTypes.contains(data.gtype)) { |
|
|
|
stmt.setInt(13, 0); |
|
|
|
stmt.setString(1, "ERPCZ" + timestampPart + randomNumber); |
|
|
|
stmt.setNull(20, Types.INTEGER); |
|
|
|
stmt.setString(1, "ERPCZ_" + data.orderNumber); |
|
|
|
} else if (validOneTypes.contains(data.gtype)) { |
|
|
|
stmt.setInt(13, 1); |
|
|
|
stmt.setString(1, "ERPXF" + timestampPart + randomNumber); |
|
|
|
stmt.setInt(20, 0); |
|
|
|
stmt.setString(1, "ERPXF_" + data.orderNumber); |
|
|
|
} else if (validTwoTypes.contains(data.gtype)) { |
|
|
|
stmt.setInt(13, 2); |
|
|
|
stmt.setString(1, "ERPTK" + timestampPart + randomNumber); |
|
|
|
stmt.setInt(20, 0); |
|
|
|
stmt.setString(1, "ERPTK_" + data.orderNumber); |
|
|
|
stmt.setString(10, "退款商品"); |
|
|
|
stmt.setInt(11, 0); |
|
|
|
} else if (validThreeTypes.contains(data.gtype)) { |
|
|
|
stmt.setInt(13, 3); |
|
|
|
stmt.setString(1, "ERPQT" + timestampPart + randomNumber); |
|
|
|
stmt.setNull(20, Types.INTEGER); |
|
|
|
stmt.setString(1, "ERPQT_" + data.orderNumber); |
|
|
|
} |
|
|
|
|
|
|
|
stmt.setInt(2, data.jwcode); |
|
|
@ -508,6 +280,7 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
|
|
|
|
stmt.setInt(7, data.core_jb); |
|
|
|
|
|
|
|
// pay_platform |
|
|
|
String platform = data.operation_platform; |
|
|
|
if ("1".equals(platform)) { |
|
|
|
stmt.setString(8, "ERP"); |
|
|
@ -516,7 +289,7 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
} else if ("3".equals(platform)) { |
|
|
|
stmt.setString(8, "HomilyChart"); |
|
|
|
} else if ("4".equals(platform)) { |
|
|
|
return; // 跳过这条数据 |
|
|
|
throw new IllegalArgumentException("不应处理 platform=4,应在上层过滤"); |
|
|
|
} else if ("0".equals(platform)) { |
|
|
|
stmt.setString(8, "初始化金币"); |
|
|
|
} else { |
|
|
@ -527,90 +300,102 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
stmt.setString(12, data.cz_bz); |
|
|
|
stmt.setInt(15, 3); |
|
|
|
stmt.setTimestamp(16, data.cz_time); |
|
|
|
stmt.setInt(17, data.flag); |
|
|
|
|
|
|
|
if (data.cz_bz != null && data.cz_bz.contains("测试") && data.cz_bz.contains("员工")) { |
|
|
|
stmt.setInt(17, 0); |
|
|
|
} else { |
|
|
|
stmt.setInt(17, 1); |
|
|
|
} |
|
|
|
|
|
|
|
stmt.setTimestamp(18, data.cz_time); |
|
|
|
stmt.setTimestamp(19, data.cz_time); |
|
|
|
stmt.setString(21, data.uid); |
|
|
|
} |
|
|
|
|
|
|
|
// 更新用户金币逻辑 |
|
|
|
private void updateUserGold(RecordData data) throws Exception { |
|
|
|
private void updateUserBalance(Connection conn, RecordData data) throws Exception { |
|
|
|
logger.info("处理用户余额更新,jwcode={}", data.jwcode); |
|
|
|
|
|
|
|
User user = userService.selectAllUser(String.valueOf(data.jwcode)); |
|
|
|
BigDecimal freeBD = BigDecimal.valueOf(data.free); |
|
|
|
BigDecimal buyJbBD = BigDecimal.valueOf(data.buy_jb); |
|
|
|
BigDecimal coreJbBD = BigDecimal.valueOf(data.core_jb); |
|
|
|
|
|
|
|
if (ObjectUtils.isEmpty(user)) { |
|
|
|
logger.info("用户不存在"); |
|
|
|
logger.info("用户不存在,jwcode={}", data.jwcode); |
|
|
|
user = new User(); |
|
|
|
List<String> country = Collections.singletonList("未知"); |
|
|
|
String name = "未知"; |
|
|
|
BaseDES des = new BaseDES(); |
|
|
|
String desjwcode= des.encrypt(String.valueOf(data.jwcode)); |
|
|
|
// System.out.println("desjwcode:"+desjwcode); |
|
|
|
|
|
|
|
// 创建 JSON 请求体 |
|
|
|
Map<String, String> requestBody = new HashMap<>(); |
|
|
|
requestBody.put("jwcode", desjwcode); |
|
|
|
try { |
|
|
|
BaseDES des = new BaseDES(); |
|
|
|
String desjwcode = des.encrypt(String.valueOf(data.jwcode)); |
|
|
|
|
|
|
|
// 设置请求头 |
|
|
|
HttpHeaders headers = new HttpHeaders(); |
|
|
|
headers.setContentType(MediaType.APPLICATION_JSON); |
|
|
|
Map<String, String> requestBody = new HashMap<>(); |
|
|
|
requestBody.put("jwcode", desjwcode); |
|
|
|
|
|
|
|
// 创建 HttpEntity |
|
|
|
HttpEntity<Map<String, String>> entity = new HttpEntity<>(requestBody, headers); |
|
|
|
HttpHeaders headers = new HttpHeaders(); |
|
|
|
headers.setContentType(MediaType.APPLICATION_JSON); |
|
|
|
HttpEntity<Map<String, String>> entity = new HttpEntity<>(requestBody, headers); |
|
|
|
|
|
|
|
// 发送 POST 请求 |
|
|
|
try { |
|
|
|
ResponseEntity<Map> response = restTemplate.exchange( |
|
|
|
"http://hwapi.rzfwq.com/hwjnApp/hwhc-login/hwhclogin/hc/login/clent/info", |
|
|
|
HttpMethod.POST, entity, Map.class); |
|
|
|
|
|
|
|
// 检查响应状态码 |
|
|
|
if (response.getStatusCode().is2xxSuccessful()) { |
|
|
|
Map<String, Object> responseBody = response.getBody(); |
|
|
|
if (responseBody != null) { |
|
|
|
// 获取data部分 |
|
|
|
Map<String, Object> data1 = (Map<String, Object>) responseBody.get("data"); |
|
|
|
if (data1 != null) { |
|
|
|
// 提取name和country |
|
|
|
name = (String) data1.get("name"); |
|
|
|
country = (List<String>) data1.get("country"); |
|
|
|
|
|
|
|
// 打印获取到的数据 |
|
|
|
System.out.println("Name: " + name); |
|
|
|
System.out.println("Country: " + country); |
|
|
|
} else { |
|
|
|
System.out.println("Data is null"); |
|
|
|
Map<String, Object> dataMap = (Map<String, Object>) responseBody.get("data"); |
|
|
|
if (dataMap != null) { |
|
|
|
name = (String) dataMap.get("name"); |
|
|
|
Object countryObj = dataMap.get("country"); |
|
|
|
if (countryObj instanceof List) { |
|
|
|
country = (List<String>) countryObj; |
|
|
|
} |
|
|
|
logger.info("获取用户信息成功: name={}, country={}", name, country); |
|
|
|
} |
|
|
|
} else { |
|
|
|
System.out.println("Response body is null"); |
|
|
|
} |
|
|
|
} else { |
|
|
|
System.out.println("Request failed with status code: " + response.getStatusCode()); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
System.out.println("Error: " + e.getMessage()); |
|
|
|
// 设置默认的 country 值 |
|
|
|
logger.warn("获取用户信息失败,jwcode={}", data.jwcode, e); |
|
|
|
country = Collections.singletonList("未知"); |
|
|
|
} |
|
|
|
|
|
|
|
String market= String.join(",",marketService.getMarketIds(country)); |
|
|
|
String market = String.join(",", marketService.getMarketIds(country)); |
|
|
|
user.setJwcode(data.jwcode); |
|
|
|
user.setName( name); |
|
|
|
user.setName(name); |
|
|
|
user.setMarket(market); |
|
|
|
logger.info("新添用户"); |
|
|
|
userService.addUser(user); |
|
|
|
logger.info("用户添加成功"); |
|
|
|
logger.info("用户创建成功,jwcode={}", data.jwcode); |
|
|
|
|
|
|
|
// 重新查询确保数据完整 |
|
|
|
user = userService.selectAllUser(String.valueOf(data.jwcode)); |
|
|
|
} |
|
|
|
|
|
|
|
// 更新当前金币 |
|
|
|
if (currentMonth.getValue() >= 7) { |
|
|
|
user.setCurrentFreeJune(user.getCurrentFreeJune().add(freeBD)); |
|
|
|
BigDecimal newFreeJune = user.getCurrentFreeJune().add(freeBD); |
|
|
|
if (newFreeJune.compareTo(BigDecimal.ZERO) >= 0) { |
|
|
|
user.setCurrentFreeJune(newFreeJune); |
|
|
|
} else { |
|
|
|
BigDecimal remaining = newFreeJune; |
|
|
|
user.setCurrentFreeJune(BigDecimal.ZERO); |
|
|
|
user.setCurrentFreeDecember(user.getCurrentFreeDecember().add(remaining)); |
|
|
|
} |
|
|
|
} else { |
|
|
|
user.setCurrentFreeDecember(user.getCurrentFreeDecember().add(freeBD)); |
|
|
|
BigDecimal newFreeDec = user.getCurrentFreeDecember().add(freeBD); |
|
|
|
if (newFreeDec.compareTo(BigDecimal.ZERO) >= 0) { |
|
|
|
user.setCurrentFreeDecember(newFreeDec); |
|
|
|
} else { |
|
|
|
BigDecimal remaining = newFreeDec; |
|
|
|
user.setCurrentFreeDecember(BigDecimal.ZERO); |
|
|
|
user.setCurrentFreeJune(remaining); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
user.setCurrentPermanentGold(user.getCurrentPermanentGold().add(buyJbBD)); |
|
|
|
user.setCurrentTaskGold(user.getCurrentTaskGold().add(coreJbBD)); |
|
|
|
|
|
|
|
// 更新统计字段 |
|
|
|
if (validZeroTypes.contains(data.gtype)) { |
|
|
|
user.setRechargeNum(user.getRechargeNum() + 1); |
|
|
|
user.setSumPermanentGold(user.getSumPermanentGold().add(buyJbBD)); |
|
|
@ -628,12 +413,16 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
} |
|
|
|
|
|
|
|
userService.updateAllGold(user); |
|
|
|
logger.info("用户余额更新成功,jwcode={}", data.jwcode); |
|
|
|
} |
|
|
|
|
|
|
|
// 内部类:封装查询结果 |
|
|
|
static class RecordData { |
|
|
|
int gtype, jwcode, free, core_jb, buy_jb, flag; |
|
|
|
Timestamp cz_time; |
|
|
|
String cz_user, cz_bz, operation_platform, goods_name; |
|
|
|
private boolean checkRecordExists(Connection conn, String uid) throws SQLException { |
|
|
|
String sql = "SELECT 1 FROM user_gold_record WHERE uid = ? LIMIT 1"; |
|
|
|
try (PreparedStatement stmt = conn.prepareStatement(sql)) { |
|
|
|
stmt.setString(1, uid); |
|
|
|
try (ResultSet rs = stmt.executeQuery()) { |
|
|
|
return rs.next(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |