时间:2023-04-18 20:22:01 | 来源:网站运营
时间:2023-04-18 20:22:01 来源:网站运营
Websocket集群解决方案:最近在项目中在做一个消息推送的功能,比如客户下单之后通知给给对应的客户发送系统通知,这种消息推送需要使用到全双工的websocket推送消息。所谓的全双工表示客户端和服务端都能向对方发送消息。不使用同样是全双工的上一篇文章Spring Boot 整合单机websocket介绍了http是因为http只能由客户端主动发起请求,服务接收后返回消息。websocket建立起连接之后,客户端和服务端都能主动向对方发送消息。
websocket在单机模式下进行消息的发送和接收:用户A和用户B和web服务器建立连接之后,用户A发送一条消息到服务器,服务器再推送给用户B,在单机系统上所有的用户都和同一个服务器建立连接,所有的session都存储在同一个服务器中。session,服务器的保存维持连接的session。客户端每次只能和集群服务器其中的一个服务器连接,后续也是和该服务器进行数据传输。session共享的问题,客户端成功连接服务器之后,其他服务器也知道客户端连接成功。websocket类似的http是如何解决集群问题的?解决方案之一就是共享session,客户端登录服务端之后,将session信息存储在Redis数据库中,连接其他服务器时,从Redis获取session,实际就是将session信息存储在Redis中,实现redis的共享。session可以被共享的前提是可以被序列化,而websocket的session是无法被序列化的,http的session记录的是请求的数据,而websocket的session对应的是连接,连接到不同的服务器,session也不同,无法被序列化。http不使用session共享,就可以使用Nginx负载均衡的ip hash算法,客户端每次都是请求同一个服务器,客户端的session都保存在服务器上,而后续请求都是请求该服务器,都能获取到session,就不存在分布式session问题了。websocket相对http来说,可以由服务端主动推动消息给客户端,如果接收消息的服务端和发送消息消息的服务端不是同一个服务端,发送消息的服务端无法找到接收消息对应的session,即两个session不处于同一个服务端,也就无法推送消息。如下图所示:解决问题的方法是将所有消息的发送方和接收方都处于同一个服务器下,而消息发送方和接收方都是不确定的,显然是无法实现的。
websocket实现消息的推送。<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>@Configurationpublic class WebSocketConfig { //tomcat启动无需该配置 @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}@Component@ServerEndpoint(value = "/message")@Slf4jpublic class WebSocket { private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>(); private Session session; @OnOpen public void onOpen(Session session) throws SocketException { this.session = session; webSocketSet.put(this.session.getId(),this); log.info("【websocket】有新的连接,总数:{}",webSocketSet.size()); } @OnClose public void onClose(){ String id = this.session.getId(); if (id != null){ webSocketSet.remove(id); log.info("【websocket】连接断开:总数:{}",webSocketSet.size()); } } @OnMessage public void onMessage(String message){ if (!message.equals("ping")){ log.info("【wesocket】收到客户端发送的消息,message={}",message); sendMessage(message); } } /** * 发送消息 * @param message * @return */ public void sendMessage(String message){ for (WebSocket webSocket : webSocketSet.values()) { webSocket.session.getAsyncRemote().sendText(message); } log.info("【wesocket】发送消息,message={}", message); }}<div> <input type="text" name="message" id="message"> <button id="sendBtn">发送</button></div><div style="width:100px;height: 500px;" id="content"></div><script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script><script type="text/javascript"> var ws = new WebSocket("ws://127.0.0.1:8080/message"); ws.onopen = function(evt) { console.log("Connection open ..."); }; ws.onmessage = function(evt) { console.log( "Received Message: " + evt.data); var p = $("<p>"+evt.data+"</p>") $("#content").prepend(p); $("#message").val(""); }; ws.onclose = function(evt) { console.log("Connection closed."); }; $("#sendBtn").click(function(){ var aa = $("#message").val(); ws.send(aa); })</script>服务端和客户端中的OnOpen、onclose、onmessage都是一一对应的。ws.onopen调用服务端的@OnOpen注解的方法,储存客户端的session信息,握手建立连接。ws.send发送消息,对应服务端的@OnMessage注解下面的方法接收消息。session.getAsyncRemote().sendText发送消息,对应的客户端ws.onmessage接收消息。@GetMapping({"","index.html"})public ModelAndView index() { ModelAndView view = new ModelAndView("index"); return view;}RabbitMQ作为消息中间件,而RabbitMQ支持发布订阅模式:@Configurationpublic class RabbitConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE"); } @Bean public Queue psQueue() throws SocketException { // ip + 端口 为队列名 String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort(); return new Queue("ps_" + ip); } @Bean public Binding routingFirstBinding() throws SocketException { return BindingBuilder.bind(psQueue()).to(fanoutExchange()); }}获取服务器IP和端口可以具体查看Github源码,这里就不做详细描述了。
WebSocket添加消息的接收方法,@RabbitListener 接收消息,队列名称使用常量命名,动态队列名称使用 #{name},其中的name是Queue的bean 名称:@RabbitListener(queues= "#{psQueue.name}")public void pubsubQueueFirst(String message) { System.out.println(message); sendMessage(message);}然后再调用sendMessage方法发送给所在连接的客户端。WebSocket类的onMessage方法将消息发送改成RabbitMQ方式发送:@OnMessagepublic void onMessage(String message){ if (!message.equals("ping")){ log.info("【wesocket】收到客户端发送的消息,message={}",message); //sendMessage(message); if (rabbitTemplate == null) { rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate"); } rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message); }}消息通知流程如下所示:Edit Configurations:server.port=8081:8080和8081。在启动8081端口的服务,将前端连接端口改成8081:var ws = new WebSocket("ws://127.0.0.1:8081/message");关键词:方案,解决