|
|
@ -4,6 +4,7 @@ package com.example.demo.Mysql; |
|
|
|
import com.example.demo.Util.BaseDES; |
|
|
|
import com.example.demo.domain.entity.User; |
|
|
|
import com.example.demo.service.coin.AdminService; |
|
|
|
import com.example.demo.service.coin.MarketService; |
|
|
|
import com.example.demo.service.coin.UserService; |
|
|
|
import org.apache.commons.lang3.ObjectUtils; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
@ -27,7 +28,7 @@ import java.util.*; |
|
|
|
|
|
|
|
|
|
|
|
@Service |
|
|
|
@Transactional |
|
|
|
|
|
|
|
public class MysqlServiceImpl implements MysqlService { |
|
|
|
@Autowired |
|
|
|
private RestTemplate restTemplate; |
|
|
@ -43,6 +44,8 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
private AdminService adminService; |
|
|
|
@Autowired |
|
|
|
private UserService userService; |
|
|
|
@Autowired |
|
|
|
private MarketService marketService; |
|
|
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
@ -59,10 +62,10 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Scheduled(cron = "0 0 * * * ?") // 每小时执行一次 |
|
|
|
// @Scheduled(cron = "0 0 * * * ?") // 每小时执行一次 |
|
|
|
public void getSqlserverData() throws Exception { |
|
|
|
logger.info("开始从 SQL Server 同步数据到 MySQL"); |
|
|
|
try (Connection sqlServerConn = sqlserver1DataSource.getConnection(); |
|
|
|
try (Connection sqlServerConn = mysql1DataSource.getConnection(); |
|
|
|
Connection mysqlConn = mysql1DataSource.getConnection()) { |
|
|
|
logger.info("开始查询数据..."); |
|
|
|
// 从 SQL Server 查询数据 |
|
|
@ -177,7 +180,7 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
if(ObjectUtils.isEmpty(user)){ |
|
|
|
logger.info("用户不存在"); |
|
|
|
user = new User(); |
|
|
|
String country = "未知"; |
|
|
|
List<String> country = Collections.singletonList("未知"); |
|
|
|
BaseDES des = new BaseDES(); |
|
|
|
String desjwcode= des.encrypt(String.valueOf(jwcode)); |
|
|
|
// System.out.println("desjwcode:"+desjwcode); |
|
|
@ -208,7 +211,7 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
if (data != null) { |
|
|
|
// 提取name和country |
|
|
|
name = (String) data.get("name"); |
|
|
|
country = (String) data.get("country"); |
|
|
|
country = (List<String>) data.get("country"); |
|
|
|
|
|
|
|
// 打印获取到的数据 |
|
|
|
System.out.println("Name: " + name); |
|
|
@ -225,12 +228,13 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
} catch (Exception e) { |
|
|
|
System.out.println("Error: " + e.getMessage()); |
|
|
|
// 设置默认的 country 值 |
|
|
|
country = "未知"; |
|
|
|
country = Collections.singletonList("未知"); |
|
|
|
} |
|
|
|
|
|
|
|
String market= String.join(",",marketService.getMarketIds(country)); |
|
|
|
user.setJwcode(jwcode); |
|
|
|
user.setName( name); |
|
|
|
user.setMarket(country); |
|
|
|
user.setMarket(market); |
|
|
|
logger.info("新添用户"); |
|
|
|
userService.addUser(user); |
|
|
|
logger.info("用户添加成功"); |
|
|
@ -275,4 +279,311 @@ public class MysqlServiceImpl implements MysqlService { |
|
|
|
throw new RuntimeException("数据链接失败", e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void getSqlserverData2() throws Exception { |
|
|
|
logger.info("开始从 MySQL 同步数据到 MySQL(分页处理)"); |
|
|
|
|
|
|
|
Timestamp startTime = Timestamp.valueOf(LocalDateTime.now().minusHours(1)); |
|
|
|
int pageSize = 100; |
|
|
|
int pageNumber = 0; |
|
|
|
int totalProcessed = 0; |
|
|
|
|
|
|
|
try ( |
|
|
|
|
|
|
|
Connection sourceConn = mysql1DataSource.getConnection(); |
|
|
|
Connection targetConn = mysql1DataSource.getConnection() |
|
|
|
) { |
|
|
|
// 关闭自动提交,手动控制事务 |
|
|
|
targetConn.setAutoCommit(false); |
|
|
|
|
|
|
|
// 分页查询 SQL(MySQL 使用 LIMIT offset, size) |
|
|
|
String querySql = """ |
|
|
|
SELECT gtype, jwcode, free, core_jb, buy_jb, cz_time, cz_user, cz_bz, operation_platform, goods_name,flag |
|
|
|
FROM user_gold_records |
|
|
|
where cz_time >'2025-08-10 07:11:04' |
|
|
|
ORDER BY cz_time ASC |
|
|
|
LIMIT ? OFFSET ? |
|
|
|
"""; |
|
|
|
|
|
|
|
String insertSql = """ |
|
|
|
INSERT INTO user_gold_record |
|
|
|
(order_code, jwcode, sum_gold, permanent_gold, free_june, free_december, |
|
|
|
task_gold, pay_platform, goods_name, refund_type, refund_model, remark, type, admin_id, |
|
|
|
audit_status, create_time, flag, update_time) |
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
|
"""; |
|
|
|
|
|
|
|
try ( |
|
|
|
PreparedStatement queryStmt = sourceConn.prepareStatement(querySql); |
|
|
|
PreparedStatement insertStmt = targetConn.prepareStatement(insertSql) |
|
|
|
) { |
|
|
|
|
|
|
|
while (true) { |
|
|
|
int offset = pageNumber * pageSize; |
|
|
|
|
|
|
|
queryStmt.setInt(1, pageSize); |
|
|
|
queryStmt.setInt(2, offset); |
|
|
|
|
|
|
|
List<RecordData> batchData = new ArrayList<>(); |
|
|
|
try (ResultSet rs = queryStmt.executeQuery()) { |
|
|
|
boolean hasData = false; |
|
|
|
while (rs.next()) { |
|
|
|
hasData = true; |
|
|
|
|
|
|
|
RecordData data = new RecordData(); |
|
|
|
data.gtype = rs.getInt("gtype"); |
|
|
|
data.jwcode = rs.getInt("jwcode"); |
|
|
|
data.free = rs.getInt("free"); |
|
|
|
data.core_jb = rs.getInt("core_jb"); |
|
|
|
data.buy_jb = rs.getInt("buy_jb"); |
|
|
|
data.cz_time = rs.getTimestamp("cz_time"); |
|
|
|
data.cz_user = rs.getString("cz_user"); |
|
|
|
data.cz_bz = rs.getString("cz_bz"); |
|
|
|
data.operation_platform = rs.getString("operation_platform"); |
|
|
|
data.goods_name = rs.getString("goods_name"); |
|
|
|
|
|
|
|
batchData.add(data); |
|
|
|
} |
|
|
|
|
|
|
|
if (!hasData || batchData.isEmpty()) { |
|
|
|
logger.info("无更多数据可同步,共处理 {} 条", totalProcessed); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 清空上一批 |
|
|
|
insertStmt.clearBatch(); |
|
|
|
|
|
|
|
// 处理当前批次(100条以内) |
|
|
|
for (RecordData data : batchData) { |
|
|
|
buildInsertStatement(insertStmt, data); |
|
|
|
insertStmt.addBatch(); |
|
|
|
} |
|
|
|
|
|
|
|
// 执行批量插入 |
|
|
|
insertStmt.executeBatch(); |
|
|
|
targetConn.commit(); |
|
|
|
totalProcessed += batchData.size(); |
|
|
|
logger.info("第 {} 页,插入 {} 条记录,累计 {}", pageNumber, batchData.size(), totalProcessed); |
|
|
|
|
|
|
|
// 更新用户信息(可优化为批量) |
|
|
|
for (RecordData data : batchData) { |
|
|
|
updateUserGold(data); |
|
|
|
} |
|
|
|
|
|
|
|
pageNumber++; |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
targetConn.rollback(); |
|
|
|
logger.error("同步批次失败,事务回滚", e); |
|
|
|
throw e; |
|
|
|
} |
|
|
|
|
|
|
|
logger.info("✅ 数据同步完成,共处理 {} 条记录", totalProcessed); |
|
|
|
|
|
|
|
} catch (SQLException e) { |
|
|
|
logger.error("数据库连接或操作失败", e); |
|
|
|
throw new RuntimeException("MySQL 同步失败", e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 封装插入逻辑 |
|
|
|
private void buildInsertStatement(PreparedStatement stmt, RecordData data) throws SQLException { |
|
|
|
String timestampPart = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")); |
|
|
|
Random random = new Random(); |
|
|
|
int randomNumber = random.nextInt(900) + 100; |
|
|
|
|
|
|
|
// admin_id |
|
|
|
String czUser = data.cz_user; |
|
|
|
|
|
|
|
// 1. 先判空 |
|
|
|
if (czUser != null && StringUtils.isNumeric(czUser)) { |
|
|
|
try { |
|
|
|
// 2. 调用服务获取 ID |
|
|
|
String adminIdStr = adminService.getId(czUser); // 可能返回 null 或字符串 |
|
|
|
|
|
|
|
// 3. 再次判空并转为 Integer |
|
|
|
if (adminIdStr != null && StringUtils.isNumeric(adminIdStr)) { |
|
|
|
Integer admin_id = Integer.parseInt(adminIdStr); // 安全转换 |
|
|
|
stmt.setInt(14, admin_id); |
|
|
|
} else { |
|
|
|
// adminService 返回 null 或非数字,视为无效 admin |
|
|
|
stmt.setInt(14, 99999); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
// 防御性兜底 |
|
|
|
logger.warn("解析 admin_id 失败,cz_user={}", czUser, e); |
|
|
|
stmt.setInt(14, 99999); |
|
|
|
} |
|
|
|
} else { |
|
|
|
// cz_user 为 null 或非数字 |
|
|
|
stmt.setInt(14, 99999); |
|
|
|
} |
|
|
|
|
|
|
|
stmt.setString(10, null); |
|
|
|
stmt.setNull(11, Types.INTEGER); |
|
|
|
|
|
|
|
if (validFourTypes.contains(data.gtype)) { |
|
|
|
return; // 跳过 |
|
|
|
} |
|
|
|
|
|
|
|
if (validZeroTypes.contains(data.gtype)) { |
|
|
|
stmt.setInt(13, 0); |
|
|
|
stmt.setString(1, "ERPCZ" + timestampPart + randomNumber); |
|
|
|
} else if (validOneTypes.contains(data.gtype)) { |
|
|
|
stmt.setInt(13, 1); |
|
|
|
stmt.setString(1, "ERPXF" + timestampPart + randomNumber); |
|
|
|
} else if (validTwoTypes.contains(data.gtype)) { |
|
|
|
stmt.setInt(13, 2); |
|
|
|
stmt.setString(1, "ERPTK" + timestampPart + randomNumber); |
|
|
|
stmt.setString(10, "退款商品"); |
|
|
|
stmt.setInt(11, 0); |
|
|
|
} else if (validThreeTypes.contains(data.gtype)) { |
|
|
|
stmt.setInt(13, 3); |
|
|
|
stmt.setString(1, "ERPQT" + timestampPart + randomNumber); |
|
|
|
} |
|
|
|
|
|
|
|
stmt.setInt(2, data.jwcode); |
|
|
|
stmt.setInt(3, data.free + data.core_jb + data.buy_jb); |
|
|
|
stmt.setInt(4, data.buy_jb); |
|
|
|
|
|
|
|
if (currentMonth.getValue() >= 7) { |
|
|
|
stmt.setInt(5, data.free); |
|
|
|
stmt.setInt(6, 0); |
|
|
|
} else { |
|
|
|
stmt.setInt(5, 0); |
|
|
|
stmt.setInt(6, data.free); |
|
|
|
} |
|
|
|
|
|
|
|
stmt.setInt(7, data.core_jb); |
|
|
|
|
|
|
|
String platform = data.operation_platform; |
|
|
|
if ("1".equals(platform)) { |
|
|
|
stmt.setString(8, "ERP"); |
|
|
|
} else if ("2".equals(platform)) { |
|
|
|
stmt.setString(8, "HomilyLink"); |
|
|
|
} else if ("3".equals(platform)) { |
|
|
|
stmt.setString(8, "HomilyChart"); |
|
|
|
} else if ("4".equals(platform)) { |
|
|
|
return; // 跳过这条数据 |
|
|
|
} else if ("0".equals(platform)) { |
|
|
|
stmt.setString(8, "初始化金币"); |
|
|
|
} else { |
|
|
|
stmt.setString(8, "其他"); |
|
|
|
} |
|
|
|
|
|
|
|
stmt.setString(9, data.goods_name); |
|
|
|
stmt.setString(12, data.cz_bz); |
|
|
|
stmt.setInt(15, 3); |
|
|
|
stmt.setTimestamp(16, data.cz_time); |
|
|
|
stmt.setInt(17, data.flag); |
|
|
|
stmt.setTimestamp(18, data.cz_time); |
|
|
|
} |
|
|
|
|
|
|
|
// 更新用户金币逻辑 |
|
|
|
private void updateUserGold(RecordData data) throws Exception { |
|
|
|
User user = userService.selectAllUser(String.valueOf(data.jwcode)); |
|
|
|
BigDecimal freeBD = BigDecimal.valueOf(data.free); |
|
|
|
BigDecimal buyJbBD = BigDecimal.valueOf(data.buy_jb); |
|
|
|
BigDecimal coreJbBD = BigDecimal.valueOf(data.core_jb); |
|
|
|
|
|
|
|
if (ObjectUtils.isEmpty(user)) { |
|
|
|
logger.info("用户不存在"); |
|
|
|
user = new User(); |
|
|
|
List<String> country = Collections.singletonList("未知"); |
|
|
|
String name = "未知"; |
|
|
|
BaseDES des = new BaseDES(); |
|
|
|
String desjwcode= des.encrypt(String.valueOf(data.jwcode)); |
|
|
|
// System.out.println("desjwcode:"+desjwcode); |
|
|
|
|
|
|
|
// 创建 JSON 请求体 |
|
|
|
Map<String, String> requestBody = new HashMap<>(); |
|
|
|
requestBody.put("jwcode", desjwcode); |
|
|
|
|
|
|
|
// 设置请求头 |
|
|
|
HttpHeaders headers = new HttpHeaders(); |
|
|
|
headers.setContentType(MediaType.APPLICATION_JSON); |
|
|
|
|
|
|
|
// 创建 HttpEntity |
|
|
|
HttpEntity<Map<String, String>> entity = new HttpEntity<>(requestBody, headers); |
|
|
|
|
|
|
|
// 发送 POST 请求 |
|
|
|
try { |
|
|
|
ResponseEntity<Map> response = restTemplate.exchange( |
|
|
|
"http://hwapi.rzfwq.com/hwjnApp/hwhc-login/hwhclogin/hc/login/clent/info", |
|
|
|
HttpMethod.POST, entity, Map.class); |
|
|
|
|
|
|
|
// 检查响应状态码 |
|
|
|
if (response.getStatusCode().is2xxSuccessful()) { |
|
|
|
Map<String, Object> responseBody = response.getBody(); |
|
|
|
if (responseBody != null) { |
|
|
|
// 获取data部分 |
|
|
|
Map<String, Object> data1 = (Map<String, Object>) responseBody.get("data"); |
|
|
|
if (data1 != null) { |
|
|
|
// 提取name和country |
|
|
|
name = (String) data1.get("name"); |
|
|
|
country = (List<String>) data1.get("country"); |
|
|
|
|
|
|
|
// 打印获取到的数据 |
|
|
|
System.out.println("Name: " + name); |
|
|
|
System.out.println("Country: " + country); |
|
|
|
} else { |
|
|
|
System.out.println("Data is null"); |
|
|
|
} |
|
|
|
} else { |
|
|
|
System.out.println("Response body is null"); |
|
|
|
} |
|
|
|
} else { |
|
|
|
System.out.println("Request failed with status code: " + response.getStatusCode()); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
System.out.println("Error: " + e.getMessage()); |
|
|
|
// 设置默认的 country 值 |
|
|
|
country = Collections.singletonList("未知"); |
|
|
|
} |
|
|
|
|
|
|
|
String market= String.join(",",marketService.getMarketIds(country)); |
|
|
|
user.setJwcode(data.jwcode); |
|
|
|
user.setName( name); |
|
|
|
user.setMarket(market); |
|
|
|
logger.info("新添用户"); |
|
|
|
userService.addUser(user); |
|
|
|
logger.info("用户添加成功"); |
|
|
|
user = userService.selectAllUser(String.valueOf(data.jwcode)); |
|
|
|
} |
|
|
|
|
|
|
|
if (currentMonth.getValue() >= 7) { |
|
|
|
user.setCurrentFreeJune(user.getCurrentFreeJune().add(freeBD)); |
|
|
|
} else { |
|
|
|
user.setCurrentFreeDecember(user.getCurrentFreeDecember().add(freeBD)); |
|
|
|
} |
|
|
|
user.setCurrentPermanentGold(user.getCurrentPermanentGold().add(buyJbBD)); |
|
|
|
user.setCurrentTaskGold(user.getCurrentTaskGold().add(coreJbBD)); |
|
|
|
|
|
|
|
if (validZeroTypes.contains(data.gtype)) { |
|
|
|
user.setRechargeNum(user.getRechargeNum() + 1); |
|
|
|
user.setSumPermanentGold(user.getSumPermanentGold().add(buyJbBD)); |
|
|
|
user.setSumTaskGold(user.getSumTaskGold().add(coreJbBD)); |
|
|
|
if (currentMonth.getValue() >= 7) { |
|
|
|
user.setSumFreeJune(user.getSumFreeJune().add(freeBD)); |
|
|
|
} else { |
|
|
|
user.setSumFreeDecember(user.getSumFreeDecember().add(freeBD)); |
|
|
|
} |
|
|
|
} else if (validOneTypes.contains(data.gtype)) { |
|
|
|
user.setConsumeNum(user.getConsumeNum() + 1); |
|
|
|
user.setSumConsumePermanent(user.getSumConsumePermanent().add(buyJbBD)); |
|
|
|
user.setSumConsumeTask(user.getSumConsumeTask().add(coreJbBD)); |
|
|
|
user.setSumConsumeFree(user.getSumConsumeFree().add(freeBD)); |
|
|
|
} |
|
|
|
|
|
|
|
userService.updateAllGold(user); |
|
|
|
} |
|
|
|
|
|
|
|
// 内部类:封装查询结果 |
|
|
|
static class RecordData { |
|
|
|
int gtype, jwcode, free, core_jb, buy_jb, flag; |
|
|
|
Timestamp cz_time; |
|
|
|
String cz_user, cz_bz, operation_platform, goods_name; |
|
|
|
} |
|
|
|
} |