24 changed files with 3411 additions and 702 deletions
-
1067chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java
-
33chenhai-admin/src/main/java/com/chenhai/web/controller/system/SensitiveWordApiTest.java
-
8chenhai-admin/src/main/resources/application-druid.yml
-
13chenhai-common/pom.xml
-
17chenhai-framework/src/main/java/com/chenhai/framework/config/RocketMQConfig.java
-
3chenhai-framework/src/main/java/com/chenhai/framework/config/SecurityConfig.java
-
72chenhai-framework/src/main/java/com/chenhai/framework/config/WebSocketConfig.java
-
184chenhai-framework/src/main/java/com/chenhai/framework/websocket/WebSocketChannelInterceptor.java
-
113chenhai-system/src/main/java/com/chenhai/system/domain/ChatMessages.java
-
98chenhai-system/src/main/java/com/chenhai/system/domain/ChatSessions.java
-
40chenhai-system/src/main/java/com/chenhai/system/mapper/ChatMessagesMapper.java
-
42chenhai-system/src/main/java/com/chenhai/system/mapper/ChatSessionsMapper.java
-
90chenhai-system/src/main/java/com/chenhai/system/mq/OfflineMessageConsumer.java
-
78chenhai-system/src/main/java/com/chenhai/system/service/IChatMessagesService.java
-
47chenhai-system/src/main/java/com/chenhai/system/service/IChatSessionsService.java
-
147chenhai-system/src/main/java/com/chenhai/system/service/impl/ChatMessagesServiceImpl.java
-
143chenhai-system/src/main/java/com/chenhai/system/service/impl/ChatSessionsServiceImpl.java
-
114chenhai-system/src/main/resources/mapper/system/ChatMessagesMapper.xml
-
135chenhai-system/src/main/resources/mapper/system/ChatSessionsMapper.xml
-
3chenhai-ui/package.json
-
84chenhai-ui/src/api/system/chat.js
-
55chenhai-ui/src/api/vet/session.js
-
1510chenhai-ui/src/views/vet/chatManager/index.vue
-
15chenhai-ui/vue.config.js
1067
chenhai-admin/src/main/java/com/chenhai/web/controller/system/ChatController.java
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,17 @@ |
|||||
|
package com.chenhai.framework.config; |
||||
|
|
||||
|
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
import org.springframework.context.annotation.Import; |
||||
|
|
||||
|
/** |
||||
|
* RocketMQ配置类 |
||||
|
* 位置:ruoyi-framework/src/main/java/com/ruoyi/framework/config/RocketMQConfig.java |
||||
|
* |
||||
|
* @author ruoyi |
||||
|
*/ |
||||
|
@Configuration |
||||
|
@Import(RocketMQAutoConfiguration.class) |
||||
|
public class RocketMQConfig { |
||||
|
// 使用RocketMQ默认配置即可 |
||||
|
} |
||||
@ -0,0 +1,72 @@ |
|||||
|
package com.chenhai.framework.config; |
||||
|
|
||||
|
import com.chenhai.framework.websocket.WebSocketChannelInterceptor; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
import org.springframework.messaging.simp.config.ChannelRegistration; |
||||
|
import org.springframework.messaging.simp.config.MessageBrokerRegistry; |
||||
|
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; |
||||
|
import org.springframework.web.socket.config.annotation.StompEndpointRegistry; |
||||
|
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; |
||||
|
|
||||
|
/** |
||||
|
* WebSocket配置类 |
||||
|
* 配置STOMP协议端点、消息代理和拦截器 |
||||
|
* |
||||
|
* @author chenhai |
||||
|
*/ |
||||
|
@Configuration |
||||
|
@EnableWebSocketMessageBroker |
||||
|
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { |
||||
|
|
||||
|
@Autowired |
||||
|
private WebSocketChannelInterceptor webSocketChannelInterceptor; |
||||
|
|
||||
|
/** |
||||
|
* 注册STOMP端点 |
||||
|
* 客户端通过这个地址连接WebSocket服务器 |
||||
|
* |
||||
|
* @param registry STOMP端点注册表 |
||||
|
*/ |
||||
|
@Override |
||||
|
public void registerStompEndpoints(StompEndpointRegistry registry) { |
||||
|
registry.addEndpoint("/ws/chat") // WebSocket连接地址 |
||||
|
.setAllowedOriginPatterns("*") // 允许所有域名跨域 |
||||
|
.withSockJS(); // 启用SockJS回退选项 |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 配置消息代理 |
||||
|
* 设置应用前缀和用户前缀 |
||||
|
* |
||||
|
* @param registry 消息代理注册表 |
||||
|
*/ |
||||
|
@Override |
||||
|
public void configureMessageBroker(MessageBrokerRegistry registry) { |
||||
|
// 启用简单内存消息代理,配置目的地前缀 |
||||
|
// /topic - 广播模式,所有订阅者都能收到 |
||||
|
// /queue - 点对点模式 |
||||
|
// /user - 用户专属消息 |
||||
|
registry.enableSimpleBroker("/topic", "/queue", "/user"); |
||||
|
|
||||
|
// 设置应用前缀,客户端发送消息需要以/app开头 |
||||
|
// 例如:/app/chat.send |
||||
|
registry.setApplicationDestinationPrefixes("/app"); |
||||
|
|
||||
|
// 设置用户前缀,用于点对点消息 |
||||
|
// 例如:/user/{userId}/queue/messages |
||||
|
registry.setUserDestinationPrefix("/user"); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 配置客户端入站通道 |
||||
|
* 注册拦截器用于处理连接和断开事件 |
||||
|
* |
||||
|
* @param registration 通道注册表 |
||||
|
*/ |
||||
|
@Override |
||||
|
public void configureClientInboundChannel(ChannelRegistration registration) { |
||||
|
// 添加自定义拦截器,用于管理在线状态 |
||||
|
registration.interceptors(webSocketChannelInterceptor); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,184 @@ |
|||||
|
package com.chenhai.framework.websocket; |
||||
|
|
||||
|
import com.chenhai.system.service.IChatSessionsService; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.context.annotation.Lazy; |
||||
|
import org.springframework.data.redis.core.RedisTemplate; |
||||
|
import org.springframework.messaging.Message; |
||||
|
import org.springframework.messaging.MessageChannel; |
||||
|
import org.springframework.messaging.simp.SimpMessagingTemplate; |
||||
|
import org.springframework.messaging.simp.stomp.StompCommand; |
||||
|
import org.springframework.messaging.simp.stomp.StompHeaderAccessor; |
||||
|
import org.springframework.messaging.support.ChannelInterceptor; |
||||
|
import org.springframework.messaging.support.MessageHeaderAccessor; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
import java.util.HashMap; |
||||
|
import java.util.List; |
||||
|
import java.util.Map; |
||||
|
import java.util.concurrent.TimeUnit; |
||||
|
|
||||
|
/** |
||||
|
* WebSocket连接拦截器 |
||||
|
* 负责管理用户在线状态,并在用户上线/下线时广播状态变更 |
||||
|
* |
||||
|
* @author chenhai |
||||
|
*/ |
||||
|
@Component |
||||
|
public class WebSocketChannelInterceptor implements ChannelInterceptor { |
||||
|
|
||||
|
private static final Logger log = LoggerFactory.getLogger(WebSocketChannelInterceptor.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private RedisTemplate<String, String> redisTemplate; |
||||
|
|
||||
|
@Autowired |
||||
|
@Lazy |
||||
|
private SimpMessagingTemplate messagingTemplate; |
||||
|
|
||||
|
@Autowired |
||||
|
private IChatSessionsService chatSessionsService; |
||||
|
|
||||
|
/** Redis中在线状态的key前缀,格式:chat:online:{userId} -> sessionId */ |
||||
|
private static final String ONLINE_KEY = "chat:online:"; |
||||
|
|
||||
|
/** Redis中会话与用户映射的key前缀,格式:chat:session:{sessionId} -> userId */ |
||||
|
private static final String SESSION_USER_KEY = "chat:session:"; |
||||
|
|
||||
|
/** |
||||
|
* 消息发送前拦截处理 |
||||
|
* 在这里处理WebSocket的连接和断开事件 |
||||
|
* |
||||
|
* @param message 原始消息 |
||||
|
* @param channel 消息通道 |
||||
|
* @return 处理后的消息 |
||||
|
*/ |
||||
|
@Override |
||||
|
public Message<?> preSend(Message<?> message, MessageChannel channel) { |
||||
|
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
||||
|
if (accessor == null) { |
||||
|
return message; |
||||
|
} |
||||
|
|
||||
|
// 处理客户端连接事件 |
||||
|
if (StompCommand.CONNECT.equals(accessor.getCommand())) { |
||||
|
handleConnect(accessor); |
||||
|
} |
||||
|
|
||||
|
// 处理客户端断开连接事件 |
||||
|
if (StompCommand.DISCONNECT.equals(accessor.getCommand())) { |
||||
|
handleDisconnect(accessor); |
||||
|
} |
||||
|
|
||||
|
return message; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 处理用户连接事件 |
||||
|
* 1. 设置用户认证信息 |
||||
|
* 2. 记录在线状态到Redis |
||||
|
* 3. 广播用户上线消息给相关用户(排除自己) |
||||
|
* |
||||
|
* @param accessor STOMP消息访问器,包含会话信息和头部 |
||||
|
*/ |
||||
|
private void handleConnect(StompHeaderAccessor accessor) { |
||||
|
// 从请求头中获取用户ID |
||||
|
String userId = accessor.getFirstNativeHeader("userId"); |
||||
|
if (userId == null) { |
||||
|
log.warn("连接请求中缺少userId参数"); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
String sessionId = accessor.getSessionId(); |
||||
|
log.info("用户[{}]正在连接,会话ID: {}", userId, sessionId); |
||||
|
|
||||
|
// 1. 设置用户认证信息,后续可以通过@AuthenticationPrincipal获取 |
||||
|
accessor.setUser(() -> userId); |
||||
|
|
||||
|
// 2. 存储在线状态到Redis,设置12小时过期防止僵尸数据 |
||||
|
redisTemplate.opsForValue().set(ONLINE_KEY + userId, sessionId, 12, TimeUnit.HOURS); |
||||
|
redisTemplate.opsForValue().set(SESSION_USER_KEY + sessionId, userId, 12, TimeUnit.HOURS); |
||||
|
|
||||
|
log.info("用户[{}]上线成功,已记录到Redis", userId); |
||||
|
|
||||
|
// 3. 广播用户上线状态给所有相关用户(排除自己) |
||||
|
broadcastUserStatusToRelatedUsers(userId, true); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 处理用户断开连接事件 |
||||
|
* 1. 清理Redis中的在线状态 |
||||
|
* 2. 广播用户下线消息给相关用户(排除自己) |
||||
|
* |
||||
|
* @param accessor STOMP消息访问器,包含会话信息 |
||||
|
*/ |
||||
|
private void handleDisconnect(StompHeaderAccessor accessor) { |
||||
|
String sessionId = accessor.getSessionId(); |
||||
|
String userId = redisTemplate.opsForValue().get(SESSION_USER_KEY + sessionId); |
||||
|
|
||||
|
if (userId != null) { |
||||
|
// 1. 清理Redis中的在线状态 |
||||
|
redisTemplate.delete(ONLINE_KEY + userId); |
||||
|
redisTemplate.delete(SESSION_USER_KEY + sessionId); |
||||
|
|
||||
|
log.info("用户[{}]已断开连接,已清理Redis状态", userId); |
||||
|
|
||||
|
// 2. 广播用户下线状态给相关用户(排除自己) |
||||
|
broadcastUserStatusToRelatedUsers(userId, false); |
||||
|
} else { |
||||
|
log.warn("未知会话断开连接: {}", sessionId); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 广播用户状态给所有相关用户(有会话关联的用户) |
||||
|
* 这是核心方法,确保状态只广播给需要知道的人,而不是所有人 |
||||
|
* |
||||
|
* @param userId 状态变更的用户ID |
||||
|
* @param online true:上线, false:下线 |
||||
|
*/ |
||||
|
private void broadcastUserStatusToRelatedUsers(String userId, boolean online) { |
||||
|
try { |
||||
|
Long currentUserId = Long.parseLong(userId); |
||||
|
|
||||
|
// 查询所有和这个用户有关联的用户(有会话的用户) |
||||
|
// 这个方法需要你在 IChatSessionsService 中实现 |
||||
|
List<Long> relatedUserIds = chatSessionsService.getRelatedUserIds(currentUserId); |
||||
|
|
||||
|
if (relatedUserIds == null || relatedUserIds.isEmpty()) { |
||||
|
log.info("用户[{}]没有相关用户,无需广播", userId); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
// 构建状态消息 |
||||
|
Map<String, Object> statusMsg = new HashMap<>(); |
||||
|
statusMsg.put("type", "USER_STATUS"); |
||||
|
statusMsg.put("userId", currentUserId); |
||||
|
statusMsg.put("online", online); |
||||
|
statusMsg.put("timestamp", System.currentTimeMillis()); |
||||
|
|
||||
|
log.info("准备广播用户[{}]状态给{}个相关用户: online={}, relatedUsers={}", |
||||
|
userId, relatedUserIds.size(), online, relatedUserIds); |
||||
|
|
||||
|
// 分别发送给每个相关用户(点对点发送) |
||||
|
for (Long relatedUserId : relatedUserIds) { |
||||
|
// 使用 convertAndSendToUser 发送点对点消息 |
||||
|
// 这样只有指定用户能收到,避免广播给所有人 |
||||
|
messagingTemplate.convertAndSendToUser( |
||||
|
relatedUserId.toString(), |
||||
|
"/queue/user-status", // 使用独立的队列,避免和消息队列混淆 |
||||
|
statusMsg |
||||
|
); |
||||
|
log.debug("已发送用户状态给用户{}: userId={}, online={}", relatedUserId, userId, online); |
||||
|
} |
||||
|
|
||||
|
log.info("用户[{}]状态广播完成", userId); |
||||
|
|
||||
|
} catch (NumberFormatException e) { |
||||
|
log.error("用户ID格式错误: {}", userId, e); |
||||
|
} catch (Exception e) { |
||||
|
log.error("广播用户状态失败: userId={}, online={}", userId, online, e); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,113 @@ |
|||||
|
package com.chenhai.system.domain; |
||||
|
|
||||
|
import com.fasterxml.jackson.annotation.JsonFormat; |
||||
|
import com.chenhai.common.core.domain.BaseEntity; |
||||
|
import java.util.Date; |
||||
|
|
||||
|
/** |
||||
|
* 聊天消息表对象 chat_messages |
||||
|
* |
||||
|
* @author chenhai |
||||
|
*/ |
||||
|
public class ChatMessages extends BaseEntity { |
||||
|
private static final long serialVersionUID = 1L; |
||||
|
|
||||
|
/** 消息ID */ |
||||
|
private Long id; |
||||
|
|
||||
|
/** 发送者视角会话ID */ |
||||
|
private Long senderSessionId; |
||||
|
|
||||
|
/** 接收者视角会话ID */ |
||||
|
private Long receiverSessionId; |
||||
|
|
||||
|
/** 发送者ID */ |
||||
|
private Long senderId; |
||||
|
|
||||
|
/** 接收者ID */ |
||||
|
private Long receiverId; |
||||
|
|
||||
|
/** 消息类型 */ |
||||
|
private String contentType; |
||||
|
|
||||
|
/** 消息内容 */ |
||||
|
private String content; |
||||
|
|
||||
|
/** 文件URL */ |
||||
|
private String fileUrl; |
||||
|
|
||||
|
/** 原始文件名 */ |
||||
|
private String fileName; |
||||
|
|
||||
|
/** 文件大小 */ |
||||
|
private Integer fileSize; |
||||
|
|
||||
|
/** 视频时长 */ |
||||
|
private Integer videoDuration; |
||||
|
|
||||
|
/** 缩略图URL */ |
||||
|
private String thumbnailUrl; |
||||
|
|
||||
|
/** 是否已读 */ |
||||
|
private Integer isRead; |
||||
|
|
||||
|
/** 已读时间 */ |
||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
||||
|
private Date readAt; |
||||
|
|
||||
|
/** 发送时间 */ |
||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
||||
|
private Date createdAt; |
||||
|
|
||||
|
// 非数据库字段 |
||||
|
private Boolean isMe; |
||||
|
|
||||
|
// ========== getter/setter ========== |
||||
|
public Long getId() { return id; } |
||||
|
public void setId(Long id) { this.id = id; } |
||||
|
|
||||
|
public Long getSenderSessionId() { return senderSessionId; } |
||||
|
public void setSenderSessionId(Long senderSessionId) { this.senderSessionId = senderSessionId; } |
||||
|
|
||||
|
public Long getReceiverSessionId() { return receiverSessionId; } |
||||
|
public void setReceiverSessionId(Long receiverSessionId) { this.receiverSessionId = receiverSessionId; } |
||||
|
|
||||
|
public Long getSenderId() { return senderId; } |
||||
|
public void setSenderId(Long senderId) { this.senderId = senderId; } |
||||
|
|
||||
|
public Long getReceiverId() { return receiverId; } |
||||
|
public void setReceiverId(Long receiverId) { this.receiverId = receiverId; } |
||||
|
|
||||
|
public String getContentType() { return contentType; } |
||||
|
public void setContentType(String contentType) { this.contentType = contentType; } |
||||
|
|
||||
|
public String getContent() { return content; } |
||||
|
public void setContent(String content) { this.content = content; } |
||||
|
|
||||
|
public String getFileUrl() { return fileUrl; } |
||||
|
public void setFileUrl(String fileUrl) { this.fileUrl = fileUrl; } |
||||
|
|
||||
|
public String getFileName() { return fileName; } |
||||
|
public void setFileName(String fileName) { this.fileName = fileName; } |
||||
|
|
||||
|
public Integer getFileSize() { return fileSize; } |
||||
|
public void setFileSize(Integer fileSize) { this.fileSize = fileSize; } |
||||
|
|
||||
|
public Integer getVideoDuration() { return videoDuration; } |
||||
|
public void setVideoDuration(Integer videoDuration) { this.videoDuration = videoDuration; } |
||||
|
|
||||
|
public String getThumbnailUrl() { return thumbnailUrl; } |
||||
|
public void setThumbnailUrl(String thumbnailUrl) { this.thumbnailUrl = thumbnailUrl; } |
||||
|
|
||||
|
public Integer getIsRead() { return isRead; } |
||||
|
public void setIsRead(Integer isRead) { this.isRead = isRead; } |
||||
|
|
||||
|
public Date getReadAt() { return readAt; } |
||||
|
public void setReadAt(Date readAt) { this.readAt = readAt; } |
||||
|
|
||||
|
public Date getCreatedAt() { return createdAt; } |
||||
|
public void setCreatedAt(Date createdAt) { this.createdAt = createdAt; } |
||||
|
|
||||
|
public Boolean getIsMe() { return isMe; } |
||||
|
public void setIsMe(Boolean isMe) { this.isMe = isMe; } |
||||
|
} |
||||
@ -0,0 +1,98 @@ |
|||||
|
package com.chenhai.system.domain; |
||||
|
|
||||
|
import com.fasterxml.jackson.annotation.JsonFormat; |
||||
|
import com.chenhai.common.core.domain.BaseEntity; |
||||
|
import java.util.Date; |
||||
|
|
||||
|
/** |
||||
|
* 聊天会话表对象 chat_sessions |
||||
|
* |
||||
|
* @author chenhai |
||||
|
*/ |
||||
|
public class ChatSessions extends BaseEntity { |
||||
|
private static final long serialVersionUID = 1L; |
||||
|
|
||||
|
/** 会话ID */ |
||||
|
private Long id; |
||||
|
|
||||
|
/** 当前用户ID */ |
||||
|
private Long userId; |
||||
|
|
||||
|
/** 对方用户ID */ |
||||
|
private Long otherUserId; |
||||
|
|
||||
|
/** 当前用户未读消息数 */ |
||||
|
private Integer userUnread; |
||||
|
|
||||
|
/** 对方用户未读消息数 */ |
||||
|
private Integer otherUnread; |
||||
|
|
||||
|
/** 最后一条消息内容 */ |
||||
|
private String lastMessage; |
||||
|
|
||||
|
/** 最后消息时间 */ |
||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
||||
|
private Date lastMessageTime; |
||||
|
|
||||
|
/** 最后活动时间 */ |
||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
||||
|
private Date lastActiveTime; |
||||
|
|
||||
|
/** 创建时间 */ |
||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
||||
|
private Date createdAt; |
||||
|
|
||||
|
/** 更新时间 */ |
||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
||||
|
private Date updatedAt; |
||||
|
|
||||
|
// 非数据库字段 |
||||
|
private String otherUserName; |
||||
|
private String otherUserAvatar; |
||||
|
private Boolean otherOnline; |
||||
|
|
||||
|
// ========== getter/setter ========== |
||||
|
public Long getId() { return id; } |
||||
|
public void setId(Long id) { this.id = id; } |
||||
|
|
||||
|
public Long getUserId() { return userId; } |
||||
|
public void setUserId(Long userId) { this.userId = userId; } |
||||
|
|
||||
|
public Long getOtherUserId() { return otherUserId; } |
||||
|
public void setOtherUserId(Long otherUserId) { this.otherUserId = otherUserId; } |
||||
|
|
||||
|
public Integer getUserUnread() { return userUnread; } |
||||
|
public void setUserUnread(Integer userUnread) { this.userUnread = userUnread; } |
||||
|
|
||||
|
public Integer getOtherUnread() { return otherUnread; } |
||||
|
public void setOtherUnread(Integer otherUnread) { this.otherUnread = otherUnread; } |
||||
|
|
||||
|
public String getLastMessage() { return lastMessage; } |
||||
|
public void setLastMessage(String lastMessage) { this.lastMessage = lastMessage; } |
||||
|
|
||||
|
public Date getLastMessageTime() { return lastMessageTime; } |
||||
|
public void setLastMessageTime(Date lastMessageTime) { this.lastMessageTime = lastMessageTime; } |
||||
|
|
||||
|
public Date getLastActiveTime() { return lastActiveTime; } |
||||
|
public void setLastActiveTime(Date lastActiveTime) { this.lastActiveTime = lastActiveTime; } |
||||
|
|
||||
|
public Date getCreatedAt() { return createdAt; } |
||||
|
public void setCreatedAt(Date createdAt) { this.createdAt = createdAt; } |
||||
|
|
||||
|
public Date getUpdatedAt() { return updatedAt; } |
||||
|
public void setUpdatedAt(Date updatedAt) { this.updatedAt = updatedAt; } |
||||
|
|
||||
|
public String getOtherUserName() { return otherUserName; } |
||||
|
public void setOtherUserName(String otherUserName) { this.otherUserName = otherUserName; } |
||||
|
|
||||
|
public String getOtherUserAvatar() { return otherUserAvatar; } |
||||
|
public void setOtherUserAvatar(String otherUserAvatar) { this.otherUserAvatar = otherUserAvatar; } |
||||
|
|
||||
|
public Boolean getOtherOnline() { return otherOnline; } |
||||
|
public void setOtherOnline(Boolean otherOnline) { this.otherOnline = otherOnline; } |
||||
|
|
||||
|
@Override |
||||
|
public String toString() { |
||||
|
return "ChatSessions{id=" + id + ", userId=" + userId + ", otherUserId=" + otherUserId + "}"; |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,40 @@ |
|||||
|
package com.chenhai.system.mapper; |
||||
|
|
||||
|
import com.chenhai.system.domain.ChatMessages; |
||||
|
import org.apache.ibatis.annotations.Param; |
||||
|
import java.util.List; |
||||
|
import java.util.Date; |
||||
|
|
||||
|
public interface ChatMessagesMapper { |
||||
|
|
||||
|
ChatMessages selectChatMessagesById(Long id); |
||||
|
|
||||
|
List<ChatMessages> selectChatMessagesList(ChatMessages chatMessages); |
||||
|
|
||||
|
/** |
||||
|
* 根据会话ID查询消息 - 用于分页 |
||||
|
* 注意:这个方法会被 PageHelper 拦截,不要传 limit 参数 |
||||
|
*/ |
||||
|
List<ChatMessages> selectMessagesBySession(@Param("sessionId") Long sessionId, |
||||
|
@Param("lastId") Long lastId); |
||||
|
|
||||
|
/** |
||||
|
* 获取会话的最后一条消息 |
||||
|
*/ |
||||
|
ChatMessages selectLastMessageBySession(@Param("sessionId") Long sessionId); |
||||
|
|
||||
|
/** |
||||
|
* 获取未读消息 |
||||
|
*/ |
||||
|
List<ChatMessages> selectUnreadMessages(@Param("sessionId") Long sessionId, |
||||
|
@Param("receiverId") Long receiverId); |
||||
|
|
||||
|
int insertChatMessages(ChatMessages chatMessages); |
||||
|
|
||||
|
int batchMarkAsRead(@Param("messageIds") List<Long> messageIds, |
||||
|
@Param("readAt") Date readAt); |
||||
|
|
||||
|
int markSessionAsRead(@Param("sessionId") Long sessionId, |
||||
|
@Param("receiverId") Long receiverId, |
||||
|
@Param("readAt") Date readAt); |
||||
|
} |
||||
@ -0,0 +1,42 @@ |
|||||
|
package com.chenhai.system.mapper; |
||||
|
|
||||
|
import com.chenhai.system.domain.ChatSessions; |
||||
|
import org.apache.ibatis.annotations.Param; |
||||
|
import java.util.List; |
||||
|
import java.util.Date; |
||||
|
|
||||
|
public interface ChatSessionsMapper { |
||||
|
|
||||
|
ChatSessions selectChatSessionsById(Long id); |
||||
|
|
||||
|
List<ChatSessions> selectChatSessionsList(ChatSessions chatSessions); |
||||
|
|
||||
|
List<ChatSessions> selectSessionsByUserId(@Param("userId") Long userId); |
||||
|
|
||||
|
ChatSessions selectSessionByUsers(@Param("userId") Long userId, |
||||
|
@Param("otherUserId") Long otherUserId); |
||||
|
|
||||
|
int insertChatSessions(ChatSessions chatSessions); |
||||
|
|
||||
|
int updateChatSessions(ChatSessions chatSessions); |
||||
|
|
||||
|
int updateLastMessage(ChatSessions chatSessions); |
||||
|
|
||||
|
/** |
||||
|
* 增加对方未读计数 |
||||
|
*/ |
||||
|
int incrementOtherUnread(@Param("sessionId") Long sessionId, |
||||
|
@Param("lastActiveTime") Date lastActiveTime); |
||||
|
|
||||
|
/** |
||||
|
* 增加自己未读计数 |
||||
|
*/ |
||||
|
int incrementUserUnread(@Param("sessionId") Long sessionId, |
||||
|
@Param("lastActiveTime") Date lastActiveTime); |
||||
|
|
||||
|
int resetUnread(@Param("sessionId") Long sessionId); |
||||
|
|
||||
|
int deleteChatSessionsById(Long id); |
||||
|
|
||||
|
int deleteChatSessionsByIds(String[] ids); |
||||
|
} |
||||
@ -0,0 +1,90 @@ |
|||||
|
package com.chenhai.system.mq; |
||||
|
|
||||
|
import com.chenhai.system.domain.ChatMessages; |
||||
|
import com.chenhai.system.mapper.ChatMessagesMapper; |
||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; |
||||
|
import org.apache.rocketmq.spring.core.RocketMQListener; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.data.redis.core.RedisTemplate; |
||||
|
import org.springframework.messaging.simp.SimpMessagingTemplate; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.Map; |
||||
|
|
||||
|
@Component |
||||
|
@RocketMQMessageListener( |
||||
|
topic = "chat-topic", |
||||
|
selectorExpression = "offline", |
||||
|
consumerGroup = "chat-offline-group", |
||||
|
maxReconsumeTimes = 3 |
||||
|
) |
||||
|
public class OfflineMessageConsumer implements RocketMQListener<Map<String, Object>> { |
||||
|
|
||||
|
private static final Logger log = LoggerFactory.getLogger(OfflineMessageConsumer.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private SimpMessagingTemplate messagingTemplate; |
||||
|
|
||||
|
@Autowired |
||||
|
private RedisTemplate<String, String> redisTemplate; |
||||
|
|
||||
|
@Autowired |
||||
|
private ChatMessagesMapper chatMessagesMapper; |
||||
|
|
||||
|
private static final String ONLINE_KEY = "chat:online:"; |
||||
|
|
||||
|
@Override |
||||
|
public void onMessage(Map<String, Object> message) { |
||||
|
String receiverId = message.get("receiverId").toString(); |
||||
|
Integer retryCount = (Integer) message.getOrDefault("retryCount", 0); |
||||
|
Long messageId = Long.valueOf(message.get("id").toString()); |
||||
|
|
||||
|
// 检查用户是否在线 |
||||
|
String sessionId = redisTemplate.opsForValue().get(ONLINE_KEY + receiverId); |
||||
|
boolean isOnline = sessionId != null; |
||||
|
|
||||
|
log.info("MQ处理离线消息 - 接收者: {}, 在线状态: {}, 重试次数: {}/3", |
||||
|
receiverId, isOnline, retryCount); |
||||
|
|
||||
|
if (isOnline) { |
||||
|
try { |
||||
|
// 用户在线,推送消息 |
||||
|
messagingTemplate.convertAndSendToUser( |
||||
|
receiverId, |
||||
|
"/queue/messages", |
||||
|
message |
||||
|
); |
||||
|
log.info("✅ MQ消息推送成功: 接收者={}, 消息ID={}", receiverId, messageId); |
||||
|
|
||||
|
// 推送成功,不需要做任何事,消息已经在数据库里了 |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
log.error("❌ MQ消息推送失败: 接收者={}, 错误={}", receiverId, e.getMessage()); |
||||
|
|
||||
|
if (retryCount < 3) { |
||||
|
message.put("retryCount", retryCount + 1); |
||||
|
throw new RuntimeException("推送失败,需要重试", e); |
||||
|
} else { |
||||
|
// 超过重试次数,但消息已经在数据库,用户上线时会通过历史消息拉取 |
||||
|
log.warn("消息推送重试已达上限,等待用户上线后拉取历史消息: 接收者={}, 消息ID={}", |
||||
|
receiverId, messageId); |
||||
|
} |
||||
|
} |
||||
|
} else { |
||||
|
// 用户离线 |
||||
|
if (retryCount < 3) { |
||||
|
// 未超过重试次数,继续重试 |
||||
|
log.info("用户[{}]仍离线,进行第{}次重试", receiverId, retryCount + 1); |
||||
|
message.put("retryCount", retryCount + 1); |
||||
|
throw new RuntimeException("用户离线,需要重试"); |
||||
|
} else { |
||||
|
// 超过重试次数,消息已经在数据库,用户上线后会拉取 |
||||
|
log.info("用户[{}]持续离线,等待上线后拉取历史消息: 消息ID={}", |
||||
|
receiverId, messageId); |
||||
|
// 不抛异常,消费成功 |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,78 @@ |
|||||
|
package com.chenhai.system.service; |
||||
|
|
||||
|
import com.chenhai.system.domain.ChatMessages; |
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* 聊天消息Service接口 |
||||
|
* |
||||
|
* @author chenhai |
||||
|
*/ |
||||
|
public interface IChatMessagesService { |
||||
|
|
||||
|
/** |
||||
|
* 获取会话历史消息 |
||||
|
*/ |
||||
|
List<ChatMessages> getHistoryMessages(Long sessionId, Long lastId); |
||||
|
|
||||
|
/** |
||||
|
* 获取最后一条消息 |
||||
|
*/ |
||||
|
ChatMessages getLastMessage(Long sessionId); |
||||
|
|
||||
|
/** |
||||
|
* 获取未读消息 |
||||
|
*/ |
||||
|
List<ChatMessages> getUnreadMessages(Long sessionId, Long receiverId); |
||||
|
|
||||
|
/** |
||||
|
* 标记消息已读 |
||||
|
*/ |
||||
|
void markAsRead(List<Long> messageIds, Long sessionId, Long readerId); |
||||
|
|
||||
|
/** |
||||
|
* 发送消息 - WebSocket调用,需要传入senderId |
||||
|
* |
||||
|
* @param senderId 发送者ID |
||||
|
* @param receiverId 接收者ID |
||||
|
* @param contentType 消息类型 |
||||
|
* @param content 消息内容 |
||||
|
* @return 保存后的消息对象 |
||||
|
*/ |
||||
|
ChatMessages sendMessage(Long senderId, Long receiverId, String contentType, String content); |
||||
|
|
||||
|
/** |
||||
|
* 发送消息 - HTTP调用,从SecurityUtils获取senderId |
||||
|
* |
||||
|
* @param receiverId 接收者ID |
||||
|
* @param contentType 消息类型 |
||||
|
* @param content 消息内容 |
||||
|
* @return 保存后的消息对象 |
||||
|
*/ |
||||
|
ChatMessages sendMessage(Long receiverId, String contentType, String content); |
||||
|
|
||||
|
/** |
||||
|
* 新增消息 |
||||
|
*/ |
||||
|
int insertChatMessages(ChatMessages chatMessages); |
||||
|
|
||||
|
/** |
||||
|
* 修改消息 |
||||
|
*/ |
||||
|
// int updateChatMessages(ChatMessages chatMessages); |
||||
|
// |
||||
|
// /** |
||||
|
// * 删除消息 |
||||
|
// */ |
||||
|
// int deleteChatMessagesById(Long id); |
||||
|
// |
||||
|
// /** |
||||
|
// * 批量删除消息 |
||||
|
// */ |
||||
|
// int deleteChatMessagesByIds(String[] ids); |
||||
|
|
||||
|
/** |
||||
|
* 根据ID查询消息 |
||||
|
*/ |
||||
|
ChatMessages getChatMessagesById(Long id); |
||||
|
} |
||||
@ -0,0 +1,47 @@ |
|||||
|
package com.chenhai.system.service; |
||||
|
|
||||
|
import com.chenhai.system.domain.ChatSessions; |
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* 聊天会话Service接口 |
||||
|
* |
||||
|
* @author chenhai |
||||
|
*/ |
||||
|
public interface IChatSessionsService { |
||||
|
|
||||
|
/** |
||||
|
* 查询用户的所有会话 |
||||
|
*/ |
||||
|
List<ChatSessions> selectSessionsByUserId(Long userId); |
||||
|
|
||||
|
/** |
||||
|
* 根据双方ID获取会话 |
||||
|
*/ |
||||
|
ChatSessions getSessionByUsers(Long userId, Long otherUserId); |
||||
|
|
||||
|
/** |
||||
|
* 获取或创建会话 |
||||
|
*/ |
||||
|
ChatSessions getOrCreateSession(Long userId, Long otherUserId); |
||||
|
|
||||
|
/** |
||||
|
* 发送消息后更新会话 |
||||
|
*/ |
||||
|
void afterSendMessage(Long senderSessionId, Long receiverSessionId, |
||||
|
Long senderId, Long receiverId, String content); |
||||
|
|
||||
|
/** |
||||
|
* 标记会话已读 |
||||
|
*/ |
||||
|
void markAsRead(Long sessionId, Long userId); |
||||
|
|
||||
|
/** |
||||
|
* 获取与指定用户相关的所有用户ID(有会话关联的用户) |
||||
|
* 用于在线状态广播 |
||||
|
* |
||||
|
* @param userId 当前用户ID |
||||
|
* @return 相关用户ID列表 |
||||
|
*/ |
||||
|
List<Long> getRelatedUserIds(Long userId); |
||||
|
} |
||||
@ -0,0 +1,147 @@ |
|||||
|
package com.chenhai.system.service.impl; |
||||
|
|
||||
|
import com.chenhai.common.utils.SecurityUtils; |
||||
|
import com.chenhai.system.domain.ChatMessages; |
||||
|
import com.chenhai.system.domain.ChatSessions; |
||||
|
import com.chenhai.system.mapper.ChatMessagesMapper; |
||||
|
import com.chenhai.system.service.IChatMessagesService; |
||||
|
import com.chenhai.system.service.IChatSessionsService; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.stereotype.Service; |
||||
|
import org.springframework.transaction.annotation.Transactional; |
||||
|
import java.util.Date; |
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* 聊天消息Service实现 |
||||
|
* |
||||
|
* @author chenhai |
||||
|
*/ |
||||
|
@Service |
||||
|
public class ChatMessagesServiceImpl implements IChatMessagesService { |
||||
|
|
||||
|
private static final Logger log = LoggerFactory.getLogger(ChatMessagesServiceImpl.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private ChatMessagesMapper chatMessagesMapper; |
||||
|
|
||||
|
@Autowired |
||||
|
private IChatSessionsService chatSessionsService; |
||||
|
|
||||
|
@Override |
||||
|
public List<ChatMessages> getHistoryMessages(Long sessionId, Long lastId) { |
||||
|
return chatMessagesMapper.selectMessagesBySession(sessionId, lastId); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public ChatMessages getLastMessage(Long sessionId) { |
||||
|
return chatMessagesMapper.selectLastMessageBySession(sessionId); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public List<ChatMessages> getUnreadMessages(Long sessionId, Long receiverId) { |
||||
|
return chatMessagesMapper.selectUnreadMessages(sessionId, receiverId); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
@Transactional |
||||
|
public void markAsRead(List<Long> messageIds, Long sessionId, Long readerId) { |
||||
|
Date now = new Date(); |
||||
|
if (messageIds != null && !messageIds.isEmpty()) { |
||||
|
chatMessagesMapper.batchMarkAsRead(messageIds, now); |
||||
|
} else { |
||||
|
chatMessagesMapper.markSessionAsRead(sessionId, readerId, now); |
||||
|
} |
||||
|
chatSessionsService.markAsRead(sessionId, readerId); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送消息 - 从参数获取senderId,不依赖SecurityUtils |
||||
|
* |
||||
|
* @param senderId 发送者ID(从WebSocket上下文中获取) |
||||
|
* @param receiverId 接收者ID |
||||
|
* @param contentType 消息类型 |
||||
|
* @param content 消息内容 |
||||
|
* @return 保存后的消息对象 |
||||
|
*/ |
||||
|
@Override |
||||
|
@Transactional |
||||
|
public ChatMessages sendMessage(Long senderId, Long receiverId, String contentType, String content) { |
||||
|
log.info("发送消息 - senderId: {}, receiverId: {}, content: {}", senderId, receiverId, content); |
||||
|
|
||||
|
// 关键修复:检查是否给自己发消息 |
||||
|
if (senderId.equals(receiverId)) { |
||||
|
log.error("不能给自己发送消息: senderId={}, receiverId={}", senderId, receiverId); |
||||
|
throw new RuntimeException("不能给自己发送消息"); |
||||
|
} |
||||
|
|
||||
|
// 1. 获取双方会话ID |
||||
|
ChatSessions senderSession = chatSessionsService.getOrCreateSession(senderId, receiverId); |
||||
|
ChatSessions receiverSession = chatSessionsService.getOrCreateSession(receiverId, senderId); |
||||
|
|
||||
|
log.info("发送者会话ID: {}, 接收者会话ID: {}", senderSession.getId(), receiverSession.getId()); |
||||
|
|
||||
|
// 2. 创建消息对象 |
||||
|
ChatMessages message = new ChatMessages(); |
||||
|
message.setSenderSessionId(senderSession.getId()); |
||||
|
message.setReceiverSessionId(receiverSession.getId()); |
||||
|
message.setSenderId(senderId); |
||||
|
message.setReceiverId(receiverId); |
||||
|
message.setContentType(contentType); |
||||
|
message.setContent(content); |
||||
|
message.setIsRead(0); |
||||
|
message.setCreatedAt(new Date()); |
||||
|
|
||||
|
chatMessagesMapper.insertChatMessages(message); |
||||
|
|
||||
|
// 3. 更新会话 |
||||
|
chatSessionsService.afterSendMessage( |
||||
|
senderSession.getId(), |
||||
|
receiverSession.getId(), |
||||
|
senderId, |
||||
|
receiverId, |
||||
|
content |
||||
|
); |
||||
|
|
||||
|
return message; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送消息 - HTTP调用,从SecurityUtils获取senderId |
||||
|
*/ |
||||
|
@Override |
||||
|
@Transactional |
||||
|
public ChatMessages sendMessage(Long receiverId, String contentType, String content) { |
||||
|
Long senderId = SecurityUtils.getUserId(); |
||||
|
return this.sendMessage(senderId, receiverId, contentType, content); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public int insertChatMessages(ChatMessages chatMessages) { |
||||
|
chatMessages.setCreateTime(new Date()); |
||||
|
return chatMessagesMapper.insertChatMessages(chatMessages); |
||||
|
} |
||||
|
|
||||
|
// @Override |
||||
|
// public int updateChatMessages(ChatMessages chatMessages) { |
||||
|
// chatMessages.setUpdateTime(new Date()); |
||||
|
// return chatMessagesMapper.updateChatMessages(chatMessages); |
||||
|
// } |
||||
|
// |
||||
|
// @Override |
||||
|
// public int deleteChatMessagesById(Long id) { |
||||
|
// return chatMessagesMapper.deleteChatMessagesById(id); |
||||
|
// } |
||||
|
// |
||||
|
// @Override |
||||
|
// public int deleteChatMessagesByIds(String[] ids) { |
||||
|
// return chatMessagesMapper.deleteChatMessagesByIds(ids); |
||||
|
// } |
||||
|
|
||||
|
@Override |
||||
|
public ChatMessages getChatMessagesById(Long id) { |
||||
|
return chatMessagesMapper.selectChatMessagesById(id); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,143 @@ |
|||||
|
package com.chenhai.system.service.impl; |
||||
|
|
||||
|
import com.chenhai.system.domain.ChatSessions; |
||||
|
import com.chenhai.system.mapper.ChatSessionsMapper; |
||||
|
import com.chenhai.system.service.IChatSessionsService; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.stereotype.Service; |
||||
|
import org.springframework.transaction.annotation.Transactional; |
||||
|
|
||||
|
import java.util.ArrayList; |
||||
|
import java.util.Date; |
||||
|
import java.util.List; |
||||
|
import java.util.stream.Collectors; |
||||
|
|
||||
|
/** |
||||
|
* 聊天会话Service实现 |
||||
|
* |
||||
|
* @author chenhai |
||||
|
*/ |
||||
|
@Service |
||||
|
public class ChatSessionsServiceImpl implements IChatSessionsService { |
||||
|
|
||||
|
private static final Logger log = LoggerFactory.getLogger(ChatSessionsServiceImpl.class); |
||||
|
|
||||
|
@Autowired |
||||
|
private ChatSessionsMapper chatSessionsMapper; |
||||
|
|
||||
|
@Override |
||||
|
public List<ChatSessions> selectSessionsByUserId(Long userId) { |
||||
|
return chatSessionsMapper.selectSessionsByUserId(userId); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public ChatSessions getSessionByUsers(Long userId, Long otherUserId) { |
||||
|
return chatSessionsMapper.selectSessionByUsers(userId, otherUserId); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
@Transactional |
||||
|
public ChatSessions getOrCreateSession(Long userId, Long otherUserId) { |
||||
|
// 关键修复:检查是否和自己聊天 |
||||
|
if (userId.equals(otherUserId)) { |
||||
|
log.error("不能和自己创建会话: userId={}, otherUserId={}", userId, otherUserId); |
||||
|
throw new RuntimeException("不能和自己聊天"); |
||||
|
} |
||||
|
|
||||
|
ChatSessions session = chatSessionsMapper.selectSessionByUsers(userId, otherUserId); |
||||
|
if (session == null) { |
||||
|
session = new ChatSessions(); |
||||
|
session.setUserId(userId); |
||||
|
session.setOtherUserId(otherUserId); |
||||
|
session.setUserUnread(0); |
||||
|
session.setOtherUnread(0); |
||||
|
session.setLastActiveTime(new Date()); |
||||
|
chatSessionsMapper.insertChatSessions(session); |
||||
|
log.info("创建新会话: userId={}, otherUserId={}, sessionId={}", userId, otherUserId, session.getId()); |
||||
|
} |
||||
|
return session; |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
@Transactional |
||||
|
public void afterSendMessage(Long senderSessionId, Long receiverSessionId, |
||||
|
Long senderId, Long receiverId, String content) { |
||||
|
Date now = new Date(); |
||||
|
|
||||
|
// 1. 更新发送者视角的会话 - 增加对方的未读 |
||||
|
chatSessionsMapper.incrementOtherUnread(senderSessionId, now); |
||||
|
|
||||
|
// 2. 更新发送者视角的会话的最后消息 |
||||
|
ChatSessions senderSession = new ChatSessions(); |
||||
|
senderSession.setId(senderSessionId); |
||||
|
senderSession.setLastMessage(content); |
||||
|
senderSession.setLastMessageTime(now); |
||||
|
senderSession.setLastActiveTime(now); |
||||
|
chatSessionsMapper.updateLastMessage(senderSession); |
||||
|
|
||||
|
// 3. 更新接收者视角的会话 - 增加自己的未读 |
||||
|
chatSessionsMapper.incrementUserUnread(receiverSessionId, now); |
||||
|
|
||||
|
// 4. 更新接收者视角的会话的最后消息 |
||||
|
ChatSessions receiverSession = new ChatSessions(); |
||||
|
receiverSession.setId(receiverSessionId); |
||||
|
receiverSession.setLastMessage(content); |
||||
|
receiverSession.setLastMessageTime(now); |
||||
|
receiverSession.setLastActiveTime(now); |
||||
|
chatSessionsMapper.updateLastMessage(receiverSession); |
||||
|
|
||||
|
log.info("会话更新完成: senderSessionId={}, receiverSessionId={}, content={}", |
||||
|
senderSessionId, receiverSessionId, content); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
@Transactional |
||||
|
public void markAsRead(Long sessionId, Long userId) { |
||||
|
chatSessionsMapper.resetUnread(sessionId); |
||||
|
log.info("会话已读: sessionId={}, userId={}", sessionId, userId); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 获取与指定用户相关的所有用户ID(有会话关联的用户) |
||||
|
* 实现方法:查询所有包含该用户的会话,提取对方用户ID |
||||
|
* |
||||
|
* @param userId 当前用户ID |
||||
|
* @return 相关用户ID列表(不包含自己) |
||||
|
*/ |
||||
|
@Override |
||||
|
public List<Long> getRelatedUserIds(Long userId) { |
||||
|
try { |
||||
|
// 查询用户的所有会话 |
||||
|
List<ChatSessions> sessions = chatSessionsMapper.selectSessionsByUserId(userId); |
||||
|
|
||||
|
if (sessions == null || sessions.isEmpty()) { |
||||
|
log.info("用户[{}]没有会话记录", userId); |
||||
|
return new ArrayList<>(); |
||||
|
} |
||||
|
|
||||
|
// 从每个会话中提取对方用户ID |
||||
|
List<Long> relatedUserIds = sessions.stream() |
||||
|
.map(session -> { |
||||
|
// 如果当前用户是userId,则对方是otherUserId |
||||
|
// 注意:这里假设会话表结构是(user_id, other_user_id) |
||||
|
if (userId.equals(session.getUserId())) { |
||||
|
return session.getOtherUserId(); |
||||
|
} else { |
||||
|
return session.getUserId(); |
||||
|
} |
||||
|
}) |
||||
|
.distinct() // 去重 |
||||
|
.filter(id -> !id.equals(userId)) // 过滤掉自己(安全起见) |
||||
|
.collect(Collectors.toList()); |
||||
|
|
||||
|
log.info("用户[{}]的相关用户: {}", userId, relatedUserIds); |
||||
|
return relatedUserIds; |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
log.error("获取相关用户ID失败: userId={}", userId, e); |
||||
|
return new ArrayList<>(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,114 @@ |
|||||
|
<?xml version="1.0" encoding="UTF-8" ?> |
||||
|
<!DOCTYPE mapper |
||||
|
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" |
||||
|
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"> |
||||
|
<mapper namespace="com.chenhai.system.mapper.ChatMessagesMapper"> |
||||
|
|
||||
|
<resultMap type="com.chenhai.system.domain.ChatMessages" id="ChatMessagesResult"> |
||||
|
<id property="id" column="id" /> |
||||
|
<result property="senderSessionId" column="sender_session_id" /> |
||||
|
<result property="receiverSessionId" column="receiver_session_id" /> |
||||
|
<result property="senderId" column="sender_id" /> |
||||
|
<result property="receiverId" column="receiver_id" /> |
||||
|
<result property="contentType" column="content_type" /> |
||||
|
<result property="content" column="content" /> |
||||
|
<result property="fileUrl" column="file_url" /> |
||||
|
<result property="fileName" column="file_name" /> |
||||
|
<result property="fileSize" column="file_size" /> |
||||
|
<result property="videoDuration" column="video_duration" /> |
||||
|
<result property="thumbnailUrl" column="thumbnail_url" /> |
||||
|
<result property="isRead" column="is_read" /> |
||||
|
<result property="readAt" column="read_at" /> |
||||
|
<result property="createdAt" column="created_at" /> |
||||
|
</resultMap> |
||||
|
|
||||
|
<sql id="selectChatMessagesVo"> |
||||
|
select id, sender_session_id, receiver_session_id, sender_id, receiver_id, |
||||
|
content_type, content, file_url, file_name, file_size, video_duration, |
||||
|
thumbnail_url, is_read, read_at, created_at |
||||
|
from chat_messages |
||||
|
</sql> |
||||
|
|
||||
|
<!-- 查询消息 by ID --> |
||||
|
<select id="selectChatMessagesById" parameterType="Long" resultMap="ChatMessagesResult"> |
||||
|
<include refid="selectChatMessagesVo"/> |
||||
|
where id = #{id} |
||||
|
</select> |
||||
|
|
||||
|
<!-- 查询消息列表 - 用于PageHelper分页 --> |
||||
|
<select id="selectChatMessagesList" parameterType="com.chenhai.system.domain.ChatMessages" resultMap="ChatMessagesResult"> |
||||
|
<include refid="selectChatMessagesVo"/> |
||||
|
<where> |
||||
|
<if test="senderSessionId != null and senderSessionId != 0"> |
||||
|
AND sender_session_id = #{senderSessionId} |
||||
|
</if> |
||||
|
<if test="receiverSessionId != null and receiverSessionId != 0"> |
||||
|
AND receiver_session_id = #{receiverSessionId} |
||||
|
</if> |
||||
|
<if test="senderId != null and senderId != 0"> |
||||
|
AND sender_id = #{senderId} |
||||
|
</if> |
||||
|
<if test="receiverId != null and receiverId != 0"> |
||||
|
AND receiver_id = #{receiverId} |
||||
|
</if> |
||||
|
</where> |
||||
|
order by id desc |
||||
|
</select> |
||||
|
|
||||
|
<!-- 根据会话ID查询消息 - 用于PageHelper分页,不要手动LIMIT --> |
||||
|
<select id="selectMessagesBySession" resultMap="ChatMessagesResult"> |
||||
|
<include refid="selectChatMessagesVo"/> |
||||
|
where sender_session_id = #{sessionId} or receiver_session_id = #{sessionId} |
||||
|
<if test="lastId != null and lastId > 0"> |
||||
|
and id < #{lastId} |
||||
|
</if> |
||||
|
order by id desc |
||||
|
<!-- 注意:这里不能写 LIMIT,由 PageHelper 自动添加 --> |
||||
|
</select> |
||||
|
|
||||
|
<!-- 获取会话的最后一条消息 - 需要手动LIMIT 1 --> |
||||
|
<select id="selectLastMessageBySession" resultMap="ChatMessagesResult"> |
||||
|
<include refid="selectChatMessagesVo"/> |
||||
|
where sender_session_id = #{sessionId} or receiver_session_id = #{sessionId} |
||||
|
order by id desc |
||||
|
limit 1 <!-- 这里需要 limit 1,因为只需要一条 --> |
||||
|
</select> |
||||
|
|
||||
|
<!-- 获取未读消息列表 - 不需要分页 --> |
||||
|
<select id="selectUnreadMessages" resultMap="ChatMessagesResult"> |
||||
|
<include refid="selectChatMessagesVo"/> |
||||
|
where receiver_session_id = #{sessionId} |
||||
|
and receiver_id = #{receiverId} |
||||
|
and is_read = 0 |
||||
|
order by id asc |
||||
|
</select> |
||||
|
|
||||
|
<!-- 插入消息 --> |
||||
|
<insert id="insertChatMessages" parameterType="com.chenhai.system.domain.ChatMessages" useGeneratedKeys="true" keyProperty="id"> |
||||
|
insert into chat_messages ( |
||||
|
sender_session_id, receiver_session_id, sender_id, receiver_id, |
||||
|
content_type, content, is_read, created_at |
||||
|
) values ( |
||||
|
#{senderSessionId}, #{receiverSessionId}, #{senderId}, #{receiverId}, |
||||
|
#{contentType}, #{content}, 0, sysdate() |
||||
|
) |
||||
|
</insert> |
||||
|
|
||||
|
<!-- 批量标记已读 --> |
||||
|
<update id="batchMarkAsRead"> |
||||
|
update chat_messages set is_read = 1, read_at = #{readAt} |
||||
|
where id in |
||||
|
<foreach collection="messageIds" item="id" open="(" separator="," close=")"> |
||||
|
#{id} |
||||
|
</foreach> |
||||
|
</update> |
||||
|
|
||||
|
<!-- 标记会话所有消息已读 --> |
||||
|
<update id="markSessionAsRead"> |
||||
|
update chat_messages set is_read = 1, read_at = #{readAt} |
||||
|
where receiver_session_id = #{sessionId} |
||||
|
and receiver_id = #{receiverId} |
||||
|
and is_read = 0 |
||||
|
</update> |
||||
|
|
||||
|
</mapper> |
||||
@ -0,0 +1,135 @@ |
|||||
|
<?xml version="1.0" encoding="UTF-8" ?> |
||||
|
<!DOCTYPE mapper |
||||
|
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" |
||||
|
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"> |
||||
|
<mapper namespace="com.chenhai.system.mapper.ChatSessionsMapper"> |
||||
|
|
||||
|
<resultMap type="com.chenhai.system.domain.ChatSessions" id="ChatSessionsResult"> |
||||
|
<id property="id" column="id" /> |
||||
|
<result property="userId" column="user_id" /> |
||||
|
<result property="otherUserId" column="other_user_id" /> |
||||
|
<result property="userUnread" column="user_unread" /> |
||||
|
<result property="otherUnread" column="other_unread" /> |
||||
|
<result property="lastMessage" column="last_message" /> |
||||
|
<result property="lastMessageTime" column="last_message_time" /> |
||||
|
<result property="lastActiveTime" column="last_active_time" /> |
||||
|
<result property="createdAt" column="created_at" /> |
||||
|
<result property="updatedAt" column="updated_at" /> |
||||
|
<result property="createTime" column="created_at" /> |
||||
|
<result property="updateTime" column="updated_at" /> |
||||
|
</resultMap> |
||||
|
|
||||
|
<sql id="selectChatSessionsVo"> |
||||
|
select id, user_id, other_user_id, user_unread, other_unread, |
||||
|
last_message, last_message_time, last_active_time, |
||||
|
created_at, updated_at |
||||
|
from chat_sessions |
||||
|
</sql> |
||||
|
|
||||
|
<select id="selectChatSessionsById" parameterType="Long" resultMap="ChatSessionsResult"> |
||||
|
<include refid="selectChatSessionsVo"/> |
||||
|
where id = #{id} |
||||
|
</select> |
||||
|
|
||||
|
<select id="selectChatSessionsList" parameterType="com.chenhai.system.domain.ChatSessions" resultMap="ChatSessionsResult"> |
||||
|
<include refid="selectChatSessionsVo"/> |
||||
|
<where> |
||||
|
<if test="userId != null and userId != 0"> |
||||
|
AND user_id = #{userId} |
||||
|
</if> |
||||
|
<if test="otherUserId != null and otherUserId != 0"> |
||||
|
AND other_user_id = #{otherUserId} |
||||
|
</if> |
||||
|
</where> |
||||
|
order by last_active_time desc |
||||
|
</select> |
||||
|
|
||||
|
<select id="selectSessionsByUserId" parameterType="Long" resultMap="ChatSessionsResult"> |
||||
|
<include refid="selectChatSessionsVo"/> |
||||
|
where user_id = #{userId} |
||||
|
order by last_active_time desc |
||||
|
</select> |
||||
|
|
||||
|
<select id="selectSessionByUsers" resultMap="ChatSessionsResult"> |
||||
|
<include refid="selectChatSessionsVo"/> |
||||
|
where user_id = #{userId} and other_user_id = #{otherUserId} |
||||
|
</select> |
||||
|
|
||||
|
<insert id="insertChatSessions" parameterType="com.chenhai.system.domain.ChatSessions" useGeneratedKeys="true" keyProperty="id"> |
||||
|
insert into chat_sessions ( |
||||
|
user_id, other_user_id, user_unread, other_unread, |
||||
|
last_message, last_message_time, last_active_time, |
||||
|
created_at |
||||
|
) values ( |
||||
|
#{userId}, #{otherUserId}, 0, 0, |
||||
|
#{lastMessage}, #{lastMessageTime}, #{lastActiveTime}, |
||||
|
sysdate() |
||||
|
) |
||||
|
</insert> |
||||
|
|
||||
|
<update id="updateChatSessions" parameterType="com.chenhai.system.domain.ChatSessions"> |
||||
|
update chat_sessions |
||||
|
<set> |
||||
|
<if test="lastMessage != null">last_message = #{lastMessage},</if> |
||||
|
<if test="lastMessageTime != null">last_message_time = #{lastMessageTime},</if> |
||||
|
<if test="userUnread != null">user_unread = #{userUnread},</if> |
||||
|
<if test="otherUnread != null">other_unread = #{otherUnread},</if> |
||||
|
<if test="lastActiveTime != null">last_active_time = #{lastActiveTime},</if> |
||||
|
</set> |
||||
|
where id = #{id} |
||||
|
</update> |
||||
|
|
||||
|
<update id="updateLastMessage" parameterType="com.chenhai.system.domain.ChatSessions"> |
||||
|
update chat_sessions |
||||
|
set last_message = #{lastMessage}, |
||||
|
last_message_time = #{lastMessageTime}, |
||||
|
last_active_time = sysdate() |
||||
|
where id = #{id} |
||||
|
</update> |
||||
|
|
||||
|
<!-- 修复 incrementUnread - 使用两个独立的更新语句,而不是条件判断 --> |
||||
|
<update id="incrementUnread"> |
||||
|
update chat_sessions |
||||
|
set |
||||
|
<if test="receiverId != null and receiverId != 0"> |
||||
|
<!-- 这里不能直接写子查询,我们用两个单独的更新语句 --> |
||||
|
user_unread = user_unread + 1 |
||||
|
</if> |
||||
|
where id = #{sessionId} |
||||
|
and user_id = #{receiverId} <!-- 如果接收者是当前用户,增加 user_unread --> |
||||
|
</update> |
||||
|
|
||||
|
<!-- 新增一个专门用于增加对方未读的方法 --> |
||||
|
<update id="incrementOtherUnread"> |
||||
|
update chat_sessions |
||||
|
set other_unread = other_unread + 1, |
||||
|
last_active_time = #{lastActiveTime} |
||||
|
where id = #{sessionId} |
||||
|
</update> |
||||
|
|
||||
|
<!-- 新增一个专门用于增加自己未读的方法 --> |
||||
|
<update id="incrementUserUnread"> |
||||
|
update chat_sessions |
||||
|
set user_unread = user_unread + 1, |
||||
|
last_active_time = #{lastActiveTime} |
||||
|
where id = #{sessionId} |
||||
|
</update> |
||||
|
|
||||
|
<update id="resetUnread"> |
||||
|
update chat_sessions |
||||
|
set user_unread = 0 |
||||
|
where id = #{sessionId} |
||||
|
</update> |
||||
|
|
||||
|
<delete id="deleteChatSessionsById" parameterType="Long"> |
||||
|
delete from chat_sessions where id = #{id} |
||||
|
</delete> |
||||
|
|
||||
|
<delete id="deleteChatSessionsByIds" parameterType="String"> |
||||
|
delete from chat_sessions where id in |
||||
|
<foreach item="id" collection="array" open="(" separator="," close=")"> |
||||
|
#{id} |
||||
|
</foreach> |
||||
|
</delete> |
||||
|
|
||||
|
</mapper> |
||||
@ -1,72 +1,72 @@ |
|||||
// /api/chat/chat.js - 修改为使用现有接口
|
|
||||
|
// src/api/system/chat.js
|
||||
import request from '@/utils/request' |
import request from '@/utils/request' |
||||
|
|
||||
// 使用现有的专家接口获取专家信息
|
|
||||
export function getExpertInfo(expertId) { |
|
||||
|
/** |
||||
|
* 获取会话列表 |
||||
|
*/ |
||||
|
export function listSessions() { |
||||
return request({ |
return request({ |
||||
url: '/vet/experts/' + expertId, |
|
||||
|
url: '/system/chat/sessions', |
||||
method: 'get' |
method: 'get' |
||||
}) |
}) |
||||
} |
} |
||||
|
|
||||
// 使用现有的会话接口创建会话
|
|
||||
export function createOrGetSession(data) { |
|
||||
|
/** |
||||
|
* 标记会话已读 |
||||
|
* @param {Object} data - { sessionId } |
||||
|
*/ |
||||
|
export function markSessionRead(data) { |
||||
return request({ |
return request({ |
||||
url: '/system/session/createOrGetSession', |
|
||||
|
url: '/system/chat/session/read', |
||||
method: 'post', |
method: 'post', |
||||
data: data |
|
||||
|
data |
||||
}) |
}) |
||||
} |
} |
||||
|
|
||||
// 使用现有的消息接口发送消息
|
|
||||
export function sendTextMessage(data) { |
|
||||
|
/** |
||||
|
* 获取历史消息 |
||||
|
* @param {Object} params - { sessionId, pageNum, pageSize, lastId } |
||||
|
*/ |
||||
|
export function getMessages(params) { |
||||
return request({ |
return request({ |
||||
url: '/system/message/send', |
|
||||
method: 'post', |
|
||||
data: data |
|
||||
}) |
|
||||
} |
|
||||
|
|
||||
// 使用现有的消息接口获取会话消息
|
|
||||
export function getSessionMessages(sessionId) { |
|
||||
const query = { |
|
||||
sessionId: sessionId, |
|
||||
delFlag: '0' |
|
||||
} |
|
||||
return request({ |
|
||||
url: '/system/message/list', |
|
||||
|
url: '/system/chat/messages', |
||||
method: 'get', |
method: 'get', |
||||
params: query |
|
||||
|
params |
||||
}) |
}) |
||||
} |
} |
||||
|
|
||||
// 标记消息为已读(通过修改消息接口)
|
|
||||
export function markMessagesAsRead(sessionId, senderType) { |
|
||||
|
/** |
||||
|
* 发送消息 |
||||
|
* @param {Object} data - { receiverId, contentType, content } |
||||
|
*/ |
||||
|
export function sendMessage(data) { |
||||
return request({ |
return request({ |
||||
url: '/system/message/markAllRead', |
|
||||
|
url: '/system/chat/message/send', |
||||
method: 'post', |
method: 'post', |
||||
data: { sessionId, senderType } |
|
||||
|
data |
||||
}) |
}) |
||||
} |
} |
||||
|
|
||||
// 获取用户会话列表
|
|
||||
export function getUserSessions(userId) { |
|
||||
const query = { |
|
||||
userId: userId, |
|
||||
delFlag: '0' |
|
||||
} |
|
||||
|
/** |
||||
|
* 标记消息已读 |
||||
|
* @param {Object} data - { messageIds, sessionId } |
||||
|
*/ |
||||
|
export function markMessagesRead(data) { |
||||
return request({ |
return request({ |
||||
url: '/system/session/list', |
|
||||
method: 'get', |
|
||||
params: query |
|
||||
|
url: '/system/chat/message/read', |
||||
|
method: 'post', |
||||
|
data |
||||
}) |
}) |
||||
} |
} |
||||
|
|
||||
// 结束会话
|
|
||||
export function endSession(sessionId, reason) { |
|
||||
|
/** |
||||
|
* 批量查询用户在线状态 |
||||
|
* @param {Object} data - { userIds: [1,2,3] } |
||||
|
*/ |
||||
|
export function batchCheckStatus(data) { |
||||
return request({ |
return request({ |
||||
url: '/system/session/end', |
|
||||
|
url: '/system/chat/status/batch', |
||||
method: 'post', |
method: 'post', |
||||
data: { sessionId, reason } |
|
||||
|
data |
||||
}) |
}) |
||||
} |
} |
||||
@ -0,0 +1,55 @@ |
|||||
|
import request from '@/utils/request' |
||||
|
|
||||
|
/** |
||||
|
* 会话列表相关API |
||||
|
*/ |
||||
|
|
||||
|
// 查询我的会话列表
|
||||
|
export function listMySessions(query) { |
||||
|
return request({ |
||||
|
url: '/chat/sessions/list', |
||||
|
method: 'get', |
||||
|
params: query |
||||
|
}) |
||||
|
} |
||||
|
|
||||
|
// 获取会话详情
|
||||
|
export function getSessionDetail(sessionId) { |
||||
|
return request({ |
||||
|
url: `/chat/session/${sessionId}`, |
||||
|
method: 'get' |
||||
|
}) |
||||
|
} |
||||
|
|
||||
|
// 创建新会话
|
||||
|
export function createSession(data) { |
||||
|
return request({ |
||||
|
url: '/chat/session', |
||||
|
method: 'post', |
||||
|
data: data |
||||
|
}) |
||||
|
} |
||||
|
|
||||
|
// 删除会话
|
||||
|
export function deleteSession(sessionId) { |
||||
|
return request({ |
||||
|
url: `/chat/session/${sessionId}`, |
||||
|
method: 'delete' |
||||
|
}) |
||||
|
} |
||||
|
|
||||
|
// 标记会话已读
|
||||
|
export function markSessionRead(sessionId) { |
||||
|
return request({ |
||||
|
url: `/chat/session/${sessionId}/read`, |
||||
|
method: 'put' |
||||
|
}) |
||||
|
} |
||||
|
|
||||
|
// 获取会话统计
|
||||
|
export function getSessionStats() { |
||||
|
return request({ |
||||
|
url: '/chat/session/stats', |
||||
|
method: 'get' |
||||
|
}) |
||||
|
} |
||||
1510
chenhai-ui/src/views/vet/chatManager/index.vue
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
Write
Preview
Loading…
Cancel
Save
Reference in new issue