netty-确定客户端和服务端编解码工具

This commit is contained in:
2023-08-22 09:48:23 +08:00
parent eaccee2a9b
commit 86fd226c84
48 changed files with 235 additions and 2786 deletions

View File

@@ -1,14 +1,16 @@
package com.sv.netty;
import com.sv.netty.message.HeartBeat;
import com.sv.netty.message.MessageType;
import com.sv.netty.message.VenueMessage;
import com.sv.netty.utils.EncodeMsg;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.net.UnknownHostException;
/**
* 通讯服务器请求处理
*
@@ -16,77 +18,82 @@ import io.netty.handler.timeout.IdleStateEvent;
* @date 05/12/2017 10:27 PM
*/
@ChannelHandler.Sharable
public class ClientHandler extends ChannelInboundHandlerAdapter {
private final static String TAG = "ClientHandler";
private boolean hasRead = false;
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
// ClientTcpSession.getInstance().setContext(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
//服务器连上以后立即模拟心跳返回
ctx.writeAndFlush(getHbMessage());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ClientThread.getInstance().clearFuture();
ClientThread.getInstance().restart();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
// Message message = JsonMapper.fromJson(msg.toString(), Message.class);
// MessageService.getInstance().execute(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
System.err.println("错了Error");
System.err.println("Error");
// GlobalConfig.isConnected = false;
ctx.close();
}
public class ClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 获取心跳返回消息
*
* @return
*/
private VenueMessage getHbMessage() {
HeartBeat hb = new HeartBeat();
// hb.setVersionCode(AppUtil.getVersionCode(StartApplication.getAppContext()));
VenueMessage message = new VenueMessage();
message.setMessageType(MessageType.HB);
// message.setDeviceId(DeviceIdUtil.generateDeviceId(mContext));
return message;
}
/**
* 心跳处理
*
* 当通道就绪就会触发
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
/**
* 当通道失效就会触发
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
/**
* 当通道有读取事件时触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("接收服务器响应msg:[" + msg + "]");
// MessageDTO message = JsonMapper.fromJson(msg, MessageDTO.class);
// MessageService.getInstance().execute(message);
}
/**
* 心跳
* @param ctx
* @param evt
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws UnknownHostException {
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.ALL_IDLE) {
ctx.writeAndFlush(getHbMessage());
ctx.write(EncodeMsg.INSTANCE.encode(getHbMessage()));
ctx.flush();
}
}
}
/**
* 封装心跳请求包
* @throws Exception
*/
private HeartBeat getHbMessage() {
HeartBeat hb = new HeartBeat();
hb.setDeviceName("dsadasdasd");
hb.setVenueId(123);
return hb;
}
/**
* 处理异常
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("ClientHandler exceptionCaught");
cause.printStackTrace();
Channel channel = ctx.channel();
if(channel.isActive()) {
ctx.close();
}
}
}

View File

@@ -1,19 +1,18 @@
package com.sv.netty;
import com.sv.netty.config.Constant;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
private final static int TIME_HEART_BEAT = 20;
private final static int TIME_HEART_BEAT = 5;
public ClientThread.ReConnectHandler reConnectHandler;
public ClientHandler dmClientHandler;
@@ -28,10 +27,10 @@ public class ClientInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("reconnect", reConnectHandler);
pipeline.addLast("idleStateHandler", new IdleStateHandler(TIME_HEART_BEAT, TIME_HEART_BEAT, TIME_HEART_BEAT));
pipeline.addLast(new MessageEncoder());
pipeline.addFirst(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 4, 4, 0, 0));
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new IdleStateHandler(TIME_HEART_BEAT, TIME_HEART_BEAT,TIME_HEART_BEAT));
pipeline.addLast(dmClientHandler);
}

View File

@@ -1,10 +1,11 @@
package com.sv.netty;
import com.sv.netty.config.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
@@ -51,6 +52,7 @@ public class ClientThread extends Thread{
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new LoggingHandler());
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
@@ -114,7 +116,7 @@ public class ClientThread extends Thread{
public void run() {
doConnect();
}
}, 1, TimeUnit.SECONDS);
}, 2, TimeUnit.SECONDS);
}
}

View File

@@ -1,48 +0,0 @@
package com.sv.netty;
import com.sv.netty.config.Constant;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* Created by hehelt on 16/2/26.
* <p/>
* 解码器
*/
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.capacity() >= Constant.LENGTH_INDEX) {
int magicWord = in.readInt();
if (magicWord == Constant.MAGIC_WORD) {
int length = in.readInt();
byte[] msg = new byte[length];
in.readBytes(msg);
String message = new String(msg, "utf-8");
out.add(message);
}
}
}
/**
* 解析从服务器接受的消息
*
* @param buf
* @return
*/
private String getMessage(ByteBuf buf) {
byte[] con = new byte[buf.readableBytes()];
buf.readBytes(con);
try {
return new String(con, "utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
}
}

View File

@@ -1,24 +0,0 @@
package com.sv.netty;
import com.sv.netty.config.Constant;
import com.sv.netty.message.VenueMessage;
import com.sv.utils.JsonMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 自定义编码器, 1个字节固定头+4个字节长度+内容
*/
public class MessageEncoder extends MessageToByteEncoder<VenueMessage> {
private String charset = "utf-8";
@Override
protected void encode(ChannelHandlerContext ctx, VenueMessage message, ByteBuf out) throws Exception {
out.writeInt(Constant.MAGIC_WORD);
String msg = JsonMapper.toJson(message);
out.writeInt(msg.getBytes(charset).length);
out.writeBytes(msg.getBytes(charset));
}
}

View File

@@ -1,15 +0,0 @@
package com.sv.netty.config;
public interface Constant {
String DELIMITER_WORD = "$_$";
int MAGIC_WORD = 0x9DDD;
int LENGTH_INDEX = 4;
String SERVER_IP = "120.27.209.4";
Integer SERVER_PORT = 56791;
Integer VENUE_ID = 39;
String DEVICE_NAME = "SN2020382013";
}

View File

@@ -1,204 +0,0 @@
/**
* Copyright (c) 2005-2012 springside.org.cn
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
*/
package com.sv.utils;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.util.JSONPObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
/**
* 简单封装Jackson实现JSON String<->Java Object的Mapper.
* <p>
* 封装不同的输出风格, 使用不同的builder函数创建实例.
*
* @author calvin
*/
public class JsonMapper {
private static Logger logger = LoggerFactory.getLogger(JsonMapper.class);
private static ObjectMapper mapper;
public JsonMapper() {
this(null);
}
public JsonMapper(Include include) {
mapper = new ObjectMapper();
// 设置输出时包含属性的风格
if (include != null) {
mapper.setSerializationInclusion(include);
}
// 设置输入时忽略在JSON字符串中存在但Java对象实际没有的属性
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
}
/**
* 创建只输出非Null且非Empty(如List.isEmpty)的属性到Json字符串的Mapper,建议在外部接口中使用.
*/
public static JsonMapper nonEmptyMapper() {
return new JsonMapper(Include.NON_EMPTY);
}
/**
* 创建只输出初始值被改变的属性到Json字符串的Mapper, 最节约的存储方式,建议在内部接口中使用。
*/
public static JsonMapper nonDefaultMapper() {
return new JsonMapper(Include.NON_DEFAULT);
}
public static JsonMapper nonNullMapper() {
return new JsonMapper(Include.NON_NULL);
}
/**
* Object可以是POJO也可以是Collection或数组。 如果对象为Null, 返回"null". 如果集合为空集合, 返回"[]".
*/
public static String toJson(Object object) {
try {
return mapper.writeValueAsString(object);
} catch (IOException e) {
logger.warn("write to json string error:" + object, e);
return null;
}
}
/**
* 反序列化POJO或简单Collection如List<String>.
* <p>
* 如果JSON字符串为Null或"null"字符串, 返回Null. 如果JSON字符串为"[]", 返回空集合.
* <p>
* 如需反序列化复杂Collection如List<MyBean>, 请使用fromJson(String, JavaType)
*
* @see #fromJson(String, JavaType)
*/
public static <T> T fromJson(String jsonString, Class<T> clazz) {
if (StringUtils.isEmpty(jsonString)) {
return null;
}
try {
return mapper.readValue(jsonString, clazz);
} catch (IOException e) {
logger.warn("parse json string error:" + jsonString, e);
return null;
}
}
/**
* 反序列化复杂Collection如List<Bean>,
* 先使用createCollectionType()或contructMapType()构造类型, 然后调用本函数.
*
* @see #createCollectionType(Class, Class...)
*/
public <T> T fromJson(String jsonString, JavaType javaType) {
if (StringUtils.isEmpty(jsonString)) {
return null;
}
try {
return (T) mapper.readValue(jsonString, javaType);
} catch (IOException e) {
logger.warn("parse json string error:" + jsonString, e);
return null;
}
}
/**
* 构造Collection类型.
*/
public JavaType contructCollectionType(Class<? extends Collection> collectionClass, Class<?> elementClass) {
return mapper.getTypeFactory().constructCollectionType(collectionClass, elementClass);
}
/**
* 构造Map类型.
*/
public JavaType contructMapType(Class<? extends Map> mapClass, Class<?> keyClass, Class<?> valueClass) {
return mapper.getTypeFactory().constructMapType(mapClass, keyClass, valueClass);
}
/**
* map 转 bean
*
* @param map
* @param beanClass
* @return
*/
public <T> T convertMapToBean(Map<String, ?> map, Class<T> beanClass) {
try {
return mapper.convertValue(map, beanClass);
} catch (Exception e) {
logger.warn("convertMapToBean error! ", e);
}
return null;
}
/**
* map 转 bean
*
* @param bean
* @param objectClass
* @return
*/
public <T> T convertBeanToOther(Object bean, Class<T> objectClass) {
try {
return mapper.convertValue(bean, objectClass);
} catch (Exception e) {
logger.warn("convertMapToBean error! ", e);
}
return null;
}
/**
* 当JSON里只含有Bean的部分屬性時更新一個已存在Bean只覆蓋該部分的屬性.
*/
public void update(String jsonString, Object object) {
try {
mapper.readerForUpdating(object).readValue(jsonString);
} catch (JsonProcessingException e) {
logger.warn("update json string:" + jsonString + " to object:" + object + " error.", e);
} catch (IOException e) {
logger.warn("update json string:" + jsonString + " to object:" + object + " error.", e);
}
}
/**
* 輸出JSONP格式數據.
*/
public String toJsonP(String functionName, Object object) {
return toJson(new JSONPObject(functionName, object));
}
/**
* 設定是否使用Enum的toString函數來讀寫Enum, 為False時時使用Enum的name()函數來讀寫Enum, 默認為False.
* 注意本函數一定要在Mapper創建後, 所有的讀寫動作之前調用.
*/
public void enableEnumUseToString() {
mapper.enable(SerializationFeature.WRITE_ENUMS_USING_TO_STRING);
mapper.enable(DeserializationFeature.READ_ENUMS_USING_TO_STRING);
}
/**
* 取出Mapper做进一步的设置或使用其他序列化API.
*/
public ObjectMapper getMapper() {
return mapper;
}
}