zhoushihao
2025-09-04 2a3c00cc5b07d1bb96463f7ed1703461e95de6ea
hangzhoumesParent/common/servicebase/src/main/java/com/mes/tools/WebSocketServer.java
@@ -10,161 +10,131 @@
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
@ServerEndpoint(value = "/api/talk/{username}")
@Component("webSocketServer")
@ServerEndpoint(value = "/api/talk/{webSocketName}")
@Component
public class WebSocketServer {
    public static ConfigurableApplicationContext applicationContext;
    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
    private List<String> messages;
    /**
     * 记录当前在线连接数
     */
    public static final Map<String, ArrayList<WebSocketServer>> sessionMap = new ConcurrentHashMap<>();
    private static final int MAX_MESSAGE_SIZE = 50000; // 单次消息分块阈值
    private static final Semaphore semaphore = new Semaphore(100); // 流量控制
    public String username;
    public Session session;
    // 按用户名分组存储Session(线程安全)
    private static final Map<String, List<WebSocketServer>> sessionMap = new ConcurrentHashMap<>();
    public WebSocketServer() {
        this.messages = new ArrayList<>();
    }
    // 当前连接的用户名和Session
    private String webSocketName;
    private Session session;
    private final List<String> messages = new CopyOnWriteArrayList<>(); // 线程安全消息记录
    /**
     * 连接建立成功调用的方法
     * 连接建立
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("username") String username) {
        this.username = username;
    public void onOpen(Session session, @PathParam("webSocketName") String webSocketName) {
        this.webSocketName = webSocketName;
        this.session = session;
        List<WebSocketServer> webSocketServers = sessionMap.get(username);
        if (webSocketServers == null) {
            ArrayList<WebSocketServer> arrayListwebserver = new ArrayList<WebSocketServer>();
            arrayListwebserver.add(this);
            sessionMap.put(username, arrayListwebserver);
        } else {
            webSocketServers.add(this);
        }
        log.info("有新用户加入,username={}, 当前在线人数为:{}", username, sessionMap.get(username).size());
        sessionMap.computeIfAbsent(webSocketName, k -> new CopyOnWriteArrayList<>())
                .add(this);
        // JSONObject result = new JSONObject();
        // JSONArray array = new JSONArray();
        // result.set("users", array);
        // for (Object key : sessionMap.keySet()) {
        // JSONObject jsonObject = new JSONObject();
        // jsonObject.set("username", key);
        // array.add(jsonObject);
        // }
        // sendAllMessage(JSONUtil.toJsonStr(result)); // 后台发送消息给所有的客户端
        log.info("用户连接: webSocketName={}, 当前会话数: {}", webSocketName,
                sessionMap.getOrDefault(webSocketName, Collections.emptyList()).size());
    }
    /**
     * 连接关闭调用的方法
     * 连接关闭
     */
    @OnClose
    public void onClose(Session session, @PathParam("username") String username) {
        List<WebSocketServer> webSocketServers = sessionMap.get(username);
        ArrayList<WebSocketServer> arrayListwebserver = new ArrayList<WebSocketServer>();
        if (webSocketServers.size() > 1) {
            for (WebSocketServer webSocketServer : webSocketServers) {
                if (webSocketServer != this) {
                    arrayListwebserver.add(webSocketServer);
                }
    public void onClose() {
        List<WebSocketServer> sessions = sessionMap.get(webSocketName);
        if (sessions != null) {
            sessions.remove(this);
            if (sessions.isEmpty()) {
                sessionMap.remove(webSocketName);
            }
            sessionMap.put(username, arrayListwebserver);
            log.info("移除username={}一名用户session, {}的当前在线人数为:{}", username, username, sessionMap.get(username).size());
        } else {
            sessionMap.remove(username);
            log.info("移除username={}一名用户session, {}连接关闭, 当前连接数为:{}", username, username, sessionMap.size());
            log.info("用户断开: webSocketName={}, 剩余会话数: {}", webSocketName,
                    sessionMap.getOrDefault(webSocketName, Collections.emptyList()).size());
        }
    }
    /**
     * 收到客户端消息后调用的方法
     * 后台收到客户端发送过来的消息
     * onMessage 是一个消息的中转站
     * 接受 浏览器端 socket.send 发送过来的 json数据
     *
     * @param message 客户端发送过来的消息
     * 接收消息
     */
    @OnMessage
    public void onMessage(String message, Session session, @PathParam("username") String username) {
        log.info("服务端收到用户username={}的消息:{}", username, message);
    public void onMessage(String message) {
        log.info("收到消息: webSocketName={}, content={}", webSocketName, message);
        JSONObject obj = JSONUtil.parseObj(message);
        String text = obj.getStr("data");
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message", text);
        this.messages.add(text);
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
        messages.add(obj.getStr("data")); // 存储消息历史
    }
    /**
     * 服务端发送消息给客户端
     * 错误处理
     */
    public void sendMessage(String message) {
        try {
            // log.info("服务端给客户端[{}]发送消息{}", this.session.getId(), message);
            if(this.session.isOpen()){
                int maxChunkSize = 50000; // 定义最大的分块大小
                int length = message.length();
                int chunks = (int) Math.ceil((double) length / maxChunkSize);
                //分块发送消息
                for (int i = 0; i < chunks; i++) {
                    int startIndex = i * maxChunkSize;
                    int endIndex = Math.min(startIndex + maxChunkSize, length);
                    String chunk = message.substring(startIndex, endIndex);
    @OnError
    public void onError(Throwable error) {
        log.error("WebSocket错误: webSocketName={}", webSocketName, error);
    }
                    // 判断是否是最后一块消息
                    boolean isLastChunk = (i == chunks - 1);
                    if(isLastChunk==true){
                        chunk+="<END>";
                    }
                    // 发送分块消息,并传递是否是最后一块消息的标识
                    this.session.getBasicRemote().sendText(chunk);
                }
    /**
     * 向当前用户的所有会话发送消息
     */
    public void sendToWeb(String webSocketName, String message) {
        List<WebSocketServer> sessions = sessionMap.get(webSocketName);
        if (sessions == null) return;
        sessions.forEach(ws -> {
            try {
                semaphore.acquire();
                ws.sendChunkedMessage(message);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                log.error("推送失败: webSocketName={}", webSocketName, e);
            } finally {
                semaphore.release();
            }
        } catch (Exception e) {
            log.error("发送消息给客户端失败:{}", e.getMessage(), e);
        });
    }
    /**
     * 分块发送大消息
     */
    private void sendChunkedMessage(String message) {
        if (!session.isOpen()) return;
        try {
            if (message.length() <= MAX_MESSAGE_SIZE) {
                session.getBasicRemote().sendText(message);
                return;
            }
            // 分块发送
            int chunks = (int) Math.ceil((double) message.length() / MAX_MESSAGE_SIZE);
            for (int i = 0; i < chunks; i++) {
                int start = i * MAX_MESSAGE_SIZE;
                int end = Math.min(start + MAX_MESSAGE_SIZE, message.length());
                String chunk = message.substring(start, end) + (i == chunks - 1 ? "<END>" : "");
                session.getBasicRemote().sendText(chunk);
            }
        } catch (IOException e) {
            log.error("消息发送失败: webSocketName={}", webSocketName, e);
        }
    }
    // /**
    //  * 服务端发送消息给所有客户端
    //  */
    // public void sendAllMessage(String message) {
    //     try {
    //         for (WebSocketServer webSocketServer : sessionMap.values()) {
    //             // log.info("服务端给客户端[{}]发送消息{}", this.session.getId(), message);
    //             webSocketServer.sendMessage(message);
    //         }
    //     } catch (Exception e) {
    //         log.error("服务端发送消息给客户端失败", e);
    //     }
    // }
    // --- 工具方法 ---
    public static Set<String> getOnlineUsers() {
        return sessionMap.keySet();
    }
    public List<String> getMessages() {
        return messages;
        return Collections.unmodifiableList(messages);
    }
    public void clearMessages() {
        messages.clear();
    }
}
}