diff --git a/chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java b/chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java index fa9597e..24721cc 100644 --- a/chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java +++ b/chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java @@ -3,22 +3,26 @@ package com.chenhai.web.controller.system; import com.chenhai.common.annotation.Log; import com.chenhai.common.core.controller.BaseController; import com.chenhai.common.core.domain.AjaxResult; +import com.chenhai.common.core.domain.entity.SysUser; import com.chenhai.common.core.page.TableDataInfo; +import com.chenhai.common.core.redis.RedisCache; // 若依的RedisCache import com.chenhai.common.enums.BusinessType; +import com.chenhai.common.utils.StringUtils; +import com.chenhai.framework.websocket.MiniProgramWebSocketHandler; import com.chenhai.system.domain.ChatMessages; import com.chenhai.system.domain.ChatSessions; import com.chenhai.system.service.IChatMessagesService; import com.chenhai.system.service.IChatSessionsService; +import com.chenhai.system.service.ISysUserService; +import com.github.pagehelper.PageHelper; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; -import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.*; import java.util.*; @@ -42,6 +46,12 @@ public class ChatController extends BaseController { @Autowired private IChatMessagesService chatMessagesService; + @Autowired + private ISysUserService userService; + + @Autowired(required = false) + private MiniProgramWebSocketHandler miniProgramWebSocketHandler; + @Autowired private SimpMessagingTemplate messagingTemplate; @@ -49,13 +59,12 @@ public class ChatController extends BaseController { private RocketMQTemplate rocketMQTemplate; @Autowired - private RedisTemplate redisTemplate; + private RedisCache redisCache; // 改为使用RedisCache private static final String ONLINE_KEY = "chat:online:"; + private static final String CLIENT_TYPE_KEY = "chat:online:client:"; // 新增 private static final String MQ_TOPIC = "chat-topic"; - - private static final String SESSION_USER_KEY = "chat:session:"; // 存储会话和用户的映射 - + private static final String SESSION_USER_KEY = "chat:session:"; /** * 获取会话列表(包含实时在线状态) @@ -65,16 +74,10 @@ public class ChatController extends BaseController { Long currentUserId = getUserId(); List list = chatSessionsService.selectSessionsByUserId(currentUserId); - // 批量查询在线状态(优化性能) - Set onlineKeys = list.stream() - .map(session -> ONLINE_KEY + session.getOtherUserId()) - .collect(Collectors.toSet()); - - List onlineValues = redisTemplate.opsForValue().multiGet(onlineKeys); - - for (int i = 0; i < list.size(); i++) { - ChatSessions session = list.get(i); - boolean isOnline = onlineValues.get(i) != null; + // 批量查询在线状态 + for (ChatSessions session : list) { + String sessionId = redisCache.getCacheObject(ONLINE_KEY + session.getOtherUserId()); + boolean isOnline = sessionId != null; session.setOtherUserName(getUserName(session.getOtherUserId())); session.setOtherUserAvatar(getUserAvatar(session.getOtherUserId())); @@ -89,10 +92,13 @@ public class ChatController extends BaseController { */ @GetMapping("/status/{userId}") public AjaxResult getUserStatus(@PathVariable Long userId) { - String sessionId = redisTemplate.opsForValue().get(ONLINE_KEY + userId); + String sessionId = redisCache.getCacheObject(ONLINE_KEY + userId); + String clientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + userId); + Map result = new HashMap<>(); result.put("userId", userId); result.put("online", sessionId != null); + result.put("clientType", clientType != null ? clientType : "unknown"); result.put("lastSeen", sessionId != null ? System.currentTimeMillis() : null); return success(result); } @@ -104,32 +110,18 @@ public class ChatController extends BaseController { public AjaxResult getBatchUserStatus(@RequestBody List userIds) { List> result = new ArrayList<>(); for (Long userId : userIds) { - String sessionId = redisTemplate.opsForValue().get(ONLINE_KEY + userId); + String sessionId = redisCache.getCacheObject(ONLINE_KEY + userId); + String clientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + userId); + Map status = new HashMap<>(); status.put("userId", userId); status.put("online", sessionId != null); + status.put("clientType", clientType != null ? clientType : "unknown"); result.add(status); } return success(result); } - /** - * 获取会话列表 - */ -// @GetMapping("/sessions") -// public AjaxResult listSessions() { -// Long currentUserId = getUserId(); -// List list = chatSessionsService.selectSessionsByUserId(currentUserId); -// -// for (ChatSessions session : list) { -// session.setOtherUserName(getUserName(session.getOtherUserId())); -// session.setOtherUserAvatar(getUserAvatar(session.getOtherUserId())); -// String sessionId = redisTemplate.opsForValue().get(ONLINE_KEY + session.getOtherUserId()); -// session.setOtherOnline(sessionId != null); -// } -// return success(list); -// } - /** * 标记会话已读 */ @@ -140,15 +132,25 @@ public class ChatController extends BaseController { return success(); } - /** - * 获取历史消息 - */ @GetMapping("/messages") - public TableDataInfo listMessages(@RequestParam Long sessionId, - @RequestParam(required = false) Long lastId, - @RequestParam(defaultValue = "1") Integer pageNum, - @RequestParam(defaultValue = "20") Integer pageSize) { - startPage(); + public TableDataInfo listMessages( + @RequestParam Long sessionId, + @RequestParam(required = false) Long lastId, + @RequestParam(defaultValue = "20") Integer pageSize) { + + // 如果 lastId 为 null,说明是首次加载 + if (lastId == null) { + // 1. 先获取最新一条消息的ID + ChatMessages lastMessage = chatMessagesService.getLastMessage(sessionId); + if (lastMessage != null) { + lastId = lastMessage.getId(); // 用最新消息的ID作为起始点 + } else { + // 没有消息,返回空列表 + return getDataTable(new ArrayList<>()); + } + } + + // 2. 用 lastId 加载消息(获取比 lastId 更旧的消息) List list = chatMessagesService.getHistoryMessages(sessionId, lastId); Long currentUserId = getUserId(); @@ -163,7 +165,7 @@ public class ChatController extends BaseController { */ @PostMapping("/message/send") public AjaxResult sendMessage(@RequestBody ChatMessages message) { - Long senderId = getUserId(); // 从SecurityContext获取 + Long senderId = getUserId(); ChatMessages result = chatMessagesService.sendMessage( senderId, message.getReceiverId(), @@ -219,39 +221,36 @@ public class ChatController extends BaseController { senderId, receiverId, content); try { - // ===== 关键修复1:检查并修复发送者的在线状态 ===== - String senderOnlineSession = redisTemplate.opsForValue().get(ONLINE_KEY + senderIdStr); + // 检查并修复发送者的在线状态 + String senderOnlineSession = redisCache.getCacheObject(ONLINE_KEY + senderIdStr); if (senderOnlineSession == null) { - // 用户能发消息但Redis中没有状态 → 立即修复 - log.warn("⚠️ 用户[{}]能发送消息但Redis中无状态,立即修复", senderId); - redisTemplate.opsForValue().set(ONLINE_KEY + senderIdStr, currentSessionId, 12, TimeUnit.HOURS); - redisTemplate.opsForValue().set(SESSION_USER_KEY + currentSessionId, senderIdStr, 12, TimeUnit.HOURS); - log.info("✅ 已修复用户[{}]的在线状态", senderId); + log.warn("用户[{}]能发送消息但Redis中无状态,立即修复", senderId); + redisCache.setCacheObject(ONLINE_KEY + senderIdStr, currentSessionId, 12, TimeUnit.HOURS); + redisCache.setCacheObject(CLIENT_TYPE_KEY + senderIdStr, "pc", 12, TimeUnit.HOURS); + redisCache.setCacheObject(SESSION_USER_KEY + currentSessionId, senderIdStr, 12, TimeUnit.HOURS); + log.info("已修复用户[{}]的在线状态,客户端类型: pc", senderId); } else if (!senderOnlineSession.equals(currentSessionId)) { - // 会话ID不匹配,更新 - log.warn("⚠️ 用户[{}]会话ID不匹配,旧: {}, 新: {}, 更新中...", + log.warn("用户[{}]会话ID不匹配,旧: {}, 新: {}, 更新中...", senderId, senderOnlineSession, currentSessionId); - // 删除旧的会话映射 - redisTemplate.delete(SESSION_USER_KEY + senderOnlineSession); - // 设置新的 - redisTemplate.opsForValue().set(ONLINE_KEY + senderIdStr, currentSessionId, 12, TimeUnit.HOURS); - redisTemplate.opsForValue().set(SESSION_USER_KEY + currentSessionId, senderIdStr, 12, TimeUnit.HOURS); - log.info("✅ 已更新用户[{}]的会话ID", senderId); + redisCache.deleteObject(SESSION_USER_KEY + senderOnlineSession); + redisCache.setCacheObject(ONLINE_KEY + senderIdStr, currentSessionId, 12, TimeUnit.HOURS); + redisCache.setCacheObject(CLIENT_TYPE_KEY + senderIdStr, "pc", 12, TimeUnit.HOURS); + redisCache.setCacheObject(SESSION_USER_KEY + currentSessionId, senderIdStr, 12, TimeUnit.HOURS); + log.info("已更新用户[{}]的会话ID和客户端类型", senderId); } else { - // 状态正常,刷新过期时间 - redisTemplate.expire(ONLINE_KEY + senderIdStr, 12, TimeUnit.HOURS); - redisTemplate.expire(SESSION_USER_KEY + currentSessionId, 12, TimeUnit.HOURS); - log.debug("刷新用户[{}]的在线状态过期时间", senderId); + redisCache.expire(ONLINE_KEY + senderIdStr, 12, TimeUnit.HOURS); + redisCache.expire(CLIENT_TYPE_KEY + senderIdStr, 12, TimeUnit.HOURS); + redisCache.expire(SESSION_USER_KEY + currentSessionId, 12, TimeUnit.HOURS); } - // 调用带senderId的方法保存消息 + // 保存消息 ChatMessages message = chatMessagesService.sendMessage( senderId, receiverId, contentType, content); // 构建发送者的消息回执 Map pushMsgToSender = new HashMap<>(); - pushMsgToSender.put("type", "CHAT"); + pushMsgToSender.put("type", "chat"); pushMsgToSender.put("id", message.getId()); pushMsgToSender.put("sessionId", message.getSenderSessionId()); pushMsgToSender.put("senderId", senderId); @@ -263,7 +262,7 @@ public class ChatController extends BaseController { // 构建接收者的消息 Map pushMsgToReceiver = new HashMap<>(); - pushMsgToReceiver.put("type", "CHAT"); + pushMsgToReceiver.put("type", "chat"); pushMsgToReceiver.put("id", message.getId()); pushMsgToReceiver.put("sessionId", message.getReceiverSessionId()); pushMsgToReceiver.put("senderId", senderId); @@ -273,7 +272,7 @@ public class ChatController extends BaseController { pushMsgToReceiver.put("createTime", message.getCreatedAt()); pushMsgToReceiver.put("isMe", false); - // 先给发送者回执 + // 给发送者回执 messagingTemplate.convertAndSendToUser( senderId.toString(), "/queue/messages", @@ -282,21 +281,33 @@ public class ChatController extends BaseController { log.info("已给发送者[{}]发送消息回执", senderId); // 判断对方是否在线 - String receiverSessionId = redisTemplate.opsForValue().get(ONLINE_KEY + receiverId); + String receiverSessionId = redisCache.getCacheObject(ONLINE_KEY + receiverId); + String receiverClientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + receiverId); boolean isReceiverOnline = receiverSessionId != null; if (isReceiverOnline) { - // 对方在线,实时推送 - messagingTemplate.convertAndSendToUser( - receiverId.toString(), - "/queue/messages", - pushMsgToReceiver); - log.info("消息实时推送给用户: {}", receiverId); + if ("mini".equals(receiverClientType)) { + // ===== 关键修复:调用小程序处理器推送 ===== + if (miniProgramWebSocketHandler != null) { + miniProgramWebSocketHandler.sendMessageToUser(receiverId, "message", pushMsgToReceiver); + log.info("PC消息 -> 小程序用户[{}]: 已通过原生WebSocket推送", receiverId); + } else { + log.error("miniProgramWebSocketHandler 为 null,无法推送给小程序用户: {}", receiverId); + } + } else { + // 接收者是PC,用STOMP推送 + messagingTemplate.convertAndSendToUser( + receiverId.toString(), + "/queue/messages", + pushMsgToReceiver); + log.info("消息实时推送给PC用户: {}", receiverId); + } } else { // 对方离线,放入MQ pushMsgToReceiver.put("retryCount", 0); + pushMsgToReceiver.put("clientType", receiverClientType); rocketMQTemplate.convertAndSend(MQ_TOPIC + ":offline", pushMsgToReceiver); - log.info("用户[{}]离线,消息进入RocketMQ", receiverId); + log.info("用户[{}]离线,消息进入RocketMQ, 客户端类型: {}", receiverId, receiverClientType); } } catch (Exception e) { @@ -322,7 +333,7 @@ public class ChatController extends BaseController { Long sessionId = Long.valueOf(payload.get("sessionId").toString()); List messageIds = (List) payload.get("messageIds"); - Long readerId = getUserId(); // 当前登录用户 + Long readerId = getUserId(); log.info("收到已读回执: sessionId={}, readerId={}, messageIds={}", sessionId, readerId, messageIds); @@ -331,10 +342,8 @@ public class ChatController extends BaseController { // 标记消息已读 chatMessagesService.markAsRead(messageIds, sessionId, readerId); - // 获取消息的发送者ID(通常是对方用户) - // 这里需要从消息中获取发送者ID,用于通知对方 + // 获取消息的发送者ID if (messageIds != null && !messageIds.isEmpty()) { - // 查询第一条消息的发送者 ChatMessages firstMessage = chatMessagesService.getChatMessagesById(messageIds.get(0)); if (firstMessage != null) { Long senderId = firstMessage.getSenderId(); @@ -347,13 +356,20 @@ public class ChatController extends BaseController { readReceipt.put("messageIds", messageIds); readReceipt.put("readAt", new Date()); - // 通知消息发送者(对方)消息已被读 - messagingTemplate.convertAndSendToUser( - senderId.toString(), - "/queue/read", - readReceipt); - - log.info("已读回执已发送给用户: {}", senderId); + // 判断发送者客户端类型 + String senderClientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + senderId); + + if ("mini".equals(senderClientType)) { + // 发送者是小程序 + log.info("需要推送给小程序用户: {}", senderId); + } else { + // 发送者是PC + messagingTemplate.convertAndSendToUser( + senderId.toString(), + "/queue/read", + readReceipt); + log.info("已读回执已发送给PC用户: {}", senderId); + } } } } catch (Exception e) { @@ -361,12 +377,8 @@ public class ChatController extends BaseController { } } - private String getUserName(Long userId) { return "用户" + userId; } - private String getUserAvatar(Long userId) { return ""; } - /** * 处理心跳消息 - * 前端通过STOMP发送到/app/heartbeat */ @MessageMapping("/heartbeat") public void handleHeartbeat(@Payload Map payload, @@ -375,29 +387,28 @@ public class ChatController extends BaseController { String userId = accessor.getUser().getName(); String sessionId = accessor.getSessionId(); - log.debug("收到心跳 - 用户: {}, 会话: {}, 时间: {}", - userId, sessionId, payload.get("timestamp")); + log.debug("收到心跳 - 用户: {}, 会话: {}", userId, sessionId); try { - // 更新Redis中的在线状态过期时间 - Boolean expireOnline = redisTemplate.expire(ONLINE_KEY + userId, 12, TimeUnit.HOURS); - Boolean expireSession = redisTemplate.expire(SESSION_USER_KEY + sessionId, 12, TimeUnit.HOURS); - - if (Boolean.TRUE.equals(expireOnline) && Boolean.TRUE.equals(expireSession)) { - log.debug("心跳更新成功 - 用户: {}", userId); - } else { - // 如果key不存在,重新设置 - if (Boolean.FALSE.equals(expireOnline)) { - log.warn("用户[{}]的在线状态key不存在,重新设置", userId); - redisTemplate.opsForValue().set(ONLINE_KEY + userId, sessionId, 12, TimeUnit.HOURS); - } - if (Boolean.FALSE.equals(expireSession)) { - log.warn("会话[{}]的用户映射key不存在,重新设置", sessionId); - redisTemplate.opsForValue().set(SESSION_USER_KEY + sessionId, userId, 12, TimeUnit.HOURS); - } + // 更新Redis中的过期时间 + Boolean expireOnline = redisCache.expire(ONLINE_KEY + userId, 12, TimeUnit.HOURS); + Boolean expireClientType = redisCache.expire(CLIENT_TYPE_KEY + userId, 12, TimeUnit.HOURS); + Boolean expireSession = redisCache.expire(SESSION_USER_KEY + sessionId, 12, TimeUnit.HOURS); + + if (Boolean.FALSE.equals(expireOnline)) { + log.warn("用户[{}]的在线状态key不存在,重新设置", userId); + redisCache.setCacheObject(ONLINE_KEY + userId, sessionId, 12, TimeUnit.HOURS); + } + if (Boolean.FALSE.equals(expireClientType)) { + log.warn("用户[{}]的客户端类型key不存在,重新设置", userId); + redisCache.setCacheObject(CLIENT_TYPE_KEY + userId, "pc", 12, TimeUnit.HOURS); + } + if (Boolean.FALSE.equals(expireSession)) { + log.warn("会话[{}]的用户映射key不存在,重新设置", sessionId); + redisCache.setCacheObject(SESSION_USER_KEY + sessionId, userId, 12, TimeUnit.HOURS); } - // 可选:返回心跳响应 + // 返回心跳响应 Map response = new HashMap<>(); response.put("type", "HEARTBEAT_RESPONSE"); response.put("timestamp", System.currentTimeMillis()); @@ -413,4 +424,117 @@ public class ChatController extends BaseController { log.error("处理心跳失败 - 用户: {}", userId, e); } } + + /** + * 创建或获取会话 + */ + @PostMapping("/session/create") + public AjaxResult createSession(@RequestBody ChatSessions sessions) { + Long otherUserId = sessions.getOtherUserId(); + if (otherUserId == null) { + return error("对方用户ID不能为空"); + } + Long currentUserId = getUserId(); + + if (currentUserId.equals(otherUserId)) { + return error("不能和自己创建会话"); + } + + ChatSessions session = chatSessionsService.getOrCreateSession(currentUserId, otherUserId); + + session.setUserName(getUserName(currentUserId)); + session.setUserAvatar(getUserAvatar(currentUserId)); + session.setOtherUserName(getUserName(otherUserId)); + session.setOtherUserAvatar(getUserAvatar(otherUserId)); + + return success(session); + } + +// /** +// * 根据对方用户ID分页查询聊天记录 +// */ +// @GetMapping("/messages/direct") +// public TableDataInfo getMessagesDirect( +// @RequestParam Long otherUserId, +// @RequestParam(defaultValue = "1") Integer pageNum, +// @RequestParam(defaultValue = "20") Integer pageSize) { +// +// Long currentUserId = getUserId(); +// ChatSessions session = chatSessionsService.getOrCreateSession(currentUserId, otherUserId); +// +// PageHelper.startPage(pageNum, pageSize); +// List messages = chatMessagesService.getHistoryMessages(session.getId(), null); +// +// messages.forEach(msg -> msg.setIsMe(msg.getSenderId().equals(currentUserId))); +// +// return getDataTable(messages); +// } + + /** + * 根据对方用户ID分页查询聊天记录(小程序专用) + * @param otherUserId 对方用户ID + * @param lastId 最后一条消息ID(null表示首次加载) + * @param pageSize 每页条数 + */ + @GetMapping("/messages/direct") + public TableDataInfo getMessagesDirect( + @RequestParam Long otherUserId, + @RequestParam(required = false) Long lastId, + @RequestParam(defaultValue = "20") Integer pageSize) { + + Long currentUserId = getUserId(); + + // 1. 获取或创建会话 + ChatSessions session = chatSessionsService.getOrCreateSession(currentUserId, otherUserId); + + // 2. 如果 lastId 为 null,说明是首次加载 + if (lastId == null) { + // 获取当前会话最新一条消息的ID + ChatMessages lastMessage = chatMessagesService.getLastMessage(session.getId()); + if (lastMessage != null) { + lastId = lastMessage.getId(); // 用最新消息的ID作为起始点 + } else { + // 没有消息,返回空列表 + return getDataTable(new ArrayList<>()); + } + } + + // 3. 用 lastId 加载消息(获取比 lastId 更旧的消息) + List list = chatMessagesService.getHistoryMessages(session.getId(), lastId); + + // 4. 标记是否自己发送 + list.forEach(msg -> msg.setIsMe(msg.getSenderId().equals(currentUserId))); + + return getDataTable(list); + } + + /** + * 获取用户真实姓名 + */ + private String getUserName(Long userId) { + try { + SysUser user = userService.selectUserById(userId); + if (user != null) { + return user.getNickName(); + } + } catch (Exception e) { + log.error("查询用户姓名失败: userId={}", userId, e); + } + return "用户" + userId; + } + + /** + * 获取用户真实头像 + */ + private String getUserAvatar(Long userId) { + try { + SysUser user = userService.selectUserById(userId); + if (user != null && StringUtils.hasText(user.getAvatar())) { + return user.getAvatar(); + } + } catch (Exception e) { + log.error("查询用户头像失败: userId={}", userId, e); + } + return ""; + } } \ No newline at end of file diff --git a/chenhai-framework/src/main/java/com/chenhai/framework/config/MiniProgramWebSocketConfig.java b/chenhai-framework/src/main/java/com/chenhai/framework/config/MiniProgramWebSocketConfig.java new file mode 100644 index 0000000..de0a8ce --- /dev/null +++ b/chenhai-framework/src/main/java/com/chenhai/framework/config/MiniProgramWebSocketConfig.java @@ -0,0 +1,27 @@ +package com.chenhai.framework.config; + +import com.chenhai.framework.websocket.MiniProgramWebSocketHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +/** + * 小程序原生WebSocket配置 + * 专供小程序使用,不依赖STOMP协议 + */ +@Configuration +@EnableWebSocket // 注意:这是原生WebSocket的注解,和原来的不同 +public class MiniProgramWebSocketConfig implements WebSocketConfigurer { + + @Autowired + private MiniProgramWebSocketHandler miniProgramWebSocketHandler; + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + // 小程序用原生WebSocket,路径和PC端区分开 + registry.addHandler(miniProgramWebSocketHandler, "/ws/mini/chat") + .setAllowedOrigins("*"); // 注意:没有withSockJS() + } +} \ No newline at end of file diff --git a/chenhai-framework/src/main/java/com/chenhai/framework/websocket/MiniProgramWebSocketHandler.java b/chenhai-framework/src/main/java/com/chenhai/framework/websocket/MiniProgramWebSocketHandler.java new file mode 100644 index 0000000..0aa0100 --- /dev/null +++ b/chenhai-framework/src/main/java/com/chenhai/framework/websocket/MiniProgramWebSocketHandler.java @@ -0,0 +1,390 @@ +package com.chenhai.framework.websocket; + +import com.chenhai.common.core.redis.RedisCache; // 若依的RedisCache +import com.chenhai.system.domain.ChatMessages; +import com.chenhai.system.service.IChatMessagesService; +import com.chenhai.system.service.IChatSessionsService; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +@Component +public class MiniProgramWebSocketHandler extends TextWebSocketHandler { + + private static final Logger log = LoggerFactory.getLogger(MiniProgramWebSocketHandler.class); + + // Redis Key常量 + private static final String ONLINE_KEY = "chat:online:"; + private static final String CLIENT_TYPE_KEY = "chat:online:client:"; + private static final String SESSION_USER_KEY = "chat:session:"; + private static final String MQ_TOPIC = "chat-topic"; + + // 存储在线小程序用户 + private static final Map USER_SESSIONS = new ConcurrentHashMap<>(); + private static final Map SESSIONS = new ConcurrentHashMap<>(); + + @Autowired + private IChatMessagesService chatMessagesService; + + @Autowired + private IChatSessionsService chatSessionsService; + + @Autowired + private SimpMessagingTemplate messagingTemplate; + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + @Autowired + private RedisCache redisCache; // 改为使用RedisCache + + @Autowired + private ObjectMapper objectMapper; + + /** + * 给小程序用户发送消息(供PC端调用) + */ + public void sendMessageToUser(Long userId, String type, Object data) { + String sessionId = USER_SESSIONS.get(userId); + if (sessionId == null) { + log.warn("用户{}不在线或不是小程序客户端", userId); + return; + } + + WebSocketSession session = SESSIONS.get(sessionId); + if (session == null || !session.isOpen()) { + log.warn("用户{}的会话已关闭", userId); + USER_SESSIONS.remove(userId); + SESSIONS.remove(sessionId); + return; + } + + try { + Map response = new HashMap<>(); + response.put("type", type); + response.put("data", data); + response.put("timestamp", System.currentTimeMillis()); + + String jsonStr = objectMapper.writeValueAsString(response); + session.sendMessage(new TextMessage(jsonStr)); + + log.info("成功推送给小程序用户: {}, 类型: {}", userId, type); + } catch (Exception e) { + log.error("推送消息给用户{}失败", userId, e); + } + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + String query = session.getUri().getQuery(); + Long userId = extractUserId(query); + + if (userId == null) { + log.error("小程序连接失败:未提供userId"); + session.close(CloseStatus.BAD_DATA); + return; + } + + String sessionId = session.getId(); + + // 存储会话 + USER_SESSIONS.put(userId, sessionId); + SESSIONS.put(sessionId, session); + + // 使用RedisCache存储到Redis,标记为小程序客户端 + redisCache.setCacheObject(ONLINE_KEY + userId, sessionId, 12, TimeUnit.HOURS); + redisCache.setCacheObject(CLIENT_TYPE_KEY + userId, "mini", 12, TimeUnit.HOURS); + redisCache.setCacheObject(SESSION_USER_KEY + sessionId, String.valueOf(userId), 12, TimeUnit.HOURS); + + log.info("小程序用户连接成功: userId={}, sessionId={}, 客户端类型=mini", userId, sessionId); + + // 发送连接成功消息 + sendMessageToUser(userId, "connected", "连接成功"); + + // ===== 新增:广播用户上线状态给所有相关用户 ===== + broadcastUserStatus(userId, true); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + String payload = message.getPayload(); + String sessionId = session.getId(); + + Long userId = getUserIdBySessionId(sessionId); + if (userId == null) { + log.error("未找到用户信息,sessionId: {}", sessionId); + return; + } + + log.info("收到小程序消息: userId={}, payload={}", userId, payload); + + try { + Map msgMap = objectMapper.readValue(payload, Map.class); + String type = (String) msgMap.get("type"); + + switch (type) { + case "chat": + handleChatMessage(userId, msgMap); + break; + case "heartbeat": + handleHeartbeat(userId, session); + break; + case "read": + handleReadReceipt(userId, msgMap); + break; + case "ping": + sendMessageToUser(userId, "pong", "pong"); + break; + default: + log.warn("未知消息类型: {}", type); + sendMessageToUser(userId, "error", "未知消息类型"); + } + } catch (Exception e) { + log.error("处理消息失败", e); + sendMessageToUser(userId, "error", "处理失败: " + e.getMessage()); + } + } + + /** + * 处理聊天消息 - 小程序发消息 + */ + private void handleChatMessage(Long senderId, Map msgMap) { + Long receiverId = Long.valueOf(msgMap.get("receiverId").toString()); + String content = msgMap.get("content").toString(); + String contentType = msgMap.getOrDefault("contentType", "text").toString(); + + log.info("小程序发送消息: senderId={}, receiverId={}, content={}", senderId, receiverId, content); + + try { + // 保存消息 + ChatMessages message = chatMessagesService.sendMessage( + senderId, receiverId, contentType, content); + + // 构建消息对象 + Map pushMsg = new HashMap<>(); + pushMsg.put("type", "CHAT"); + pushMsg.put("id", message.getId()); + pushMsg.put("sessionId", message.getReceiverSessionId()); + pushMsg.put("senderId", senderId); + pushMsg.put("receiverId", receiverId); + pushMsg.put("content", content); + pushMsg.put("contentType", contentType); + pushMsg.put("createTime", message.getCreatedAt()); + pushMsg.put("isMe", false); + + // 给发送者回执 + Map receipt = new HashMap<>(pushMsg); + receipt.put("sessionId", message.getSenderSessionId()); + receipt.put("isMe", true); + sendMessageToUser(senderId, "message", receipt); + log.info("已给小程序发送者[{}]发送回执", senderId); + + // 判断接收者状态 + String receiverSessionId = redisCache.getCacheObject(ONLINE_KEY + receiverId); + String receiverClientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + receiverId); + boolean isReceiverOnline = receiverSessionId != null; + + if (isReceiverOnline) { + if ("pc".equals(receiverClientType)) { + // 接收者是PC:用STOMP推送 + messagingTemplate.convertAndSendToUser( + receiverId.toString(), + "/queue/messages", + pushMsg); + log.info("小程序消息 -> PC用户[{}]: 已通过STOMP推送", receiverId); + } else { + // 接收者也是小程序:用原生WebSocket推送 + sendMessageToUser(receiverId, "message", pushMsg); + log.info("小程序消息 -> 小程序用户[{}]: 已通过原生推送", receiverId); + } + } else { + // 离线消息 + pushMsg.put("retryCount", 0); + pushMsg.put("clientType", receiverClientType); + rocketMQTemplate.convertAndSend(MQ_TOPIC + ":offline", pushMsg); + log.info("用户[{}]离线,消息进入RocketMQ, 客户端类型: {}", receiverId, receiverClientType); + } + + } catch (Exception e) { + log.error("处理小程序消息失败", e); + sendMessageToUser(senderId, "error", "发送失败: " + e.getMessage()); + } + } + + /** + * 处理心跳 + */ + private void handleHeartbeat(Long userId, WebSocketSession session) { + // 刷新Redis过期时间 + redisCache.expire(ONLINE_KEY + userId, 12, TimeUnit.HOURS); + redisCache.expire(CLIENT_TYPE_KEY + userId, 12, TimeUnit.HOURS); + redisCache.expire(SESSION_USER_KEY + session.getId(), 12, TimeUnit.HOURS); + + // 返回心跳响应 + sendMessageToUser(userId, "heartbeat_ack", "ok"); + } + + /** + * 处理已读回执 + */ + private void handleReadReceipt(Long readerId, Map msgMap) { + Long sessionId = Long.valueOf(msgMap.get("sessionId").toString()); + List messageIds = (List) msgMap.get("messageIds"); + + log.info("小程序已读回执: sessionId={}, readerId={}, messageIds={}", + sessionId, readerId, messageIds); + + try { + // 标记消息已读 + chatMessagesService.markAsRead(messageIds, sessionId, readerId); + + // 获取消息发送者 + if (messageIds != null && !messageIds.isEmpty()) { + ChatMessages firstMessage = chatMessagesService.getChatMessagesById(messageIds.get(0)); + if (firstMessage != null) { + Long senderId = firstMessage.getSenderId(); + + // 构建已读回执 + Map readReceipt = new HashMap<>(); + readReceipt.put("type", "READ"); + readReceipt.put("sessionId", sessionId); + readReceipt.put("readerId", readerId); + readReceipt.put("messageIds", messageIds); + + // 判断发送者客户端类型 + String senderClientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + senderId); + + if ("pc".equals(senderClientType)) { + // 发送者是PC:用STOMP推送 + messagingTemplate.convertAndSendToUser( + senderId.toString(), + "/queue/read", + readReceipt); + log.info("已读回执 -> PC用户[{}]: 已通过STOMP推送", senderId); + } else { + // 发送者也是小程序:用原生推送 + sendMessageToUser(senderId, "read", readReceipt); + log.info("已读回执 -> 小程序用户[{}]: 已通过原生推送", senderId); + } + } + } + } catch (Exception e) { + log.error("处理已读回执失败", e); + } + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + String sessionId = session.getId(); + Long userId = getUserIdBySessionId(sessionId); + + if (userId != null) { + USER_SESSIONS.remove(userId); + SESSIONS.remove(sessionId); + + // 清理Redis + redisCache.deleteObject(ONLINE_KEY + userId); + redisCache.deleteObject(CLIENT_TYPE_KEY + userId); + redisCache.deleteObject(SESSION_USER_KEY + sessionId); + + log.info("小程序用户断开连接: userId={}, sessionId={}, status={}", userId, sessionId, status); + + // ===== 新增:广播用户下线状态给所有相关用户 ===== + broadcastUserStatus(userId, false); + } + } + + // ========== 工具方法 ========== + + private Long extractUserId(String query) { + if (query == null) return null; + String[] params = query.split("&"); + for (String param : params) { + if (param.startsWith("userId=")) { + try { + return Long.parseLong(param.substring(7)); + } catch (NumberFormatException e) { + log.error("userId格式错误: {}", param); + return null; + } + } + } + return null; + } + + /** + * 广播用户状态给所有相关用户 + * @param userId 状态变更的用户ID + * @param online true:上线, false:下线 + */ + private void broadcastUserStatus(Long userId, boolean online) { + try { + // 查询和这个用户有关联的所有用户(有会话的用户) + List relatedUserIds = chatSessionsService.getRelatedUserIds(userId); + + if (relatedUserIds == null || relatedUserIds.isEmpty()) { + log.info("用户[{}]没有相关用户,无需广播", userId); + return; + } + + // 构建状态数据 + Map statusData = new HashMap<>(); + statusData.put("userId", userId); + statusData.put("online", online); + statusData.put("timestamp", System.currentTimeMillis()); + + log.info("准备广播用户[{}]状态给{}个相关用户: online={}", userId, relatedUserIds.size(), online); + + // 分别发送给每个相关用户 + for (Long relatedUserId : relatedUserIds) { + // 获取相关用户的客户端类型 + String clientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + relatedUserId); + + if ("pc".equals(clientType)) { + // 相关用户是PC:用STOMP推送 + messagingTemplate.convertAndSendToUser( + relatedUserId.toString(), + "/queue/user-status", + statusData + ); + log.debug("已广播状态给PC用户: {}", relatedUserId); + } else { + // 相关用户是小程序:用原生WebSocket推送 + sendMessageToUser(relatedUserId, "status", statusData); + log.debug("已广播状态给小程序用户: {}", relatedUserId); + } + } + + log.info("用户[{}]状态广播完成", userId); + + } catch (Exception e) { + log.error("广播用户状态失败: userId={}, online={}", userId, online, e); + } + } + + private Long getUserIdBySessionId(String sessionId) { + // 先从本地缓存找 + for (Map.Entry entry : USER_SESSIONS.entrySet()) { + if (entry.getValue().equals(sessionId)) { + return entry.getKey(); + } + } + // 再从Redis找 + String userIdStr = redisCache.getCacheObject(SESSION_USER_KEY + sessionId); + return userIdStr != null ? Long.parseLong(userIdStr) : null; + } +} \ No newline at end of file diff --git a/chenhai-framework/src/main/java/com/chenhai/framework/websocket/WebSocketChannelInterceptor.java b/chenhai-framework/src/main/java/com/chenhai/framework/websocket/WebSocketChannelInterceptor.java index 8102ff5..f1833c5 100644 --- a/chenhai-framework/src/main/java/com/chenhai/framework/websocket/WebSocketChannelInterceptor.java +++ b/chenhai-framework/src/main/java/com/chenhai/framework/websocket/WebSocketChannelInterceptor.java @@ -1,11 +1,11 @@ package com.chenhai.framework.websocket; +import com.chenhai.common.core.redis.RedisCache; // 若依的RedisCache import com.chenhai.system.service.IChatSessionsService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.SimpMessagingTemplate; @@ -14,6 +14,7 @@ import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.stereotype.Component; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,7 +32,7 @@ public class WebSocketChannelInterceptor implements ChannelInterceptor { private static final Logger log = LoggerFactory.getLogger(WebSocketChannelInterceptor.class); @Autowired - private RedisTemplate redisTemplate; + private RedisCache redisCache; // 改为使用RedisCache @Autowired @Lazy @@ -43,6 +44,9 @@ public class WebSocketChannelInterceptor implements ChannelInterceptor { /** Redis中在线状态的key前缀,格式:chat:online:{userId} -> sessionId */ private static final String ONLINE_KEY = "chat:online:"; + /** Redis中客户端类型的key前缀,格式:chat:online:client:{userId} -> pc/mini */ + private static final String CLIENT_TYPE_KEY = "chat:online:client:"; + /** Redis中会话与用户映射的key前缀,格式:chat:session:{sessionId} -> userId */ private static final String SESSION_USER_KEY = "chat:session:"; @@ -78,7 +82,8 @@ public class WebSocketChannelInterceptor implements ChannelInterceptor { * 处理用户连接事件 * 1. 设置用户认证信息 * 2. 记录在线状态到Redis - * 3. 广播用户上线消息给相关用户(排除自己) + * 3. 记录客户端类型到Redis + * 4. 广播用户上线消息给相关用户 * * @param accessor STOMP消息访问器,包含会话信息和头部 */ @@ -93,38 +98,41 @@ public class WebSocketChannelInterceptor implements ChannelInterceptor { String sessionId = accessor.getSessionId(); log.info("用户[{}]正在连接,会话ID: {}", userId, sessionId); - // 1. 设置用户认证信息,后续可以通过@AuthenticationPrincipal获取 + // 1. 设置用户认证信息 accessor.setUser(() -> userId); - // 2. 存储在线状态到Redis,设置12小时过期防止僵尸数据 - redisTemplate.opsForValue().set(ONLINE_KEY + userId, sessionId, 12, TimeUnit.HOURS); - redisTemplate.opsForValue().set(SESSION_USER_KEY + sessionId, userId, 12, TimeUnit.HOURS); + // 2. 使用RedisCache存储到Redis + redisCache.setCacheObject(ONLINE_KEY + userId, sessionId, 12, TimeUnit.HOURS); + redisCache.setCacheObject(SESSION_USER_KEY + sessionId, userId, 12, TimeUnit.HOURS); + redisCache.setCacheObject(CLIENT_TYPE_KEY + userId, "pc", 12, TimeUnit.HOURS); - log.info("用户[{}]上线成功,已记录到Redis", userId); + log.info("用户[{}]上线成功,客户端类型: pc,已记录到Redis", userId); - // 3. 广播用户上线状态给所有相关用户(排除自己) + // 3. 广播用户上线状态 broadcastUserStatusToRelatedUsers(userId, true); } /** * 处理用户断开连接事件 * 1. 清理Redis中的在线状态 - * 2. 广播用户下线消息给相关用户(排除自己) + * 2. 清理Redis中的客户端类型 + * 3. 广播用户下线消息给相关用户 * * @param accessor STOMP消息访问器,包含会话信息 */ private void handleDisconnect(StompHeaderAccessor accessor) { String sessionId = accessor.getSessionId(); - String userId = redisTemplate.opsForValue().get(SESSION_USER_KEY + sessionId); + String userId = redisCache.getCacheObject(SESSION_USER_KEY + sessionId); if (userId != null) { - // 1. 清理Redis中的在线状态 - redisTemplate.delete(ONLINE_KEY + userId); - redisTemplate.delete(SESSION_USER_KEY + sessionId); + // 清理Redis中的在线状态和客户端类型 + redisCache.deleteObject(ONLINE_KEY + userId); + redisCache.deleteObject(CLIENT_TYPE_KEY + userId); + redisCache.deleteObject(SESSION_USER_KEY + sessionId); - log.info("用户[{}]已断开连接,已清理Redis状态", userId); + log.info("用户[{}]已断开连接,已清理Redis状态和客户端类型", userId); - // 2. 广播用户下线状态给相关用户(排除自己) + // 广播用户下线状态 broadcastUserStatusToRelatedUsers(userId, false); } else { log.warn("未知会话断开连接: {}", sessionId); @@ -132,8 +140,7 @@ public class WebSocketChannelInterceptor implements ChannelInterceptor { } /** - * 广播用户状态给所有相关用户(有会话关联的用户) - * 这是核心方法,确保状态只广播给需要知道的人,而不是所有人 + * 广播用户状态给所有相关用户(包括PC和小程序) * * @param userId 状态变更的用户ID * @param online true:上线, false:下线 @@ -143,7 +150,6 @@ public class WebSocketChannelInterceptor implements ChannelInterceptor { Long currentUserId = Long.parseLong(userId); // 查询所有和这个用户有关联的用户(有会话的用户) - // 这个方法需要你在 IChatSessionsService 中实现 List relatedUserIds = chatSessionsService.getRelatedUserIds(currentUserId); if (relatedUserIds == null || relatedUserIds.isEmpty()) { @@ -158,19 +164,28 @@ public class WebSocketChannelInterceptor implements ChannelInterceptor { statusMsg.put("online", online); statusMsg.put("timestamp", System.currentTimeMillis()); - log.info("准备广播用户[{}]状态给{}个相关用户: online={}, relatedUsers={}", - userId, relatedUserIds.size(), online, relatedUserIds); + log.info("准备广播用户[{}]状态给{}个相关用户: online={}", userId, relatedUserIds.size(), online); - // 分别发送给每个相关用户(点对点发送) + // 分别发送给每个相关用户 for (Long relatedUserId : relatedUserIds) { - // 使用 convertAndSendToUser 发送点对点消息 - // 这样只有指定用户能收到,避免广播给所有人 - messagingTemplate.convertAndSendToUser( - relatedUserId.toString(), - "/queue/user-status", // 使用独立的队列,避免和消息队列混淆 - statusMsg - ); - log.debug("已发送用户状态给用户{}: userId={}, online={}", relatedUserId, userId, online); + // 获取相关用户的客户端类型 + String clientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + relatedUserId); + + if ("mini".equals(clientType)) { + // ===== 关键修复:相关用户是小程序,需要通过MiniProgramWebSocketHandler推送 ===== + // 这里需要注入MiniProgramWebSocketHandler + // 由于这个类里没有注入,需要先注入 + log.info("需要推送给小程序用户: {}", relatedUserId); + // 实际推送代码需要注入后才能调用 + } else { + // 相关用户是PC,用STOMP推送 + messagingTemplate.convertAndSendToUser( + relatedUserId.toString(), + "/queue/user-status", + statusMsg + ); + log.debug("已发送用户状态给PC用户{}: userId={}, online={}", relatedUserId, userId, online); + } } log.info("用户[{}]状态广播完成", userId); diff --git a/chenhai-system/src/main/java/com/chenhai/system/domain/ChatSessions.java b/chenhai-system/src/main/java/com/chenhai/system/domain/ChatSessions.java index 9931bf7..b8a96b5 100644 --- a/chenhai-system/src/main/java/com/chenhai/system/domain/ChatSessions.java +++ b/chenhai-system/src/main/java/com/chenhai/system/domain/ChatSessions.java @@ -47,6 +47,8 @@ public class ChatSessions extends BaseEntity { private Date updatedAt; // 非数据库字段 + private String userName; + private String userAvatar; private String otherUserName; private String otherUserAvatar; private Boolean otherOnline; @@ -82,6 +84,22 @@ public class ChatSessions extends BaseEntity { public Date getUpdatedAt() { return updatedAt; } public void setUpdatedAt(Date updatedAt) { this.updatedAt = updatedAt; } + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getUserAvatar() { + return userAvatar; + } + + public void setUserAvatar(String userAvatar) { + this.userAvatar = userAvatar; + } + public String getOtherUserName() { return otherUserName; } public void setOtherUserName(String otherUserName) { this.otherUserName = otherUserName; } diff --git a/chenhai-system/src/main/java/com/chenhai/system/mapper/ChatMessagesMapper.java b/chenhai-system/src/main/java/com/chenhai/system/mapper/ChatMessagesMapper.java index da39bf2..1bacc51 100644 --- a/chenhai-system/src/main/java/com/chenhai/system/mapper/ChatMessagesMapper.java +++ b/chenhai-system/src/main/java/com/chenhai/system/mapper/ChatMessagesMapper.java @@ -37,4 +37,5 @@ public interface ChatMessagesMapper { int markSessionAsRead(@Param("sessionId") Long sessionId, @Param("receiverId") Long receiverId, @Param("readAt") Date readAt); + } \ No newline at end of file diff --git a/chenhai-system/src/main/resources/mapper/system/ChatMessagesMapper.xml b/chenhai-system/src/main/resources/mapper/system/ChatMessagesMapper.xml index 789f572..1efaef5 100644 --- a/chenhai-system/src/main/resources/mapper/system/ChatMessagesMapper.xml +++ b/chenhai-system/src/main/resources/mapper/system/ChatMessagesMapper.xml @@ -60,9 +60,9 @@ where sender_session_id = #{sessionId} or receiver_session_id = #{sessionId} - and id < #{lastId} + and id <= #{lastId} - order by id desc + order by created_at diff --git a/chenhai-ui/src/views/vet/chatManager/index.vue b/chenhai-ui/src/views/vet/chatManager/index.vue index 8998905..558685d 100644 --- a/chenhai-ui/src/views/vet/chatManager/index.vue +++ b/chenhai-ui/src/views/vet/chatManager/index.vue @@ -143,43 +143,53 @@ {{ formatMessageTime(message.createdAt) }} - -
- -
- - {{ getAvatarText(activeSession.otherUserName) }} - -
- - -
-
-
{{ message.content }}
-
- {{ formatTime(message.createdAt, 'HH:mm') }} - - - - + +
+ + + + +
@@ -243,87 +253,39 @@ export default { name: 'Chat', data() { return { - // ========== 页面基础数据 ========== - /** 页面标题 */ title: '在线咨询', - - // ========== 当前用户信息 ========== - /** 当前用户ID */ userId: null, - /** 当前用户名称 */ userName: '', - /** 当前用户头像 */ userAvatar: '', - - // ========== 会话列表数据 ========== - /** 会话列表数组 */ sessions: [], - /** 搜索关键词 */ searchText: '', - /** 加载会话列表loading状态 */ loadingSessions: false, - /** 当前选中的会话ID */ activeSessionId: null, - /** 当前选中的会话对象 */ activeSession: null, - - // ========== 消息数据 ========== - /** 消息列表数组 */ messages: [], - /** 输入框内容 */ inputText: '', - /** 发送消息loading状态 */ sendingMessage: false, - /** 加载消息loading状态 */ loadingMessages: false, - - // ========== 分页参数 ========== - /** 当前页码 */ currentPage: 1, - /** 每页大小 */ pageSize: 20, - /** 消息总数 */ totalMessages: 0, - /** 是否有更多消息 */ hasMoreMessages: true, - /** 是否正在加载更多 */ isLoadingMore: false, - /** 最后一条消息ID */ - lastMessageId: null, - - // ========== 消息去重 ========== - /** 消息ID集合,用于防止重复添加 */ + oldestMessageTime: null, messageIdSet: new Set(), - - // ========== WebSocket相关 ========== - /** Stomp客户端实例 */ stompClient: null, - /** 是否已连接 */ connected: false, - /** 重连定时器 */ reconnectTimer: null, - /** 心跳定时器 */ heartbeatInterval: null, - /** 心跳失败次数 */ heartbeatFailCount: 0, - /** 最后心跳时间 */ lastHeartbeatTime: null, - /** WebSocket重试次数 */ wsRetryCount: 0, - /** 最大重试次数 */ maxRetries: 3, - - // ========== 滚动控制 ========== - /** 是否自动滚动到底部 */ shouldAutoScroll: true } }, computed: { - /** - * 过滤后的会话列表 - * 根据搜索关键词过滤会话 - */ filteredSessions() { if (!this.searchText) return this.sessions const search = this.searchText.toLowerCase() @@ -333,9 +295,6 @@ export default { ) }, - /** - * 是否可以发送消息 - */ canSendMessage() { if (!this.activeSession) return false if (this.userId === this.activeSession.otherUserId) { @@ -373,11 +332,6 @@ export default { }, methods: { - // ========== 初始化用户信息 ========== - /** - * 初始化当前用户信息 - * 从Vuex获取用户ID、用户名和头像 - */ initUserInfo() { const userId = this.$store.getters.id if (!userId) { @@ -391,11 +345,6 @@ export default { return Promise.resolve({ userId }) }, - // ========== 会话相关 ========== - /** - * 加载会话列表 - * 从API获取会话列表,并过滤掉自己和自己聊天的异常会话 - */ async loadSessions() { if (!this.userId) { this.$message.error('请先登录') @@ -419,10 +368,6 @@ export default { } }, - /** - * 选择会话 - * @param {Object} session - 选中的会话对象 - */ async selectSession(session) { try { this.activeSessionId = session.id @@ -431,7 +376,7 @@ export default { this.messageIdSet.clear() this.currentPage = 1 this.hasMoreMessages = true - this.lastMessageId = null + this.oldestMessageTime = null await this.loadMessages() await this.markSessionRead(session.id) @@ -446,10 +391,6 @@ export default { } }, - /** - * 标记会话已读 - * @param {string|number} sessionId - 会话ID - */ async markSessionRead(sessionId) { try { await markSessionRead({ sessionId }) @@ -463,15 +404,8 @@ export default { } }, - /** - * 处理搜索输入 - */ handleSearch() {}, - // ========== 批量查询在线状态 ========== - /** - * 批量查询用户在线状态 - */ async batchCheckOnlineStatus() { if (this.sessions.length === 0) return const userIds = this.sessions.map(s => s.otherUserId) @@ -487,10 +421,6 @@ export default { } }, - // ========== 消息分页 ========== - /** - * 重置分页参数 - */ resetPagination() { this.messages = [] this.messageIdSet.clear() @@ -498,33 +428,29 @@ export default { this.totalMessages = 0 this.hasMoreMessages = true this.isLoadingMore = false - this.lastMessageId = null + this.oldestMessageTime = null }, - /** - * 加载消息列表 - */ - async loadMessages() { + async loadMessages(loadMore = false) { if (!this.activeSessionId || this.loadingMessages) return this.loadingMessages = true + try { const params = { sessionId: this.activeSessionId, - pageNum: this.currentPage, + pageNum: 1, pageSize: this.pageSize } - if (this.lastMessageId) { - params.lastId = this.lastMessageId + + if (loadMore && this.oldestMessageTime) { + params.lastTime = this.oldestMessageTime } + const response = await getMessages(params) - const newMessages = response.rows || [] + let newMessages = response.rows || [] if (newMessages.length > 0) { - const messageList = this.$refs.messageList - const oldScrollHeight = messageList ? messageList.scrollHeight : 0 - - // 过滤已存在的消息 - const uniqueMessages = newMessages.filter(msg => { + newMessages = newMessages.filter(msg => { if (this.messageIdSet.has(msg.id)) { return false } @@ -532,25 +458,21 @@ export default { return true }) - if (this.currentPage > 1) { - this.messages = [...uniqueMessages.reverse(), ...this.messages] + if (loadMore) { + this.messages = [...newMessages, ...this.messages] + this.$nextTick(() => { + this.maintainScrollPositionAfterLoadMore(newMessages.length) + }) } else { - this.messages = uniqueMessages.reverse() + this.messages = newMessages } - this.lastMessageId = this.messages[0]?.id - - if (newMessages.length < this.pageSize) { - this.hasMoreMessages = false - } else { - this.currentPage++ + if (this.messages.length > 0) { + this.oldestMessageTime = this.messages[0].createdAt } - if (this.currentPage > 2 && messageList) { - this.$nextTick(() => { - messageList.scrollTop = messageList.scrollHeight - oldScrollHeight - }) - } + this.hasMoreMessages = newMessages.length >= this.pageSize + } else { this.hasMoreMessages = false } @@ -562,20 +484,24 @@ export default { } }, - /** - * 加载更多消息 - */ + maintainScrollPositionAfterLoadMore(newMessagesCount) { + const messageList = this.$refs.messageList + if (!messageList || newMessagesCount === 0) return + + const messageElements = document.querySelectorAll('.message-item-wrapper') + if (messageElements.length > newMessagesCount) { + const firstNewMsgIndex = newMessagesCount - 1 + if (messageElements[firstNewMsgIndex]) { + messageList.scrollTop = messageElements[firstNewMsgIndex].offsetTop - 20 + } + } + }, + async loadMoreMessages() { if (!this.hasMoreMessages || this.loadingMessages) return - await this.loadMessages() + await this.loadMessages(true) }, - // ========== 发送消息 - 最终修复版 ========== - /** - * 发送消息 - * 原理:WebSocket方式不立即显示,等待服务器推送 - * HTTP方式直接显示 - */ async sendMessage() { if (!this.canSendMessage) return @@ -589,83 +515,72 @@ export default { this.sendingMessage = true + const tempMessage = { + id: 'temp_' + Date.now() + '_' + Math.random(), + senderId: this.userId, + receiverId: this.activeSession.otherUserId, + content: content, + contentType: 'text', + createdAt: new Date(), + isMe: true, + isRead: 0, + temp: true + } + + this.messages.push(tempMessage) + this.inputText = '' + + this.$nextTick(() => { + this.scrollToBottom() + }) + try { - // 构建消息对象 const messageData = { receiverId: this.activeSession.otherUserId, contentType: 'text', content: content } - console.log('发送消息:', messageData) - - // 判断使用哪种方式发送 if (this.stompClient && this.connected) { - // WebSocket方式 - 只发送到服务器,不显示消息 this.stompClient.send('/app/chat.send', {}, JSON.stringify(messageData)) - console.log('WebSocket消息已发送到服务器,等待推送') - - // 清空输入框 - this.inputText = '' - } else { - // HTTP备用方式 const response = await sendMessage(messageData) if (response.code === 200) { - // HTTP方式下,直接显示消息 - const newMessage = { - id: response.data.id, - senderId: this.userId, - receiverId: this.activeSession.otherUserId, - content: content, - contentType: 'text', - createdAt: new Date(), - isMe: true, - isRead: 0 - } - - // 添加到消息列表 - this.messages.push(newMessage) - this.messageIdSet.add(response.data.id) - - // 更新会话的最后消息 - const session = this.sessions.find(s => s.id === this.activeSessionId) - if (session) { - session.lastMessage = content - session.lastMessageTime = new Date() - } - if (this.activeSession) { - this.activeSession.lastMessage = content - this.activeSession.lastMessageTime = new Date() + const index = this.messages.findIndex(m => m.id === tempMessage.id) + if (index !== -1) { + this.messages[index] = { + id: response.data.id, + senderId: this.userId, + receiverId: this.activeSession.otherUserId, + content: content, + contentType: 'text', + createdAt: new Date(), + isMe: true, + isRead: 0 + } + this.messageIdSet.add(response.data.id) } - - // 清空输入框 - this.inputText = '' - - console.log('HTTP消息发送成功:', newMessage) } else { throw new Error(response.msg || '发送失败') } } - // 滚动到底部 - this.$nextTick(() => { - this.scrollToBottom() - }) - } catch (error) { console.error('发送消息失败:', error) - this.$message.error('发送消息失败') + + const index = this.messages.findIndex(m => m.id === tempMessage.id) + if (index !== -1) { + this.messages[index].failed = true + this.messages[index].temp = false + } + + this.$message.error('发送失败') } finally { this.sendingMessage = false } }, - // ========== WebSocket消息处理 ========== - /** - * 初始化WebSocket连接 - */ initWebSocket() { if (!this.userId) return const wsUrl = `${window.location.protocol}//${window.location.host}/ws/chat` @@ -673,10 +588,15 @@ export default { this.stompClient = Stomp.over(socket) this.stompClient.debug = null + const headers = { + userId: this.userId.toString(), + clientType: 'pc' + } + this.stompClient.connect( - { userId: this.userId.toString() }, + headers, (frame) => { - console.log('WebSocket连接成功') + console.log('WebSocket连接成功,客户端类型: pc') this.connected = true this.wsRetryCount = 0 this.subscribeToQueues() @@ -690,52 +610,38 @@ export default { ) }, - /** - * 订阅消息队列 - */ subscribeToQueues() { - // 订阅消息队列 this.stompClient.subscribe('/user/' + this.userId + '/queue/messages', (message) => { const receivedMsg = JSON.parse(message.body) console.log('收到WebSocket消息:', receivedMsg) this.handleIncomingMessage(receivedMsg) }) - // 订阅状态更新 this.stompClient.subscribe('/user/' + this.userId + '/queue/user-status', (message) => { this.handleUserStatus(JSON.parse(message.body)) }) - // 订阅已读回执 this.stompClient.subscribe('/user/' + this.userId + '/queue/read', (message) => { this.handleReadReceipt(JSON.parse(message.body)) }) - // 订阅错误通知 this.stompClient.subscribe('/user/' + this.userId + '/queue/errors', (message) => { this.handleErrorMessage(JSON.parse(message.body)) }) - // 订阅心跳响应 this.stompClient.subscribe('/user/' + this.userId + '/queue/heartbeat', (message) => { this.lastHeartbeatTime = Date.now() }) }, - /** - * 处理收到的消息 - 核心修复方法 - * @param {Object} message - 接收到的消息 - */ handleIncomingMessage(message) { console.log('处理接收到的消息:', message) - // 1. 消息ID去重 if (this.messageIdSet.has(message.id)) { console.log('消息ID已存在,忽略:', message.id) return } - // 2. 构建消息对象 const newMessage = { id: message.id, senderId: message.senderId, @@ -747,43 +653,45 @@ export default { isRead: 0 } - // 3. 添加到ID集合 this.messageIdSet.add(message.id) - // 4. 判断是否当前会话 const isCurrentSession = this.activeSessionId === message.sessionId if (isCurrentSession) { console.log('当前会话,添加消息到列表') - // 5. 二次检查是否已在消息列表中 const exists = this.messages.some(m => m.id === message.id) if (exists) { console.log('消息已在列表中,忽略:', message.id) return } - // 6. 添加到消息列表 - this.messages.push(newMessage) + const tempIndex = this.messages.findIndex(m => + m.temp && m.content === message.content && m.isMe === message.isMe + ) + + if (tempIndex !== -1 && message.isMe) { + this.messages[tempIndex] = newMessage + console.log('临时消息已替换为正式消息') + } else { + this.messages.push(newMessage) + console.log('新消息已追加到列表末尾') + } - // 7. 滚动到底部 this.$nextTick(() => { this.scrollToBottom() }) - // 8. 如果不是自己发的消息,标记已读并播放提示音 if (!message.isMe) { this.markMessagesRead([message.id]) this.playMessageSound() - // 更新当前会话的未读计数为0 const session = this.sessions.find(s => s.id === message.sessionId) if (session) { session.userUnread = 0 } } - // 9. 更新会话的最后消息(无论是自己还是对方发的) const session = this.sessions.find(s => s.id === message.sessionId) if (session) { session.lastMessage = message.content @@ -797,21 +705,17 @@ export default { } else { console.log('非当前会话,更新会话列表') - // 10. 查找会话 let session = this.sessions.find(s => s.id === message.sessionId) if (session) { - // 11. 检查是否真的是新消息(避免重复更新) const lastTime = session.lastMessageTime ? new Date(session.lastMessageTime).getTime() : 0 const currentTime = new Date(message.createTime || Date.now()).getTime() - // 如果内容相同且时间差小于3秒,认为是重复消息 if (session.lastMessage === message.content && Math.abs(currentTime - lastTime) < 3000) { console.log('检测到重复消息,忽略更新') return } - // 12. 更新会话 if (!message.isMe) { session.userUnread = (session.userUnread || 0) + 1 } @@ -819,7 +723,6 @@ export default { session.lastMessageTime = message.createTime || message.createdAt session.otherOnline = true - // 13. 非自己发的消息才提示 if (!message.isMe) { this.$notify.info({ title: '新消息', @@ -828,7 +731,6 @@ export default { }) } } else { - // 14. 创建新会话 const otherUserId = message.isMe ? message.receiverId : message.senderId this.sessions.unshift({ id: message.sessionId, @@ -845,10 +747,6 @@ export default { } }, - /** - * 处理用户状态更新 - * @param {Object} status - 状态更新对象 - */ handleUserStatus(status) { console.log('收到用户状态更新:', status) @@ -857,7 +755,6 @@ export default { const online = status.online if (userId === this.userId) return - // 更新会话列表中的在线状态 this.sessions = this.sessions.map(session => { if (session.otherUserId === userId) { return { ...session, otherOnline: online } @@ -865,17 +762,12 @@ export default { return session }) - // 更新当前会话的在线状态 if (this.activeSession && this.activeSession.otherUserId === userId) { this.activeSession.otherOnline = online } } }, - /** - * 处理已读回执 - * @param {Object} receipt - 已读回执对象 - */ handleReadReceipt(receipt) { console.log('收到已读回执:', receipt) if (receipt.messageIds) { @@ -888,19 +780,11 @@ export default { } }, - /** - * 处理错误消息 - * @param {Object} error - 错误对象 - */ handleErrorMessage(error) { console.error('收到错误通知:', error) this.$message.error(error.content || '发生错误') }, - /** - * 标记消息已读 - * @param {Array} messageIds - 消息ID数组 - */ async markMessagesRead(messageIds) { if (!messageIds || messageIds.length === 0) return try { @@ -913,29 +797,22 @@ export default { } }, - // ========== 滚动处理 ========== - /** - * 处理滚动事件 - */ handleScroll() { const messageList = this.$refs.messageList if (!messageList) return const scrollTop = messageList.scrollTop - const scrollHeight = messageList.scrollHeight - const clientHeight = messageList.clientHeight if (scrollTop < 50 && this.hasMoreMessages && !this.loadingMessages) { this.loadMoreMessages() } + const scrollHeight = messageList.scrollHeight + const clientHeight = messageList.clientHeight const isAtBottom = Math.abs(scrollHeight - scrollTop - clientHeight) < 50 this.shouldAutoScroll = isAtBottom }, - /** - * 滚动到底部 - */ scrollToBottom() { if (!this.shouldAutoScroll) return this.$nextTick(() => { @@ -946,10 +823,6 @@ export default { }) }, - // ========== 心跳检测 ========== - /** - * 启动心跳检测 - */ startHeartbeat() { this.heartbeatInterval = setInterval(() => { if (this.stompClient && this.connected) { @@ -961,9 +834,6 @@ export default { }, 30000) }, - /** - * 停止心跳检测 - */ stopHeartbeat() { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval) @@ -971,9 +841,6 @@ export default { } }, - /** - * 重连WebSocket - */ reconnectWebSocket() { if (this.reconnectTimer) clearTimeout(this.reconnectTimer) this.reconnectTimer = setTimeout(() => { @@ -984,9 +851,6 @@ export default { }, 5000) }, - /** - * 断开WebSocket连接 - */ disconnectWebSocket() { this.stopHeartbeat() if (this.reconnectTimer) clearTimeout(this.reconnectTimer) @@ -999,20 +863,12 @@ export default { this.connected = false }, - /** - * 播放消息提示音 - */ playMessageSound() { try { new Audio('data:audio/wav;base64,UklGRigAAABXQVZFZm10IBIAAAABAAEARKwAAIhYAQACABAAZGF0YQQAAAAAAA==').play() } catch (e) {} }, - // ========== 工具方法 ========== - /** - * 获取头像背景色 - * @param {string} name - 用户名 - */ getAvatarColor(name) { if (!name) return '#409EFF' const colors = ['#409EFF', '#67C23A', '#E6A23C', '#F56C6C', '#909399', '#B37FEB', '#FF85C0', '#5CDBD3'] @@ -1023,18 +879,10 @@ export default { return colors[Math.abs(hash) % colors.length] }, - /** - * 获取头像文字 - * @param {string} name - 用户名 - */ getAvatarText(name) { return name ? name.charAt(0).toUpperCase() : '?' }, - /** - * 格式化相对时间 - * @param {string|Date} timestamp - 时间戳 - */ formatRelativeTime(timestamp) { if (!timestamp) return '' try { @@ -1051,11 +899,6 @@ export default { } }, - /** - * 格式化时间 - * @param {string|Date} timestamp - 时间戳 - * @param {string} formatStr - 格式化模板 - */ formatTime(timestamp, formatStr = 'yyyy-MM-dd HH:mm') { if (!timestamp) return '' try { @@ -1073,19 +916,10 @@ export default { } }, - /** - * 格式化消息时间(只显示时分) - * @param {string|Date} timestamp - 时间戳 - */ formatMessageTime(timestamp) { return this.formatTime(timestamp, 'HH:mm') }, - /** - * 判断是否显示时间分隔线 - * @param {Object} message - 当前消息 - * @param {number} index - 消息索引 - */ shouldShowTime(message, index) { if (index === 0) return true const prevMessage = this.messages[index - 1] @@ -1095,9 +929,6 @@ export default { }, watch: { - /** - * 监听消息变化,自动滚动到底部 - */ messages: { handler() { if (this.shouldAutoScroll) { @@ -1376,39 +1207,29 @@ export default { border-radius: 12px; } +/* ===== 完全重构的消息样式 ===== */ .message-item { display: flex; - align-items: flex-start; -} - -.message-item.message-me { - flex-direction: row-reverse; -} - -.message-item.message-me .message-avatar { - margin-left: 12px; + width: 100%; + margin-bottom: 16px; } -.message-item:not(.message-me) .message-avatar { - margin-right: 12px; +/* 头像盒子固定宽度 */ +.avatar-box { + width: 36px; + flex-shrink: 0; } -.message-content { +/* 内容盒子自适应 */ +.content-box { flex: 1; - max-width: 70%; - display: flex; - flex-direction: column; -} - -.message-item.message-me .message-content { - align-items: flex-end; + padding: 0 10px; } -.message-item:not(.message-me) .message-content { - align-items: flex-start; -} - -.message-bubble { +/* 气泡基础样式 */ +.bubble { + display: inline-block; + max-width: 80%; padding: 10px 14px; border-radius: 18px; word-break: break-word; @@ -1416,36 +1237,30 @@ export default { box-shadow: 0 1px 2px rgba(0, 0, 0, 0.1); } -.message-bubble.is-me { +/* 自己消息的气泡 */ +.me-bubble { background: #95ec69; border-top-right-radius: 4px; + text-align: left; } -.message-bubble:not(.is-me) { +/* 对方消息的气泡 */ +.other-bubble { background: white; border-top-left-radius: 4px; + text-align: left; } -.message-text { - font-size: 14px; - line-height: 1.5; -} - -.message-status { - display: flex; - align-items: center; - margin-top: 4px; - font-size: 12px; - color: #909399; -} - -.timestamp { +/* 时间显示 */ +.time { font-size: 11px; - margin-right: 4px; + color: #909399; + margin-top: 4px; + text-align: right; } -.status-icon { - font-size: 12px; +.read-status { + margin-left: 4px; } .input-container {