netty-调整调用链路,TCP粘包方案替换

This commit is contained in:
limqhz
2020-07-06 18:54:43 +08:00
parent 4e0eeb86ff
commit 0d523a89e9
6 changed files with 50 additions and 107 deletions

View File

@@ -24,10 +24,6 @@ public class Constant {
*/ */
public static AttributeKey<ChannelParam> CHANNEL_PARAM = AttributeKey.newInstance("CHANNEL_PARAM"); public static AttributeKey<ChannelParam> CHANNEL_PARAM = AttributeKey.newInstance("CHANNEL_PARAM");
/** public final static String DELIMITER_WORD = "$_$";
* 每局结束后的保护时间
*/
public static final Integer GAME_OVER_PROTECTION_SECONDS = 5;
} }

View File

@@ -4,6 +4,7 @@ import com.sv.netty.utils.JsonMapper;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -14,15 +15,10 @@ import org.slf4j.LoggerFactory;
* *
* @author ranfi * @author ranfi
*/ */
public class ClientHandler extends ChannelInboundHandlerAdapter { public class ClientHandler extends SimpleChannelInboundHandler<String> {
private static Logger logger = LoggerFactory.getLogger(ClientHandler.class); private static Logger logger = LoggerFactory.getLogger(ClientHandler.class);
private static final String MESSAGE = "Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients.";
public ClientHandler() {
}
/** /**
* 当通道就绪就会触发 * 当通道就绪就会触发
* @param ctx * @param ctx
@@ -50,25 +46,30 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
* @throws Exception * @throws Exception
*/ */
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
super.channelRead(ctx, msg); super.channelRead(ctx, msg);
logger.info("接收服务器响应msg:[" + msg + "]"); logger.info("接收服务器响应msg:[" + msg + "]");
// 安卓写非netty 后台实现
// TODO 安卓获取心跳内容有二维码的唯一识别显示请求小程序的venueId的二维码无需拼接url
// TODO 安卓获取通知加载页面
// TODO 安卓获取通知开门失败消息 (进入一个页面,然后显示倒计时,回到主页(二维码页面))
// TODO 安卓获取通知开门的消息 (无需校验,直接操作开门)
// Message message = JsonMapper.fromJson(msg, Message.class);
// MessageService.getInstance().execute(message);
} }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
throws Exception { if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
super.userEventTriggered(ctx, evt);
IdleStateEvent event = (IdleStateEvent) evt; IdleStateEvent event = (IdleStateEvent) evt;
// 如果IoSession闲置则关闭连接 if (event.state() == IdleState.ALL_IDLE) {
if (event.state() == IdleState.READER_IDLE) { ctx.writeAndFlush("getHbMessage()");
String json = JsonMapper.nonDefaultMapper().toJson(null); }
ctx.writeAndFlush(json);
} }
} }
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush(); ctx.flush();
} }

View File

@@ -1,43 +0,0 @@
package com.sv.netty.netty;
import com.sv.netty.utils.EncodeUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* TCP消息解码器
*
* @Author peakren
* @Date 08/05/2017 10:34 PM
*/
public class MessageDecoder extends ByteToMessageDecoder {
private final static int MESSAGE_LENGTH = 4;
private final static int MESSAGE_SEQNO = 4;
private final static int MESSAGE_HEAD = 8;
private final static int MESSAGE_MAX_LENGTH = 12;
private final static int MAGIC_WORD = 0x9DDD; //码头
private static Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.capacity() >= MESSAGE_MAX_LENGTH) {
int magicWord = in.readInt();
if (magicWord == MAGIC_WORD) {
int length = in.readInt();
byte[] msg = new byte[length];
in.readBytes(msg);
String message = new String(msg, "utf-8");
out.add(message);
}
}
}
}

View File

@@ -1,5 +1,6 @@
package com.sv.netty.netty; package com.sv.netty.netty;
import com.sv.netty.config.Constant;
import com.sv.netty.netty.message.BaseDto; import com.sv.netty.netty.message.BaseDto;
import com.sv.netty.utils.JsonUtils; import com.sv.netty.utils.JsonUtils;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@@ -17,24 +18,18 @@ import java.nio.charset.Charset;
* @Author peakren * @Author peakren
* @Date 07/05/2017 10:43 PM * @Date 07/05/2017 10:43 PM
*/ */
public class MessageEncoder extends MessageToByteEncoder<BaseDto> { public class MessageEncoder extends MessageToByteEncoder<String> {
/**
* TODO 客户端没用就删了
*/
// private final static String mSeqno = "doll";
private final static String DELIMITER_WORD = "$_$";
private static Logger logger = LoggerFactory.getLogger(MessageEncoder.class); private static Logger logger = LoggerFactory.getLogger(MessageEncoder.class);
Charset charset = Charset.forName("UTF-8"); Charset charset = Charset.forName("UTF-8");
@Override @Override
protected void encode(ChannelHandlerContext ctx, BaseDto msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
String message = JsonUtils.encode(msg); // String message = JsonUtils.encode(msg);
logger.info("send message content:" + message); logger.info("send message content:" + msg);
message = message + DELIMITER_WORD; msg = msg + Constant.DELIMITER_WORD;
byte[] content = message.getBytes(charset.name()); byte[] content = msg.getBytes(charset.name());
out.writeBytes(content); //发送消息内容 out.writeBytes(content); //发送消息内容
} }
} }

View File

@@ -2,12 +2,9 @@ package com.sv.netty.netty;
import com.sv.netty.config.Constant; import com.sv.netty.config.Constant;
import com.sv.netty.config.SpringContextHolder; import com.sv.netty.config.SpringContextHolder;
import com.sv.netty.netty.message.BaseDto;
import com.sv.netty.netty.message.ChannelParam; import com.sv.netty.netty.message.ChannelParam;
import com.sv.netty.service.MessageService; import com.sv.netty.service.MessageService;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -20,7 +17,7 @@ import java.net.InetSocketAddress;
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<BaseDto> { public class ServerHandler extends SimpleChannelInboundHandler<String> {
private static Logger logger = LoggerFactory.getLogger(ServerHandler.class); private static Logger logger = LoggerFactory.getLogger(ServerHandler.class);
@@ -41,11 +38,11 @@ public class ServerHandler extends SimpleChannelInboundHandler<BaseDto> {
} }
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, BaseDto msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
super.channelRead(ctx, msg); super.channelRead(ctx, msg);
String clientIp = ctx.channel().attr(Constant.CHANNEL_PARAM).get().getClientIp(); String clientIp = ctx.channel().attr(Constant.CHANNEL_PARAM).get().getClientIp();
try { try {
messageService.receive(ctx.channel(), msg.toString()); messageService.receive(ctx.channel(), msg);
} catch (Exception e) { } catch (Exception e) {
logger.error("[" + clientIp + "] host unknown error"); logger.error("[" + clientIp + "] host unknown error");
} }
@@ -101,24 +98,20 @@ public class ServerHandler extends SimpleChannelInboundHandler<BaseDto> {
} }
/** /**
* Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward * IdleStateHandler 如果几秒之后没有读操作,那么就会触发这个方法
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
* <p/>
* Sub-classes may override this method to change behavior.
*
* @param ctx
* @param evt
*/ */
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
IdleStateEvent event = (IdleStateEvent) evt; // Do nothing 客户端这个方法用来发心跳,
// 如果Channel读取数据闲置则关闭连接 // 可以考虑再ctx 获取客户端的上下文信息,将该客户移除我们的操作 TODO 值得尝试主要是要区分客户端
if (event.state() == IdleState.READER_IDLE) { // IdleStateEvent event = (IdleStateEvent) evt;
String clientIP = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(); // // 如果Channel读取数据闲置则关闭连接
logger.info("Client [" + clientIP + "] has idle"); // if (event.state() == IdleState.READER_IDLE) {
messageService.destory(ctx.channel()); // String clientIP = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
ctx.channel().close(); // logger.info("Client [" + clientIP + "] has idle");
} // messageService.destory(ctx.channel());
// ctx.channel().close();
// }
} }
} }

View File

@@ -1,13 +1,14 @@
package com.sv.netty.netty; package com.sv.netty.netty;
import com.sv.netty.config.Constant;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
@@ -19,20 +20,20 @@ import org.springframework.stereotype.Component;
@Component @Component
public class ServerProtocolInitializer extends ChannelInitializer<SocketChannel> { public class ServerProtocolInitializer extends ChannelInitializer<SocketChannel> {
private static Logger logger = LoggerFactory.getLogger(ServerProtocolInitializer.class);
private final int IDLE_TIME = 30; //连接检测空闲时间 private final int IDLE_TIME = 30; //连接检测空闲时间
private final int READ_TIME = 30; //读超时时间 private final int READ_TIME = 30; //读超时时间
private final int WRITE_TIME = 30; //写超时时间 private final int WRITE_TIME = 30; //写超时时间
private final int READ_TIMEOUT = 600; //读超时时间 private final int READ_TIMEOUT = 60; //读超时时间
@Override @Override
protected void initChannel(SocketChannel ch){ protected void initChannel(SocketChannel ch){
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
logger.info("ServerProtocolInitializer"); // 超时设置
// 通过指定的长度来标识整包的信息,这样就可以自动的处理粘包和半包的问题
pipeline.addFirst(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 4, 4, 0, 0));
pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT)); pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT));
pipeline.addLast(new MessageDecoder()); // 通过指定的长度来标识整包的信息,这样就可以自动的处理粘包和半包的问题
pipeline.addLast(new DelimiterBasedFrameDecoder(2048,
Unpooled.wrappedBuffer(Constant.DELIMITER_WORD.getBytes())));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new MessageEncoder()); pipeline.addLast(new MessageEncoder());
// 心跳检测机制通过调用触发下一个handler userEventTriggered 方法 // 心跳检测机制通过调用触发下一个handler userEventTriggered 方法
pipeline.addLast(new IdleStateHandler(READ_TIME, WRITE_TIME,IDLE_TIME)); pipeline.addLast(new IdleStateHandler(READ_TIME, WRITE_TIME,IDLE_TIME));