You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

190 lines
5.8 KiB

  1. /*
  2. package com.example.demo.Util;
  3. import jakarta.annotation.PostConstruct;
  4. import jakarta.annotation.Resource;
  5. import org.springframework.data.redis.connection.RedisConnection;
  6. import org.springframework.data.redis.core.RedisCallback;
  7. import org.springframework.data.redis.core.RedisTemplate;
  8. import org.springframework.data.redis.core.ZSetOperations;
  9. import org.springframework.scheduling.annotation.Scheduled;
  10. import org.springframework.stereotype.Component;
  11. import java.util.Set;
  12. import java.util.concurrent.*;
  13. @Component
  14. public class RedisUtil {
  15. @Resource
  16. private RedisTemplate<String, Object> redisTemplate;
  17. // 线程池用于处理延迟消息
  18. private ExecutorService delayMessageExecutor;
  19. // 延迟消息处理线程数
  20. private static final int DELAY_THREAD_POOL_SIZE = 4;
  21. // 延迟队列轮询间隔(毫秒)
  22. private static final long DELAY_QUEUE_POLL_INTERVAL = 1000L;
  23. @PostConstruct
  24. public void init() {
  25. // 初始化线程池
  26. delayMessageExecutor = Executors.newFixedThreadPool(DELAY_THREAD_POOL_SIZE);
  27. }
  28. */
  29. /**
  30. * 发送消息到队列
  31. * @param queueName 队列名称
  32. * @param message 消息内容
  33. *//*
  34. public void sendMessage(String queueName, Object message) {
  35. redisTemplate.opsForList().rightPush(queueName, message);
  36. }
  37. */
  38. /**
  39. * 阻塞获取消息(优化版增加重试机制)
  40. * @param queueName 队列名称
  41. * @param timeout 超时时间()
  42. * @return 消息内容
  43. *//*
  44. public Object blockingGetMessage(String queueName, long timeout) {
  45. // 分段获取,避免长时间阻塞
  46. long endTime = System.currentTimeMillis() + timeout * 1000;
  47. while (System.currentTimeMillis() < endTime) {
  48. Object message = redisTemplate.opsForList().leftPop(queueName, 1, TimeUnit.SECONDS);
  49. if (message != null) {
  50. return message;
  51. }
  52. }
  53. return null;
  54. }
  55. */
  56. /**
  57. * 非阻塞获取消息
  58. * @param queueName 队列名称
  59. * @return 消息内容
  60. *//*
  61. public Object getMessage(String queueName) {
  62. return redisTemplate.opsForList().leftPop(queueName);
  63. }
  64. */
  65. /**
  66. * 获取队列长度
  67. * @param queueName 队列名称
  68. * @return 队列长度
  69. *//*
  70. public Long getQueueSize(String queueName) {
  71. return redisTemplate.opsForList().size(queueName);
  72. }
  73. */
  74. /**
  75. * 发送延迟消息(优化版)
  76. * @param queueName 队列名称
  77. * @param message 消息内容
  78. * @param delay 延迟时间()
  79. *//*
  80. public void sendDelayMessage(String queueName, Object message, long delay) {
  81. String delayQueueKey = getDelayQueueKey(queueName);
  82. String messageId = generateMessageId();
  83. redisTemplate.execute(new RedisCallback<Object>() {
  84. @Override
  85. public Object doInRedis(RedisConnection connection) {
  86. connection.openPipeline();
  87. // 直接存储消息内容到ZSet的value中
  88. connection.zAdd(
  89. delayQueueKey.getBytes(),
  90. System.currentTimeMillis() + delay * 1000,
  91. serializeMessage(messageId, message)
  92. );
  93. connection.closePipeline();
  94. return null;
  95. }
  96. });
  97. }
  98. */
  99. /**
  100. * 启动延迟消息处理任务
  101. *//*
  102. @Scheduled(fixedRate = DELAY_QUEUE_POLL_INTERVAL)
  103. public void processDelayMessages() {
  104. Set<String> delayQueues = redisTemplate.keys("delay:*");
  105. if (delayQueues != null) {
  106. for (String delayQueue : delayQueues) {
  107. delayMessageExecutor.execute(() -> {
  108. String queueName = delayQueue.substring(6); // 去掉"delay:"前缀
  109. processSingleDelayQueue(queueName);
  110. });
  111. }
  112. }
  113. }
  114. */
  115. /**
  116. * 处理单个延迟队列
  117. *//*
  118. private void processSingleDelayQueue(String queueName) {
  119. String delayQueueKey = getDelayQueueKey(queueName);
  120. long now = System.currentTimeMillis();
  121. // 获取所有已到期的消息
  122. Set<ZSetOperations.TypedTuple<Object>> messages = redisTemplate.opsForZSet()
  123. .rangeByScoreWithScores(delayQueueKey, 0, now);
  124. if (messages != null && !messages.isEmpty()) {
  125. for (ZSetOperations.TypedTuple<Object> tuple : messages) {
  126. Object messageWithId = tuple.getValue();
  127. if (messageWithId != null) {
  128. // 反序列化消息
  129. Object message = deserializeMessage(messageWithId.toString());
  130. // 发送到实际队列
  131. sendMessage(queueName, message);
  132. // 从延迟队列中移除
  133. redisTemplate.opsForZSet().remove(delayQueueKey, messageWithId);
  134. }
  135. }
  136. }
  137. }
  138. // 生成消息ID
  139. private String generateMessageId() {
  140. return java.util.UUID.randomUUID().toString();
  141. }
  142. // 获取延迟队列key
  143. private String getDelayQueueKey(String queueName) {
  144. return "delay:" + queueName;
  145. }
  146. // 序列化消息(可根据实际需求实现)
  147. private byte[] serializeMessage(String messageId, Object message) {
  148. // 这里简单实现,实际项目中可以使用JSON序列化等
  149. return (messageId + ":" + message.toString()).getBytes();
  150. }
  151. // 反序列化消息(可根据实际需求实现)
  152. private Object deserializeMessage(String serialized) {
  153. // 简单实现,根据实际序列化方式调整
  154. int separatorIndex = serialized.indexOf(':');
  155. if (separatorIndex > 0) {
  156. return serialized.substring(separatorIndex + 1);
  157. }
  158. return serialized;
  159. }
  160. }*/