package com.lh.service; import com.lh.bean.Candidate; import com.lh.exception.MyException; import com.lh.mapper.CandidatesMapper; import com.lh.mapper.VoterMapper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service @EnableKafka public class VoteConsumer { @Autowired private VoterMapper voterMapper; @Autowired private CandidatesMapper candidatesMapper; @Autowired private StringRedisTemplate redisTemplate; @KafkaListener(topics = "vote-topic", groupId = "vote-group") public void handleVoteMessage(ConsumerRecord record) throws MyException { String message = record.value(); // 将消息分割为投票信息 String[] parts = message.split(","); String voterJwcode = parts[0]; String candidateJwcode = parts[1]; String voterName = parts[2]; String voteTime = parts[3]; // 处理投票 processVote(voterJwcode, candidateJwcode, voterName, voteTime); } private boolean processVote(String voterJwcode, String candidateJwcode, String voterName, String voteTime) throws MyException { // 获取候选人信息 Candidate candidate = candidatesMapper.getByCandidateJwcode(candidateJwcode); // 2. 获取候选人信息 if (candidate == null) { throw new MyException("候选人不存在!"); } // 3. 检查用户是否已经为该候选人投过票 //List hasVotes = voterMapper.countVotesToday(voterJwcode); ////遍历列表,判断是否有记录 //boolean flag = true; //for (Voter vote : hasVotes) { // if (vote.getCandidateJwCode().equals(candidateJwcode)) { // flag = false; // } //} //if (!flag){ // throw new MyException("已投票,可以选择其他人试试哦~"); //} // 4. 增加候选人票数 if (!candidatesMapper.addVotes(candidateJwcode)) { throw new MyException ("候选人票数更新失败,请联系管理员!"); } // 5. 插入投票记录 voterMapper.insertVote(voterJwcode, candidateJwcode, voterName); System.out.println("投票成功!用户:" + voterJwcode + " 投给了 " + candidateJwcode); return true; } }