fix(netty): 修复设备心跳和离线处理逻辑
- 将心跳间隔从60秒调整为30秒 - 修复平台ID设置顺序问题,确保context正确初始化 - 优化在线处理逻辑,支持重复连接时替换旧通道 - 更新离线方法签名,传入通道参数进行精确匹配 - 添加通道比较逻辑,避免错误的离线操作
This commit is contained in:
@@ -32,8 +32,8 @@ public class PlatformIdAOP {
|
||||
Method method = ((MethodSignature)pjp.getSignature()).getMethod();
|
||||
PlatformKey platformKey = method.getAnnotation(PlatformKey.class);
|
||||
if(platformKey != null){
|
||||
PlatformContext.changeKey(platformKey.value());
|
||||
PlatformContext.set("platform_id",1);
|
||||
PlatformContext.changeKey(platformKey.value());
|
||||
}
|
||||
NoPlatform noPlatform = method.getAnnotation(NoPlatform.class);
|
||||
if(noPlatform != null){
|
||||
|
||||
@@ -115,7 +115,7 @@ public class ServerHandler extends SimpleChannelInboundHandler<String> {
|
||||
logger.error("Client ip [" + clientIP + "] has inactive");
|
||||
Integer venueId = ctx.channel().attr(NettyConstant.CHANNEL_PARAM).get().getVenueId();
|
||||
String deviceName = ctx.channel().attr(NettyConstant.CHANNEL_PARAM).get().getDeviceName();
|
||||
messageService.Offline(deviceName,venueId);
|
||||
messageService.Offline(deviceName, venueId, ctx.channel());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,7 +133,7 @@ public class ServerHandler extends SimpleChannelInboundHandler<String> {
|
||||
ChannelParam param = channel.attr(NettyConstant.CHANNEL_PARAM).get();
|
||||
// 无论是合法连接还是非法扫描发来的乱码,发生错误立马掐断连接,绝不能手软
|
||||
if(channel.isActive() && param != null && param.getVenueId() != null) {
|
||||
messageService.Offline(param.getDeviceName(),param.getVenueId());
|
||||
messageService.Offline(param.getDeviceName(), param.getVenueId(), channel);
|
||||
logger.info("{} - {} ServerHandler exceptionCaught,{}",channel.hashCode(),param.getDeviceName(),param.getVenueId(),cause);
|
||||
}
|
||||
ctx.close();
|
||||
|
||||
@@ -26,8 +26,9 @@ public interface MessageService {
|
||||
*
|
||||
* @param deviceName
|
||||
* @param venueId
|
||||
* @param channel
|
||||
*/
|
||||
void Offline(String deviceName, Integer venueId);
|
||||
void Offline(String deviceName, Integer venueId, Channel channel);
|
||||
|
||||
/**
|
||||
* 统计目前的链接数
|
||||
|
||||
@@ -86,18 +86,29 @@ public class ServerMessageHandlerAdapter implements MessageService {
|
||||
@Override
|
||||
public void online(String clientId, Channel channel, HeartBeat heartBeat) {
|
||||
logger.info("=========" + JsonUtils.encode(heartBeat) + clientId);
|
||||
// 处理心跳信息
|
||||
if (!contains(heartBeat.getDeviceName(),heartBeat.getVenueId())){
|
||||
// 此处存储客户端的channel 信息key 为 deviceId + venueId
|
||||
Venue thisVenue = venueService.findById(heartBeat.getVenueId());
|
||||
String deviceName = heartBeat.getDeviceName();
|
||||
Integer venueId = heartBeat.getVenueId();
|
||||
|
||||
Channel oldChannel = getCurrentChannel(deviceName, venueId);
|
||||
if (oldChannel == null) {
|
||||
Venue thisVenue = venueService.findById(venueId);
|
||||
if (thisVenue == null ){
|
||||
logger.error("this client choose venue Error! venueId == " + heartBeat.getVenueId());
|
||||
logger.error("this client choose venue Error! venueId == " + venueId);
|
||||
} else {
|
||||
deviceService.online(heartBeat.getDeviceName(),heartBeat.getVenueId(),thisVenue.getType(),clientId);
|
||||
addLinks(heartBeat.getDeviceName(),heartBeat.getVenueId(),channel);
|
||||
// VenueMessage venueMessage = new VenueMessage(MessageType.LINK,"欢迎扫码进场!");
|
||||
// ServerMessageUtils.INSTANCE.sendMsg(channel,venueMessage);
|
||||
deviceService.online(deviceName, venueId, thisVenue.getType(), clientId);
|
||||
addLinks(deviceName, venueId, channel);
|
||||
}
|
||||
} else if (oldChannel != channel) {
|
||||
logger.info("New connection reporting heartbeat for device {}_{}, overwriting old channel.", deviceName, venueId);
|
||||
addLinks(deviceName, venueId, channel);
|
||||
// 也可能需要更新一下 IP
|
||||
Venue thisVenue = venueService.findById(venueId);
|
||||
if (thisVenue != null) {
|
||||
deviceService.online(deviceName, venueId, thisVenue.getType(), clientId);
|
||||
}
|
||||
try {
|
||||
oldChannel.close();
|
||||
} catch (Exception e) {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,10 +118,13 @@ public class ServerMessageHandlerAdapter implements MessageService {
|
||||
* @param venueId
|
||||
*/
|
||||
@Override
|
||||
public void Offline(String deviceName, Integer venueId) {
|
||||
public void Offline(String deviceName, Integer venueId, Channel channel) {
|
||||
if (deviceName != null && venueId != null){
|
||||
removeChannelType(deviceName,venueId);
|
||||
deviceService.offline(deviceName,venueId);
|
||||
Channel currentChannel = getCurrentChannel(deviceName, venueId);
|
||||
if (currentChannel == null || currentChannel == channel) {
|
||||
removeChannelType(deviceName, venueId);
|
||||
deviceService.offline(deviceName, venueId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import io.netty.handler.timeout.IdleStateHandler;
|
||||
|
||||
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
private final static int TIME_HEART_BEAT = 60;
|
||||
private final static int TIME_HEART_BEAT = 30;
|
||||
|
||||
public ClientThread.ReConnectHandler reConnectHandler;
|
||||
public ClientHandler dmClientHandler;
|
||||
|
||||
Reference in New Issue
Block a user