1.引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.mina/mina-core -->
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.7</version>
</dependency>
2.创建连接客户端
NioSocketConnector connector = new NioSocketConnector(); // 创建连接客户端
connector.setConnectTimeoutMillis(30000); // 设置连接超时
TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName("UTF-8"));
factory.setDecoderMaxLineLength(Integer.MAX_VALUE);
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));
connector.getSessionConfig().setReceiveBufferSize(Integer.MAX_VALUE); // 设置接收缓冲区的大小
connector.getSessionConfig().setSendBufferSize(Integer.MAX_VALUE);// 设置输出缓冲区的大小
connector.setDefaultRemoteAddress(new InetSocketAddress(IP, Port));// 设置默认访问地址
connector.getSessionConfig().setTcpNoDelay(true);
connector.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
connector.setHandler(new DockHandler());
ConnectFuture future = connector.connect();
future.awaitUninterruptibly(); // 等待连接创建成功
IoSession session = future.getSession(); // 获取会话
消息处理器IoHandlerAdapter实现类:
public class DockHandler extends IoHandlerAdapter
{
@Override
public void messageReceived(IoSession session, Object message)
throws Exception
{
super.messageReceived(session, message);
String msg = (String)message;
log.info("收到消息");
// 消息处理...
}
@Override
public void sessionCreated(IoSession session)
throws Exception
{
super.sessionCreated(session);
log.info("创建连接");
}
@Override
public void sessionOpened(IoSession session)
throws Exception
{
super.sessionOpened(session);
log.info("建立连接");
}
@Override
public void sessionClosed(IoSession session)
throws Exception
{
super.sessionClosed(session);
log.info("连接关闭");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception
{
super.sessionIdle(session, status);
log.info("重新连接");
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception
{
super.exceptionCaught(session, cause);
log.info("会话异常!");
if (session != null)
{
session.closeNow();
}
}
@Override
public void messageSent(IoSession session, Object message)
throws Exception
{
super.messageSent(session, message);
}
@Override
public void inputClosed(IoSession session)
throws Exception
{
super.inputClosed(session);
}
}
3.连接失败自动重连&断线重连功能
启动时连接失败自动重连:
for (;;)
{
try
{
ConnectFuture future = connector.connect();
future.awaitUninterruptibly(); // 等待连接创建成功
IoSession session = future.getSession(); // 获取会话
log.info("连接服务端[成功]");
break;
}
catch (RuntimeIoException e)
{
log.error("连接服务端[失败],5S后重新连接");
Thread.sleep(5000);
}
}
断线重连:
断线重连功能参考:https://blog.csdn.net/qq_34928194/article/details/105204583
这里采用过滤器的方式:
connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter()
{
@Override
public void sessionClosed(NextFilter nextFilter, IoSession ioSession)
throws Exception
{
for (;;)
{
try
{
Thread.sleep(3000);
ConnectFuture future = connector.connect();
future.awaitUninterruptibly();// 等待连接创建成功
IoSession session = future.getSession();// 获取会话
if (session.isConnected())
{
log.info("断线重连成功");
break;
}
}
catch (Exception ex)
{
log.info("断线重连失败,3s再次连接");
}
}
}
});
4.心跳设置
使用Mina的KeepAliveFilter实现心跳:(了解Mina的KeepAliveFilter心跳机制)
(1) 新建类实现KeepAliveMessageFactory
public class MyKeepAliveMessageFactory implements KeepAliveMessageFactory
{
@Override
public boolean isRequest(IoSession session, Object message)
{
return false;
}
@Override
public boolean isResponse(IoSession session, Object message)
{
return false;
}
@Override
public Object getRequest(IoSession session)
{
return "#";// 心跳内容为#
}
@Override
public Object getResponse(IoSession session, Object request)
{
return null;
}
}
(2) 会话管理中加入KeepAliveFilter
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 设置会话属性读写通道10秒无操作则视为空闲状态
MyKeepAliveMessageFactory heartBeat = new MyKeepAliveMessageFactory();
KeepAliveFilter keepAliveFilter = new KeepAliveFilter(heartBeat, IdleStatus.READER_IDLE, KeepAliveRequestTimeoutHandler.NOOP);// 无心跳响应时不采取任何操作
keepAliveFilter.setForwardEvent(false);
keepAliveFilter.setRequestInterval(10);// 心跳间隔10s
keepAliveFilter.setRequestTimeout(1);// 超时时间1s
connector.getFilterChain().addLast("heart", keepAliveFilter);
5.Mina客户端完全断开连接的方法&加入断线重连功能后如何完全断开连接
断开连接:Mina2.0+版本:调用connector的dispose()方法
加入断线重连后如何彻底断开客户端连接:
经过尝试
(1) 如果使用过滤器方式的断线重连,断开连接(即使删除了重连过滤器)后依然会进行重连(当然重连失败,一直重连),代码如下:
connector.getFilterChain().remove("reconnection");
connector.dispose();
// 尝试无效
(2) 如果使用监听器方式的断线重连,删除监听器后断开连接就完全断开连接了,代码如下:
connector.removeListener(ioListener);// ioListener为创建的断线重连的监听器对象
connector.dispose();
6.粘包半包问题的解决
本文使用的Mina自带的基于文本的编解码器TextLineCodecFactory,根据回车换行(windows下是\r\n,linux下是\r)来断点传输数据,且在解码器中解决了半包粘包问题。
适用场景:报文为文本字符串类型的,且以换行符为数据分割符(当然可以利用TextLineCodecFactory的另外两个构造方法来自定义数据分割符)。
对于自定义报文(多为16进制报文),解决Mina数据接收的半包粘包问题需要我们自定义编解码器,在解码器中解决这一问题。
一个来自未来博文的url:Mina自定义解码器,解决半包粘包问题
完整项目代码联系获取