7 changed files with 180 additions and 8 deletions
-
4src/main/java/com/example/demo/controller/GoldDetailController.java
-
13src/main/java/com/example/demo/exception/SystemException.java
-
3src/main/java/com/example/demo/mapper/GoldDetailMapper.java
-
84src/main/java/com/example/demo/service/listen/AiEmotionExportListener.java
-
69src/main/java/com/example/demo/service/queue/AbstractMessageListener.java
-
10src/main/java/com/example/demo/serviceImpl/GoldDetailServiceImpl.java
-
5src/main/resources/mapper/GoldDetailMapper.xml
@ -0,0 +1,13 @@ |
|||
package com.example.demo.exception; |
|||
|
|||
public class SystemException extends RuntimeException { |
|||
public SystemException(String message) { |
|||
super(message); |
|||
} |
|||
|
|||
|
|||
public SystemException(String message, Throwable cause) { |
|||
super(message, cause); |
|||
} |
|||
|
|||
} |
@ -0,0 +1,84 @@ |
|||
package com.example.demo.service.listen; |
|||
|
|||
import cn.hutool.core.util.StrUtil; |
|||
|
|||
|
|||
import com.example.demo.Util.ExecutionContextUtil; |
|||
import com.example.demo.Util.FeiShuAlertUtil; |
|||
import com.example.demo.Util.RedisUtil; |
|||
import com.example.demo.domain.vo.ExecutionContext; |
|||
import com.example.demo.service.ExportExcelService; |
|||
import com.example.demo.service.queue.AbstractMessageListener; |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import static java.lang.Thread.sleep; |
|||
|
|||
|
|||
@Component |
|||
public class AiEmotionExportListener extends AbstractMessageListener<String> { |
|||
|
|||
|
|||
//注入ExportExcelService |
|||
@Autowired |
|||
private ExportExcelService exportExcelService; |
|||
|
|||
@Autowired |
|||
public AiEmotionExportListener( |
|||
RedisUtil redisQueueUtil |
|||
|
|||
) { |
|||
super(redisQueueUtil, "hwgold:queue:export_queue"); |
|||
System.out.println("监听器已启动,队列: "); |
|||
} |
|||
|
|||
@Override |
|||
protected void handleMessage(String message) { |
|||
if (StrUtil.isBlank(message)) { |
|||
System.err.println("redis消息队列数据为空" + message); |
|||
} |
|||
try { |
|||
Thread.sleep(5000); |
|||
exportExcelService.handleExcelExportData(message); |
|||
} catch (Exception e) { |
|||
logError(e, message); |
|||
throw new RuntimeException("Failed to process AI emotion export: " + e.getMessage(), e); |
|||
} |
|||
|
|||
|
|||
} |
|||
|
|||
private void logError(Exception e, String message) { |
|||
System.err.println("Export data listener exception: " + e.getMessage()); |
|||
e.printStackTrace(); |
|||
try { |
|||
ExecutionContext context = ExecutionContextUtil.getExecutionContext(); |
|||
String cause = ""; |
|||
if (e.getCause() != null) { |
|||
cause = e.getCause().getMessage(); |
|||
} |
|||
FeiShuAlertUtil.sendAlertMessage( |
|||
context, |
|||
e.getStackTrace()[0].getFileName(), |
|||
e.getStackTrace()[0].getLineNumber(), |
|||
"AI Emotion Export Error: " + e.getMessage() + " 底层错误: " + cause , |
|||
"Failed message: " + message |
|||
); |
|||
} catch (Exception alertEx) { |
|||
System.err.println("Failed to send Feishu alert: " + alertEx.getMessage()); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
protected void handleError(Exception e, String message) { |
|||
System.err.println("处理消息失败: " + message); |
|||
e.printStackTrace(); |
|||
} |
|||
|
|||
} |
@ -0,0 +1,69 @@ |
|||
package com.example.demo.service.queue; |
|||
|
|||
|
|||
|
|||
import com.example.demo.Util.RedisUtil; |
|||
import jakarta.annotation.PostConstruct; |
|||
|
|||
|
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
|
|||
/** |
|||
* 抽象消息监听器 |
|||
*/ |
|||
public abstract class AbstractMessageListener<T> { |
|||
|
|||
private final ExecutorService executorService = Executors.newSingleThreadExecutor(); |
|||
protected final RedisUtil redisQueueUtil; |
|||
protected final String queueName; |
|||
|
|||
public AbstractMessageListener(RedisUtil redisQueueUtil, String queueName) { |
|||
this.redisQueueUtil = redisQueueUtil; |
|||
this.queueName = queueName; |
|||
} |
|||
|
|||
@PostConstruct |
|||
public void init() { |
|||
executorService.submit(this::listen); |
|||
} |
|||
|
|||
private void listen() { |
|||
System.out.println("消费者消费数据" + queueName + "<UNK>"); |
|||
while (!Thread.currentThread().isInterrupted()) { |
|||
try { |
|||
Object message = redisQueueUtil.blockingGetMessage(queueName, 1); |
|||
if (message != null) { |
|||
try { |
|||
handleMessage((T) message); |
|||
} catch (Exception e) { |
|||
handleError(e, (T) message); |
|||
} |
|||
} |
|||
} catch (Exception e) { |
|||
System.err.println("监听队列异常: " + e.getMessage()); |
|||
try { |
|||
Thread.sleep(5000); |
|||
} catch (InterruptedException ex) { |
|||
Thread.currentThread().interrupt(); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 处理消息 |
|||
* @param message 消息内容 |
|||
*/ |
|||
protected abstract void handleMessage(T message); |
|||
|
|||
/** |
|||
* 处理错误 |
|||
* @param e 异常 |
|||
* @param message 消息内容 |
|||
*/ |
|||
protected void handleError(Exception e, T message) { |
|||
System.err.println("处理消息异常: " + e.getMessage()); |
|||
} |
|||
|
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue