netty 本地模拟客户端

This commit is contained in:
limqhz
2020-06-09 23:21:23 +08:00
parent 96584aa44d
commit 644c0063e7
21 changed files with 1726 additions and 17 deletions

View File

@@ -3,6 +3,7 @@ package com.sv.netty.netty;
import com.google.common.collect.Maps;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -65,13 +66,13 @@ public class BootService {
private Map<ChannelOption, Object> channelOptions;
@Autowired
private ServerProtocolInitalizer serverProtocolInitalizer;
private final ServerProtocolInitializer serverProtocolInitializer;
/**
* 初始化netty启动配置
*/
public void init() {
// bossGroup 只处理连接请求真正的客户端业务处理是由workerGroup 处理的 都是无效循环的
workerGroup = new NioEventLoopGroup();
bossGroup = new NioEventLoopGroup();
channelOptions = Maps.newHashMap();
@@ -81,16 +82,17 @@ public class BootService {
// channelOptions.put(ChannelOption.TCP_NODELAY,TCP_NODELAY);
channelOptions.put(ChannelOption.SO_REUSEADDR,reuseaddr);
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(serverProtocolInitalizer);
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // 设置服务器的通道是NIOServerSocketChannel
.childHandler(serverProtocolInitializer);
for (Map.Entry<ChannelOption, Object> entry : channelOptions.entrySet()) {
bootstrap.option(entry.getKey(), entry.getValue());
}
bootstrap.childOption(ChannelOption.TCP_NODELAY,nodelay);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE,keepalive);
// 绑定一个端口并且同步 启动服务器
serverChannelFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
logger.info("成功bind端口:" + port);
// 对关闭通道进行监听 (异步模型)
serverChannelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("启动NETTY TCP异常:", e);
@@ -122,4 +124,9 @@ public class BootService {
public void setChannelOptions(Map<ChannelOption, Object> channelOptions) {
this.channelOptions = channelOptions;
}
public BootService(ServerProtocolInitializer serverProtocolInitializer) {
this.serverProtocolInitializer = serverProtocolInitializer;
}
}

View File

@@ -23,6 +23,11 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
public ClientHandler() {
}
/**
* 当通道就绪就会触发
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
@@ -33,10 +38,16 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
super.channelInactive(ctx);
}
/**
* 当通道有读取事件时触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
System.out.println("接收服务器响应msg:[" + msg + "]");
logger.info("接收服务器响应msg:[" + msg + "]");
}
@Override

View File

@@ -4,6 +4,7 @@ import com.sv.netty.config.Constant;
import com.sv.netty.config.SpringContextHolder;
import com.sv.netty.service.MessageService;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +33,12 @@ public class ServerHandler extends ChannelInboundHandlerAdapter {
super.channelActive(ctx);
}
/**
* 读取客户端发送的消息
* @param ctx 上下文对象 管道pipeline通道channel , 地址
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
@@ -76,6 +83,12 @@ public class ServerHandler extends ChannelInboundHandlerAdapter {
messageService.destory(ctx.channel());
}
/**
* 处理异常、一般需要关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
@@ -100,12 +113,12 @@ public class ServerHandler extends ChannelInboundHandlerAdapter {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
// 如果Channel读取数据闲置则关闭连接
// if (event.state() == IdleState.READER_IDLE) {
// String clientIP = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
// logger.info("Client [" + clientIP + "] has idle");
// messageService.destory(ctx.channel());
// ctx.channel().close();
// }
if (event.state() == IdleState.READER_IDLE) {
String clientIP = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
logger.info("Client [" + clientIP + "] has idle");
messageService.destory(ctx.channel());
ctx.channel().close();
}
}
}

View File

@@ -1,14 +1,13 @@
package com.sv.netty.netty;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
@@ -18,16 +17,18 @@ import org.springframework.stereotype.Component;
* @Date 09/05/2017 5:19 PM
*/
@Component
public class ServerProtocolInitalizer extends ChannelInitializer<SocketChannel> {
public class ServerProtocolInitializer extends ChannelInitializer<SocketChannel> {
private static Logger logger = LoggerFactory.getLogger(ServerProtocolInitializer.class);
private final int IDLE_TIME = 30; //连接检测空闲时间
private final int READ_TIME = 30; //读超时时间
private final int WRITE_TIME = 30; //写超时时间
private final int READ_TIMEOUT = 600; //读超时时间
@Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel(SocketChannel ch){
ChannelPipeline pipeline = ch.pipeline();
logger.info("ServerProtocolInitializer");
pipeline.addFirst(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 4, 4, 0, 0));
pipeline.addLast(new IdleStateHandler(IDLE_TIME, READ_TIME, WRITE_TIME));
pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT));