package com.northglass.listener;
|
|
import java.io.DataInputStream;
|
import java.io.DataOutputStream;
|
import java.io.IOException;
|
import java.net.ServerSocket;
|
import java.net.Socket;
|
import java.text.SimpleDateFormat;
|
import java.util.Date;
|
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.slf4j.MDC;
|
import org.springframework.context.ApplicationContext;
|
|
import com.northglass.constants.StateConstants.ConnectState;
|
import com.northglass.entity.Shelf;
|
import com.northglass.service.shelf.ShelfMessageService;
|
import com.northglass.service.shelf.ShelfService;
|
import com.northglass.util.HexUtil;
|
|
|
public class ShelfServerListener extends AbstractServerListener implements Runnable {
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(ShelfServerListener.class);
|
|
private Shelf shelf;
|
private ShelfService service;
|
private ShelfMessageService messageService;
|
|
private StringBuffer MESSAGE_BUFFER = new StringBuffer();
|
|
public ShelfServerListener() {
|
ApplicationContext context = initializeSpringContext();
|
service = context.getBean(ShelfService.class);
|
messageService = context.getBean(ShelfMessageService.class);
|
this.shelf = service.getDefaultShelf();
|
|
MDC.put("logFileName", "Shelf_" + new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date()));
|
}
|
|
/**
|
* 启动服务器监听连接,等待客户端连接
|
*
|
* @throws IOException
|
*/
|
private void connect(ServerSocket serverSocket) throws IOException {
|
if (serverSocket.isClosed()) {
|
return;
|
}
|
|
Socket socket = serverSocket.accept();
|
LOGGER.debug("客户端已连接,客户端IP:" + socket.getInetAddress());
|
|
service.setConnectState(ConnectState.CONNECTED);
|
ShelfConnection.setSocket(socket);
|
//添加连接时间
|
service.getendtime("shelf"+shelf.getNumber());
|
}
|
|
/**
|
* 服务器监听消息循环,先从客户端接收消息,再向客户端发送消息
|
*
|
* @throws IOException
|
*/
|
private void listen() throws IOException {
|
Socket socket = ShelfConnection.getSocket();
|
socket.setSoTimeout(10000); // 10000ms = 10s
|
while (true) {
|
DataInputStream in = new DataInputStream(socket.getInputStream());
|
byte b = in.readByte();
|
MESSAGE_BUFFER.append(HexUtil.byteToHexString(b));
|
if (!existMessage()) {
|
continue;
|
}
|
String message = getNextMessage();
|
LOGGER.debug("仓储接收消息为:"+message);
|
// machineService.saveMessage(MessageType.CLIENT, message, socket.getInetAddress().getHostAddress(), socket.getLocalPort());
|
String returnMessage = service.generateReturnMessage(message);
|
|
if (returnMessage != null && !"".equals(returnMessage) && !returnMessage.equals("connect")) {
|
// machineService.saveMessage(MessageType.SERVER, message, socket.getInetAddress().getHostAddress(), socket.getLocalPort());
|
|
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
|
out.write(HexUtil.stringToInt(returnMessage));
|
out.flush();
|
}
|
if (returnMessage.equals("connect")) {
|
in.close();
|
socket.close();
|
ServerSocket serverSocket = null;
|
try {
|
serverSocket = new ServerSocket(shelf.getPort());
|
LOGGER.debug("仓储模块【" + shelf.getNumber() + "】启动监听,等待客户端连接,端口:" + shelf.getPort());
|
ShelfConnection.setServerSocket(serverSocket);
|
|
connect(serverSocket);
|
|
// 一直循环,若连接断开,则在Exception处理环节重新连接,并继续监听消息
|
while (!Thread.currentThread().isInterrupted()) {
|
try {
|
listen();
|
}
|
catch (IOException se) {
|
service.getright("shelf");
|
LOGGER.debug("仓储模块连接已断开!");
|
se.printStackTrace();
|
|
LOGGER.debug("仓储模块等待重新连接……");
|
service.setConnectState(ConnectState.CONNECTING);
|
connect(serverSocket);
|
}
|
}
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
|
// DataInputStream in = new DataInputStream(socket.getInputStream());
|
// bufSize = in.read(msg);
|
// String message = HexUtil.byteToHexString(bufSize, msg);
|
//
|
// String returnMessage = service.generateReturnMessage(message);
|
//
|
// // 客户端会一直向服务器发数据
|
// if(returnMessage!=null){
|
// DataOutputStream out = new DataOutputStream(socket.getOutputStream());
|
// out.write(HexUtil.stringToInt(returnMessage));
|
// out.flush();
|
// }
|
}
|
}
|
|
private boolean existMessage() {
|
// 若缓冲区长度小于消息最小长度,则缓冲区中的消息无效。
|
if (MESSAGE_BUFFER.length() < 84) {
|
return false;
|
}
|
|
String head = MESSAGE_BUFFER.substring(0, 10);
|
LOGGER.trace("head: " + head);
|
|
// 若缓冲区不以HEAD为起始,则删掉字符;
|
// 一个字符一个字符删是避免一次删多导致消息损坏。
|
if (!head.equalsIgnoreCase(HEAD)) {
|
MESSAGE_BUFFER.delete(0, 2);
|
return false;
|
}
|
|
// 获取消息长度
|
String messageLengthHex = MESSAGE_BUFFER.substring(MESSAGE_LENGTH_START, MESSAGE_LENGTH_END + 1);
|
int messageLength = HexUtil.hexToInt(messageLengthHex);
|
LOGGER.trace("messageLength: " + messageLength);
|
|
if (MESSAGE_BUFFER.length() < DATA_START + messageLength*2) {
|
return false;
|
}
|
|
String dataAndEnd = MESSAGE_BUFFER.substring(DATA_START, DATA_START + messageLength*2);
|
String end = dataAndEnd.substring(dataAndEnd.length() - 10, dataAndEnd.length());
|
LOGGER.trace("end: " + end);
|
|
if (head.equals(HEAD) && end.equals(END)) {
|
return true;
|
}
|
|
return false;
|
}
|
|
private String getNextMessage() {
|
// 获取消息长度
|
String messageLengthHex = MESSAGE_BUFFER.substring(MESSAGE_LENGTH_START, MESSAGE_LENGTH_END + 1);
|
int messageLength = HexUtil.hexToInt(messageLengthHex);
|
LOGGER.trace("messageLength: " + messageLength);
|
|
String message = MESSAGE_BUFFER.substring(0, DATA_START + messageLength*2);
|
MESSAGE_BUFFER.delete(0, DATA_START + messageLength*2);
|
|
// int messageEnd = MESSAGE_BUFFER.indexOf("3c454f463e");
|
// String message = "";
|
//
|
// if (messageEnd >= 0) {
|
// message = MESSAGE_BUFFER.substring(0, messageEnd + 10);
|
// MESSAGE_BUFFER = MESSAGE_BUFFER.delete(0, messageEnd + 10);
|
// }
|
|
return message;
|
}
|
|
@Override
|
public void run() {
|
ServerSocket serverSocket = null;
|
|
try {
|
serverSocket = new ServerSocket(shelf.getPort());
|
LOGGER.debug("仓储模块【" + shelf.getNumber() + "】启动监听,等待客户端连接,端口:" + shelf.getPort());
|
ShelfConnection.setServerSocket(serverSocket);
|
|
connect(serverSocket);
|
|
// 一直循环,若连接断开,则在Exception处理环节重新连接,并继续监听消息
|
while (!Thread.currentThread().isInterrupted()) {
|
try {
|
listen();
|
}
|
catch (IOException se) {
|
service.getright("shelf");
|
LOGGER.debug("仓储模块连接已断开!");
|
se.printStackTrace();
|
|
LOGGER.debug("仓储模块等待重新连接……");
|
service.setConnectState(ConnectState.CONNECTING);
|
connect(serverSocket);
|
}
|
}
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
}
|