Browse Source
1.聊天功能前端的排序滚动分页问题
1.聊天功能前端的排序滚动分页问题
2.小程序和pc端STOMP和原生websocket打通问题 3.小程序根据用户的id查询聊天记录接口 4.用户上线离线广播问题 5.redis存储操作改成若依的redisCache,解决redis存储类型不对问题master
8 changed files with 884 additions and 494 deletions
-
310chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java
-
27chenhai-framework/src/main/java/com/chenhai/framework/config/MiniProgramWebSocketConfig.java
-
390chenhai-framework/src/main/java/com/chenhai/framework/websocket/MiniProgramWebSocketHandler.java
-
67chenhai-framework/src/main/java/com/chenhai/framework/websocket/WebSocketChannelInterceptor.java
-
18chenhai-system/src/main/java/com/chenhai/system/domain/ChatSessions.java
-
1chenhai-system/src/main/java/com/chenhai/system/mapper/ChatMessagesMapper.java
-
4chenhai-system/src/main/resources/mapper/system/ChatMessagesMapper.xml
-
463chenhai-ui/src/views/vet/chatManager/index.vue
@ -0,0 +1,27 @@ |
|||||
|
package com.chenhai.framework.config; |
||||
|
|
||||
|
import com.chenhai.framework.websocket.MiniProgramWebSocketHandler; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
import org.springframework.web.socket.config.annotation.EnableWebSocket; |
||||
|
import org.springframework.web.socket.config.annotation.WebSocketConfigurer; |
||||
|
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; |
||||
|
|
||||
|
/** |
||||
|
* 小程序原生WebSocket配置 |
||||
|
* 专供小程序使用,不依赖STOMP协议 |
||||
|
*/ |
||||
|
@Configuration |
||||
|
@EnableWebSocket // 注意:这是原生WebSocket的注解,和原来的不同 |
||||
|
public class MiniProgramWebSocketConfig implements WebSocketConfigurer { |
||||
|
|
||||
|
@Autowired |
||||
|
private MiniProgramWebSocketHandler miniProgramWebSocketHandler; |
||||
|
|
||||
|
@Override |
||||
|
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { |
||||
|
// 小程序用原生WebSocket,路径和PC端区分开 |
||||
|
registry.addHandler(miniProgramWebSocketHandler, "/ws/mini/chat") |
||||
|
.setAllowedOrigins("*"); // 注意:没有withSockJS() |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,390 @@ |
|||||
|
package com.chenhai.framework.websocket; |
||||
|
|
||||
|
import com.chenhai.common.core.redis.RedisCache; // 若依的RedisCache |
||||
|
import com.chenhai.system.domain.ChatMessages; |
||||
|
import com.chenhai.system.service.IChatMessagesService; |
||||
|
import com.chenhai.system.service.IChatSessionsService; |
||||
|
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
|
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.messaging.simp.SimpMessagingTemplate; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
import org.springframework.web.socket.CloseStatus; |
||||
|
import org.springframework.web.socket.TextMessage; |
||||
|
import org.springframework.web.socket.WebSocketSession; |
||||
|
import org.springframework.web.socket.handler.TextWebSocketHandler; |
||||
|
|
||||
|
import java.util.HashMap; |
||||
|
import java.util.List; |
||||
|
import java.util.Map; |
||||
|
import java.util.concurrent.ConcurrentHashMap; |
||||
|
import java.util.concurrent.TimeUnit; |
||||
|
|
||||
|
@Component |
||||
|
public class MiniProgramWebSocketHandler extends TextWebSocketHandler { |
||||
|
|
||||
|
private static final Logger log = LoggerFactory.getLogger(MiniProgramWebSocketHandler.class); |
||||
|
|
||||
|
// Redis Key常量 |
||||
|
private static final String ONLINE_KEY = "chat:online:"; |
||||
|
private static final String CLIENT_TYPE_KEY = "chat:online:client:"; |
||||
|
private static final String SESSION_USER_KEY = "chat:session:"; |
||||
|
private static final String MQ_TOPIC = "chat-topic"; |
||||
|
|
||||
|
// 存储在线小程序用户 |
||||
|
private static final Map<Long, String> USER_SESSIONS = new ConcurrentHashMap<>(); |
||||
|
private static final Map<String, WebSocketSession> SESSIONS = new ConcurrentHashMap<>(); |
||||
|
|
||||
|
@Autowired |
||||
|
private IChatMessagesService chatMessagesService; |
||||
|
|
||||
|
@Autowired |
||||
|
private IChatSessionsService chatSessionsService; |
||||
|
|
||||
|
@Autowired |
||||
|
private SimpMessagingTemplate messagingTemplate; |
||||
|
|
||||
|
@Autowired |
||||
|
private RocketMQTemplate rocketMQTemplate; |
||||
|
|
||||
|
@Autowired |
||||
|
private RedisCache redisCache; // 改为使用RedisCache |
||||
|
|
||||
|
@Autowired |
||||
|
private ObjectMapper objectMapper; |
||||
|
|
||||
|
/** |
||||
|
* 给小程序用户发送消息(供PC端调用) |
||||
|
*/ |
||||
|
public void sendMessageToUser(Long userId, String type, Object data) { |
||||
|
String sessionId = USER_SESSIONS.get(userId); |
||||
|
if (sessionId == null) { |
||||
|
log.warn("用户{}不在线或不是小程序客户端", userId); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
WebSocketSession session = SESSIONS.get(sessionId); |
||||
|
if (session == null || !session.isOpen()) { |
||||
|
log.warn("用户{}的会话已关闭", userId); |
||||
|
USER_SESSIONS.remove(userId); |
||||
|
SESSIONS.remove(sessionId); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
try { |
||||
|
Map<String, Object> response = new HashMap<>(); |
||||
|
response.put("type", type); |
||||
|
response.put("data", data); |
||||
|
response.put("timestamp", System.currentTimeMillis()); |
||||
|
|
||||
|
String jsonStr = objectMapper.writeValueAsString(response); |
||||
|
session.sendMessage(new TextMessage(jsonStr)); |
||||
|
|
||||
|
log.info("成功推送给小程序用户: {}, 类型: {}", userId, type); |
||||
|
} catch (Exception e) { |
||||
|
log.error("推送消息给用户{}失败", userId, e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void afterConnectionEstablished(WebSocketSession session) throws Exception { |
||||
|
String query = session.getUri().getQuery(); |
||||
|
Long userId = extractUserId(query); |
||||
|
|
||||
|
if (userId == null) { |
||||
|
log.error("小程序连接失败:未提供userId"); |
||||
|
session.close(CloseStatus.BAD_DATA); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
String sessionId = session.getId(); |
||||
|
|
||||
|
// 存储会话 |
||||
|
USER_SESSIONS.put(userId, sessionId); |
||||
|
SESSIONS.put(sessionId, session); |
||||
|
|
||||
|
// 使用RedisCache存储到Redis,标记为小程序客户端 |
||||
|
redisCache.setCacheObject(ONLINE_KEY + userId, sessionId, 12, TimeUnit.HOURS); |
||||
|
redisCache.setCacheObject(CLIENT_TYPE_KEY + userId, "mini", 12, TimeUnit.HOURS); |
||||
|
redisCache.setCacheObject(SESSION_USER_KEY + sessionId, String.valueOf(userId), 12, TimeUnit.HOURS); |
||||
|
|
||||
|
log.info("小程序用户连接成功: userId={}, sessionId={}, 客户端类型=mini", userId, sessionId); |
||||
|
|
||||
|
// 发送连接成功消息 |
||||
|
sendMessageToUser(userId, "connected", "连接成功"); |
||||
|
|
||||
|
// ===== 新增:广播用户上线状态给所有相关用户 ===== |
||||
|
broadcastUserStatus(userId, true); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { |
||||
|
String payload = message.getPayload(); |
||||
|
String sessionId = session.getId(); |
||||
|
|
||||
|
Long userId = getUserIdBySessionId(sessionId); |
||||
|
if (userId == null) { |
||||
|
log.error("未找到用户信息,sessionId: {}", sessionId); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
log.info("收到小程序消息: userId={}, payload={}", userId, payload); |
||||
|
|
||||
|
try { |
||||
|
Map<String, Object> msgMap = objectMapper.readValue(payload, Map.class); |
||||
|
String type = (String) msgMap.get("type"); |
||||
|
|
||||
|
switch (type) { |
||||
|
case "chat": |
||||
|
handleChatMessage(userId, msgMap); |
||||
|
break; |
||||
|
case "heartbeat": |
||||
|
handleHeartbeat(userId, session); |
||||
|
break; |
||||
|
case "read": |
||||
|
handleReadReceipt(userId, msgMap); |
||||
|
break; |
||||
|
case "ping": |
||||
|
sendMessageToUser(userId, "pong", "pong"); |
||||
|
break; |
||||
|
default: |
||||
|
log.warn("未知消息类型: {}", type); |
||||
|
sendMessageToUser(userId, "error", "未知消息类型"); |
||||
|
} |
||||
|
} catch (Exception e) { |
||||
|
log.error("处理消息失败", e); |
||||
|
sendMessageToUser(userId, "error", "处理失败: " + e.getMessage()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 处理聊天消息 - 小程序发消息 |
||||
|
*/ |
||||
|
private void handleChatMessage(Long senderId, Map<String, Object> msgMap) { |
||||
|
Long receiverId = Long.valueOf(msgMap.get("receiverId").toString()); |
||||
|
String content = msgMap.get("content").toString(); |
||||
|
String contentType = msgMap.getOrDefault("contentType", "text").toString(); |
||||
|
|
||||
|
log.info("小程序发送消息: senderId={}, receiverId={}, content={}", senderId, receiverId, content); |
||||
|
|
||||
|
try { |
||||
|
// 保存消息 |
||||
|
ChatMessages message = chatMessagesService.sendMessage( |
||||
|
senderId, receiverId, contentType, content); |
||||
|
|
||||
|
// 构建消息对象 |
||||
|
Map<String, Object> pushMsg = new HashMap<>(); |
||||
|
pushMsg.put("type", "CHAT"); |
||||
|
pushMsg.put("id", message.getId()); |
||||
|
pushMsg.put("sessionId", message.getReceiverSessionId()); |
||||
|
pushMsg.put("senderId", senderId); |
||||
|
pushMsg.put("receiverId", receiverId); |
||||
|
pushMsg.put("content", content); |
||||
|
pushMsg.put("contentType", contentType); |
||||
|
pushMsg.put("createTime", message.getCreatedAt()); |
||||
|
pushMsg.put("isMe", false); |
||||
|
|
||||
|
// 给发送者回执 |
||||
|
Map<String, Object> receipt = new HashMap<>(pushMsg); |
||||
|
receipt.put("sessionId", message.getSenderSessionId()); |
||||
|
receipt.put("isMe", true); |
||||
|
sendMessageToUser(senderId, "message", receipt); |
||||
|
log.info("已给小程序发送者[{}]发送回执", senderId); |
||||
|
|
||||
|
// 判断接收者状态 |
||||
|
String receiverSessionId = redisCache.getCacheObject(ONLINE_KEY + receiverId); |
||||
|
String receiverClientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + receiverId); |
||||
|
boolean isReceiverOnline = receiverSessionId != null; |
||||
|
|
||||
|
if (isReceiverOnline) { |
||||
|
if ("pc".equals(receiverClientType)) { |
||||
|
// 接收者是PC:用STOMP推送 |
||||
|
messagingTemplate.convertAndSendToUser( |
||||
|
receiverId.toString(), |
||||
|
"/queue/messages", |
||||
|
pushMsg); |
||||
|
log.info("小程序消息 -> PC用户[{}]: 已通过STOMP推送", receiverId); |
||||
|
} else { |
||||
|
// 接收者也是小程序:用原生WebSocket推送 |
||||
|
sendMessageToUser(receiverId, "message", pushMsg); |
||||
|
log.info("小程序消息 -> 小程序用户[{}]: 已通过原生推送", receiverId); |
||||
|
} |
||||
|
} else { |
||||
|
// 离线消息 |
||||
|
pushMsg.put("retryCount", 0); |
||||
|
pushMsg.put("clientType", receiverClientType); |
||||
|
rocketMQTemplate.convertAndSend(MQ_TOPIC + ":offline", pushMsg); |
||||
|
log.info("用户[{}]离线,消息进入RocketMQ, 客户端类型: {}", receiverId, receiverClientType); |
||||
|
} |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
log.error("处理小程序消息失败", e); |
||||
|
sendMessageToUser(senderId, "error", "发送失败: " + e.getMessage()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 处理心跳 |
||||
|
*/ |
||||
|
private void handleHeartbeat(Long userId, WebSocketSession session) { |
||||
|
// 刷新Redis过期时间 |
||||
|
redisCache.expire(ONLINE_KEY + userId, 12, TimeUnit.HOURS); |
||||
|
redisCache.expire(CLIENT_TYPE_KEY + userId, 12, TimeUnit.HOURS); |
||||
|
redisCache.expire(SESSION_USER_KEY + session.getId(), 12, TimeUnit.HOURS); |
||||
|
|
||||
|
// 返回心跳响应 |
||||
|
sendMessageToUser(userId, "heartbeat_ack", "ok"); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 处理已读回执 |
||||
|
*/ |
||||
|
private void handleReadReceipt(Long readerId, Map<String, Object> msgMap) { |
||||
|
Long sessionId = Long.valueOf(msgMap.get("sessionId").toString()); |
||||
|
List<Long> messageIds = (List<Long>) msgMap.get("messageIds"); |
||||
|
|
||||
|
log.info("小程序已读回执: sessionId={}, readerId={}, messageIds={}", |
||||
|
sessionId, readerId, messageIds); |
||||
|
|
||||
|
try { |
||||
|
// 标记消息已读 |
||||
|
chatMessagesService.markAsRead(messageIds, sessionId, readerId); |
||||
|
|
||||
|
// 获取消息发送者 |
||||
|
if (messageIds != null && !messageIds.isEmpty()) { |
||||
|
ChatMessages firstMessage = chatMessagesService.getChatMessagesById(messageIds.get(0)); |
||||
|
if (firstMessage != null) { |
||||
|
Long senderId = firstMessage.getSenderId(); |
||||
|
|
||||
|
// 构建已读回执 |
||||
|
Map<String, Object> readReceipt = new HashMap<>(); |
||||
|
readReceipt.put("type", "READ"); |
||||
|
readReceipt.put("sessionId", sessionId); |
||||
|
readReceipt.put("readerId", readerId); |
||||
|
readReceipt.put("messageIds", messageIds); |
||||
|
|
||||
|
// 判断发送者客户端类型 |
||||
|
String senderClientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + senderId); |
||||
|
|
||||
|
if ("pc".equals(senderClientType)) { |
||||
|
// 发送者是PC:用STOMP推送 |
||||
|
messagingTemplate.convertAndSendToUser( |
||||
|
senderId.toString(), |
||||
|
"/queue/read", |
||||
|
readReceipt); |
||||
|
log.info("已读回执 -> PC用户[{}]: 已通过STOMP推送", senderId); |
||||
|
} else { |
||||
|
// 发送者也是小程序:用原生推送 |
||||
|
sendMessageToUser(senderId, "read", readReceipt); |
||||
|
log.info("已读回执 -> 小程序用户[{}]: 已通过原生推送", senderId); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} catch (Exception e) { |
||||
|
log.error("处理已读回执失败", e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { |
||||
|
String sessionId = session.getId(); |
||||
|
Long userId = getUserIdBySessionId(sessionId); |
||||
|
|
||||
|
if (userId != null) { |
||||
|
USER_SESSIONS.remove(userId); |
||||
|
SESSIONS.remove(sessionId); |
||||
|
|
||||
|
// 清理Redis |
||||
|
redisCache.deleteObject(ONLINE_KEY + userId); |
||||
|
redisCache.deleteObject(CLIENT_TYPE_KEY + userId); |
||||
|
redisCache.deleteObject(SESSION_USER_KEY + sessionId); |
||||
|
|
||||
|
log.info("小程序用户断开连接: userId={}, sessionId={}, status={}", userId, sessionId, status); |
||||
|
|
||||
|
// ===== 新增:广播用户下线状态给所有相关用户 ===== |
||||
|
broadcastUserStatus(userId, false); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// ========== 工具方法 ========== |
||||
|
|
||||
|
private Long extractUserId(String query) { |
||||
|
if (query == null) return null; |
||||
|
String[] params = query.split("&"); |
||||
|
for (String param : params) { |
||||
|
if (param.startsWith("userId=")) { |
||||
|
try { |
||||
|
return Long.parseLong(param.substring(7)); |
||||
|
} catch (NumberFormatException e) { |
||||
|
log.error("userId格式错误: {}", param); |
||||
|
return null; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 广播用户状态给所有相关用户 |
||||
|
* @param userId 状态变更的用户ID |
||||
|
* @param online true:上线, false:下线 |
||||
|
*/ |
||||
|
private void broadcastUserStatus(Long userId, boolean online) { |
||||
|
try { |
||||
|
// 查询和这个用户有关联的所有用户(有会话的用户) |
||||
|
List<Long> relatedUserIds = chatSessionsService.getRelatedUserIds(userId); |
||||
|
|
||||
|
if (relatedUserIds == null || relatedUserIds.isEmpty()) { |
||||
|
log.info("用户[{}]没有相关用户,无需广播", userId); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
// 构建状态数据 |
||||
|
Map<String, Object> statusData = new HashMap<>(); |
||||
|
statusData.put("userId", userId); |
||||
|
statusData.put("online", online); |
||||
|
statusData.put("timestamp", System.currentTimeMillis()); |
||||
|
|
||||
|
log.info("准备广播用户[{}]状态给{}个相关用户: online={}", userId, relatedUserIds.size(), online); |
||||
|
|
||||
|
// 分别发送给每个相关用户 |
||||
|
for (Long relatedUserId : relatedUserIds) { |
||||
|
// 获取相关用户的客户端类型 |
||||
|
String clientType = redisCache.getCacheObject(CLIENT_TYPE_KEY + relatedUserId); |
||||
|
|
||||
|
if ("pc".equals(clientType)) { |
||||
|
// 相关用户是PC:用STOMP推送 |
||||
|
messagingTemplate.convertAndSendToUser( |
||||
|
relatedUserId.toString(), |
||||
|
"/queue/user-status", |
||||
|
statusData |
||||
|
); |
||||
|
log.debug("已广播状态给PC用户: {}", relatedUserId); |
||||
|
} else { |
||||
|
// 相关用户是小程序:用原生WebSocket推送 |
||||
|
sendMessageToUser(relatedUserId, "status", statusData); |
||||
|
log.debug("已广播状态给小程序用户: {}", relatedUserId); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
log.info("用户[{}]状态广播完成", userId); |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
log.error("广播用户状态失败: userId={}, online={}", userId, online, e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private Long getUserIdBySessionId(String sessionId) { |
||||
|
// 先从本地缓存找 |
||||
|
for (Map.Entry<Long, String> entry : USER_SESSIONS.entrySet()) { |
||||
|
if (entry.getValue().equals(sessionId)) { |
||||
|
return entry.getKey(); |
||||
|
} |
||||
|
} |
||||
|
// 再从Redis找 |
||||
|
String userIdStr = redisCache.getCacheObject(SESSION_USER_KEY + sessionId); |
||||
|
return userIdStr != null ? Long.parseLong(userIdStr) : null; |
||||
|
} |
||||
|
} |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue