1.什么是断包,粘包?
在讲断包,粘包之前,先说下消息保护边界和无消息保护边界。 1.保护消息边界,就是指传输协议把数据当作一条独立的消息在网上传输,接收端只能接收独立的消息.也就是说存在保护消息边界,接收端一次只能接收发送端发出的一个数据包. 2.而面向流则是无消息保护边界的,如果发送端连续发送数据, 接收端有可能在一次接收动作中,会接收两个或者更多的数据包。
而tcp是面向流的,需要在消息接收端处理消息边界问题。
接收端在接受数据时有可能会遇到下面四种情况
A.先接收到dataA然后接收到dataB. B.先接收到dataA的部分数据,然后接收到dataA余下的部分以及dataB的全部. C.先接收到了dataA的全部数据和dataB的部分数据,然后接收到了dataB的余下的数据. D.一次性接收到了dataA和dataB的全部数据.
A为正常情况,无粘包或断包。 B为断包+粘包。 C为粘包+断包。 D为粘包。
2.如何处理Mina中遇到的粘包和断包问题
在Mina框架中有个CumulativeProtocolDecoder 累积性的协议解码器,专门用来处理粘包和断包问题。doDecode()的返回值有重要作用。
A.你的doDecode()方法返回true 时,CumulativeProtocolDecoder 的decode()方法会首先判断你是否在doDecode()方法中从内部的IoBuffer 缓冲区读取了数据,如果没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true 就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你必须已经消费过内部的IoBuffer 缓冲区的数据(哪怕是消费了一个字节的数据)。如果验证过通过,那么CumulativeProtocolDecoder 会检查缓冲区内是否还有数据未读取,如果有就继续调用doDecode()方法,没有就停止对doDecode()方法的调用,直到有新的数据被缓冲。
B. 当你的doDecode()方法返回false 时,CumulativeProtocolDecoder 会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时可以从IoSession 中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer 缓冲区(让父类进行接收下一个包)。简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。也就是说返回true,那么CumulativeProtocolDecoder会再次调用decoder,并把剩余的数据发下来;(意思就是会把剩余数据给doDecode()处理,剩余数据就是remaining()的数据),返回false就不处理剩余的,(不把剩余数据给doDecode()处理)当有新数据包来的时候就把剩余的数据和新的数据拼接在一起,然后再调用decoder。
下面附上一个完整的实例 1.消息的格式 包头+消息长度(int)+消息内容(json字符串)+包尾,包头包尾是十六进制字符串00 aa bb cc,转化成字节数组0, -86, -69, -52四个字节,下面的完整实例有客户端,服务端,将会解析数据,获取其中的消息内容(Json字符串)并且打印处理,消息以字节数组的方式在服务端,客户端之间传递。
服务端代码
package com.my.mina;import java.net.InetSocketAddress;import java.nio.charset.Charset;import java.util.Date;import org.apache.mina.core.service.IoAcceptor;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;import org.apache.mina.filter.logging.LoggingFilter;import org.apache.mina.transport.socket.nio.NioSocketAcceptor;/** * mina的Service端 * * @author linbin * */public class MinaService { public static void main(String[] args) { // 创建一个非阻塞的server端的Socket IoAcceptor acceptor = new NioSocketAcceptor(); // 添加日志过滤器 acceptor.getFilterChain().addLast("logger", new LoggingFilter()); acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));// 自定义解编码器 // 设置Handler acceptor.setHandler(new DemoServerHandler()); // 设置读取数据的缓存区大小 acceptor.getSessionConfig().setReadBufferSize(2048); // 读写通道10秒内无操作进入空闲状态 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); try { // 绑定端口 acceptor.bind(new InetSocketAddress(20000)); } catch (Exception e) { e.printStackTrace(); } System.out.println("启动服务"); } /** * @ClassName: DemoServerHandler * @Description: 负责session对象的创建和监听以及消息的创建和接收监听 * @author chenzheng * @date 2016-12-9 下午3:57:11 */ private static class DemoServerHandler extends IoHandlerAdapter { // 服务器与客户端创建连接 @Override public void sessionCreated(IoSession session) throws Exception { System.out.println("服务器与客户端创建连接..."); super.sessionCreated(session); } @Override public void sessionOpened(IoSession session) throws Exception { System.out.println("服务器与客户端连接打开..."); super.sessionOpened(session); } // 消息的接收处理 @Override public void messageReceived(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub super.messageReceived(session, message);// 消息的接受 // 传递自定义解编码器传递数组和解析数组丢包断包的 String a = (String) message; System.out.println("接收到的数据:" + a); session.write(a); } // 消息发送后调用 @Override public void messageSent(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub super.messageSent(session, message); System.out.println("服务器发送消息成功..."); } // session关闭 @Override public void sessionClosed(IoSession session) throws Exception { // TODO Auto-generated method stub super.sessionClosed(session); System.out.println("断开连接:"); } }}复制代码
编码器
package com.my.mina;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolEncoderAdapter;import org.apache.mina.filter.codec.ProtocolEncoderOutput;import java.nio.charset.Charset;/** * 编码器 * */public class ByteArrayEncoder extends ProtocolEncoderAdapter { private final Charset charset; public ByteArrayEncoder(Charset charset) { this.charset = charset; } /** * 直接将数据发出去,数据格式,包头+消息长度(int)+消息内容(json字符串)+包尾 包头包尾是十六进制字符串00 aa bb cc,转化成字节数组0, * -86, -69, -52四个字节 * * @param session * @param message * @param out * @throws Exception */ @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { // 仿项目,解决断包,粘包问题 String value = (message == null ? "" : message.toString());// 消息值 byte[] content = value.getBytes(charset);// 消息内容,字节数组 IoBuffer buf = IoBuffer.allocate(38 + content.length).setAutoExpand(true);// 缓冲区容量大小38字节加上字符长度 buf.put(new byte[] { 0, -86, -69, -52 });// 输入包开头固定值十六进制00 aa bb cc,转化成字节数组 buf.putUnsignedInt(content.length);// int为4字节,一个字节等于2个16进制字符,所以有八位 00 00 00 0c,内容长度。 buf.put(content);// 消息内容 buf.put(new byte[] { 0, -86, -69, -52 });// 包尾 buf.flip(); out.write(buf);// 写入 }}复制代码
解码器,重点,解决Mina断包,丢包问题
package com.my.mina;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.CumulativeProtocolDecoder;import org.apache.mina.filter.codec.ProtocolDecoderOutput;import java.nio.charset.Charset;/** * 自定义解码器,确保能读到完整的包 */public class ByteArrayDecoder extends CumulativeProtocolDecoder { private final Charset charset; public ByteArrayDecoder(Charset charset) { this.charset = charset; } @Override protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception { // 丢包,断包处理 if (ioBuffer.remaining() > 4)// 有包头,包头足够 { ioBuffer.mark();// 标记当前position的快照标记mark,以便后继的reset操作能恢复position位置,开始是0 byte[] l = new byte[4]; ioBuffer.get(l);// 读取包头,占4个字节 if (ioBuffer.remaining() < 4)// 内容长度的4个字节不够,断包 { ioBuffer.reset(); return false;// } else {// 内容长度的4个字节数组足够 byte[] bytesLegth = new byte[4];// 内容长度 ioBuffer.get(bytesLegth);// 读取内容长度,int类型,占四个字节 int len = MinaUtil.byteArrayToInt(bytesLegth);// 内容长度有多少 if (ioBuffer.remaining() < len)// 内容不够,断包 { ioBuffer.reset(); return false;// } else { // 消息内容足够 byte[] bytes = new byte[len]; ioBuffer.get(bytes, 0, len); protocolDecoderOutput.write(new String(bytes, charset));// 读取内容,并且发送 if (ioBuffer.remaining() < 4) {// 包尾不够 ioBuffer.reset(); return false;// } else {// 包尾足够 byte[] tails = new byte[4]; ioBuffer.get(tails);// 读取包尾 if (ioBuffer.remaining() > 0)// 最后如果粘了包,会再次调用doDeocde()方法,把剩余数据给doDeocde()方法处理 { return true; } } } } } return false;// 断包,或者执行完, }}复制代码
解编码工厂
package com.my.mina;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFactory;import org.apache.mina.filter.codec.ProtocolDecoder;import org.apache.mina.filter.codec.ProtocolEncoder;import java.nio.charset.Charset;/** * 自定义解编码器工厂 * */public class ByteArrayCodecFactory implements ProtocolCodecFactory { private ByteArrayDecoder decoder; private ByteArrayEncoder encoder; public ByteArrayCodecFactory() { this(Charset.defaultCharset()); } public ByteArrayCodecFactory(Charset charSet) { encoder = new ByteArrayEncoder(charSet); decoder = new ByteArrayDecoder(charSet); } @Override public ProtocolDecoder getDecoder(IoSession session) throws Exception { return decoder; } @Override public ProtocolEncoder getEncoder(IoSession session) throws Exception { return encoder; }}复制代码
注意:客户端,服务端需要和服务端有同样的解码器,编码器,解编码工厂这三个类。
客户端核心代码
package com.example.mina.minaapplication.view;import android.app.Activity;import android.os.Bundle;import android.os.Handler;import android.os.Message;import android.util.Log;import android.view.View;import android.widget.TextView;import android.widget.Toast;import com.example.mina.minaapplication.R;import com.example.mina.minaapplication.mina.ByteArrayCodecFactory;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.logging.LoggingFilter;import org.apache.mina.transport.socket.SocketSessionConfig;import org.apache.mina.transport.socket.nio.NioSocketConnector;import java.net.InetSocketAddress;import java.nio.charset.Charset;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.FutureTask;/** * Mina客户端 */public class MainActivity extends Activity { /** * 线程池,避免阻塞主线程,与服务器建立连接使用,创建一个只有单线程的线程池,尽快执行线程的线程池 */ private static ExecutorService executorService = Executors.newSingleThreadExecutor(); /** * 连接对象 */ private NioSocketConnector mConnection; /** * session对象 */ private IoSession mSession; /** * 连接服务器的地址 */ private InetSocketAddress mAddress; private ConnectFuture mConnectFuture; public static final int UPADTE_TEXT = 1; /** * 服务端返回的信息 */ private TextView tvShow; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); tvShow = findViewById(R.id.tv_show); initConfig(); connect(); findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {//发送消息数据 @Override public void onClick(View view) { if (mConnectFuture != null && mConnectFuture.isConnected()) {//与服务器连接上 mConnectFuture.getSession().write("{\"id\":11,\"name\":\"ccc\"}");//发送json字符串 } } }); } /** * 初始化Mina配置信息 */ private void initConfig() { mAddress = new InetSocketAddress("192.168.0.1", 20000);//连接地址,此数据可改成自己要连接的IP和端口号 mConnection = new NioSocketConnector();// 创建连接 // 设置读取数据的缓存区大小 SocketSessionConfig socketSessionConfig = mConnection.getSessionConfig(); socketSessionConfig.setReadBufferSize(2048); socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE, 4);//设置4秒没有读写操作进入空闲状态 mConnection.getFilterChain().addLast("logging", new LoggingFilter());//logging过滤器 mConnection.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));//自定义解编码器 mConnection.setHandler(new DefaultHandler());//设置handler mConnection.setDefaultRemoteAddress(mAddress);//设置地址 } /** * 创建连接 */ private void connect() { FutureTaskfutureTask = new FutureTask<>(new Callable () { @Override public Void call() {// try { while (true) { mConnectFuture = mConnection.connect(); mConnectFuture.awaitUninterruptibly();//一直等到他连接为止 mSession = mConnectFuture.getSession();//获取session对象 if (mSession != null && mSession.isConnected()) { Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show(); break; } Thread.sleep(3000);//每隔三秒循环一次 } } catch (Exception e) {//连接异常 } return null; } }); executorService.execute(futureTask);//执行连接线程 } /** * Mina处理消息的handler,从服务端返回的消息一般在这里处理 */ private class DefaultHandler extends IoHandlerAdapter { @Override public void sessionOpened(IoSession session) throws Exception { super.sessionOpened(session); } /** * 接收到服务器端消息 * * @param session * @param message * @throws Exception */ @Override public void messageReceived(IoSession session, Object message) throws Exception { Log.e("tag", "接收到服务器端消息:" + message.toString()); Message message1 = new Message(); message1.what = UPADTE_TEXT; message1.obj = message; handler.sendMessage(message1); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception {//客户端进入空闲状态. super.sessionIdle(session, status); } } /** * 更新UI */ private Handler handler = new Handler() { @Override public void handleMessage(Message msg) { super.handleMessage(msg); switch (msg.what) { case UPADTE_TEXT: String message = (String) msg.obj; tvShow.setText(message); break; } } };}复制代码
客户端效果:
服务端效果:
本文完整项目代码地址: https://download.csdn.net/download/lb1207087645/10314510
参考资源: