From 0f88c13e7fde9b8bb6d142601ff531cb2e3dadd7 Mon Sep 17 00:00:00 2001 From: ma-zhongxu Date: Sat, 28 Feb 2026 18:25:39 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=95=8F=E6=84=9F=E8=AF=8D=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=202.=E5=85=BD=E5=8C=BB=E5=92=A8=E8=AF=A2=E8=81=8A=E5=A4=A9?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web/controller/system/ChatController.java | 1067 +++++------- .../system/SensitiveWordApiTest.java | 33 +- .../src/main/resources/application-druid.yml | 10 +- chenhai-common/pom.xml | 13 + .../framework/config/RocketMQConfig.java | 17 + .../framework/config/SecurityConfig.java | 3 +- .../framework/config/WebSocketConfig.java | 72 + .../WebSocketChannelInterceptor.java | 184 ++ .../chenhai/system/domain/ChatMessages.java | 113 ++ .../chenhai/system/domain/ChatSessions.java | 98 ++ .../system/mapper/ChatMessagesMapper.java | 40 + .../system/mapper/ChatSessionsMapper.java | 42 + .../system/mq/OfflineMessageConsumer.java | 90 + .../system/service/IChatMessagesService.java | 78 + .../system/service/IChatSessionsService.java | 47 + .../service/impl/ChatMessagesServiceImpl.java | 147 ++ .../service/impl/ChatSessionsServiceImpl.java | 143 ++ .../mapper/system/ChatMessagesMapper.xml | 114 ++ .../mapper/system/ChatSessionsMapper.xml | 135 ++ chenhai-ui/package.json | 3 + chenhai-ui/src/api/system/chat.js | 84 +- chenhai-ui/src/api/vet/session.js | 55 + .../src/views/vet/chatManager/index.vue | 1510 +++++++++++++++++ chenhai-ui/vue.config.js | 15 +- 24 files changed, 3411 insertions(+), 702 deletions(-) create mode 100644 chenhai-framework/src/main/java/com/chenhai/framework/config/RocketMQConfig.java create mode 100644 chenhai-framework/src/main/java/com/chenhai/framework/config/WebSocketConfig.java create mode 100644 chenhai-framework/src/main/java/com/chenhai/framework/websocket/WebSocketChannelInterceptor.java create mode 100644 chenhai-system/src/main/java/com/chenhai/system/domain/ChatMessages.java create mode 100644 chenhai-system/src/main/java/com/chenhai/system/domain/ChatSessions.java create mode 100644 chenhai-system/src/main/java/com/chenhai/system/mapper/ChatMessagesMapper.java create mode 100644 chenhai-system/src/main/java/com/chenhai/system/mapper/ChatSessionsMapper.java create mode 100644 chenhai-system/src/main/java/com/chenhai/system/mq/OfflineMessageConsumer.java create mode 100644 chenhai-system/src/main/java/com/chenhai/system/service/IChatMessagesService.java create mode 100644 chenhai-system/src/main/java/com/chenhai/system/service/IChatSessionsService.java create mode 100644 chenhai-system/src/main/java/com/chenhai/system/service/impl/ChatMessagesServiceImpl.java create mode 100644 chenhai-system/src/main/java/com/chenhai/system/service/impl/ChatSessionsServiceImpl.java create mode 100644 chenhai-system/src/main/resources/mapper/system/ChatMessagesMapper.xml create mode 100644 chenhai-system/src/main/resources/mapper/system/ChatSessionsMapper.xml create mode 100644 chenhai-ui/src/api/vet/session.js create mode 100644 chenhai-ui/src/views/vet/chatManager/index.vue diff --git a/chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java b/chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java index 07fc3a4..fa9597e 100644 --- a/chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java +++ b/chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java @@ -1,657 +1,416 @@ -//package com.chenhai.system.controller; -// -//import java.util.*; -//import org.springframework.beans.factory.annotation.Autowired; -//import org.springframework.web.bind.annotation.*; -//import com.chenhai.common.core.controller.BaseController; -//import com.chenhai.common.core.domain.AjaxResult; -//import com.chenhai.common.core.domain.model.LoginUser; -//import com.chenhai.common.utils.SecurityUtils; -//import com.chenhai.system.domain.SysChatMessage; -//import com.chenhai.system.domain.SysChatSession; -//import com.chenhai.system.service.ISysChatMessageService; -//import com.chenhai.system.service.ISysChatSessionService; -//import com.chenhai.vet.domain.VetExperts; -//import com.chenhai.vet.service.IVetExpertsService; -// -//@RestController -//@RequestMapping("/system/chat") -//public class ChatController extends BaseController { -// -// @Autowired -// private IVetExpertsService expertsService; -// -// @Autowired -// private ISysChatSessionService sessionService; -// -// @Autowired -// private ISysChatMessageService messageService; -// -// /** -// * 获取当前用户身份(基于权限判断) -// */ -// @GetMapping("/identity") -// public AjaxResult getIdentity() { -// try { -// LoginUser loginUser = SecurityUtils.getLoginUser(); -// if (loginUser == null) { -// return AjaxResult.error("用户未登录"); -// } -// -// Map result = new HashMap<>(); -// -// // 检查用户权限 -// boolean isExpertRole = SecurityUtils.hasPermi("vet:experts:list"); -// boolean isMuhuRole = SecurityUtils.hasPermi("system:notice:list"); // 假设牧户有这个权限 -// -// // 或者使用角色检查(如果您的系统使用角色) -// // boolean isExpertRole = SecurityUtils.hasRole("vetnotshenhe"); -// // boolean isMuhuRole = SecurityUtils.hasRole("muhu"); -// -// // 判断是否是专家 -// List experts = expertsService.selectVetExpertsList(null); -// VetExperts currentExpert = null; -// for (VetExperts expert : experts) { -// if (expert.getUserId() != null && expert.getUserId().equals(loginUser.getUserId())) { -// currentExpert = expert; -// break; -// } -// } -// -// if (currentExpert != null) { -// // 是专家 -// result.put("userType", "expert"); -// result.put("expertId", currentExpert.getExpertId()); -// result.put("expertName", currentExpert.getRealName()); -// result.put("expertAvatar", currentExpert.getAvatar()); -// result.put("isOnline", currentExpert.getIsOnline()); -// } else if (isMuhuRole) { -// // 是牧户 -// result.put("userType", "user"); -// result.put("userId", loginUser.getUserId()); -// result.put("userName", loginUser.getUsername()); -// } else { -// // 其他用户 -// result.put("userType", "unknown"); -// } -// -// return AjaxResult.success(result); -// -// } catch (Exception e) { -// logger.error("获取用户身份失败", e); -// return AjaxResult.error("获取失败"); -// } -// } -// -// /** -// * 获取在线专家列表(牧户端使用) -// */ -// @GetMapping("/experts") -// public AjaxResult getOnlineExperts() { -// try { -// LoginUser loginUser = SecurityUtils.getLoginUser(); -// if (loginUser == null) { -// return AjaxResult.error("用户未登录"); -// } -// -// // 查询所有在线专家 -// VetExperts query = new VetExperts(); -// query.setStatus("0"); // 只查询状态正常的专家 -// List allExperts = expertsService.selectVetExpertsList(query); -// -// List> result = new ArrayList<>(); -// -// for (VetExperts expert : allExperts) { -// Map expertMap = new HashMap<>(); -// expertMap.put("expertId", expert.getExpertId()); -// expertMap.put("realName", expert.getRealName()); -// expertMap.put("title", expert.getTitle()); -// expertMap.put("avatar", expert.getAvatar()); -// expertMap.put("expertiseArea", expert.getExpertiseArea()); -// expertMap.put("isOnline", "1".equals(expert.getIsOnline()) ? "在线" : "离线"); -// expertMap.put("introduction", expert.getIntroduction()); -// expertMap.put("workExperience", expert.getWorkExperience()); -// -// // 获取未读数(牧户视角) -// SysChatSession querySession = new SysChatSession(); -// querySession.setUserId(loginUser.getUserId()); -// querySession.setExpertId(expert.getExpertId()); -// querySession.setDelFlag("0"); -// List sessions = sessionService.selectSysChatSessionList(querySession); -// long unreadCount = 0; -// if (!sessions.isEmpty()) { -// unreadCount = sessions.get(0).getUnreadUser() != null ? sessions.get(0).getUnreadUser() : 0; -// } -// expertMap.put("unreadCount", unreadCount); -// -// result.add(expertMap); -// } -// -// return AjaxResult.success(result); -// -// } catch (Exception e) { -// logger.error("获取专家列表失败", e); -// return AjaxResult.error("获取失败"); -// } -// } -// -// /** -// * 获取用户会话列表(牧户端使用) -// */ +package com.chenhai.web.controller.system; + +import com.chenhai.common.annotation.Log; +import com.chenhai.common.core.controller.BaseController; +import com.chenhai.common.core.domain.AjaxResult; +import com.chenhai.common.core.page.TableDataInfo; +import com.chenhai.common.enums.BusinessType; +import com.chenhai.system.domain.ChatMessages; +import com.chenhai.system.domain.ChatSessions; +import com.chenhai.system.service.IChatMessagesService; +import com.chenhai.system.service.IChatSessionsService; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * 聊天控制器 + * + * @author chenhai + */ +@RestController +@RequestMapping("/system/chat") +public class ChatController extends BaseController { + + private static final Logger log = LoggerFactory.getLogger(ChatController.class); + + @Autowired + private IChatSessionsService chatSessionsService; + + @Autowired + private IChatMessagesService chatMessagesService; + + @Autowired + private SimpMessagingTemplate messagingTemplate; + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + @Autowired + private RedisTemplate redisTemplate; + + private static final String ONLINE_KEY = "chat:online:"; + private static final String MQ_TOPIC = "chat-topic"; + + private static final String SESSION_USER_KEY = "chat:session:"; // 存储会话和用户的映射 + + + /** + * 获取会话列表(包含实时在线状态) + */ + @GetMapping("/sessions") + public AjaxResult listSessions() { + Long currentUserId = getUserId(); + List list = chatSessionsService.selectSessionsByUserId(currentUserId); + + // 批量查询在线状态(优化性能) + Set onlineKeys = list.stream() + .map(session -> ONLINE_KEY + session.getOtherUserId()) + .collect(Collectors.toSet()); + + List onlineValues = redisTemplate.opsForValue().multiGet(onlineKeys); + + for (int i = 0; i < list.size(); i++) { + ChatSessions session = list.get(i); + boolean isOnline = onlineValues.get(i) != null; + + session.setOtherUserName(getUserName(session.getOtherUserId())); + session.setOtherUserAvatar(getUserAvatar(session.getOtherUserId())); + session.setOtherOnline(isOnline); + } + + return success(list); + } + + /** + * 获取指定用户的在线状态 + */ + @GetMapping("/status/{userId}") + public AjaxResult getUserStatus(@PathVariable Long userId) { + String sessionId = redisTemplate.opsForValue().get(ONLINE_KEY + userId); + Map result = new HashMap<>(); + result.put("userId", userId); + result.put("online", sessionId != null); + result.put("lastSeen", sessionId != null ? System.currentTimeMillis() : null); + return success(result); + } + + /** + * 批量获取用户在线状态 + */ + @PostMapping("/status/batch") + public AjaxResult getBatchUserStatus(@RequestBody List userIds) { + List> result = new ArrayList<>(); + for (Long userId : userIds) { + String sessionId = redisTemplate.opsForValue().get(ONLINE_KEY + userId); + Map status = new HashMap<>(); + status.put("userId", userId); + status.put("online", sessionId != null); + result.add(status); + } + return success(result); + } + + /** + * 获取会话列表 + */ // @GetMapping("/sessions") -// public AjaxResult getUserSessions() { -// try { -// LoginUser loginUser = SecurityUtils.getLoginUser(); -// if (loginUser == null) { -// return AjaxResult.error("用户未登录"); -// } -// -// // 查询用户的所有会话 -// SysChatSession query = new SysChatSession(); -// query.setUserId(loginUser.getUserId()); -// query.setDelFlag("0"); -// query.setSessionStatus("0"); // 只查询进行中的会话 -// List sessions = sessionService.selectSysChatSessionList(query); -// -// List> result = new ArrayList<>(); -// for (SysChatSession session : sessions) { -// Map sessionMap = new HashMap<>(); -// sessionMap.put("sessionId", session.getSessionId()); -// sessionMap.put("expertId", session.getExpertId()); -// sessionMap.put("animalType", session.getAnimalType()); -// sessionMap.put("symptomSummary", session.getSymptomSummary()); -// sessionMap.put("lastMessage", session.getLastMessage()); -// sessionMap.put("lastMessageTime", session.getLastMessageTime()); -// sessionMap.put("createTime", session.getCreateTime()); -// sessionMap.put("unreadCount", session.getUnreadUser() != null ? session.getUnreadUser() : 0); -// sessionMap.put("sessionStatus", session.getSessionStatus()); -// -// // 获取专家信息 -// VetExperts expert = expertsService.selectVetExpertsByExpertId(session.getExpertId()); -// if (expert != null) { -// sessionMap.put("expertName", expert.getRealName()); -// sessionMap.put("expertTitle", expert.getTitle()); -// sessionMap.put("expertAvatar", expert.getAvatar()); -// sessionMap.put("expertIsOnline", expert.getIsOnline()); -// } -// -// result.add(sessionMap); -// } -// -// // 按最后消息时间排序 -// result.sort((a, b) -> { -// Date timeA = (Date) a.get("lastMessageTime"); -// Date timeB = (Date) b.get("lastMessageTime"); -// if (timeA == null && timeB == null) return 0; -// if (timeA == null) return 1; -// if (timeB == null) return -1; -// return timeB.compareTo(timeA); -// }); -// -// return AjaxResult.success(result); -// -// } catch (Exception e) { -// logger.error("获取用户会话列表失败", e); -// return AjaxResult.error("获取失败"); -// } -// } -// -// /** -// * 专家端:获取等待回复的会话列表 -// */ -// @GetMapping("/expert/sessions") -// public AjaxResult getExpertSessions() { -// try { -// LoginUser loginUser = SecurityUtils.getLoginUser(); -// if (loginUser == null) { -// return AjaxResult.error("用户未登录"); -// } -// -// // 检查是否是专家 -// List experts = expertsService.selectVetExpertsList(null); -// VetExperts currentExpert = null; -// for (VetExperts expert : experts) { -// if (expert.getUserId() != null && expert.getUserId().equals(loginUser.getUserId())) { -// currentExpert = expert; -// break; -// } -// } -// -// if (currentExpert == null) { -// return AjaxResult.error("您不是专家或专家信息未配置"); -// } -// -// // 查询该专家的所有会话 -// SysChatSession query = new SysChatSession(); -// query.setExpertId(currentExpert.getExpertId()); -// query.setDelFlag("0"); -// query.setSessionStatus("0"); // 只查询进行中的 -// List sessions = sessionService.selectSysChatSessionList(query); -// -// List> result = new ArrayList<>(); -// for (SysChatSession session : sessions) { -// Map sessionMap = new HashMap<>(); -// sessionMap.put("sessionId", session.getSessionId()); -// sessionMap.put("userId", session.getUserId()); -// sessionMap.put("animalType", session.getAnimalType()); -// sessionMap.put("symptomSummary", session.getSymptomSummary()); -// sessionMap.put("lastMessage", session.getLastMessage()); -// sessionMap.put("lastMessageTime", session.getLastMessageTime()); -// sessionMap.put("createTime", session.getCreateTime()); -// sessionMap.put("unreadCount", session.getUnreadExpert() != null ? session.getUnreadExpert() : 0); -// sessionMap.put("sessionStatus", session.getSessionStatus()); -// -// result.add(sessionMap); -// } -// -// // 按最后消息时间排序 -// result.sort((a, b) -> { -// Date timeA = (Date) a.get("lastMessageTime"); -// Date timeB = (Date) b.get("lastMessageTime"); -// if (timeA == null && timeB == null) return 0; -// if (timeA == null) return 1; -// if (timeB == null) return -1; -// return timeB.compareTo(timeA); -// }); -// -// return AjaxResult.success(result); -// -// } catch (Exception e) { -// logger.error("获取专家会话列表失败", e); -// return AjaxResult.error("获取失败"); -// } -// } -// -// /** -// * 开始咨询(创建会话) -// */ -// @PostMapping("/start") -// public AjaxResult startConsultation(@RequestBody Map params) { -// try { -// LoginUser loginUser = SecurityUtils.getLoginUser(); -// if (loginUser == null) { -// return AjaxResult.error("用户未登录"); -// } -// -// Long expertId = Long.valueOf(params.get("expertId").toString()); -// String animalType = params.get("animalType") != null ? params.get("animalType").toString() : "牛"; -// String symptom = params.get("symptom") != null ? params.get("symptom").toString() : ""; -// -// // 检查专家是否存在 -// VetExperts expert = expertsService.selectVetExpertsByExpertId(expertId); -// if (expert == null) { -// return AjaxResult.error("专家不存在"); -// } -// -// // 创建会话 -// String sessionId = sessionService.createOrGetSession(loginUser.getUserId(), expertId, animalType, symptom); -// -// // 发送欢迎消息 -// String welcomeMessage = "您好,我是" + expert.getRealName() + "专家,有什么可以帮您?"; -// messageService.sendTextMessage(sessionId, "1", expertId, welcomeMessage); -// -// Map result = new HashMap<>(); -// result.put("sessionId", sessionId); -// result.put("expert", expert); -// result.put("message", welcomeMessage); -// -// return AjaxResult.success("咨询开始", result); -// -// } catch (Exception e) { -// logger.error("开始咨询失败", e); -// return AjaxResult.error("开始咨询失败"); -// } -// } -// -// /** -// * 发送消息(通用,根据身份自动判断) -// */ -// @PostMapping("/send") -// public AjaxResult sendMessage(@RequestBody Map params) { -// try { -// LoginUser loginUser = SecurityUtils.getLoginUser(); -// if (loginUser == null) { -// return AjaxResult.error("用户未登录"); -// } -// -// String sessionId = params.get("sessionId").toString(); -// String content = params.get("content").toString(); -// String msgType = params.get("msgType") != null ? params.get("msgType").toString() : "text"; -// -// // 获取会话 -// SysChatSession session = sessionService.selectSysChatSessionBySessionId(sessionId); -// if (session == null) { -// return AjaxResult.error("会话不存在"); -// } -// -// // 判断发送者身份 -// String senderType; -// Long senderId; -// -// // 检查是否是专家 -// List experts = expertsService.selectVetExpertsList(null); -// VetExperts currentExpert = null; -// for (VetExperts expert : experts) { -// if (expert.getUserId() != null && expert.getUserId().equals(loginUser.getUserId())) { -// currentExpert = expert; -// break; -// } -// } -// -// if (currentExpert != null && currentExpert.getExpertId().equals(session.getExpertId())) { -// // 专家回复 -// senderType = "1"; -// senderId = currentExpert.getExpertId(); -// } else if (session.getUserId().equals(loginUser.getUserId())) { -// // 用户发送 -// senderType = "0"; -// senderId = loginUser.getUserId(); -// } else { -// return AjaxResult.error("无权发送消息"); -// } -// -// // 发送消息 -// if ("image".equals(msgType)) { -// messageService.sendImageMessage(sessionId, senderType, senderId, content); -// } else { -// messageService.sendTextMessage(sessionId, senderType, senderId, content); -// } -// -// return AjaxResult.success("发送成功"); -// -// } catch (Exception e) { -// logger.error("发送消息失败", e); -// return AjaxResult.error("发送失败"); -// } -// } -// -// /** -// * 结束咨询(用户端和专家端通用) -// */ -// @PostMapping("/end") -// public AjaxResult endConsultation(@RequestBody Map params) { -// try { -// LoginUser loginUser = SecurityUtils.getLoginUser(); -// if (loginUser == null) { -// return AjaxResult.error("用户未登录"); -// } -// -// String sessionId = params.get("sessionId").toString(); -// String reason = params.get("reason") != null ? params.get("reason").toString() : "咨询结束"; -// -// // 获取会话 -// SysChatSession session = sessionService.selectSysChatSessionBySessionId(sessionId); -// if (session == null) { -// return AjaxResult.error("会话不存在"); -// } -// -// // 验证权限 -// boolean hasPermission = false; -// String endBy = ""; -// -// // 检查是否是专家 -// List experts = expertsService.selectVetExpertsList(null); -// VetExperts currentExpert = null; -// for (VetExperts expert : experts) { -// if (expert.getUserId() != null && expert.getUserId().equals(loginUser.getUserId())) { -// currentExpert = expert; -// break; -// } -// } -// -// if (currentExpert != null && currentExpert.getExpertId().equals(session.getExpertId())) { -// // 专家可以结束会话 -// hasPermission = true; -// endBy = "expert"; -// } else if (session.getUserId().equals(loginUser.getUserId())) { -// // 用户可以结束自己的会话 -// hasPermission = true; -// endBy = "user"; -// } -// -// if (!hasPermission) { -// return AjaxResult.error("无权结束会话"); -// } -// -// // 结束会话 -// sessionService.endSession(sessionId, endBy, reason); -// -// // 发送结束消息 -// String endMessage = ""; -// if ("expert".equals(endBy)) { -// endMessage = "专家已结束本次咨询,感谢您的信任。"; -// } else { -// endMessage = "您已结束本次咨询,感谢专家的帮助。"; -// } -// -// if (currentExpert != null) { -// messageService.sendSystemMessage(sessionId, "系统通知:" + endMessage); -// } -// -// return AjaxResult.success("咨询已结束"); -// -// } catch (Exception e) { -// logger.error("结束咨询失败", e); -// return AjaxResult.error("结束咨询失败"); -// } -// } -// -// /** -// * 获取聊天历史(通用) -// */ -// @GetMapping("/history") -// public AjaxResult getChatHistory(@RequestParam String sessionId) { -// try { -// LoginUser loginUser = SecurityUtils.getLoginUser(); -// if (loginUser == null) { -// return AjaxResult.error("用户未登录"); -// } -// -// // 获取会话 -// SysChatSession session = sessionService.selectSysChatSessionBySessionId(sessionId); -// if (session == null) { -// return AjaxResult.error("会话不存在"); -// } -// -// // 验证权限(用户或专家都可以查看) -// boolean hasPermission = false; -// -// // 检查是否是专家 -// List experts = expertsService.selectVetExpertsList(null); -// VetExperts currentExpert = null; -// for (VetExperts expert : experts) { -// if (expert.getUserId() != null && expert.getUserId().equals(loginUser.getUserId())) { -// currentExpert = expert; -// break; -// } -// } -// -// if (currentExpert != null && currentExpert.getExpertId().equals(session.getExpertId())) { -// // 专家可以查看 -// hasPermission = true; -// // 标记专家已读 -// sessionService.markMessagesAsReadByExpert(sessionId); -// } else if (session.getUserId().equals(loginUser.getUserId())) { -// // 用户可以查看 -// hasPermission = true; -// // 标记用户已读 -// sessionService.markMessagesAsReadByUser(sessionId); -// } -// -// if (!hasPermission) { -// return AjaxResult.error("无权查看"); -// } -// -// // 获取消息 -// List messages = messageService.selectMessagesBySessionId(sessionId); -// -// // 格式化消息 -// List> formattedMessages = new ArrayList<>(); -// for (SysChatMessage msg : messages) { -// Map msgMap = new HashMap<>(); -// msgMap.put("id", msg.getMessageId()); -// msgMap.put("messageId", msg.getMessageId()); -// msgMap.put("senderType", msg.getSenderType()); -// msgMap.put("content", msg.getContent()); -// msgMap.put("msgType", msg.getMsgType()); -// msgMap.put("time", msg.getCreateTime()); -// msgMap.put("createTime", msg.getCreateTime()); -// msgMap.put("isRead", msg.getIsRead()); -// formattedMessages.add(msgMap); -// } -// -// // 获取相关用户信息 -// Map userInfo = new HashMap<>(); -// if (currentExpert != null && currentExpert.getExpertId().equals(session.getExpertId())) { -// // 专家端:获取用户信息 -// userInfo.put("type", "user"); -// userInfo.put("userId", session.getUserId()); -// userInfo.put("userName", session.getUserName()); -// } else { -// // 用户端:获取专家信息 -// VetExperts expert = expertsService.selectVetExpertsByExpertId(session.getExpertId()); -// if (expert != null) { -// userInfo.put("type", "expert"); -// userInfo.put("expertId", expert.getExpertId()); -// userInfo.put("realName", expert.getRealName()); -// userInfo.put("title", expert.getTitle()); -// userInfo.put("avatar", expert.getAvatar()); -// userInfo.put("isOnline", expert.getIsOnline()); -// } -// } -// -// Map result = new HashMap<>(); -// result.put("session", session); -// result.put("userInfo", userInfo); -// result.put("messages", formattedMessages); -// -// return AjaxResult.success(result); -// -// } catch (Exception e) { -// logger.error("获取聊天历史失败", e); -// return AjaxResult.error("获取失败"); -// } -// } -// -// /** -// * 标记消息为已读 -// */ -// @PostMapping("/read") -// public AjaxResult markAsRead(@RequestBody Map params) { -// try { -// LoginUser loginUser = SecurityUtils.getLoginUser(); -// if (loginUser == null) { -// return AjaxResult.error("用户未登录"); -// } -// -// String sessionId = params.get("sessionId").toString(); -// String messageIds = params.get("messageIds") != null ? params.get("messageIds").toString() : ""; -// -// // 获取会话 -// SysChatSession session = sessionService.selectSysChatSessionBySessionId(sessionId); -// if (session == null) { -// return AjaxResult.error("会话不存在"); -// } -// -// // 验证权限 -// boolean hasPermission = false; -// String readerType = ""; -// -// // 检查是否是专家 -// List experts = expertsService.selectVetExpertsList(null); -// VetExperts currentExpert = null; -// for (VetExperts expert : experts) { -// if (expert.getUserId() != null && expert.getUserId().equals(loginUser.getUserId())) { -// currentExpert = expert; -// break; -// } -// } -// -// if (currentExpert != null && currentExpert.getExpertId().equals(session.getExpertId())) { -// // 专家标记已读 -// hasPermission = true; -// readerType = "expert"; -// } else if (session.getUserId().equals(loginUser.getUserId())) { -// // 用户标记已读 -// hasPermission = true; -// readerType = "user"; -// } -// -// if (!hasPermission) { -// return AjaxResult.error("无权操作"); -// } -// -// // 标记已读 -// if ("expert".equals(readerType)) { -// messageService.markMessagesAsReadByExpert(sessionId, messageIds); -// } else { -// messageService.markMessagesAsReadByUser(sessionId, messageIds); -// } -// -// return AjaxResult.success("标记已读成功"); -// -// } catch (Exception e) { -// logger.error("标记已读失败", e); -// return AjaxResult.error("标记已读失败"); -// } -// } -// -// /** -// * 获取未读消息数量 -// */ -// @GetMapping("/unread/count") -// public AjaxResult getUnreadCount() { -// try { -// LoginUser loginUser = SecurityUtils.getLoginUser(); -// if (loginUser == null) { -// return AjaxResult.error("用户未登录"); -// } -// -// Map result = new HashMap<>(); -// -// // 检查是否是专家 -// List experts = expertsService.selectVetExpertsList(null); -// VetExperts currentExpert = null; -// for (VetExperts expert : experts) { -// if (expert.getUserId() != null && expert.getUserId().equals(loginUser.getUserId())) { -// currentExpert = expert; -// break; -// } -// } -// -// long totalUnread = 0; -// if (currentExpert != null) { -// // 专家获取未读消息数 -// SysChatSession query = new SysChatSession(); -// query.setExpertId(currentExpert.getExpertId()); -// query.setDelFlag("0"); -// query.setSessionStatus("0"); -// List sessions = sessionService.selectSysChatSessionList(query); -// -// for (SysChatSession session : sessions) { -// totalUnread += session.getUnreadExpert() != null ? session.getUnreadExpert() : 0; -// } -// result.put("userType", "expert"); -// } else { -// // 用户获取未读消息数 -// SysChatSession query = new SysChatSession(); -// query.setUserId(loginUser.getUserId()); -// query.setDelFlag("0"); -// query.setSessionStatus("0"); -// List sessions = sessionService.selectSysChatSessionList(query); -// -// for (SysChatSession session : sessions) { -// totalUnread += session.getUnreadUser() != null ? session.getUnreadUser() : 0; -// } -// result.put("userType", "user"); -// } -// -// result.put("totalUnread", totalUnread); -// return AjaxResult.success(result); -// -// } catch (Exception e) { -// logger.error("获取未读消息数失败", e); -// return AjaxResult.error("获取失败"); +// public AjaxResult listSessions() { +// Long currentUserId = getUserId(); +// List list = chatSessionsService.selectSessionsByUserId(currentUserId); +// +// for (ChatSessions session : list) { +// session.setOtherUserName(getUserName(session.getOtherUserId())); +// session.setOtherUserAvatar(getUserAvatar(session.getOtherUserId())); +// String sessionId = redisTemplate.opsForValue().get(ONLINE_KEY + session.getOtherUserId()); +// session.setOtherOnline(sessionId != null); // } +// return success(list); // } -//} \ No newline at end of file + + /** + * 标记会话已读 + */ + @PostMapping("/session/read") + public AjaxResult markSessionRead(@RequestBody Map params) { + Long sessionId = params.get("sessionId"); + chatSessionsService.markAsRead(sessionId, getUserId()); + return success(); + } + + /** + * 获取历史消息 + */ + @GetMapping("/messages") + public TableDataInfo listMessages(@RequestParam Long sessionId, + @RequestParam(required = false) Long lastId, + @RequestParam(defaultValue = "1") Integer pageNum, + @RequestParam(defaultValue = "20") Integer pageSize) { + startPage(); + List list = chatMessagesService.getHistoryMessages(sessionId, lastId); + + Long currentUserId = getUserId(); + for (ChatMessages msg : list) { + msg.setIsMe(msg.getSenderId().equals(currentUserId)); + } + return getDataTable(list); + } + + /** + * 发送消息(HTTP接口,备用) + */ + @PostMapping("/message/send") + public AjaxResult sendMessage(@RequestBody ChatMessages message) { + Long senderId = getUserId(); // 从SecurityContext获取 + ChatMessages result = chatMessagesService.sendMessage( + senderId, + message.getReceiverId(), + message.getContentType(), + message.getContent() + ); + return success(result); + } + + /** + * 标记消息已读 + */ + @PostMapping("/message/read") + public AjaxResult markMessagesRead(@RequestBody Map params) { + List messageIds = (List) params.get("messageIds"); + Long sessionId = Long.valueOf(params.get("sessionId").toString()); + chatMessagesService.markAsRead(messageIds, sessionId, getUserId()); + return success(); + } + + /** + * 处理实时消息 - WebSocket接收 + * 前端通过STOMP发送到/app/chat.send + */ + @MessageMapping("/chat.send") + public void handleChatMessage(@Payload Map payload, + StompHeaderAccessor accessor) { + + String senderIdStr = accessor.getUser().getName(); + Long senderId = Long.parseLong(senderIdStr); + String currentSessionId = accessor.getSessionId(); + + // 获取接收者ID + Long receiverId = Long.valueOf(payload.get("receiverId").toString()); + + // 防止给自己发消息 + if (senderId.equals(receiverId)) { + log.error("用户尝试给自己发送消息: userId={}", senderId); + Map errorMsg = new HashMap<>(); + errorMsg.put("type", "ERROR"); + errorMsg.put("content", "不能给自己发送消息"); + messagingTemplate.convertAndSendToUser( + senderId.toString(), + "/queue/errors", + errorMsg); + return; + } + + String contentType = payload.getOrDefault("contentType", "text").toString(); + String content = payload.get("content").toString(); + + log.info("WebSocket收到消息: senderId={}, receiverId={}, content={}", + senderId, receiverId, content); + + try { + // ===== 关键修复1:检查并修复发送者的在线状态 ===== + String senderOnlineSession = redisTemplate.opsForValue().get(ONLINE_KEY + senderIdStr); + + if (senderOnlineSession == null) { + // 用户能发消息但Redis中没有状态 → 立即修复 + log.warn("⚠️ 用户[{}]能发送消息但Redis中无状态,立即修复", senderId); + redisTemplate.opsForValue().set(ONLINE_KEY + senderIdStr, currentSessionId, 12, TimeUnit.HOURS); + redisTemplate.opsForValue().set(SESSION_USER_KEY + currentSessionId, senderIdStr, 12, TimeUnit.HOURS); + log.info("✅ 已修复用户[{}]的在线状态", senderId); + } else if (!senderOnlineSession.equals(currentSessionId)) { + // 会话ID不匹配,更新 + log.warn("⚠️ 用户[{}]会话ID不匹配,旧: {}, 新: {}, 更新中...", + senderId, senderOnlineSession, currentSessionId); + // 删除旧的会话映射 + redisTemplate.delete(SESSION_USER_KEY + senderOnlineSession); + // 设置新的 + redisTemplate.opsForValue().set(ONLINE_KEY + senderIdStr, currentSessionId, 12, TimeUnit.HOURS); + redisTemplate.opsForValue().set(SESSION_USER_KEY + currentSessionId, senderIdStr, 12, TimeUnit.HOURS); + log.info("✅ 已更新用户[{}]的会话ID", senderId); + } else { + // 状态正常,刷新过期时间 + redisTemplate.expire(ONLINE_KEY + senderIdStr, 12, TimeUnit.HOURS); + redisTemplate.expire(SESSION_USER_KEY + currentSessionId, 12, TimeUnit.HOURS); + log.debug("刷新用户[{}]的在线状态过期时间", senderId); + } + + // 调用带senderId的方法保存消息 + ChatMessages message = chatMessagesService.sendMessage( + senderId, receiverId, contentType, content); + + // 构建发送者的消息回执 + Map pushMsgToSender = new HashMap<>(); + pushMsgToSender.put("type", "CHAT"); + pushMsgToSender.put("id", message.getId()); + pushMsgToSender.put("sessionId", message.getSenderSessionId()); + pushMsgToSender.put("senderId", senderId); + pushMsgToSender.put("receiverId", receiverId); + pushMsgToSender.put("content", content); + pushMsgToSender.put("contentType", contentType); + pushMsgToSender.put("createTime", message.getCreatedAt()); + pushMsgToSender.put("isMe", true); + + // 构建接收者的消息 + Map pushMsgToReceiver = new HashMap<>(); + pushMsgToReceiver.put("type", "CHAT"); + pushMsgToReceiver.put("id", message.getId()); + pushMsgToReceiver.put("sessionId", message.getReceiverSessionId()); + pushMsgToReceiver.put("senderId", senderId); + pushMsgToReceiver.put("receiverId", receiverId); + pushMsgToReceiver.put("content", content); + pushMsgToReceiver.put("contentType", contentType); + pushMsgToReceiver.put("createTime", message.getCreatedAt()); + pushMsgToReceiver.put("isMe", false); + + // 先给发送者回执 + messagingTemplate.convertAndSendToUser( + senderId.toString(), + "/queue/messages", + pushMsgToSender); + + log.info("已给发送者[{}]发送消息回执", senderId); + + // 判断对方是否在线 + String receiverSessionId = redisTemplate.opsForValue().get(ONLINE_KEY + receiverId); + boolean isReceiverOnline = receiverSessionId != null; + + if (isReceiverOnline) { + // 对方在线,实时推送 + messagingTemplate.convertAndSendToUser( + receiverId.toString(), + "/queue/messages", + pushMsgToReceiver); + log.info("消息实时推送给用户: {}", receiverId); + } else { + // 对方离线,放入MQ + pushMsgToReceiver.put("retryCount", 0); + rocketMQTemplate.convertAndSend(MQ_TOPIC + ":offline", pushMsgToReceiver); + log.info("用户[{}]离线,消息进入RocketMQ", receiverId); + } + + } catch (Exception e) { + log.error("消息处理失败", e); + + // 发送错误通知给发送者 + Map errorMsg = new HashMap<>(); + errorMsg.put("type", "ERROR"); + errorMsg.put("content", "发送失败: " + e.getMessage()); + messagingTemplate.convertAndSendToUser( + senderId.toString(), + "/queue/errors", + errorMsg); + } + } + + /** + * WebSocket处理已读回执 + */ + @MessageMapping("/chat.read") + public void handleReadReceipt(@Payload Map payload, + StompHeaderAccessor accessor) { + + Long sessionId = Long.valueOf(payload.get("sessionId").toString()); + List messageIds = (List) payload.get("messageIds"); + Long readerId = getUserId(); // 当前登录用户 + + log.info("收到已读回执: sessionId={}, readerId={}, messageIds={}", + sessionId, readerId, messageIds); + + try { + // 标记消息已读 + chatMessagesService.markAsRead(messageIds, sessionId, readerId); + + // 获取消息的发送者ID(通常是对方用户) + // 这里需要从消息中获取发送者ID,用于通知对方 + if (messageIds != null && !messageIds.isEmpty()) { + // 查询第一条消息的发送者 + ChatMessages firstMessage = chatMessagesService.getChatMessagesById(messageIds.get(0)); + if (firstMessage != null) { + Long senderId = firstMessage.getSenderId(); + + // 构建已读回执通知 + Map readReceipt = new HashMap<>(); + readReceipt.put("type", "READ"); + readReceipt.put("sessionId", sessionId); + readReceipt.put("readerId", readerId); + readReceipt.put("messageIds", messageIds); + readReceipt.put("readAt", new Date()); + + // 通知消息发送者(对方)消息已被读 + messagingTemplate.convertAndSendToUser( + senderId.toString(), + "/queue/read", + readReceipt); + + log.info("已读回执已发送给用户: {}", senderId); + } + } + } catch (Exception e) { + log.error("处理已读回执失败", e); + } + } + + private String getUserName(Long userId) { return "用户" + userId; } + private String getUserAvatar(Long userId) { return ""; } + + /** + * 处理心跳消息 + * 前端通过STOMP发送到/app/heartbeat + */ + @MessageMapping("/heartbeat") + public void handleHeartbeat(@Payload Map payload, + StompHeaderAccessor accessor) { + + String userId = accessor.getUser().getName(); + String sessionId = accessor.getSessionId(); + + log.debug("收到心跳 - 用户: {}, 会话: {}, 时间: {}", + userId, sessionId, payload.get("timestamp")); + + try { + // 更新Redis中的在线状态过期时间 + Boolean expireOnline = redisTemplate.expire(ONLINE_KEY + userId, 12, TimeUnit.HOURS); + Boolean expireSession = redisTemplate.expire(SESSION_USER_KEY + sessionId, 12, TimeUnit.HOURS); + + if (Boolean.TRUE.equals(expireOnline) && Boolean.TRUE.equals(expireSession)) { + log.debug("心跳更新成功 - 用户: {}", userId); + } else { + // 如果key不存在,重新设置 + if (Boolean.FALSE.equals(expireOnline)) { + log.warn("用户[{}]的在线状态key不存在,重新设置", userId); + redisTemplate.opsForValue().set(ONLINE_KEY + userId, sessionId, 12, TimeUnit.HOURS); + } + if (Boolean.FALSE.equals(expireSession)) { + log.warn("会话[{}]的用户映射key不存在,重新设置", sessionId); + redisTemplate.opsForValue().set(SESSION_USER_KEY + sessionId, userId, 12, TimeUnit.HOURS); + } + } + + // 可选:返回心跳响应 + Map response = new HashMap<>(); + response.put("type", "HEARTBEAT_RESPONSE"); + response.put("timestamp", System.currentTimeMillis()); + response.put("userId", userId); + + messagingTemplate.convertAndSendToUser( + userId, + "/queue/heartbeat", + response + ); + + } catch (Exception e) { + log.error("处理心跳失败 - 用户: {}", userId, e); + } + } +} \ No newline at end of file diff --git a/chenhai-admin/src/main/java/com/chenhai/web/controller/system/SensitiveWordApiTest.java b/chenhai-admin/src/main/java/com/chenhai/web/controller/system/SensitiveWordApiTest.java index 09e8a52..982bb7a 100644 --- a/chenhai-admin/src/main/java/com/chenhai/web/controller/system/SensitiveWordApiTest.java +++ b/chenhai-admin/src/main/java/com/chenhai/web/controller/system/SensitiveWordApiTest.java @@ -4,6 +4,11 @@ import com.github.houbb.sensitive.word.api.IWordReplace; import com.github.houbb.sensitive.word.core.SensitiveWordHelper; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.Vector; + /** * @author : mazhongxu * @date : 2026-02-04 18:28 @@ -35,7 +40,7 @@ public class SensitiveWordApiTest { */ @Test public void testFindAll() { - final String text = "五星红旗迎风飘扬,毛主席的画像屹立在天安门前"; + final String text = "五星红旗迎风飘扬,毛主席的画像屹立在天安门前,你是卖冰毒的吗?不跟你学法龙功,你真淫荡"; System.out.println(SensitiveWordHelper.findAll(text)); } @@ -70,4 +75,30 @@ public class SensitiveWordApiTest { // IWordReplace customWordReplace = new CustomWordReplace(); // System.out.println(SensitiveWordHelper.replace(text, customWordReplace)); // } + + @Test + public void test() { + A a = new A(); + A a1 = new A(); + if (a == a1){ + System.out.println("a==a1"); + } + if (a.equals(a1)){ + System.out.println("a.equals(a1)"); + } + System.out.println(a.m); + } + + class A{ + static { + System.out.println("A初始化静态代码"); + m = 100; + } + + static int m = 200; + + A(){ + System.out.println("A无参构造"); + } + } } diff --git a/chenhai-admin/src/main/resources/application-druid.yml b/chenhai-admin/src/main/resources/application-druid.yml index f579e61..6cbd560 100644 --- a/chenhai-admin/src/main/resources/application-druid.yml +++ b/chenhai-admin/src/main/resources/application-druid.yml @@ -58,4 +58,12 @@ spring: merge-sql: true wall: config: - multi-statement-allow: true \ No newline at end of file + multi-statement-allow: true + +# RocketMQ配置 +rocketmq: + name-server: 172.16.1.105:9876 + producer: + group: chat-producer-group + send-message-timeout: 3000 + retry-times-when-send-failed: 3 \ No newline at end of file diff --git a/chenhai-common/pom.xml b/chenhai-common/pom.xml index 0d53e45..1945eba 100644 --- a/chenhai-common/pom.xml +++ b/chenhai-common/pom.xml @@ -118,6 +118,19 @@ junit-jupiter-engine + + + org.springframework.boot + spring-boot-starter-websocket + + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.2.3 + + \ No newline at end of file diff --git a/chenhai-framework/src/main/java/com/chenhai/framework/config/RocketMQConfig.java b/chenhai-framework/src/main/java/com/chenhai/framework/config/RocketMQConfig.java new file mode 100644 index 0000000..c87ddd1 --- /dev/null +++ b/chenhai-framework/src/main/java/com/chenhai/framework/config/RocketMQConfig.java @@ -0,0 +1,17 @@ +package com.chenhai.framework.config; + +import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +/** + * RocketMQ配置类 + * 位置:ruoyi-framework/src/main/java/com/ruoyi/framework/config/RocketMQConfig.java + * + * @author ruoyi + */ +@Configuration +@Import(RocketMQAutoConfiguration.class) +public class RocketMQConfig { + // 使用RocketMQ默认配置即可 +} \ No newline at end of file diff --git a/chenhai-framework/src/main/java/com/chenhai/framework/config/SecurityConfig.java b/chenhai-framework/src/main/java/com/chenhai/framework/config/SecurityConfig.java index 71bd321..81679c3 100644 --- a/chenhai-framework/src/main/java/com/chenhai/framework/config/SecurityConfig.java +++ b/chenhai-framework/src/main/java/com/chenhai/framework/config/SecurityConfig.java @@ -144,7 +144,8 @@ public class SecurityConfig .authorizeHttpRequests((requests) -> { permitAllUrl.getUrls().forEach(url -> requests.requestMatchers(url).permitAll()); // 对于登录login 注册register 验证码captchaImage 允许匿名访问 - requests.requestMatchers("/login", "/register", "/captchaImage","auth/**").permitAll() + requests.requestMatchers("/ws/**", "/ws/chat/**", "/ws/chat/info").permitAll() + .requestMatchers("/login", "/register", "/captchaImage","auth/**").permitAll() .requestMatchers("/auth/**").permitAll() // 静态资源,可匿名访问 .requestMatchers(HttpMethod.GET, "/", "/*.html", "/**.html", "/**.css", "/**.js", "/profile/**").permitAll() diff --git a/chenhai-framework/src/main/java/com/chenhai/framework/config/WebSocketConfig.java b/chenhai-framework/src/main/java/com/chenhai/framework/config/WebSocketConfig.java new file mode 100644 index 0000000..55a3386 --- /dev/null +++ b/chenhai-framework/src/main/java/com/chenhai/framework/config/WebSocketConfig.java @@ -0,0 +1,72 @@ +package com.chenhai.framework.config; + +import com.chenhai.framework.websocket.WebSocketChannelInterceptor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.simp.config.ChannelRegistration; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; +import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; +import org.springframework.web.socket.config.annotation.StompEndpointRegistry; +import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; + +/** + * WebSocket配置类 + * 配置STOMP协议端点、消息代理和拦截器 + * + * @author chenhai + */ +@Configuration +@EnableWebSocketMessageBroker +public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { + + @Autowired + private WebSocketChannelInterceptor webSocketChannelInterceptor; + + /** + * 注册STOMP端点 + * 客户端通过这个地址连接WebSocket服务器 + * + * @param registry STOMP端点注册表 + */ + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + registry.addEndpoint("/ws/chat") // WebSocket连接地址 + .setAllowedOriginPatterns("*") // 允许所有域名跨域 + .withSockJS(); // 启用SockJS回退选项 + } + + /** + * 配置消息代理 + * 设置应用前缀和用户前缀 + * + * @param registry 消息代理注册表 + */ + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + // 启用简单内存消息代理,配置目的地前缀 + // /topic - 广播模式,所有订阅者都能收到 + // /queue - 点对点模式 + // /user - 用户专属消息 + registry.enableSimpleBroker("/topic", "/queue", "/user"); + + // 设置应用前缀,客户端发送消息需要以/app开头 + // 例如:/app/chat.send + registry.setApplicationDestinationPrefixes("/app"); + + // 设置用户前缀,用于点对点消息 + // 例如:/user/{userId}/queue/messages + registry.setUserDestinationPrefix("/user"); + } + + /** + * 配置客户端入站通道 + * 注册拦截器用于处理连接和断开事件 + * + * @param registration 通道注册表 + */ + @Override + public void configureClientInboundChannel(ChannelRegistration registration) { + // 添加自定义拦截器,用于管理在线状态 + registration.interceptors(webSocketChannelInterceptor); + } +} \ No newline at end of file diff --git a/chenhai-framework/src/main/java/com/chenhai/framework/websocket/WebSocketChannelInterceptor.java b/chenhai-framework/src/main/java/com/chenhai/framework/websocket/WebSocketChannelInterceptor.java new file mode 100644 index 0000000..8102ff5 --- /dev/null +++ b/chenhai-framework/src/main/java/com/chenhai/framework/websocket/WebSocketChannelInterceptor.java @@ -0,0 +1,184 @@ +package com.chenhai.framework.websocket; + +import com.chenhai.system.service.IChatSessionsService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.messaging.support.ChannelInterceptor; +import org.springframework.messaging.support.MessageHeaderAccessor; +import org.springframework.stereotype.Component; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * WebSocket连接拦截器 + * 负责管理用户在线状态,并在用户上线/下线时广播状态变更 + * + * @author chenhai + */ +@Component +public class WebSocketChannelInterceptor implements ChannelInterceptor { + + private static final Logger log = LoggerFactory.getLogger(WebSocketChannelInterceptor.class); + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + @Lazy + private SimpMessagingTemplate messagingTemplate; + + @Autowired + private IChatSessionsService chatSessionsService; + + /** Redis中在线状态的key前缀,格式:chat:online:{userId} -> sessionId */ + private static final String ONLINE_KEY = "chat:online:"; + + /** Redis中会话与用户映射的key前缀,格式:chat:session:{sessionId} -> userId */ + private static final String SESSION_USER_KEY = "chat:session:"; + + /** + * 消息发送前拦截处理 + * 在这里处理WebSocket的连接和断开事件 + * + * @param message 原始消息 + * @param channel 消息通道 + * @return 处理后的消息 + */ + @Override + public Message preSend(Message message, MessageChannel channel) { + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + if (accessor == null) { + return message; + } + + // 处理客户端连接事件 + if (StompCommand.CONNECT.equals(accessor.getCommand())) { + handleConnect(accessor); + } + + // 处理客户端断开连接事件 + if (StompCommand.DISCONNECT.equals(accessor.getCommand())) { + handleDisconnect(accessor); + } + + return message; + } + + /** + * 处理用户连接事件 + * 1. 设置用户认证信息 + * 2. 记录在线状态到Redis + * 3. 广播用户上线消息给相关用户(排除自己) + * + * @param accessor STOMP消息访问器,包含会话信息和头部 + */ + private void handleConnect(StompHeaderAccessor accessor) { + // 从请求头中获取用户ID + String userId = accessor.getFirstNativeHeader("userId"); + if (userId == null) { + log.warn("连接请求中缺少userId参数"); + return; + } + + String sessionId = accessor.getSessionId(); + log.info("用户[{}]正在连接,会话ID: {}", userId, sessionId); + + // 1. 设置用户认证信息,后续可以通过@AuthenticationPrincipal获取 + accessor.setUser(() -> userId); + + // 2. 存储在线状态到Redis,设置12小时过期防止僵尸数据 + redisTemplate.opsForValue().set(ONLINE_KEY + userId, sessionId, 12, TimeUnit.HOURS); + redisTemplate.opsForValue().set(SESSION_USER_KEY + sessionId, userId, 12, TimeUnit.HOURS); + + log.info("用户[{}]上线成功,已记录到Redis", userId); + + // 3. 广播用户上线状态给所有相关用户(排除自己) + broadcastUserStatusToRelatedUsers(userId, true); + } + + /** + * 处理用户断开连接事件 + * 1. 清理Redis中的在线状态 + * 2. 广播用户下线消息给相关用户(排除自己) + * + * @param accessor STOMP消息访问器,包含会话信息 + */ + private void handleDisconnect(StompHeaderAccessor accessor) { + String sessionId = accessor.getSessionId(); + String userId = redisTemplate.opsForValue().get(SESSION_USER_KEY + sessionId); + + if (userId != null) { + // 1. 清理Redis中的在线状态 + redisTemplate.delete(ONLINE_KEY + userId); + redisTemplate.delete(SESSION_USER_KEY + sessionId); + + log.info("用户[{}]已断开连接,已清理Redis状态", userId); + + // 2. 广播用户下线状态给相关用户(排除自己) + broadcastUserStatusToRelatedUsers(userId, false); + } else { + log.warn("未知会话断开连接: {}", sessionId); + } + } + + /** + * 广播用户状态给所有相关用户(有会话关联的用户) + * 这是核心方法,确保状态只广播给需要知道的人,而不是所有人 + * + * @param userId 状态变更的用户ID + * @param online true:上线, false:下线 + */ + private void broadcastUserStatusToRelatedUsers(String userId, boolean online) { + try { + Long currentUserId = Long.parseLong(userId); + + // 查询所有和这个用户有关联的用户(有会话的用户) + // 这个方法需要你在 IChatSessionsService 中实现 + List relatedUserIds = chatSessionsService.getRelatedUserIds(currentUserId); + + if (relatedUserIds == null || relatedUserIds.isEmpty()) { + log.info("用户[{}]没有相关用户,无需广播", userId); + return; + } + + // 构建状态消息 + Map statusMsg = new HashMap<>(); + statusMsg.put("type", "USER_STATUS"); + statusMsg.put("userId", currentUserId); + statusMsg.put("online", online); + statusMsg.put("timestamp", System.currentTimeMillis()); + + log.info("准备广播用户[{}]状态给{}个相关用户: online={}, relatedUsers={}", + userId, relatedUserIds.size(), online, relatedUserIds); + + // 分别发送给每个相关用户(点对点发送) + for (Long relatedUserId : relatedUserIds) { + // 使用 convertAndSendToUser 发送点对点消息 + // 这样只有指定用户能收到,避免广播给所有人 + messagingTemplate.convertAndSendToUser( + relatedUserId.toString(), + "/queue/user-status", // 使用独立的队列,避免和消息队列混淆 + statusMsg + ); + log.debug("已发送用户状态给用户{}: userId={}, online={}", relatedUserId, userId, online); + } + + log.info("用户[{}]状态广播完成", userId); + + } catch (NumberFormatException e) { + log.error("用户ID格式错误: {}", userId, e); + } catch (Exception e) { + log.error("广播用户状态失败: userId={}, online={}", userId, online, e); + } + } +} \ No newline at end of file diff --git a/chenhai-system/src/main/java/com/chenhai/system/domain/ChatMessages.java b/chenhai-system/src/main/java/com/chenhai/system/domain/ChatMessages.java new file mode 100644 index 0000000..f6faa27 --- /dev/null +++ b/chenhai-system/src/main/java/com/chenhai/system/domain/ChatMessages.java @@ -0,0 +1,113 @@ +package com.chenhai.system.domain; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.chenhai.common.core.domain.BaseEntity; +import java.util.Date; + +/** + * 聊天消息表对象 chat_messages + * + * @author chenhai + */ +public class ChatMessages extends BaseEntity { + private static final long serialVersionUID = 1L; + + /** 消息ID */ + private Long id; + + /** 发送者视角会话ID */ + private Long senderSessionId; + + /** 接收者视角会话ID */ + private Long receiverSessionId; + + /** 发送者ID */ + private Long senderId; + + /** 接收者ID */ + private Long receiverId; + + /** 消息类型 */ + private String contentType; + + /** 消息内容 */ + private String content; + + /** 文件URL */ + private String fileUrl; + + /** 原始文件名 */ + private String fileName; + + /** 文件大小 */ + private Integer fileSize; + + /** 视频时长 */ + private Integer videoDuration; + + /** 缩略图URL */ + private String thumbnailUrl; + + /** 是否已读 */ + private Integer isRead; + + /** 已读时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date readAt; + + /** 发送时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createdAt; + + // 非数据库字段 + private Boolean isMe; + + // ========== getter/setter ========== + public Long getId() { return id; } + public void setId(Long id) { this.id = id; } + + public Long getSenderSessionId() { return senderSessionId; } + public void setSenderSessionId(Long senderSessionId) { this.senderSessionId = senderSessionId; } + + public Long getReceiverSessionId() { return receiverSessionId; } + public void setReceiverSessionId(Long receiverSessionId) { this.receiverSessionId = receiverSessionId; } + + public Long getSenderId() { return senderId; } + public void setSenderId(Long senderId) { this.senderId = senderId; } + + public Long getReceiverId() { return receiverId; } + public void setReceiverId(Long receiverId) { this.receiverId = receiverId; } + + public String getContentType() { return contentType; } + public void setContentType(String contentType) { this.contentType = contentType; } + + public String getContent() { return content; } + public void setContent(String content) { this.content = content; } + + public String getFileUrl() { return fileUrl; } + public void setFileUrl(String fileUrl) { this.fileUrl = fileUrl; } + + public String getFileName() { return fileName; } + public void setFileName(String fileName) { this.fileName = fileName; } + + public Integer getFileSize() { return fileSize; } + public void setFileSize(Integer fileSize) { this.fileSize = fileSize; } + + public Integer getVideoDuration() { return videoDuration; } + public void setVideoDuration(Integer videoDuration) { this.videoDuration = videoDuration; } + + public String getThumbnailUrl() { return thumbnailUrl; } + public void setThumbnailUrl(String thumbnailUrl) { this.thumbnailUrl = thumbnailUrl; } + + public Integer getIsRead() { return isRead; } + public void setIsRead(Integer isRead) { this.isRead = isRead; } + + public Date getReadAt() { return readAt; } + public void setReadAt(Date readAt) { this.readAt = readAt; } + + public Date getCreatedAt() { return createdAt; } + public void setCreatedAt(Date createdAt) { this.createdAt = createdAt; } + + public Boolean getIsMe() { return isMe; } + public void setIsMe(Boolean isMe) { this.isMe = isMe; } +} \ No newline at end of file diff --git a/chenhai-system/src/main/java/com/chenhai/system/domain/ChatSessions.java b/chenhai-system/src/main/java/com/chenhai/system/domain/ChatSessions.java new file mode 100644 index 0000000..9931bf7 --- /dev/null +++ b/chenhai-system/src/main/java/com/chenhai/system/domain/ChatSessions.java @@ -0,0 +1,98 @@ +package com.chenhai.system.domain; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.chenhai.common.core.domain.BaseEntity; +import java.util.Date; + +/** + * 聊天会话表对象 chat_sessions + * + * @author chenhai + */ +public class ChatSessions extends BaseEntity { + private static final long serialVersionUID = 1L; + + /** 会话ID */ + private Long id; + + /** 当前用户ID */ + private Long userId; + + /** 对方用户ID */ + private Long otherUserId; + + /** 当前用户未读消息数 */ + private Integer userUnread; + + /** 对方用户未读消息数 */ + private Integer otherUnread; + + /** 最后一条消息内容 */ + private String lastMessage; + + /** 最后消息时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date lastMessageTime; + + /** 最后活动时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date lastActiveTime; + + /** 创建时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createdAt; + + /** 更新时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date updatedAt; + + // 非数据库字段 + private String otherUserName; + private String otherUserAvatar; + private Boolean otherOnline; + + // ========== getter/setter ========== + public Long getId() { return id; } + public void setId(Long id) { this.id = id; } + + public Long getUserId() { return userId; } + public void setUserId(Long userId) { this.userId = userId; } + + public Long getOtherUserId() { return otherUserId; } + public void setOtherUserId(Long otherUserId) { this.otherUserId = otherUserId; } + + public Integer getUserUnread() { return userUnread; } + public void setUserUnread(Integer userUnread) { this.userUnread = userUnread; } + + public Integer getOtherUnread() { return otherUnread; } + public void setOtherUnread(Integer otherUnread) { this.otherUnread = otherUnread; } + + public String getLastMessage() { return lastMessage; } + public void setLastMessage(String lastMessage) { this.lastMessage = lastMessage; } + + public Date getLastMessageTime() { return lastMessageTime; } + public void setLastMessageTime(Date lastMessageTime) { this.lastMessageTime = lastMessageTime; } + + public Date getLastActiveTime() { return lastActiveTime; } + public void setLastActiveTime(Date lastActiveTime) { this.lastActiveTime = lastActiveTime; } + + public Date getCreatedAt() { return createdAt; } + public void setCreatedAt(Date createdAt) { this.createdAt = createdAt; } + + public Date getUpdatedAt() { return updatedAt; } + public void setUpdatedAt(Date updatedAt) { this.updatedAt = updatedAt; } + + public String getOtherUserName() { return otherUserName; } + public void setOtherUserName(String otherUserName) { this.otherUserName = otherUserName; } + + public String getOtherUserAvatar() { return otherUserAvatar; } + public void setOtherUserAvatar(String otherUserAvatar) { this.otherUserAvatar = otherUserAvatar; } + + public Boolean getOtherOnline() { return otherOnline; } + public void setOtherOnline(Boolean otherOnline) { this.otherOnline = otherOnline; } + + @Override + public String toString() { + return "ChatSessions{id=" + id + ", userId=" + userId + ", otherUserId=" + otherUserId + "}"; + } +} \ No newline at end of file diff --git a/chenhai-system/src/main/java/com/chenhai/system/mapper/ChatMessagesMapper.java b/chenhai-system/src/main/java/com/chenhai/system/mapper/ChatMessagesMapper.java new file mode 100644 index 0000000..da39bf2 --- /dev/null +++ b/chenhai-system/src/main/java/com/chenhai/system/mapper/ChatMessagesMapper.java @@ -0,0 +1,40 @@ +package com.chenhai.system.mapper; + +import com.chenhai.system.domain.ChatMessages; +import org.apache.ibatis.annotations.Param; +import java.util.List; +import java.util.Date; + +public interface ChatMessagesMapper { + + ChatMessages selectChatMessagesById(Long id); + + List selectChatMessagesList(ChatMessages chatMessages); + + /** + * 根据会话ID查询消息 - 用于分页 + * 注意:这个方法会被 PageHelper 拦截,不要传 limit 参数 + */ + List selectMessagesBySession(@Param("sessionId") Long sessionId, + @Param("lastId") Long lastId); + + /** + * 获取会话的最后一条消息 + */ + ChatMessages selectLastMessageBySession(@Param("sessionId") Long sessionId); + + /** + * 获取未读消息 + */ + List selectUnreadMessages(@Param("sessionId") Long sessionId, + @Param("receiverId") Long receiverId); + + int insertChatMessages(ChatMessages chatMessages); + + int batchMarkAsRead(@Param("messageIds") List messageIds, + @Param("readAt") Date readAt); + + int markSessionAsRead(@Param("sessionId") Long sessionId, + @Param("receiverId") Long receiverId, + @Param("readAt") Date readAt); +} \ No newline at end of file diff --git a/chenhai-system/src/main/java/com/chenhai/system/mapper/ChatSessionsMapper.java b/chenhai-system/src/main/java/com/chenhai/system/mapper/ChatSessionsMapper.java new file mode 100644 index 0000000..75c9805 --- /dev/null +++ b/chenhai-system/src/main/java/com/chenhai/system/mapper/ChatSessionsMapper.java @@ -0,0 +1,42 @@ +package com.chenhai.system.mapper; + +import com.chenhai.system.domain.ChatSessions; +import org.apache.ibatis.annotations.Param; +import java.util.List; +import java.util.Date; + +public interface ChatSessionsMapper { + + ChatSessions selectChatSessionsById(Long id); + + List selectChatSessionsList(ChatSessions chatSessions); + + List selectSessionsByUserId(@Param("userId") Long userId); + + ChatSessions selectSessionByUsers(@Param("userId") Long userId, + @Param("otherUserId") Long otherUserId); + + int insertChatSessions(ChatSessions chatSessions); + + int updateChatSessions(ChatSessions chatSessions); + + int updateLastMessage(ChatSessions chatSessions); + + /** + * 增加对方未读计数 + */ + int incrementOtherUnread(@Param("sessionId") Long sessionId, + @Param("lastActiveTime") Date lastActiveTime); + + /** + * 增加自己未读计数 + */ + int incrementUserUnread(@Param("sessionId") Long sessionId, + @Param("lastActiveTime") Date lastActiveTime); + + int resetUnread(@Param("sessionId") Long sessionId); + + int deleteChatSessionsById(Long id); + + int deleteChatSessionsByIds(String[] ids); +} \ No newline at end of file diff --git a/chenhai-system/src/main/java/com/chenhai/system/mq/OfflineMessageConsumer.java b/chenhai-system/src/main/java/com/chenhai/system/mq/OfflineMessageConsumer.java new file mode 100644 index 0000000..154296d --- /dev/null +++ b/chenhai-system/src/main/java/com/chenhai/system/mq/OfflineMessageConsumer.java @@ -0,0 +1,90 @@ +package com.chenhai.system.mq; + +import com.chenhai.system.domain.ChatMessages; +import com.chenhai.system.mapper.ChatMessagesMapper; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +@RocketMQMessageListener( + topic = "chat-topic", + selectorExpression = "offline", + consumerGroup = "chat-offline-group", + maxReconsumeTimes = 3 +) +public class OfflineMessageConsumer implements RocketMQListener> { + + private static final Logger log = LoggerFactory.getLogger(OfflineMessageConsumer.class); + + @Autowired + private SimpMessagingTemplate messagingTemplate; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private ChatMessagesMapper chatMessagesMapper; + + private static final String ONLINE_KEY = "chat:online:"; + + @Override + public void onMessage(Map message) { + String receiverId = message.get("receiverId").toString(); + Integer retryCount = (Integer) message.getOrDefault("retryCount", 0); + Long messageId = Long.valueOf(message.get("id").toString()); + + // 检查用户是否在线 + String sessionId = redisTemplate.opsForValue().get(ONLINE_KEY + receiverId); + boolean isOnline = sessionId != null; + + log.info("MQ处理离线消息 - 接收者: {}, 在线状态: {}, 重试次数: {}/3", + receiverId, isOnline, retryCount); + + if (isOnline) { + try { + // 用户在线,推送消息 + messagingTemplate.convertAndSendToUser( + receiverId, + "/queue/messages", + message + ); + log.info("✅ MQ消息推送成功: 接收者={}, 消息ID={}", receiverId, messageId); + + // 推送成功,不需要做任何事,消息已经在数据库里了 + + } catch (Exception e) { + log.error("❌ MQ消息推送失败: 接收者={}, 错误={}", receiverId, e.getMessage()); + + if (retryCount < 3) { + message.put("retryCount", retryCount + 1); + throw new RuntimeException("推送失败,需要重试", e); + } else { + // 超过重试次数,但消息已经在数据库,用户上线时会通过历史消息拉取 + log.warn("消息推送重试已达上限,等待用户上线后拉取历史消息: 接收者={}, 消息ID={}", + receiverId, messageId); + } + } + } else { + // 用户离线 + if (retryCount < 3) { + // 未超过重试次数,继续重试 + log.info("用户[{}]仍离线,进行第{}次重试", receiverId, retryCount + 1); + message.put("retryCount", retryCount + 1); + throw new RuntimeException("用户离线,需要重试"); + } else { + // 超过重试次数,消息已经在数据库,用户上线后会拉取 + log.info("用户[{}]持续离线,等待上线后拉取历史消息: 消息ID={}", + receiverId, messageId); + // 不抛异常,消费成功 + } + } + } +} \ No newline at end of file diff --git a/chenhai-system/src/main/java/com/chenhai/system/service/IChatMessagesService.java b/chenhai-system/src/main/java/com/chenhai/system/service/IChatMessagesService.java new file mode 100644 index 0000000..ddd49b0 --- /dev/null +++ b/chenhai-system/src/main/java/com/chenhai/system/service/IChatMessagesService.java @@ -0,0 +1,78 @@ +package com.chenhai.system.service; + +import com.chenhai.system.domain.ChatMessages; +import java.util.List; + +/** + * 聊天消息Service接口 + * + * @author chenhai + */ +public interface IChatMessagesService { + + /** + * 获取会话历史消息 + */ + List getHistoryMessages(Long sessionId, Long lastId); + + /** + * 获取最后一条消息 + */ + ChatMessages getLastMessage(Long sessionId); + + /** + * 获取未读消息 + */ + List getUnreadMessages(Long sessionId, Long receiverId); + + /** + * 标记消息已读 + */ + void markAsRead(List messageIds, Long sessionId, Long readerId); + + /** + * 发送消息 - WebSocket调用,需要传入senderId + * + * @param senderId 发送者ID + * @param receiverId 接收者ID + * @param contentType 消息类型 + * @param content 消息内容 + * @return 保存后的消息对象 + */ + ChatMessages sendMessage(Long senderId, Long receiverId, String contentType, String content); + + /** + * 发送消息 - HTTP调用,从SecurityUtils获取senderId + * + * @param receiverId 接收者ID + * @param contentType 消息类型 + * @param content 消息内容 + * @return 保存后的消息对象 + */ + ChatMessages sendMessage(Long receiverId, String contentType, String content); + + /** + * 新增消息 + */ + int insertChatMessages(ChatMessages chatMessages); + + /** + * 修改消息 + */ +// int updateChatMessages(ChatMessages chatMessages); +// +// /** +// * 删除消息 +// */ +// int deleteChatMessagesById(Long id); +// +// /** +// * 批量删除消息 +// */ +// int deleteChatMessagesByIds(String[] ids); + + /** + * 根据ID查询消息 + */ + ChatMessages getChatMessagesById(Long id); +} \ No newline at end of file diff --git a/chenhai-system/src/main/java/com/chenhai/system/service/IChatSessionsService.java b/chenhai-system/src/main/java/com/chenhai/system/service/IChatSessionsService.java new file mode 100644 index 0000000..315483f --- /dev/null +++ b/chenhai-system/src/main/java/com/chenhai/system/service/IChatSessionsService.java @@ -0,0 +1,47 @@ +package com.chenhai.system.service; + +import com.chenhai.system.domain.ChatSessions; +import java.util.List; + +/** + * 聊天会话Service接口 + * + * @author chenhai + */ +public interface IChatSessionsService { + + /** + * 查询用户的所有会话 + */ + List selectSessionsByUserId(Long userId); + + /** + * 根据双方ID获取会话 + */ + ChatSessions getSessionByUsers(Long userId, Long otherUserId); + + /** + * 获取或创建会话 + */ + ChatSessions getOrCreateSession(Long userId, Long otherUserId); + + /** + * 发送消息后更新会话 + */ + void afterSendMessage(Long senderSessionId, Long receiverSessionId, + Long senderId, Long receiverId, String content); + + /** + * 标记会话已读 + */ + void markAsRead(Long sessionId, Long userId); + + /** + * 获取与指定用户相关的所有用户ID(有会话关联的用户) + * 用于在线状态广播 + * + * @param userId 当前用户ID + * @return 相关用户ID列表 + */ + List getRelatedUserIds(Long userId); +} \ No newline at end of file diff --git a/chenhai-system/src/main/java/com/chenhai/system/service/impl/ChatMessagesServiceImpl.java b/chenhai-system/src/main/java/com/chenhai/system/service/impl/ChatMessagesServiceImpl.java new file mode 100644 index 0000000..1092121 --- /dev/null +++ b/chenhai-system/src/main/java/com/chenhai/system/service/impl/ChatMessagesServiceImpl.java @@ -0,0 +1,147 @@ +package com.chenhai.system.service.impl; + +import com.chenhai.common.utils.SecurityUtils; +import com.chenhai.system.domain.ChatMessages; +import com.chenhai.system.domain.ChatSessions; +import com.chenhai.system.mapper.ChatMessagesMapper; +import com.chenhai.system.service.IChatMessagesService; +import com.chenhai.system.service.IChatSessionsService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import java.util.Date; +import java.util.List; + +/** + * 聊天消息Service实现 + * + * @author chenhai + */ +@Service +public class ChatMessagesServiceImpl implements IChatMessagesService { + + private static final Logger log = LoggerFactory.getLogger(ChatMessagesServiceImpl.class); + + @Autowired + private ChatMessagesMapper chatMessagesMapper; + + @Autowired + private IChatSessionsService chatSessionsService; + + @Override + public List getHistoryMessages(Long sessionId, Long lastId) { + return chatMessagesMapper.selectMessagesBySession(sessionId, lastId); + } + + @Override + public ChatMessages getLastMessage(Long sessionId) { + return chatMessagesMapper.selectLastMessageBySession(sessionId); + } + + @Override + public List getUnreadMessages(Long sessionId, Long receiverId) { + return chatMessagesMapper.selectUnreadMessages(sessionId, receiverId); + } + + @Override + @Transactional + public void markAsRead(List messageIds, Long sessionId, Long readerId) { + Date now = new Date(); + if (messageIds != null && !messageIds.isEmpty()) { + chatMessagesMapper.batchMarkAsRead(messageIds, now); + } else { + chatMessagesMapper.markSessionAsRead(sessionId, readerId, now); + } + chatSessionsService.markAsRead(sessionId, readerId); + } + + /** + * 发送消息 - 从参数获取senderId,不依赖SecurityUtils + * + * @param senderId 发送者ID(从WebSocket上下文中获取) + * @param receiverId 接收者ID + * @param contentType 消息类型 + * @param content 消息内容 + * @return 保存后的消息对象 + */ + @Override + @Transactional + public ChatMessages sendMessage(Long senderId, Long receiverId, String contentType, String content) { + log.info("发送消息 - senderId: {}, receiverId: {}, content: {}", senderId, receiverId, content); + + // 关键修复:检查是否给自己发消息 + if (senderId.equals(receiverId)) { + log.error("不能给自己发送消息: senderId={}, receiverId={}", senderId, receiverId); + throw new RuntimeException("不能给自己发送消息"); + } + + // 1. 获取双方会话ID + ChatSessions senderSession = chatSessionsService.getOrCreateSession(senderId, receiverId); + ChatSessions receiverSession = chatSessionsService.getOrCreateSession(receiverId, senderId); + + log.info("发送者会话ID: {}, 接收者会话ID: {}", senderSession.getId(), receiverSession.getId()); + + // 2. 创建消息对象 + ChatMessages message = new ChatMessages(); + message.setSenderSessionId(senderSession.getId()); + message.setReceiverSessionId(receiverSession.getId()); + message.setSenderId(senderId); + message.setReceiverId(receiverId); + message.setContentType(contentType); + message.setContent(content); + message.setIsRead(0); + message.setCreatedAt(new Date()); + + chatMessagesMapper.insertChatMessages(message); + + // 3. 更新会话 + chatSessionsService.afterSendMessage( + senderSession.getId(), + receiverSession.getId(), + senderId, + receiverId, + content + ); + + return message; + } + + /** + * 发送消息 - HTTP调用,从SecurityUtils获取senderId + */ + @Override + @Transactional + public ChatMessages sendMessage(Long receiverId, String contentType, String content) { + Long senderId = SecurityUtils.getUserId(); + return this.sendMessage(senderId, receiverId, contentType, content); + } + + @Override + public int insertChatMessages(ChatMessages chatMessages) { + chatMessages.setCreateTime(new Date()); + return chatMessagesMapper.insertChatMessages(chatMessages); + } + +// @Override +// public int updateChatMessages(ChatMessages chatMessages) { +// chatMessages.setUpdateTime(new Date()); +// return chatMessagesMapper.updateChatMessages(chatMessages); +// } +// +// @Override +// public int deleteChatMessagesById(Long id) { +// return chatMessagesMapper.deleteChatMessagesById(id); +// } +// +// @Override +// public int deleteChatMessagesByIds(String[] ids) { +// return chatMessagesMapper.deleteChatMessagesByIds(ids); +// } + + @Override + public ChatMessages getChatMessagesById(Long id) { + return chatMessagesMapper.selectChatMessagesById(id); + } +} \ No newline at end of file diff --git a/chenhai-system/src/main/java/com/chenhai/system/service/impl/ChatSessionsServiceImpl.java b/chenhai-system/src/main/java/com/chenhai/system/service/impl/ChatSessionsServiceImpl.java new file mode 100644 index 0000000..ef61b8c --- /dev/null +++ b/chenhai-system/src/main/java/com/chenhai/system/service/impl/ChatSessionsServiceImpl.java @@ -0,0 +1,143 @@ +package com.chenhai.system.service.impl; + +import com.chenhai.system.domain.ChatSessions; +import com.chenhai.system.mapper.ChatSessionsMapper; +import com.chenhai.system.service.IChatSessionsService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +/** + * 聊天会话Service实现 + * + * @author chenhai + */ +@Service +public class ChatSessionsServiceImpl implements IChatSessionsService { + + private static final Logger log = LoggerFactory.getLogger(ChatSessionsServiceImpl.class); + + @Autowired + private ChatSessionsMapper chatSessionsMapper; + + @Override + public List selectSessionsByUserId(Long userId) { + return chatSessionsMapper.selectSessionsByUserId(userId); + } + + @Override + public ChatSessions getSessionByUsers(Long userId, Long otherUserId) { + return chatSessionsMapper.selectSessionByUsers(userId, otherUserId); + } + + @Override + @Transactional + public ChatSessions getOrCreateSession(Long userId, Long otherUserId) { + // 关键修复:检查是否和自己聊天 + if (userId.equals(otherUserId)) { + log.error("不能和自己创建会话: userId={}, otherUserId={}", userId, otherUserId); + throw new RuntimeException("不能和自己聊天"); + } + + ChatSessions session = chatSessionsMapper.selectSessionByUsers(userId, otherUserId); + if (session == null) { + session = new ChatSessions(); + session.setUserId(userId); + session.setOtherUserId(otherUserId); + session.setUserUnread(0); + session.setOtherUnread(0); + session.setLastActiveTime(new Date()); + chatSessionsMapper.insertChatSessions(session); + log.info("创建新会话: userId={}, otherUserId={}, sessionId={}", userId, otherUserId, session.getId()); + } + return session; + } + + @Override + @Transactional + public void afterSendMessage(Long senderSessionId, Long receiverSessionId, + Long senderId, Long receiverId, String content) { + Date now = new Date(); + + // 1. 更新发送者视角的会话 - 增加对方的未读 + chatSessionsMapper.incrementOtherUnread(senderSessionId, now); + + // 2. 更新发送者视角的会话的最后消息 + ChatSessions senderSession = new ChatSessions(); + senderSession.setId(senderSessionId); + senderSession.setLastMessage(content); + senderSession.setLastMessageTime(now); + senderSession.setLastActiveTime(now); + chatSessionsMapper.updateLastMessage(senderSession); + + // 3. 更新接收者视角的会话 - 增加自己的未读 + chatSessionsMapper.incrementUserUnread(receiverSessionId, now); + + // 4. 更新接收者视角的会话的最后消息 + ChatSessions receiverSession = new ChatSessions(); + receiverSession.setId(receiverSessionId); + receiverSession.setLastMessage(content); + receiverSession.setLastMessageTime(now); + receiverSession.setLastActiveTime(now); + chatSessionsMapper.updateLastMessage(receiverSession); + + log.info("会话更新完成: senderSessionId={}, receiverSessionId={}, content={}", + senderSessionId, receiverSessionId, content); + } + + @Override + @Transactional + public void markAsRead(Long sessionId, Long userId) { + chatSessionsMapper.resetUnread(sessionId); + log.info("会话已读: sessionId={}, userId={}", sessionId, userId); + } + + /** + * 获取与指定用户相关的所有用户ID(有会话关联的用户) + * 实现方法:查询所有包含该用户的会话,提取对方用户ID + * + * @param userId 当前用户ID + * @return 相关用户ID列表(不包含自己) + */ + @Override + public List getRelatedUserIds(Long userId) { + try { + // 查询用户的所有会话 + List sessions = chatSessionsMapper.selectSessionsByUserId(userId); + + if (sessions == null || sessions.isEmpty()) { + log.info("用户[{}]没有会话记录", userId); + return new ArrayList<>(); + } + + // 从每个会话中提取对方用户ID + List relatedUserIds = sessions.stream() + .map(session -> { + // 如果当前用户是userId,则对方是otherUserId + // 注意:这里假设会话表结构是(user_id, other_user_id) + if (userId.equals(session.getUserId())) { + return session.getOtherUserId(); + } else { + return session.getUserId(); + } + }) + .distinct() // 去重 + .filter(id -> !id.equals(userId)) // 过滤掉自己(安全起见) + .collect(Collectors.toList()); + + log.info("用户[{}]的相关用户: {}", userId, relatedUserIds); + return relatedUserIds; + + } catch (Exception e) { + log.error("获取相关用户ID失败: userId={}", userId, e); + return new ArrayList<>(); + } + } +} \ No newline at end of file diff --git a/chenhai-system/src/main/resources/mapper/system/ChatMessagesMapper.xml b/chenhai-system/src/main/resources/mapper/system/ChatMessagesMapper.xml new file mode 100644 index 0000000..789f572 --- /dev/null +++ b/chenhai-system/src/main/resources/mapper/system/ChatMessagesMapper.xml @@ -0,0 +1,114 @@ + + + + + + + + + + + + + + + + + + + + + + + + select id, sender_session_id, receiver_session_id, sender_id, receiver_id, + content_type, content, file_url, file_name, file_size, video_duration, + thumbnail_url, is_read, read_at, created_at + from chat_messages + + + + + + + + + + + + + + + + + + + + insert into chat_messages ( + sender_session_id, receiver_session_id, sender_id, receiver_id, + content_type, content, is_read, created_at + ) values ( + #{senderSessionId}, #{receiverSessionId}, #{senderId}, #{receiverId}, + #{contentType}, #{content}, 0, sysdate() + ) + + + + + update chat_messages set is_read = 1, read_at = #{readAt} + where id in + + #{id} + + + + + + update chat_messages set is_read = 1, read_at = #{readAt} + where receiver_session_id = #{sessionId} + and receiver_id = #{receiverId} + and is_read = 0 + + + \ No newline at end of file diff --git a/chenhai-system/src/main/resources/mapper/system/ChatSessionsMapper.xml b/chenhai-system/src/main/resources/mapper/system/ChatSessionsMapper.xml new file mode 100644 index 0000000..4d4764f --- /dev/null +++ b/chenhai-system/src/main/resources/mapper/system/ChatSessionsMapper.xml @@ -0,0 +1,135 @@ + + + + + + + + + + + + + + + + + + + + + select id, user_id, other_user_id, user_unread, other_unread, + last_message, last_message_time, last_active_time, + created_at, updated_at + from chat_sessions + + + + + + + + + + + + insert into chat_sessions ( + user_id, other_user_id, user_unread, other_unread, + last_message, last_message_time, last_active_time, + created_at + ) values ( + #{userId}, #{otherUserId}, 0, 0, + #{lastMessage}, #{lastMessageTime}, #{lastActiveTime}, + sysdate() + ) + + + + update chat_sessions + + last_message = #{lastMessage}, + last_message_time = #{lastMessageTime}, + user_unread = #{userUnread}, + other_unread = #{otherUnread}, + last_active_time = #{lastActiveTime}, + + where id = #{id} + + + + update chat_sessions + set last_message = #{lastMessage}, + last_message_time = #{lastMessageTime}, + last_active_time = sysdate() + where id = #{id} + + + + + update chat_sessions + set + + + user_unread = user_unread + 1 + + where id = #{sessionId} + and user_id = #{receiverId} + + + + + update chat_sessions + set other_unread = other_unread + 1, + last_active_time = #{lastActiveTime} + where id = #{sessionId} + + + + + update chat_sessions + set user_unread = user_unread + 1, + last_active_time = #{lastActiveTime} + where id = #{sessionId} + + + + update chat_sessions + set user_unread = 0 + where id = #{sessionId} + + + + delete from chat_sessions where id = #{id} + + + + delete from chat_sessions where id in + + #{id} + + + + \ No newline at end of file diff --git a/chenhai-ui/package.json b/chenhai-ui/package.json index 20fe953..b5be874 100644 --- a/chenhai-ui/package.json +++ b/chenhai-ui/package.json @@ -29,6 +29,7 @@ "axios": "0.28.1", "clipboard": "2.0.8", "core-js": "3.37.1", + "date-fns": "^4.1.0", "echarts": "5.4.0", "element-ui": "^2.15.14", "file-saver": "2.0.5", @@ -40,8 +41,10 @@ "nprogress": "0.2.0", "quill": "2.0.2", "screenfull": "5.0.2", + "sockjs-client": "^1.6.1", "sortablejs": "1.10.2", "splitpanes": "2.4.1", + "stompjs": "^2.3.3", "vue": "^2.6.12", "vue-count-to": "1.0.13", "vue-cropper": "0.5.5", diff --git a/chenhai-ui/src/api/system/chat.js b/chenhai-ui/src/api/system/chat.js index a01a3f4..39ef5de 100644 --- a/chenhai-ui/src/api/system/chat.js +++ b/chenhai-ui/src/api/system/chat.js @@ -1,72 +1,72 @@ -// /api/chat/chat.js - 修改为使用现有接口 +// src/api/system/chat.js import request from '@/utils/request' -// 使用现有的专家接口获取专家信息 -export function getExpertInfo(expertId) { +/** + * 获取会话列表 + */ +export function listSessions() { return request({ - url: '/vet/experts/' + expertId, + url: '/system/chat/sessions', method: 'get' }) } -// 使用现有的会话接口创建会话 -export function createOrGetSession(data) { +/** + * 标记会话已读 + * @param {Object} data - { sessionId } + */ +export function markSessionRead(data) { return request({ - url: '/system/session/createOrGetSession', + url: '/system/chat/session/read', method: 'post', - data: data + data }) } -// 使用现有的消息接口发送消息 -export function sendTextMessage(data) { +/** + * 获取历史消息 + * @param {Object} params - { sessionId, pageNum, pageSize, lastId } + */ +export function getMessages(params) { return request({ - url: '/system/message/send', - method: 'post', - data: data - }) -} - -// 使用现有的消息接口获取会话消息 -export function getSessionMessages(sessionId) { - const query = { - sessionId: sessionId, - delFlag: '0' - } - return request({ - url: '/system/message/list', + url: '/system/chat/messages', method: 'get', - params: query + params }) } -// 标记消息为已读(通过修改消息接口) -export function markMessagesAsRead(sessionId, senderType) { +/** + * 发送消息 + * @param {Object} data - { receiverId, contentType, content } + */ +export function sendMessage(data) { return request({ - url: '/system/message/markAllRead', + url: '/system/chat/message/send', method: 'post', - data: { sessionId, senderType } + data }) } -// 获取用户会话列表 -export function getUserSessions(userId) { - const query = { - userId: userId, - delFlag: '0' - } +/** + * 标记消息已读 + * @param {Object} data - { messageIds, sessionId } + */ +export function markMessagesRead(data) { return request({ - url: '/system/session/list', - method: 'get', - params: query + url: '/system/chat/message/read', + method: 'post', + data }) } -// 结束会话 -export function endSession(sessionId, reason) { +/** + * 批量查询用户在线状态 + * @param {Object} data - { userIds: [1,2,3] } + */ +export function batchCheckStatus(data) { return request({ - url: '/system/session/end', + url: '/system/chat/status/batch', method: 'post', - data: { sessionId, reason } + data }) } diff --git a/chenhai-ui/src/api/vet/session.js b/chenhai-ui/src/api/vet/session.js new file mode 100644 index 0000000..6fcec70 --- /dev/null +++ b/chenhai-ui/src/api/vet/session.js @@ -0,0 +1,55 @@ +import request from '@/utils/request' + +/** + * 会话列表相关API + */ + +// 查询我的会话列表 +export function listMySessions(query) { + return request({ + url: '/chat/sessions/list', + method: 'get', + params: query + }) +} + +// 获取会话详情 +export function getSessionDetail(sessionId) { + return request({ + url: `/chat/session/${sessionId}`, + method: 'get' + }) +} + +// 创建新会话 +export function createSession(data) { + return request({ + url: '/chat/session', + method: 'post', + data: data + }) +} + +// 删除会话 +export function deleteSession(sessionId) { + return request({ + url: `/chat/session/${sessionId}`, + method: 'delete' + }) +} + +// 标记会话已读 +export function markSessionRead(sessionId) { + return request({ + url: `/chat/session/${sessionId}/read`, + method: 'put' + }) +} + +// 获取会话统计 +export function getSessionStats() { + return request({ + url: '/chat/session/stats', + method: 'get' + }) +} diff --git a/chenhai-ui/src/views/vet/chatManager/index.vue b/chenhai-ui/src/views/vet/chatManager/index.vue new file mode 100644 index 0000000..8998905 --- /dev/null +++ b/chenhai-ui/src/views/vet/chatManager/index.vue @@ -0,0 +1,1510 @@ + + + + + diff --git a/chenhai-ui/vue.config.js b/chenhai-ui/vue.config.js index 49ef216..84c6e4c 100644 --- a/chenhai-ui/vue.config.js +++ b/chenhai-ui/vue.config.js @@ -10,13 +10,13 @@ const CompressionPlugin = require('compression-webpack-plugin') const name = process.env.VUE_APP_TITLE || '"与牧同行"' // 网页标题 const baseUrl = // 后端接口 - // 'http://localhost:8081' - 'http://192.168.101.105:8082' + 'http://localhost:8081' + // 'http://192.168.101.105:8082' // 'http://192.168.101.109:8080' // 'http://192.168.101.111:8081' -const port = process.env.port || process.env.npm_config_port || 80 // 端口 +const port = process.env.port || process.env.npm_config_port || 81 // 端口 // vue.config.js 配置说明 //官方vue.config.js 参考文档 https://cli.vuejs.org/zh/config/#css-loaderoptions @@ -47,6 +47,15 @@ module.exports = { ['^' + process.env.VUE_APP_BASE_API]: '' } }, + // 专门处理 WebSocket 请求 + '/ws': { + target: baseUrl, + ws: true, // 关键!启用 WebSocket 代理 + changeOrigin: true, + pathRewrite: { + '^/ws': '/ws' + } + }, // springdoc proxy '^/v3/api-docs/(.*)': { target: baseUrl,