10 changed files with 285 additions and 2 deletions
-
175src/main/java/com/example/demo/Util/RedisUtil.java
-
15src/main/java/com/example/demo/controller/GoldDetailController.java
-
33src/main/java/com/example/demo/domain/entity/Export.java
-
3src/main/java/com/example/demo/domain/entity/User.java
-
2src/main/java/com/example/demo/domain/vo/GoldDetail.java
-
2src/main/java/com/example/demo/domain/vo/Page.java
-
4src/main/java/com/example/demo/mapper/GoldDetailMapper.java
-
4src/main/java/com/example/demo/service/GoldDetailService.java
-
13src/main/java/com/example/demo/serviceImpl/GoldDetailServiceImpl.java
-
36src/main/resources/mapper/GoldDetailMapper.xml
@ -0,0 +1,175 @@ |
|||||
|
package com.example.demo.Util; |
||||
|
|
||||
|
import jakarta.annotation.PostConstruct; |
||||
|
import jakarta.annotation.Resource; |
||||
|
import org.springframework.data.redis.connection.RedisConnection; |
||||
|
import org.springframework.data.redis.core.RedisCallback; |
||||
|
import org.springframework.data.redis.core.RedisTemplate; |
||||
|
import org.springframework.data.redis.core.ZSetOperations; |
||||
|
import org.springframework.scheduling.annotation.Scheduled; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
|
||||
|
import java.util.Set; |
||||
|
import java.util.concurrent.*; |
||||
|
|
||||
|
@Component |
||||
|
public class RedisUtil { |
||||
|
|
||||
|
@Resource |
||||
|
private RedisTemplate<String, Object> redisTemplate; |
||||
|
|
||||
|
// 线程池用于处理延迟消息 |
||||
|
private ExecutorService delayMessageExecutor; |
||||
|
|
||||
|
// 延迟消息处理线程数 |
||||
|
private static final int DELAY_THREAD_POOL_SIZE = 4; |
||||
|
|
||||
|
// 延迟队列轮询间隔(毫秒) |
||||
|
private static final long DELAY_QUEUE_POLL_INTERVAL = 1000L; |
||||
|
|
||||
|
@PostConstruct |
||||
|
public void init() { |
||||
|
// 初始化线程池 |
||||
|
delayMessageExecutor = Executors.newFixedThreadPool(DELAY_THREAD_POOL_SIZE); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送消息到队列 |
||||
|
* @param queueName 队列名称 |
||||
|
* @param message 消息内容 |
||||
|
*/ |
||||
|
public void sendMessage(String queueName, Object message) { |
||||
|
redisTemplate.opsForList().rightPush(queueName, message); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 阻塞获取消息(优化版,增加重试机制) |
||||
|
* @param queueName 队列名称 |
||||
|
* @param timeout 超时时间(秒) |
||||
|
* @return 消息内容 |
||||
|
*/ |
||||
|
public Object blockingGetMessage(String queueName, long timeout) { |
||||
|
// 分段获取,避免长时间阻塞 |
||||
|
long endTime = System.currentTimeMillis() + timeout * 1000; |
||||
|
while (System.currentTimeMillis() < endTime) { |
||||
|
Object message = redisTemplate.opsForList().leftPop(queueName, 1, TimeUnit.SECONDS); |
||||
|
if (message != null) { |
||||
|
return message; |
||||
|
} |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 非阻塞获取消息 |
||||
|
* @param queueName 队列名称 |
||||
|
* @return 消息内容 |
||||
|
*/ |
||||
|
public Object getMessage(String queueName) { |
||||
|
return redisTemplate.opsForList().leftPop(queueName); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 获取队列长度 |
||||
|
* @param queueName 队列名称 |
||||
|
* @return 队列长度 |
||||
|
*/ |
||||
|
public Long getQueueSize(String queueName) { |
||||
|
return redisTemplate.opsForList().size(queueName); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送延迟消息(优化版) |
||||
|
* @param queueName 队列名称 |
||||
|
* @param message 消息内容 |
||||
|
* @param delay 延迟时间(秒) |
||||
|
*/ |
||||
|
public void sendDelayMessage(String queueName, Object message, long delay) { |
||||
|
String delayQueueKey = getDelayQueueKey(queueName); |
||||
|
String messageId = generateMessageId(); |
||||
|
|
||||
|
redisTemplate.execute(new RedisCallback<Object>() { |
||||
|
@Override |
||||
|
public Object doInRedis(RedisConnection connection) { |
||||
|
connection.openPipeline(); |
||||
|
// 直接存储消息内容到ZSet的value中 |
||||
|
connection.zAdd( |
||||
|
delayQueueKey.getBytes(), |
||||
|
System.currentTimeMillis() + delay * 1000, |
||||
|
serializeMessage(messageId, message) |
||||
|
); |
||||
|
connection.closePipeline(); |
||||
|
return null; |
||||
|
} |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 启动延迟消息处理任务 |
||||
|
*/ |
||||
|
@Scheduled(fixedRate = DELAY_QUEUE_POLL_INTERVAL) |
||||
|
public void processDelayMessages() { |
||||
|
Set<String> delayQueues = redisTemplate.keys("delay:*"); |
||||
|
if (delayQueues != null) { |
||||
|
for (String delayQueue : delayQueues) { |
||||
|
delayMessageExecutor.execute(() -> { |
||||
|
String queueName = delayQueue.substring(6); // 去掉"delay:"前缀 |
||||
|
processSingleDelayQueue(queueName); |
||||
|
}); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 处理单个延迟队列 |
||||
|
*/ |
||||
|
private void processSingleDelayQueue(String queueName) { |
||||
|
String delayQueueKey = getDelayQueueKey(queueName); |
||||
|
long now = System.currentTimeMillis(); |
||||
|
|
||||
|
// 获取所有已到期的消息 |
||||
|
Set<ZSetOperations.TypedTuple<Object>> messages = redisTemplate.opsForZSet() |
||||
|
.rangeByScoreWithScores(delayQueueKey, 0, now); |
||||
|
|
||||
|
if (messages != null && !messages.isEmpty()) { |
||||
|
for (ZSetOperations.TypedTuple<Object> tuple : messages) { |
||||
|
Object messageWithId = tuple.getValue(); |
||||
|
if (messageWithId != null) { |
||||
|
// 反序列化消息 |
||||
|
Object message = deserializeMessage(messageWithId.toString()); |
||||
|
// 发送到实际队列 |
||||
|
sendMessage(queueName, message); |
||||
|
// 从延迟队列中移除 |
||||
|
redisTemplate.opsForZSet().remove(delayQueueKey, messageWithId); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// 生成消息ID |
||||
|
private String generateMessageId() { |
||||
|
return java.util.UUID.randomUUID().toString(); |
||||
|
} |
||||
|
|
||||
|
// 获取延迟队列key |
||||
|
private String getDelayQueueKey(String queueName) { |
||||
|
return "delay:" + queueName; |
||||
|
} |
||||
|
|
||||
|
// 序列化消息(可根据实际需求实现) |
||||
|
private byte[] serializeMessage(String messageId, Object message) { |
||||
|
// 这里简单实现,实际项目中可以使用JSON序列化等 |
||||
|
return (messageId + ":" + message.toString()).getBytes(); |
||||
|
} |
||||
|
|
||||
|
// 反序列化消息(可根据实际需求实现) |
||||
|
private Object deserializeMessage(String serialized) { |
||||
|
// 简单实现,根据实际序列化方式调整 |
||||
|
int separatorIndex = serialized.indexOf(':'); |
||||
|
if (separatorIndex > 0) { |
||||
|
return serialized.substring(separatorIndex + 1); |
||||
|
} |
||||
|
return serialized; |
||||
|
} |
||||
|
} |
@ -0,0 +1,33 @@ |
|||||
|
package com.example.demo.domain.entity; |
||||
|
|
||||
|
import com.fasterxml.jackson.annotation.JsonFormat; |
||||
|
import lombok.Data; |
||||
|
import lombok.NoArgsConstructor; |
||||
|
import org.springframework.format.annotation.DateTimeFormat; |
||||
|
|
||||
|
import java.util.Date; |
||||
|
|
||||
|
/** |
||||
|
* @program: GOLD |
||||
|
* @ClassName Export |
||||
|
* @description: |
||||
|
* @author: huangqizhen |
||||
|
* @create: 2025−06-24 16:17 |
||||
|
* @Version 1.0 |
||||
|
**/ |
||||
|
@Data |
||||
|
@NoArgsConstructor |
||||
|
public class Export { |
||||
|
private String token; |
||||
|
private Integer account; |
||||
|
private String url; |
||||
|
private String fileName; |
||||
|
private Byte type; |
||||
|
private Byte state; |
||||
|
private Integer dataNum; |
||||
|
private String reason; |
||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "Asia/Shanghai") |
||||
|
private Date createTime; |
||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "Asia/Shanghai") |
||||
|
private Date updateTime; |
||||
|
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue