博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Mina粘包,断包问题处理(附完整实例,客户端,服务端)
阅读量:6175 次
发布时间:2019-06-21

本文共 13294 字,大约阅读时间需要 44 分钟。

1.什么是断包,粘包?

在讲断包,粘包之前,先说下消息保护边界和无消息保护边界。 1.保护消息边界,就是指传输协议把数据当作一条独立的消息在网上传输,接收端只能接收独立的消息.也就是说存在保护消息边界,接收端一次只能接收发送端发出的一个数据包. 2.而面向流则是无消息保护边界的,如果发送端连续发送数据, 接收端有可能在一次接收动作中,会接收两个或者更多的数据包。

而tcp是面向流的,需要在消息接收端处理消息边界问题。

接收端在接受数据时有可能会遇到下面四种情况

A.先接收到dataA然后接收到dataB. B.先接收到dataA的部分数据,然后接收到dataA余下的部分以及dataB的全部. C.先接收到了dataA的全部数据和dataB的部分数据,然后接收到了dataB的余下的数据. D.一次性接收到了dataA和dataB的全部数据.

A为正常情况,无粘包或断包。 B为断包+粘包。 C为粘包+断包。 D为粘包。

2.如何处理Mina中遇到的粘包和断包问题

在Mina框架中有个CumulativeProtocolDecoder 累积性的协议解码器,专门用来处理粘包和断包问题。doDecode()的返回值有重要作用。

A.你的doDecode()方法返回true 时,CumulativeProtocolDecoder 的decode()方法会首先判断你是否在doDecode()方法中从内部的IoBuffer 缓冲区读取了数据,如果没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true 就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你必须已经消费过内部的IoBuffer 缓冲区的数据(哪怕是消费了一个字节的数据)。如果验证过通过,那么CumulativeProtocolDecoder 会检查缓冲区内是否还有数据未读取,如果有就继续调用doDecode()方法,没有就停止对doDecode()方法的调用,直到有新的数据被缓冲。

B. 当你的doDecode()方法返回false 时,CumulativeProtocolDecoder 会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时可以从IoSession 中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer 缓冲区(让父类进行接收下一个包)。简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。也就是说返回true,那么CumulativeProtocolDecoder会再次调用decoder,并把剩余的数据发下来;(意思就是会把剩余数据给doDecode()处理,剩余数据就是remaining()的数据),返回false就不处理剩余的,(不把剩余数据给doDecode()处理)当有新数据包来的时候就把剩余的数据和新的数据拼接在一起,然后再调用decoder。

下面附上一个完整的实例 1.消息的格式 包头+消息长度(int)+消息内容(json字符串)+包尾,包头包尾是十六进制字符串00 aa bb cc,转化成字节数组0, -86, -69, -52四个字节,下面的完整实例有客户端,服务端,将会解析数据,获取其中的消息内容(Json字符串)并且打印处理,消息以字节数组的方式在服务端,客户端之间传递。

服务端代码

package com.my.mina;import java.net.InetSocketAddress;import java.nio.charset.Charset;import java.util.Date;import org.apache.mina.core.service.IoAcceptor;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;import org.apache.mina.filter.logging.LoggingFilter;import org.apache.mina.transport.socket.nio.NioSocketAcceptor;/** * mina的Service端 *  * @author linbin * */public class MinaService {	public static void main(String[] args) {		// 创建一个非阻塞的server端的Socket		IoAcceptor acceptor = new NioSocketAcceptor();		// 添加日志过滤器		acceptor.getFilterChain().addLast("logger", new LoggingFilter());		acceptor.getFilterChain().addLast("codec",				new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));// 自定义解编码器		// 设置Handler		acceptor.setHandler(new DemoServerHandler());		// 设置读取数据的缓存区大小		acceptor.getSessionConfig().setReadBufferSize(2048);		// 读写通道10秒内无操作进入空闲状态		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);		try {			// 绑定端口			acceptor.bind(new InetSocketAddress(20000));		} catch (Exception e) {			e.printStackTrace();		}		System.out.println("启动服务");	}	/**	 * @ClassName: DemoServerHandler	 * @Description: 负责session对象的创建和监听以及消息的创建和接收监听	 * @author chenzheng	 * @date 2016-12-9 下午3:57:11	 */	private static class DemoServerHandler extends IoHandlerAdapter {		// 服务器与客户端创建连接		@Override		public void sessionCreated(IoSession session) throws Exception {			System.out.println("服务器与客户端创建连接...");			super.sessionCreated(session);		}		@Override		public void sessionOpened(IoSession session) throws Exception {			System.out.println("服务器与客户端连接打开...");			super.sessionOpened(session);		}		// 消息的接收处理		@Override		public void messageReceived(IoSession session, Object message) throws Exception {			// TODO Auto-generated method stub			super.messageReceived(session, message);// 消息的接受						// 传递自定义解编码器传递数组和解析数组丢包断包的			String a = (String) message;			System.out.println("接收到的数据:" + a);			session.write(a);		}		// 消息发送后调用		@Override		public void messageSent(IoSession session, Object message) throws Exception {			// TODO Auto-generated method stub			super.messageSent(session, message);			System.out.println("服务器发送消息成功...");		}		// session关闭		@Override		public void sessionClosed(IoSession session) throws Exception {			// TODO Auto-generated method stub			super.sessionClosed(session);			System.out.println("断开连接:");		}	}}复制代码

编码器

package com.my.mina;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolEncoderAdapter;import org.apache.mina.filter.codec.ProtocolEncoderOutput;import java.nio.charset.Charset;/** * 编码器 *  */public class ByteArrayEncoder extends ProtocolEncoderAdapter {	private final Charset charset;	public ByteArrayEncoder(Charset charset) {		this.charset = charset;	}	/**	 * 直接将数据发出去,数据格式,包头+消息长度(int)+消息内容(json字符串)+包尾 包头包尾是十六进制字符串00 aa bb cc,转化成字节数组0,	 * -86, -69, -52四个字节	 *	 * @param session	 * @param message	 * @param out	 * @throws Exception	 */	@Override	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {		// 仿项目,解决断包,粘包问题		String value = (message == null ? "" : message.toString());// 消息值		byte[] content = value.getBytes(charset);// 消息内容,字节数组		IoBuffer buf = IoBuffer.allocate(38 + content.length).setAutoExpand(true);// 缓冲区容量大小38字节加上字符长度		buf.put(new byte[] { 0, -86, -69, -52 });// 输入包开头固定值十六进制00 aa bb cc,转化成字节数组		buf.putUnsignedInt(content.length);// int为4字节,一个字节等于2个16进制字符,所以有八位 00 00 00 0c,内容长度。		buf.put(content);// 消息内容		buf.put(new byte[] { 0, -86, -69, -52 });// 包尾		buf.flip();		out.write(buf);// 写入	}}复制代码

解码器,重点,解决Mina断包,丢包问题

package com.my.mina;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.CumulativeProtocolDecoder;import org.apache.mina.filter.codec.ProtocolDecoderOutput;import java.nio.charset.Charset;/** * 自定义解码器,确保能读到完整的包 */public class ByteArrayDecoder extends CumulativeProtocolDecoder {	private final Charset charset;	public ByteArrayDecoder(Charset charset) {		this.charset = charset;	}	@Override	protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput)			throws Exception {		// 丢包,断包处理		if (ioBuffer.remaining() > 4)// 有包头,包头足够		{			ioBuffer.mark();// 标记当前position的快照标记mark,以便后继的reset操作能恢复position位置,开始是0			byte[] l = new byte[4];			ioBuffer.get(l);// 读取包头,占4个字节			if (ioBuffer.remaining() < 4)// 内容长度的4个字节不够,断包			{				ioBuffer.reset();				return false;//			} else {// 内容长度的4个字节数组足够				byte[] bytesLegth = new byte[4];// 内容长度				ioBuffer.get(bytesLegth);// 读取内容长度,int类型,占四个字节				int len = MinaUtil.byteArrayToInt(bytesLegth);// 内容长度有多少				if (ioBuffer.remaining() < len)// 内容不够,断包				{					ioBuffer.reset();					return false;//				} else { // 消息内容足够					byte[] bytes = new byte[len];					ioBuffer.get(bytes, 0, len);					protocolDecoderOutput.write(new String(bytes, charset));// 读取内容,并且发送					if (ioBuffer.remaining() < 4) {// 包尾不够						ioBuffer.reset();						return false;//					} else {// 包尾足够						byte[] tails = new byte[4];						ioBuffer.get(tails);// 读取包尾						if (ioBuffer.remaining() > 0)// 最后如果粘了包,会再次调用doDeocde()方法,把剩余数据给doDeocde()方法处理						{							return true;						}					}				}			}		}		return false;// 断包,或者执行完,	}}复制代码

解编码工厂

package com.my.mina;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFactory;import org.apache.mina.filter.codec.ProtocolDecoder;import org.apache.mina.filter.codec.ProtocolEncoder;import java.nio.charset.Charset;/** * 自定义解编码器工厂 * */public class ByteArrayCodecFactory implements ProtocolCodecFactory {	private ByteArrayDecoder decoder;	private ByteArrayEncoder encoder;	public ByteArrayCodecFactory() {		this(Charset.defaultCharset());	}	public ByteArrayCodecFactory(Charset charSet) {		encoder = new ByteArrayEncoder(charSet);		decoder = new ByteArrayDecoder(charSet);	}	@Override	public ProtocolDecoder getDecoder(IoSession session) throws Exception {		return decoder;	}	@Override	public ProtocolEncoder getEncoder(IoSession session) throws Exception {		return encoder;	}}复制代码

注意:客户端,服务端需要和服务端有同样的解码器,编码器,解编码工厂这三个类。

客户端核心代码

package com.example.mina.minaapplication.view;import android.app.Activity;import android.os.Bundle;import android.os.Handler;import android.os.Message;import android.util.Log;import android.view.View;import android.widget.TextView;import android.widget.Toast;import com.example.mina.minaapplication.R;import com.example.mina.minaapplication.mina.ByteArrayCodecFactory;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.logging.LoggingFilter;import org.apache.mina.transport.socket.SocketSessionConfig;import org.apache.mina.transport.socket.nio.NioSocketConnector;import java.net.InetSocketAddress;import java.nio.charset.Charset;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.FutureTask;/** * Mina客户端 */public class MainActivity extends Activity {    /**     * 线程池,避免阻塞主线程,与服务器建立连接使用,创建一个只有单线程的线程池,尽快执行线程的线程池     */    private static ExecutorService executorService = Executors.newSingleThreadExecutor();    /**     * 连接对象     */    private NioSocketConnector mConnection;    /**     * session对象     */    private IoSession mSession;    /**     * 连接服务器的地址     */    private InetSocketAddress mAddress;    private ConnectFuture mConnectFuture;    public static final int UPADTE_TEXT = 1;    /**     * 服务端返回的信息     */    private TextView tvShow;    @Override    protected void onCreate(Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_main);        tvShow = findViewById(R.id.tv_show);        initConfig();        connect();        findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {//发送消息数据            @Override            public void onClick(View view) {                if (mConnectFuture != null && mConnectFuture.isConnected()) {//与服务器连接上                    mConnectFuture.getSession().write("{\"id\":11,\"name\":\"ccc\"}");//发送json字符串                }            }        });    }    /**     * 初始化Mina配置信息     */    private void initConfig() {        mAddress = new InetSocketAddress("192.168.0.1", 20000);//连接地址,此数据可改成自己要连接的IP和端口号        mConnection = new NioSocketConnector();// 创建连接        // 设置读取数据的缓存区大小        SocketSessionConfig socketSessionConfig = mConnection.getSessionConfig();        socketSessionConfig.setReadBufferSize(2048);        socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE, 4);//设置4秒没有读写操作进入空闲状态        mConnection.getFilterChain().addLast("logging", new LoggingFilter());//logging过滤器        mConnection.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));//自定义解编码器        mConnection.setHandler(new DefaultHandler());//设置handler        mConnection.setDefaultRemoteAddress(mAddress);//设置地址    }    /**     * 创建连接     */    private void connect() {        FutureTask
futureTask = new FutureTask<>(new Callable
() { @Override public Void call() {// try { while (true) { mConnectFuture = mConnection.connect(); mConnectFuture.awaitUninterruptibly();//一直等到他连接为止 mSession = mConnectFuture.getSession();//获取session对象 if (mSession != null && mSession.isConnected()) { Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show(); break; } Thread.sleep(3000);//每隔三秒循环一次 } } catch (Exception e) {//连接异常 } return null; } }); executorService.execute(futureTask);//执行连接线程 } /** * Mina处理消息的handler,从服务端返回的消息一般在这里处理 */ private class DefaultHandler extends IoHandlerAdapter { @Override public void sessionOpened(IoSession session) throws Exception { super.sessionOpened(session); } /** * 接收到服务器端消息 * * @param session * @param message * @throws Exception */ @Override public void messageReceived(IoSession session, Object message) throws Exception { Log.e("tag", "接收到服务器端消息:" + message.toString()); Message message1 = new Message(); message1.what = UPADTE_TEXT; message1.obj = message; handler.sendMessage(message1); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception {//客户端进入空闲状态. super.sessionIdle(session, status); } } /** * 更新UI */ private Handler handler = new Handler() { @Override public void handleMessage(Message msg) { super.handleMessage(msg); switch (msg.what) { case UPADTE_TEXT: String message = (String) msg.obj; tvShow.setText(message); break; } } };}复制代码

客户端效果:

服务端效果:

本文完整项目代码地址: https://download.csdn.net/download/lb1207087645/10314510

参考资源:

转载于:https://juejin.im/post/5abc2dfa5188255cb07d2f00

你可能感兴趣的文章
PHP FPM源代码反刍品味之五:信号signal处理
查看>>
5G网速真的有理论上那么高吗?
查看>>
Set添加自定义方法对象如何保证唯一性
查看>>
站在巨人肩膀上的牛顿:Kubernetes和SAP Kyma
查看>>
技术工坊|浅谈区块链的Layer2扩展(北京)
查看>>
SSM框架——详细整合教程(Spring+SpringMVC+MyBatis)
查看>>
Apache和PHP结合 及 Apache默认虚拟主机
查看>>
添加自定义监控项目配置邮件告警测试告警不发邮件的问题处理
查看>>
solidity智能合约的经典设计模式
查看>>
华为交换网络基础、基本配置、STP/RSTP
查看>>
SpringCloud 微服务 (十七) 容器部署 Docker
查看>>
不定项选择题
查看>>
netty 分析博客
查看>>
Spring Cloud构建微服务架构服务注册与发现
查看>>
BCGControlBar教程:如何将MFC控件的BCGControlBarBCGSuite添加到对话框中
查看>>
深入理解Java8 Lambda表达式
查看>>
Java集合框架面试问题集锦
查看>>
Java每天10道面试题,跟我走,offer有!(六)
查看>>
四种途径提高RabbitMQ传输数据的可靠性(二)
查看>>
c语言实现多态
查看>>