From e4935be7da7abdff5dcf11bcdf0e1c2e6bdca94e Mon Sep 17 00:00:00 2001 From: huangqizhen Date: Fri, 10 Jan 2025 18:57:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8E=E7=AB=AF=E6=95=B4=E5=90=88=E6=A8=A1?= =?UTF-8?q?=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 8 +- .../example/demo/controller/ConsumeController.java | 8 +- .../com/example/demo/controller/KafkaConfig.java | 18 +-- .../com/example/demo/controller/KafkaConsumer.java | 152 ++++++++++----------- .../com/example/demo/controller/KafkaProducer.java | 32 ++--- .../demo/controller/RechargeController.java | 8 +- .../example/demo/serviceImpl/AdminServiceImpl.java | 1 - src/main/resources/application.yml | 14 +- 8 files changed, 120 insertions(+), 121 deletions(-) diff --git a/pom.xml b/pom.xml index e813b18..3335764 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ demo - 21 + 1.8 @@ -96,9 +96,9 @@ true - com.mysql - mysql-connector-j - runtime + mysql + mysql-connector-java + 5.1.46 org.projectlombok diff --git a/src/main/java/com/example/demo/controller/ConsumeController.java b/src/main/java/com/example/demo/controller/ConsumeController.java index 8172131..62bbbde 100644 --- a/src/main/java/com/example/demo/controller/ConsumeController.java +++ b/src/main/java/com/example/demo/controller/ConsumeController.java @@ -3,7 +3,7 @@ package com.example.demo.controller; import com.example.demo.domain.entity.Detail; import com.example.demo.domain.entity.DetailY; import com.example.demo.domain.vo.*; -import com.example.demo.controller.KafkaProducer; +//import com.example.demo.controller.KafkaProducer; import com.example.demo.serviceImpl.ConsumeServiceImpl; import com.example.demo.serviceImpl.UserServiceImpl; import com.fasterxml.jackson.databind.ObjectMapper; @@ -28,8 +28,8 @@ public class ConsumeController { @Autowired private UserServiceImpl userService; - @Autowired - private KafkaProducer kafkaProducer; +// @Autowired +// private KafkaProducer kafkaProducer; @PostMapping("/add") public Result add(@RequestBody DetailY detailY) throws Exception { @@ -39,7 +39,7 @@ public class ConsumeController { String detailJson = objectMapper.writeValueAsString(detailY); // 发送消息到 Kafka 队列 - kafkaProducer.sendMessage("consume-topic", detailJson); +// kafkaProducer.sendMessage("consume-topic", detailJson); return Result.success("Request added to Kafka queue"); } catch (Exception e) { diff --git a/src/main/java/com/example/demo/controller/KafkaConfig.java b/src/main/java/com/example/demo/controller/KafkaConfig.java index 4dd1d94..cff361d 100644 --- a/src/main/java/com/example/demo/controller/KafkaConfig.java +++ b/src/main/java/com/example/demo/controller/KafkaConfig.java @@ -1,9 +1,9 @@ -package com.example.demo.controller; - -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; - -@Configuration -@EnableKafka -public class KafkaConfig { -} +//package com.example.demo.controller; +// +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +// +//@Configuration +//@EnableKafka +//public class KafkaConfig { +//} diff --git a/src/main/java/com/example/demo/controller/KafkaConsumer.java b/src/main/java/com/example/demo/controller/KafkaConsumer.java index 373a98a..5f40597 100644 --- a/src/main/java/com/example/demo/controller/KafkaConsumer.java +++ b/src/main/java/com/example/demo/controller/KafkaConsumer.java @@ -1,76 +1,76 @@ -package com.example.demo.controller; - -import com.example.demo.domain.entity.Detail; -import com.example.demo.domain.entity.DetailY; -import com.example.demo.domain.entity.Recharge; -import com.example.demo.serviceImpl.ConsumeServiceImpl; -import com.example.demo.sevice.RechargeService; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -import java.io.IOException; -import java.util.List; - -@Service -@Slf4j -public class KafkaConsumer { - - @Autowired - private ConsumeServiceImpl consumeService; - - @Autowired - private RechargeService rechargeService; - - private final ObjectMapper objectMapper = new ObjectMapper(); - - @KafkaListener(topics = "rechargeadd_topic") - public void listenRechargeadd(String message) { - try { - // 反序列化为List - List rechargeList = objectMapper.readValue(message, new TypeReference>() {}); - - // 遍历并处理每个充值记录 - for (Recharge recharge : rechargeList) { - processRecharge(recharge); - } - } catch (IOException e) { - // 处理反序列化异常 - e.printStackTrace(); - } - } - - private void processRecharge(Recharge recharge) { - // 具体的业务处理逻辑 - System.out.println("Processing recharge: " + recharge); - } - - @KafkaListener(topics = "consume-topic", groupId = "my-group") - public void listenConsume(String message) { - try { - DetailY detailY = objectMapper.readValue(message, DetailY.class); - - // 处理消费请求 - Integer result = consumeService.insert(detailY); - log.info("Processed consume request with result: {}", result); - } catch (Exception e) { - log.error("Error processing consume message: {}", message, e); - } - } - - @KafkaListener(topics = "recharge-topic", groupId = "my-group") - public void listenRecharge(String message) { - try { - Recharge recharge = objectMapper.readValue(message, Recharge.class); - - // 处理充值请求 - rechargeService.add(recharge); - log.info("Processed recharge request with id: {}", recharge.getRechargeId()); - } catch (Exception e) { - log.error("Error processing recharge message: {}", message, e); - } - } -} +//package com.example.demo.controller; +// +//import com.example.demo.domain.entity.Detail; +//import com.example.demo.domain.entity.DetailY; +//import com.example.demo.domain.entity.Recharge; +//import com.example.demo.serviceImpl.ConsumeServiceImpl; +//import com.example.demo.sevice.RechargeService; +//import com.fasterxml.jackson.core.type.TypeReference; +//import com.fasterxml.jackson.databind.ObjectMapper; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.stereotype.Service; +// +//import java.io.IOException; +//import java.util.List; +// +//@Service +//@Slf4j +//public class KafkaConsumer { +// +// @Autowired +// private ConsumeServiceImpl consumeService; +// +// @Autowired +// private RechargeService rechargeService; +// +// private final ObjectMapper objectMapper = new ObjectMapper(); +// +// @KafkaListener(topics = "rechargeadd_topic") +// public void listenRechargeadd(String message) { +// try { +// // 反序列化为List +// List rechargeList = objectMapper.readValue(message, new TypeReference>() {}); +// +// // 遍历并处理每个充值记录 +// for (Recharge recharge : rechargeList) { +// processRecharge(recharge); +// } +// } catch (IOException e) { +// // 处理反序列化异常 +// e.printStackTrace(); +// } +// } +// +// private void processRecharge(Recharge recharge) { +// // 具体的业务处理逻辑 +// System.out.println("Processing recharge: " + recharge); +// } +// +// @KafkaListener(topics = "consume-topic", groupId = "my-group") +// public void listenConsume(String message) { +// try { +// DetailY detailY = objectMapper.readValue(message, DetailY.class); +// +// // 处理消费请求 +// Integer result = consumeService.insert(detailY); +// log.info("Processed consume request with result: {}", result); +// } catch (Exception e) { +// log.error("Error processing consume message: {}", message, e); +// } +// } +// +// @KafkaListener(topics = "recharge-topic", groupId = "my-group") +// public void listenRecharge(String message) { +// try { +// Recharge recharge = objectMapper.readValue(message, Recharge.class); +// +// // 处理充值请求 +// rechargeService.add(recharge); +// log.info("Processed recharge request with id: {}", recharge.getRechargeId()); +// } catch (Exception e) { +// log.error("Error processing recharge message: {}", message, e); +// } +// } +//} diff --git a/src/main/java/com/example/demo/controller/KafkaProducer.java b/src/main/java/com/example/demo/controller/KafkaProducer.java index 3893bb1..23015cc 100644 --- a/src/main/java/com/example/demo/controller/KafkaProducer.java +++ b/src/main/java/com/example/demo/controller/KafkaProducer.java @@ -1,16 +1,16 @@ -package com.example.demo.controller; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Service; - -@Service -public class KafkaProducer { - - @Autowired - private KafkaTemplate kafkaTemplate; - - public void sendMessage(String topic, String message) { - kafkaTemplate.send(topic, message); - } -} +//package com.example.demo.controller; +// +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.stereotype.Service; +// +//@Service +//public class KafkaProducer { +// +// @Autowired +// private KafkaTemplate kafkaTemplate; +// +// public void sendMessage(String topic, String message) { +// kafkaTemplate.send(topic, message); +// } +//} diff --git a/src/main/java/com/example/demo/controller/RechargeController.java b/src/main/java/com/example/demo/controller/RechargeController.java index 1c808b1..8adb899 100644 --- a/src/main/java/com/example/demo/controller/RechargeController.java +++ b/src/main/java/com/example/demo/controller/RechargeController.java @@ -3,7 +3,7 @@ package com.example.demo.controller; import com.example.demo.domain.entity.Recharge; import com.example.demo.domain.vo.Page; import com.example.demo.domain.vo.Result; -import com.example.demo.controller.KafkaProducer; +//import com.example.demo.controller.KafkaProducer; import com.example.demo.sevice.RechargeService; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; @@ -29,7 +29,7 @@ public class RechargeController { private final RechargeService rechargeService; private final ObjectMapper objectMapper; // 添加这一行 @Autowired - private KafkaProducer kafkaProducer; +// private KafkaProducer kafkaProducer; @PostMapping("/addmore") public Result add(@RequestBody Map requestBody) { try { @@ -61,7 +61,7 @@ public class RechargeController { String rechargeJson = objectMapper.writeValueAsString(recharges); // 发送消息到 Kafka 队列 - kafkaProducer.sendMessage("rechargeadd-topic", rechargeJson); +// kafkaProducer.sendMessage("rechargeadd-topic", rechargeJson); return rechargeService.addRecharges(recharges); } catch (Exception e) { @@ -79,7 +79,7 @@ public class RechargeController { String rechargeJson = objectMapper.writeValueAsString(recharge); // 发送消息到 Kafka 队列 - kafkaProducer.sendMessage("recharge-topic", rechargeJson); +// kafkaProducer.sendMessage("recharge-topic", rechargeJson); return Result.success("Request added to Kafka queue"); } catch (Exception e) { diff --git a/src/main/java/com/example/demo/serviceImpl/AdminServiceImpl.java b/src/main/java/com/example/demo/serviceImpl/AdminServiceImpl.java index bd2d6d5..004cfaa 100644 --- a/src/main/java/com/example/demo/serviceImpl/AdminServiceImpl.java +++ b/src/main/java/com/example/demo/serviceImpl/AdminServiceImpl.java @@ -13,7 +13,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheEvict; -import org.springframework.cache.annotation.CachePut; import org.springframework.cache.annotation.Cacheable; import org.springframework.security.authentication.AuthenticationManager; import org.springframework.security.authentication.BadCredentialsException; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d4b04c7..34171fa 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -9,16 +9,16 @@ spring: datasource: hikari: # HikariCP连接池配置 - maximum-pool-size: 10 # 最大连接数 + maximum-pool-size: 20 # 最大连接数 minimum-idle: 5 # 最小空闲连接数 idle-timeout: 30000 # 空闲连接超时时间(毫秒) max-lifetime: 1800000 # 连接最大存活时间(毫秒) connection-timeout: 30000 # 连接超时时间(毫秒) pool-name: HwgoldHikariCP # 连接池名称 - driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://39.101.133.168/hwgold?serverTimezone=GMT%2b8 + driver-class-name: com.mysql.jdbc.Driver + url: jdbc:mysql://39.101.133.168:3306/hwgold?characterEncoding=utf8&useSSL=false&serverTimezone=UTC username: hwgold - password: 'AGX4Z3YMxiCG3GR2' + password: AGX4Z3YMxiCG3GR2 application: name: demo cache: @@ -29,7 +29,7 @@ spring: cache-null-values: true data: redis: - host: 127.0.0.1 + host: 39.99.159.73 port: 6379 timeout: 1000 lettuce: @@ -39,7 +39,7 @@ spring: max-idle: 5 # 连接池中最大空闲连接 min-idle: 0 # 连接池中最小空闲连接 kafka: - bootstrap-servers: localhost:9092 +# bootstrap-servers: 39.99.159.73:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer @@ -57,4 +57,4 @@ mybatis: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl upload: - path: D:/gold1/ + path: /home/java/haiwaiyanfa/gold1/