修改netty编解码器逻辑
This commit is contained in:
@@ -17,7 +17,7 @@ import java.nio.charset.Charset;
|
|||||||
* @Author peakren
|
* @Author peakren
|
||||||
* @Date 07/05/2017 10:43 PM
|
* @Date 07/05/2017 10:43 PM
|
||||||
*/
|
*/
|
||||||
public class MessageEncoder extends MessageToByteEncoder<Object> {
|
public class MessageEncoder extends MessageToByteEncoder<BaseDto> {
|
||||||
|
|
||||||
private final static int MESSAGE_LENGTH = 4;
|
private final static int MESSAGE_LENGTH = 4;
|
||||||
private final static int MESSAGE_SEQNO = 8;
|
private final static int MESSAGE_SEQNO = 8;
|
||||||
@@ -30,7 +30,7 @@ public class MessageEncoder extends MessageToByteEncoder<Object> {
|
|||||||
Charset charset = Charset.forName("UTF-8");
|
Charset charset = Charset.forName("UTF-8");
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
|
protected void encode(ChannelHandlerContext ctx, BaseDto msg, ByteBuf out) throws Exception {
|
||||||
String message = JsonUtils.encode(msg);
|
String message = JsonUtils.encode(msg);
|
||||||
logger.info("send message content:" + message);
|
logger.info("send message content:" + message);
|
||||||
byte[] bodys = message.getBytes(charset.name());
|
byte[] bodys = message.getBytes(charset.name());
|
||||||
@@ -40,27 +40,4 @@ public class MessageEncoder extends MessageToByteEncoder<Object> {
|
|||||||
out.writeInt(len); //发送head字节码
|
out.writeInt(len); //发送head字节码
|
||||||
out.writeBytes(bodys); //发送消息内容
|
out.writeBytes(bodys); //发送消息内容
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] getIntBytes(int crc) {
|
|
||||||
byte[] targets = new byte[4];
|
|
||||||
targets[0] = (byte) (crc & 0xff);
|
|
||||||
targets[1] = (byte) ((crc >> 8) & 0xff);
|
|
||||||
targets[2] = (byte) ((crc >> 16) & 0xff);
|
|
||||||
targets[3] = (byte) (crc >> 24);
|
|
||||||
return targets;
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[] getLongBytes(long crc) {
|
|
||||||
byte[] targets = new byte[8];
|
|
||||||
targets[0] = (byte) (crc & 0xff);
|
|
||||||
targets[1] = (byte) ((crc >> 8) & 0xff);
|
|
||||||
targets[2] = (byte) ((crc >> 16) & 0xff);
|
|
||||||
targets[3] = (byte) (crc >> 24 & 0xff);
|
|
||||||
targets[4] = (byte) (crc >> 32 & 0xff);
|
|
||||||
targets[5] = (byte) (crc >> 40 & 0xff);
|
|
||||||
targets[6] = (byte) (crc >> 48 & 0xff);
|
|
||||||
targets[7] = (byte) (crc >> 56);
|
|
||||||
return targets;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ import java.net.InetSocketAddress;
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
@ChannelHandler.Sharable
|
@ChannelHandler.Sharable
|
||||||
public class ServerHandler extends ChannelInboundHandlerAdapter {
|
public class ServerHandler extends SimpleChannelInboundHandler<BaseDto> {
|
||||||
|
|
||||||
private static Logger logger = LoggerFactory.getLogger(ServerHandler.class);
|
private static Logger logger = LoggerFactory.getLogger(ServerHandler.class);
|
||||||
|
|
||||||
@@ -46,11 +46,12 @@ public class ServerHandler extends ChannelInboundHandlerAdapter {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, BaseDto msg) throws Exception {
|
||||||
super.channelRead(ctx, msg);
|
super.channelRead(ctx, msg);
|
||||||
//taskQueue
|
|
||||||
// ctx.channel().eventLoop().execute()
|
|
||||||
//scheduleTaskQueue
|
|
||||||
// ctx.channel().eventLoop().schedule()
|
|
||||||
String clientIp = ctx.channel().attr(Constant.CHANNEL_PARAM).get().getClientIp();
|
String clientIp = ctx.channel().attr(Constant.CHANNEL_PARAM).get().getClientIp();
|
||||||
try {
|
try {
|
||||||
messageService.receive(ctx.channel(), msg.toString());
|
messageService.receive(ctx.channel(), msg.toString());
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ public class ServerProtocolInitializer extends ChannelInitializer<SocketChannel>
|
|||||||
ChannelPipeline pipeline = ch.pipeline();
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
logger.info("ServerProtocolInitializer");
|
logger.info("ServerProtocolInitializer");
|
||||||
// 通过指定的长度来标识整包的信息,这样就可以自动的处理粘包和半包的问题
|
// 通过指定的长度来标识整包的信息,这样就可以自动的处理粘包和半包的问题
|
||||||
pipeline.addFirst(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 4, 4, 0, 0));
|
// pipeline.addFirst(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 4, 4, 0, 0));
|
||||||
pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT));
|
pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT));
|
||||||
pipeline.addLast(new MessageDecoder());
|
pipeline.addLast(new MessageDecoder());
|
||||||
pipeline.addLast(new MessageEncoder());
|
pipeline.addLast(new MessageEncoder());
|
||||||
|
|||||||
Reference in New Issue
Block a user