maven配置
<!-- https://mvnrepository.com/artifact/org.java-websocket/Java-WebSocket -->
<dependency><groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.3.0</version>
</dependency>
案例
websocket生命周期为NOT_YET_CONNECTED ---》 CONNECTING ---》 OPEN ---》 CLOSING ---》 CLOSED
package server;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.java_websocket.WebSocket.READYSTATE;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import net.sf.json.JSONObject;
import RedisClient;
/**
* 基本业务
*/
public class BasicInfoConnector {
private static WebSocketClient wsc = null;
private static final BlockingQueue<String> messageReceiveQueue = new ArrayBlockingQueue<String>(5000);
private static final BlockingQueue<String> messageSendQueue = new ArrayBlockingQueue<String>(2000);
private static Object lock = new Object();
private static BasicInfoConnector wsClientCore = getInstance();
private static ReceiveProcessor messageFromProcessor = null;
private boolean sendThreadSwitch = true;
private boolean receiveThreadSwitch = true;
private boolean heartBeatThreadSwitch = true;
private Thread sendThread = null;
private Thread receiveThread = null;
private Thread heartBeatThread = null;
private static long lastCommunicateTime = System.currentTimeMillis();
private static ExecutorService sendChatInfoProcessor = null;
private RedisClient redisClient = null;
private static Long channelId;
private static final String keepalive = JSONObject.fromObject(new KeepAlivePO()).toString();
public static void stop(boolean sendThreadSwitch,boolean receiveThreadSwitch,boolean heartBeatThreadSwitch){
wsClientCore.heartBeatThreadSwitch = heartBeatThreadSwitch;
wsClientCore.sendThreadSwitch = sendThreadSwitch;
wsClientCore.receiveThreadSwitch = receiveThreadSwitch;
}
public static void setProcessor(ReceiveProcessor messageFromProcessor,RedisClient redisClient,long channelId){
BasicInfoConnector.messageFromProcessor = messageFromProcessor;
wsClientCore.redisClient = redisClient;
BasicInfoConnector.channelId = channelId;
}
public final static void sendMessage(String sendMsg) throws Exception{
boolean result = messageSendQueue.offer(sendMsg);
if(!result){
throw new Exception("the message sending queue if full,cant send the message:"+sendMsg);
}
}
public final static String getMessage() throws InterruptedException{
return messageReceiveQueue.take();
}
private BasicInfoConnector(){
}
public static final BasicInfoConnector getInstance(){
if(wsClientCore == null){
synchronized (lock) {
if(wsClientCore == null){
wsClientCore = new BasicInfoConnector();
}
}
}
return wsClientCore;
}
/**
* 初始化
*/
static {
try {
final String serverAddr = "ws://localhost:9001/ws";
wsc = initClient(serverAddr);
wsc.connect();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
private static WebSocketClient initClient(final String serverAddr) throws URISyntaxException {
return new WebSocketClient(new URI(serverAddr)) {
@Override
public void onOpen(ServerHandshake handshakedata) {//连接建立
lastCommunicateTime = System.currentTimeMillis();
wsClientCore.componentThreadInit();//发送、接收、心跳等组件的线程服务初始化
System.out.println("client has connected to the server:"+serverAddr);
}
@Override
public void onMessage(String message) {//接收到消息
lastCommunicateTime = System.currentTimeMillis();
boolean result = messageReceiveQueue.offer(message);
if(!result){
System.out.println("the message receiving queue is full,message will be discarded:"+message);
}
}
@Override
public void onError(Exception ex) {//连接发生错误
System.out.println("error during connecting server!:"+ex.getMessage());
}
@SuppressWarnings("static-access")
@Override
public void onClose(int code,String reason,boolean remote) {//连接关闭
System.out.println("Client has been disconnected to the server. to reconnect!");
System.out.println("The current stat of the websocket client:" + wsc.getReadyState());
if(wsc.getReadyState() == READYSTATE.OPEN){
wsc.close();
System.out.println("Close the current client,ready to reconnect!");
}
if(wsc.getReadyState() == READYSTATE.NOT_YET_CONNECTED || wsc.getReadyState() == READYSTATE.CLOSED || wsc.getReadyState() == READYSTATE.CLOSING){
try {
Thread.currentThread().sleep(500);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
wsc = initClient(serverAddr);
} catch (URISyntaxException e) {
e.printStackTrace();
}
wsc.connect();
}
}
};
}
/**
* 队列中获取线程并websocket发送
*/
private class SendThread implements Runnable{
@Override
public void run() {
while(sendThreadSwitch){
try {
String msg = messageSendQueue.take();
wsc.send(msg);
lastCommunicateTime = System.currentTimeMillis();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 从websocket中获取第三方消息,并传递给处理器
*/
private class ReceiveThread implements Runnable{
@Override
public void run() {
try {
while(receiveThreadSwitch){
String msg = getMessage();
System.out.println("获取信息:"+msg);
try {
messageFromProcessor.processBusiness(msg,BasicInfoConnector.channelId);
} catch (Exception e) {
System.out.println("信息处理错误:"+msg);
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 心跳检测
*/
private class Heartbeat implements Runnable{
@Override
public void run() {
try {
Thread.sleep(500);
while(heartBeatThreadSwitch){
if(((System.currentTimeMillis() - lastCommunicateTime) / 1000) > 30){
System.out.println("ready to heart beat,the current client stat:"+wsc.getReadyState());
if(wsc.getReadyState() == READYSTATE.OPEN){
System.out.println("send heart beat to server......");
wsc.send(keepalive);
lastCommunicateTime = System.currentTimeMillis();
}else{
System.out.println("is not connecting,reset heart beat time");
lastCommunicateTime = System.currentTimeMillis();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 组件初始化
*/
private void componentThreadInit(){
if(wsClientCore.receiveThread == null || !wsClientCore.receiveThread.isAlive()){//从队列中接收,并交给业务程序去处理
wsClientCore.receiveThread = new Thread(wsClientCore.new ReceiveThread());
wsClientCore.receiveThread.start();
System.out.println("基本信息接收线程启动...");
}
if(BasicInfoConnector.sendChatInfoProcessor == null){//来自信息到业务结果队列
BasicInfoConnector.sendChatInfoProcessor = Executors.newFixedThreadPool(Constants.SEND_PROCESSOR);
for(int i = 0 ; i < Constants.SEND_PROCESSOR ; i ++){
BasicInfoConnector.sendChatInfoProcessor.execute(new SendBasicThread(redisClient,BasicInfoConnector.channelId));
}
System.out.println("业务线程启动...");
}
if(wsClientCore.sendThread == null || !wsClientCore.sendThread.isAlive()){//只负责发送,从业务结果或请求队列发送到XXX
wsClientCore.sendThread = new Thread(wsClientCore.new SendThread());
wsClientCore.sendThread.start();
System.out.println("发送线程启动...");
}
if(wsClientCore.heartBeatThread == null || !wsClientCore.heartBeatThread.isAlive()){//心跳线程
wsClientCore.heartBeatThread= new Thread(wsClientCore.new Heartbeat());
wsClientCore.heartBeatThread.start();
System.out.println("心跳检测线程启动...");
}
System.out.println("启动线程初始化完毕.......");
}
}
我的技术交流群425783133
原文链接:/javaschema/283243.html