8 changed files with 120 additions and 121 deletions
-
8pom.xml
-
8src/main/java/com/example/demo/controller/ConsumeController.java
-
18src/main/java/com/example/demo/controller/KafkaConfig.java
-
152src/main/java/com/example/demo/controller/KafkaConsumer.java
-
32src/main/java/com/example/demo/controller/KafkaProducer.java
-
8src/main/java/com/example/demo/controller/RechargeController.java
-
1src/main/java/com/example/demo/serviceImpl/AdminServiceImpl.java
-
14src/main/resources/application.yml
@ -1,9 +1,9 @@ |
|||
package com.example.demo.controller; |
|||
|
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.kafka.annotation.EnableKafka; |
|||
|
|||
@Configuration |
|||
@EnableKafka |
|||
public class KafkaConfig { |
|||
} |
|||
//package com.example.demo.controller; |
|||
// |
|||
//import org.springframework.context.annotation.Configuration; |
|||
//import org.springframework.kafka.annotation.EnableKafka; |
|||
// |
|||
//@Configuration |
|||
//@EnableKafka |
|||
//public class KafkaConfig { |
|||
//} |
@ -1,76 +1,76 @@ |
|||
package com.example.demo.controller; |
|||
|
|||
import com.example.demo.domain.entity.Detail; |
|||
import com.example.demo.domain.entity.DetailY; |
|||
import com.example.demo.domain.entity.Recharge; |
|||
import com.example.demo.serviceImpl.ConsumeServiceImpl; |
|||
import com.example.demo.sevice.RechargeService; |
|||
import com.fasterxml.jackson.core.type.TypeReference; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.kafka.annotation.KafkaListener; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.List; |
|||
|
|||
@Service |
|||
@Slf4j |
|||
public class KafkaConsumer { |
|||
|
|||
@Autowired |
|||
private ConsumeServiceImpl consumeService; |
|||
|
|||
@Autowired |
|||
private RechargeService rechargeService; |
|||
|
|||
private final ObjectMapper objectMapper = new ObjectMapper(); |
|||
|
|||
@KafkaListener(topics = "rechargeadd_topic") |
|||
public void listenRechargeadd(String message) { |
|||
try { |
|||
// 反序列化为List<Recharge> |
|||
List<Recharge> rechargeList = objectMapper.readValue(message, new TypeReference<List<Recharge>>() {}); |
|||
|
|||
// 遍历并处理每个充值记录 |
|||
for (Recharge recharge : rechargeList) { |
|||
processRecharge(recharge); |
|||
} |
|||
} catch (IOException e) { |
|||
// 处理反序列化异常 |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
|
|||
private void processRecharge(Recharge recharge) { |
|||
// 具体的业务处理逻辑 |
|||
System.out.println("Processing recharge: " + recharge); |
|||
} |
|||
|
|||
@KafkaListener(topics = "consume-topic", groupId = "my-group") |
|||
public void listenConsume(String message) { |
|||
try { |
|||
DetailY detailY = objectMapper.readValue(message, DetailY.class); |
|||
|
|||
// 处理消费请求 |
|||
Integer result = consumeService.insert(detailY); |
|||
log.info("Processed consume request with result: {}", result); |
|||
} catch (Exception e) { |
|||
log.error("Error processing consume message: {}", message, e); |
|||
} |
|||
} |
|||
|
|||
@KafkaListener(topics = "recharge-topic", groupId = "my-group") |
|||
public void listenRecharge(String message) { |
|||
try { |
|||
Recharge recharge = objectMapper.readValue(message, Recharge.class); |
|||
|
|||
// 处理充值请求 |
|||
rechargeService.add(recharge); |
|||
log.info("Processed recharge request with id: {}", recharge.getRechargeId()); |
|||
} catch (Exception e) { |
|||
log.error("Error processing recharge message: {}", message, e); |
|||
} |
|||
} |
|||
} |
|||
//package com.example.demo.controller; |
|||
// |
|||
//import com.example.demo.domain.entity.Detail; |
|||
//import com.example.demo.domain.entity.DetailY; |
|||
//import com.example.demo.domain.entity.Recharge; |
|||
//import com.example.demo.serviceImpl.ConsumeServiceImpl; |
|||
//import com.example.demo.sevice.RechargeService; |
|||
//import com.fasterxml.jackson.core.type.TypeReference; |
|||
//import com.fasterxml.jackson.databind.ObjectMapper; |
|||
//import lombok.extern.slf4j.Slf4j; |
|||
//import org.springframework.beans.factory.annotation.Autowired; |
|||
//import org.springframework.kafka.annotation.KafkaListener; |
|||
//import org.springframework.stereotype.Service; |
|||
// |
|||
//import java.io.IOException; |
|||
//import java.util.List; |
|||
// |
|||
//@Service |
|||
//@Slf4j |
|||
//public class KafkaConsumer { |
|||
// |
|||
// @Autowired |
|||
// private ConsumeServiceImpl consumeService; |
|||
// |
|||
// @Autowired |
|||
// private RechargeService rechargeService; |
|||
// |
|||
// private final ObjectMapper objectMapper = new ObjectMapper(); |
|||
// |
|||
// @KafkaListener(topics = "rechargeadd_topic") |
|||
// public void listenRechargeadd(String message) { |
|||
// try { |
|||
// // 反序列化为List<Recharge> |
|||
// List<Recharge> rechargeList = objectMapper.readValue(message, new TypeReference<List<Recharge>>() {}); |
|||
// |
|||
// // 遍历并处理每个充值记录 |
|||
// for (Recharge recharge : rechargeList) { |
|||
// processRecharge(recharge); |
|||
// } |
|||
// } catch (IOException e) { |
|||
// // 处理反序列化异常 |
|||
// e.printStackTrace(); |
|||
// } |
|||
// } |
|||
// |
|||
// private void processRecharge(Recharge recharge) { |
|||
// // 具体的业务处理逻辑 |
|||
// System.out.println("Processing recharge: " + recharge); |
|||
// } |
|||
// |
|||
// @KafkaListener(topics = "consume-topic", groupId = "my-group") |
|||
// public void listenConsume(String message) { |
|||
// try { |
|||
// DetailY detailY = objectMapper.readValue(message, DetailY.class); |
|||
// |
|||
// // 处理消费请求 |
|||
// Integer result = consumeService.insert(detailY); |
|||
// log.info("Processed consume request with result: {}", result); |
|||
// } catch (Exception e) { |
|||
// log.error("Error processing consume message: {}", message, e); |
|||
// } |
|||
// } |
|||
// |
|||
// @KafkaListener(topics = "recharge-topic", groupId = "my-group") |
|||
// public void listenRecharge(String message) { |
|||
// try { |
|||
// Recharge recharge = objectMapper.readValue(message, Recharge.class); |
|||
// |
|||
// // 处理充值请求 |
|||
// rechargeService.add(recharge); |
|||
// log.info("Processed recharge request with id: {}", recharge.getRechargeId()); |
|||
// } catch (Exception e) { |
|||
// log.error("Error processing recharge message: {}", message, e); |
|||
// } |
|||
// } |
|||
//} |
@ -1,16 +1,16 @@ |
|||
package com.example.demo.controller; |
|||
|
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.kafka.core.KafkaTemplate; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
@Service |
|||
public class KafkaProducer { |
|||
|
|||
@Autowired |
|||
private KafkaTemplate<String, String> kafkaTemplate; |
|||
|
|||
public void sendMessage(String topic, String message) { |
|||
kafkaTemplate.send(topic, message); |
|||
} |
|||
} |
|||
//package com.example.demo.controller; |
|||
// |
|||
//import org.springframework.beans.factory.annotation.Autowired; |
|||
//import org.springframework.kafka.core.KafkaTemplate; |
|||
//import org.springframework.stereotype.Service; |
|||
// |
|||
//@Service |
|||
//public class KafkaProducer { |
|||
// |
|||
// @Autowired |
|||
// private KafkaTemplate<String, String> kafkaTemplate; |
|||
// |
|||
// public void sendMessage(String topic, String message) { |
|||
// kafkaTemplate.send(topic, message); |
|||
// } |
|||
//} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue