You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

189 lines
5.7 KiB

5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
5 days ago
  1. package com.example.demo.Util;
  2. import jakarta.annotation.PostConstruct;
  3. import jakarta.annotation.Resource;
  4. import org.springframework.data.redis.connection.RedisConnection;
  5. import org.springframework.data.redis.core.RedisCallback;
  6. import org.springframework.data.redis.core.RedisTemplate;
  7. import org.springframework.data.redis.core.ZSetOperations;
  8. import org.springframework.scheduling.annotation.Scheduled;
  9. import org.springframework.stereotype.Component;
  10. import java.util.Set;
  11. import java.util.concurrent.*;
  12. @Component
  13. public class RedisUtil {
  14. @Resource
  15. private RedisTemplate<String, Object> redisTemplate;
  16. // 线程池用于处理延迟消息
  17. private ExecutorService delayMessageExecutor;
  18. // 延迟消息处理线程数
  19. private static final int DELAY_THREAD_POOL_SIZE = 4;
  20. // 延迟队列轮询间隔(毫秒)
  21. private static final long DELAY_QUEUE_POLL_INTERVAL = 1000L;
  22. @PostConstruct
  23. public void init() {
  24. // 初始化线程池
  25. delayMessageExecutor = Executors.newFixedThreadPool(DELAY_THREAD_POOL_SIZE);
  26. }
  27. /**
  28. * 发送消息到队列
  29. * @param queueName 队列名称
  30. * @param message 消息内容
  31. */
  32. public void sendMessage(String queueName, Object message) {
  33. redisTemplate.opsForList().rightPush(queueName, message);
  34. }
  35. /**
  36. * 阻塞获取消息(优化版增加重试机制)
  37. * @param queueName 队列名称
  38. * @param timeout 超时时间()
  39. * @return 消息内容
  40. */
  41. public Object blockingGetMessage(String queueName, long timeout) {
  42. // 分段获取,避免长时间阻塞
  43. long endTime = System.currentTimeMillis() + timeout * 1000;
  44. while (System.currentTimeMillis() < endTime) {
  45. Object message = redisTemplate.opsForList().leftPop(queueName, 1, TimeUnit.SECONDS);
  46. if (message != null) {
  47. return message;
  48. }
  49. }
  50. return null;
  51. }
  52. /**
  53. * 非阻塞获取消息
  54. * @param queueName 队列名称
  55. * @return 消息内容
  56. */
  57. public Object getMessage(String queueName) {
  58. return redisTemplate.opsForList().leftPop(queueName);
  59. }
  60. /**
  61. * 获取队列长度
  62. * @param queueName 队列名称
  63. * @return 队列长度
  64. */
  65. public Long getQueueSize(String queueName) {
  66. return redisTemplate.opsForList().size(queueName);
  67. }
  68. /**
  69. * 发送延迟消息(优化版)
  70. * @param queueName 队列名称
  71. * @param message 消息内容
  72. * @param delay 延迟时间()
  73. */
  74. public void sendDelayMessage(String queueName, Object message, long delay) {
  75. String delayQueueKey = getDelayQueueKey(queueName);
  76. String messageId = generateMessageId();
  77. redisTemplate.execute(new RedisCallback<Object>() {
  78. @Override
  79. public Object doInRedis(RedisConnection connection) {
  80. connection.openPipeline();
  81. // 直接存储消息内容到ZSet的value中
  82. connection.zAdd(
  83. delayQueueKey.getBytes(),
  84. System.currentTimeMillis() + delay * 1000,
  85. serializeMessage(messageId, message)
  86. );
  87. connection.closePipeline();
  88. return null;
  89. }
  90. });
  91. }
  92. /**
  93. * 启动延迟消息处理任务
  94. */
  95. @Scheduled(fixedRate = DELAY_QUEUE_POLL_INTERVAL)
  96. public void processDelayMessages() {
  97. Set<String> delayQueues = redisTemplate.keys("delay:*");
  98. if (delayQueues != null) {
  99. for (String delayQueue : delayQueues) {
  100. delayMessageExecutor.execute(() -> {
  101. String queueName = delayQueue.substring(6); // 去掉"delay:"前缀
  102. processSingleDelayQueue(queueName);
  103. });
  104. }
  105. }
  106. }
  107. /**
  108. * 处理单个延迟队列
  109. */
  110. private void processSingleDelayQueue(String queueName) {
  111. String delayQueueKey = getDelayQueueKey(queueName);
  112. long now = System.currentTimeMillis();
  113. // 获取所有已到期的消息
  114. Set<ZSetOperations.TypedTuple<Object>> messages = redisTemplate.opsForZSet()
  115. .rangeByScoreWithScores(delayQueueKey, 0, now);
  116. if (messages != null && !messages.isEmpty()) {
  117. for (ZSetOperations.TypedTuple<Object> tuple : messages) {
  118. Object messageWithId = tuple.getValue();
  119. if (messageWithId != null) {
  120. // 反序列化消息
  121. Object message = deserializeMessage(messageWithId.toString());
  122. // 发送到实际队列
  123. sendMessage(queueName, message);
  124. // 从延迟队列中移除
  125. redisTemplate.opsForZSet().remove(delayQueueKey, messageWithId);
  126. }
  127. }
  128. }
  129. }
  130. // 生成消息ID
  131. private String generateMessageId() {
  132. return java.util.UUID.randomUUID().toString();
  133. }
  134. // 获取延迟队列key
  135. private String getDelayQueueKey(String queueName) {
  136. return "delay:" + queueName;
  137. }
  138. // 序列化消息(可根据实际需求实现)
  139. private byte[] serializeMessage(String messageId, Object message) {
  140. // 这里简单实现,实际项目中可以使用JSON序列化等
  141. return (messageId + ":" + message.toString()).getBytes();
  142. }
  143. // 反序列化消息(可根据实际需求实现)
  144. private Object deserializeMessage(String serialized) {
  145. // 简单实现,根据实际序列化方式调整
  146. int separatorIndex = serialized.indexOf(':');
  147. if (separatorIndex > 0) {
  148. return serialized.substring(separatorIndex + 1);
  149. }
  150. return serialized;
  151. }
  152. }