huang
2025-10-30 a99650cb00bf5b0650c33f39a4221b765201d228
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package com.mes.websocket;
 
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
 
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
/**
 * WebSocketServer class
 *
 * @author zsh
 * @date 2025/07/04
 */
@ServerEndpoint(value = "/webSocket/{webSocketName}")
@Component
public class WebSocketServer {
 
    /** 日志记录器,记录 WebSocketServer 类运行时日志信息 */
    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
 
    /** 单次消息允许的最大字符数(超出则进行分块) */
    private static final int MAX_MESSAGE_SIZE = 50000;
 
    /** 用于限制并发连接数量的信号量,控制最大并发数为 100 */
    private static final Semaphore SEMAPHORE = new Semaphore(100);
 
    /**
     * 用户名与 WebSocket 实例映射表,支持一个用户对应多个连接。
     * 使用线程安全的 ConcurrentHashMap 存储,保证高并发下的数据一致性。
     */
    private static final Map<String, List<WebSocketServer>> SESSIONMAP = new ConcurrentHashMap<>();
 
    /** 当前连接对应的用户名 */
    private String webSocketName;
 
    /** 当前连接的 WebSocket 会话对象 */
    private Session session;
 
    /** 线程安全的消息缓存,用于记录当前连接收到的所有消息 */
    private final List<String> messages = new CopyOnWriteArrayList<>();
 
 
    /**
     * 连接建立
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("webSocketName") String webSocketName) {
        this.webSocketName = webSocketName;
        this.session = session;
 
        SESSIONMAP.computeIfAbsent(webSocketName, k -> new CopyOnWriteArrayList<>())
                .add(this);
 
        log.info("用户连接: webSocketName={}, 当前会话数: {}", webSocketName,
                SESSIONMAP.getOrDefault(webSocketName, Collections.emptyList()).size());
    }
 
    /**
     * 连接关闭
     */
    @OnClose
    public void onClose() {
        List<WebSocketServer> sessions = SESSIONMAP.get(webSocketName);
        if (sessions != null) {
            sessions.remove(this);
            if (sessions.isEmpty()) {
                SESSIONMAP.remove(webSocketName);
            }
            log.info("用户断开: webSocketName={}, 剩余会话数: {}", webSocketName,
                    SESSIONMAP.getOrDefault(webSocketName, Collections.emptyList()).size());
        }
    }
 
    /**
     * 接收消息
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("收到消息: webSocketName={}, content={}", webSocketName, message);
        JSONObject obj = JSONUtil.parseObj(message);
        // 存储消息历史
        messages.add(obj.getStr("data"));
    }
 
    /**
     * 错误处理
     */
    @OnError
    public void onError(Throwable error) {
        log.error("WebSocket错误: webSocketName={}", webSocketName, error);
    }
 
    /**
     * 向当前用户的所有会话发送消息
     */
    public void sendToWeb(String webSocketName, String message) {
        List<WebSocketServer> sessions = SESSIONMAP.get(webSocketName);
        if (sessions == null) {
            return;
        }
 
        sessions.forEach(ws -> {
            try {
                SEMAPHORE.acquire();
                ws.sendChunkedMessage(message);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                log.error("推送失败: webSocketName={}", webSocketName, e);
            } finally {
                SEMAPHORE.release();
            }
        });
    }
 
    /**
     * 分块发送大消息
     */
    private void sendChunkedMessage(String message) {
        if (!session.isOpen()) {
            return;
        }
 
        try {
            if (message.length() <= MAX_MESSAGE_SIZE) {
                session.getBasicRemote().sendText(message);
                return;
            }
 
            // 分块发送
            int chunks = (int) Math.ceil((double) message.length() / MAX_MESSAGE_SIZE);
            for (int i = 0; i < chunks; i++) {
                int start = i * MAX_MESSAGE_SIZE;
                int end = Math.min(start + MAX_MESSAGE_SIZE, message.length());
                String chunk = message.substring(start, end) + (i == chunks - 1 ? "<END>" : "");
                session.getBasicRemote().sendText(chunk);
            }
        } catch (IOException e) {
            log.error("消息发送失败: webSocketName={}", webSocketName, e);
        }
    }
 
 
    public static Set<String> getOnlineUsers() {
        return SESSIONMAP.keySet();
    }
 
    public List<String> getMessages() {
        return Collections.unmodifiableList(messages);
    }
 
    public void clearMessages() {
        messages.clear();
    }
}