|
|
|
@ -21,6 +21,7 @@ import org.springframework.web.client.RestTemplate; |
|
|
|
import javax.sql.DataSource; |
|
|
|
import java.math.BigDecimal; |
|
|
|
import java.sql.*; |
|
|
|
import java.time.LocalDate; |
|
|
|
import java.time.LocalDateTime; |
|
|
|
import java.time.Month; |
|
|
|
import java.time.format.DateTimeFormatter; |
|
|
|
@ -169,6 +170,114 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Transactional(transactionManager = "mysqlTransactionManager") // 👈 保证插入和用户更新在一个事务 |
|
|
|
public void getSqlserverDataDay() throws Exception { |
|
|
|
logger.info("开始从 SQL Server 同步数据到 MySQL"); |
|
|
|
|
|
|
|
try (Connection sqlServerConn = sqlserver1DataSource.getConnection(); |
|
|
|
Connection mysqlConn = mysql1DataSource.getConnection()) { |
|
|
|
|
|
|
|
logger.info("开始查询数据..."); |
|
|
|
|
|
|
|
int pageSize = 100; |
|
|
|
int offset = 0; |
|
|
|
boolean hasMoreData = true; |
|
|
|
|
|
|
|
// 👇 恢复动态时间查询(原注释掉的硬编码时间已移除) |
|
|
|
String querySql = """ |
|
|
|
SELECT |
|
|
|
id, gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name |
|
|
|
FROM |
|
|
|
hwhcGold.dbo.user_gold_records |
|
|
|
WHERE cz_time >= ? |
|
|
|
ORDER BY |
|
|
|
cz_time ASC |
|
|
|
OFFSET ? ROWS FETCH NEXT ? ROWS ONLY; |
|
|
|
"""; |
|
|
|
|
|
|
|
String insertSql = """ |
|
|
|
INSERT IGNORE INTO user_gold_record |
|
|
|
(order_code, jwcode, sum_gold, permanent_gold, free_june, free_december, |
|
|
|
task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id, |
|
|
|
audit_status, create_time, flag, update_time, audit_time, is_refund, uid) |
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
|
"""; |
|
|
|
|
|
|
|
while (hasMoreData) { |
|
|
|
try (PreparedStatement sqlServerStmt = sqlServerConn.prepareStatement(querySql)) { |
|
|
|
// 👇 恢复动态时间参数 |
|
|
|
sqlServerStmt.setTimestamp(1, Timestamp.valueOf(LocalDate.now().minusDays(1).atStartOfDay())); |
|
|
|
sqlServerStmt.setInt(2, offset); |
|
|
|
sqlServerStmt.setInt(3, pageSize); |
|
|
|
|
|
|
|
ResultSet resultSet = sqlServerStmt.executeQuery(); |
|
|
|
|
|
|
|
if (!resultSet.next()) { |
|
|
|
logger.info("无更多数据,offset={}", offset); |
|
|
|
break; |
|
|
|
} |
|
|
|
|
|
|
|
// 👇 步骤1:收集本批次所有记录 |
|
|
|
List<RecordData> batchRecords = new ArrayList<>(); |
|
|
|
List<String> batchUids = new ArrayList<>(); |
|
|
|
|
|
|
|
do { |
|
|
|
RecordData data = extractRecordData(resultSet); // 👈 抽取数据 |
|
|
|
batchRecords.add(data); |
|
|
|
batchUids.add(data.uid); |
|
|
|
} while (resultSet.next()); |
|
|
|
|
|
|
|
logger.info("本批次共 {} 条记录", batchRecords.size()); |
|
|
|
|
|
|
|
// 👇 步骤2:批量查询哪些 uid 已存在(性能优化) |
|
|
|
Set<String> existingUids = getExistingUids(mysqlConn, batchUids); |
|
|
|
logger.info("已存在记录数: {}", existingUids.size()); |
|
|
|
|
|
|
|
// 👇 步骤3:准备批量插入 |
|
|
|
try (PreparedStatement mysqlStmt = mysqlConn.prepareStatement(insertSql)) { |
|
|
|
|
|
|
|
for (RecordData data : batchRecords) { |
|
|
|
if (validFourTypes.contains(data.gtype)) { |
|
|
|
logger.debug("跳过 validFourTypes 类型记录,gtype={}, uid={}", data.gtype, data.uid); |
|
|
|
continue; |
|
|
|
} |
|
|
|
if ("4".equals(data.operation_platform)) { |
|
|
|
logger.debug("跳过 operation_platform=4 的记录,uid={}", data.uid); |
|
|
|
continue; |
|
|
|
} |
|
|
|
// 👇 跳过已存在的记录(避免重复更新用户余额) |
|
|
|
if (existingUids.contains(data.uid)) { |
|
|
|
logger.debug("跳过重复记录,uid={}", data.uid); |
|
|
|
continue; |
|
|
|
} |
|
|
|
|
|
|
|
// 👇 设置插入参数 |
|
|
|
setStatementParams(mysqlStmt, data); |
|
|
|
|
|
|
|
mysqlStmt.addBatch(); |
|
|
|
|
|
|
|
// 👇 只有新记录才更新用户余额(关键修复!) |
|
|
|
updateUserBalance(mysqlConn, data); |
|
|
|
} |
|
|
|
|
|
|
|
// 👇 执行批量插入 |
|
|
|
int[] results = mysqlStmt.executeBatch(); |
|
|
|
logger.info("成功插入新记录 {} 条", results.length); |
|
|
|
} |
|
|
|
|
|
|
|
offset += pageSize; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
logger.info("✅ 数据同步完成"); |
|
|
|
} catch (SQLException e) { |
|
|
|
logger.error("数据库操作失败", e); |
|
|
|
throw new RuntimeException("同步失败", e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
static class RecordData { |
|
|
|
int gtype, jwcode, free, core_jb, buy_jb; |
|
|
|
Timestamp cz_time; |
|
|
|
|