Apache MINA --- [ProtocolCodecFilter]

为何使用它:java

  • TCP协议保证全部的包有正确的顺序,可是不保证发送端的一个写操做只致使接收端的一个读事件发生,用 MINA术语来描述    就是:没有ProtocolCodecFilter,发送端一个IoSession.write(...)可以致使接收端多个 messageReceived(...),多个write(...)也能被引导到一个messageReceived(...),也许在单机测试时咱们 不会碰到这样的状况,可是咱们的应用应该有能力处理这种问题.安全

  • 大多数网络应用须要一种方式来找到当前信息的结束点和下一条信息的开始点.网络

  • 咱们可以在IoHandler中实现全部业务逻辑,可是添加ProtocolCodecFilter将使你的代码更加容易,清晰的维护.session

  • 它能帮助咱们分离业务逻辑和协议逻辑.app

怎么使用:性能

  • 应用基本上仅仅接收字节流并且咱们须要将它们转化成高层对象(message).测试

  • 这里有三种通用技术来分割字节流到message:this

        1.使用固定长度的信息.编码

        2.使用固定长度的消息头来指定的消息体的长度.线程

        3.使用定界符(如:许多基于文本的协议会在每条消息末尾加上换行符).

例子:

    本例中,咱们将开发一个无用的图形字符服务来阐明如何实现本身的协议编解码器

    Request:

//一个简单的POJO表明一个请求
public class ImageRequest {
 
    private int width;
    private int height;
    private int numberOfCharacters;
 
    public ImageRequest(int width, int height, int numberOfCharacters) {
        this.width = width;
        this.height = height;
        this.numberOfCharacters = numberOfCharacters;
    }
 
    public int getWidth() {
        return width;
    }
 
    public int getHeight() {
        return height;
    }
 
    public int getNumberOfCharacters() {
        return numberOfCharacters;
    }
}
 
//将ImageRequest对象编码成特定协议数据(客户端使用)
public class ImageRequestEncoder implements ProtocolEncoder {
 
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        ImageRequest request = (ImageRequest) message;
        IoBuffer buffer = IoBuffer.allocate(12, false);
        buffer.putInt(request.getWidth());
        buffer.putInt(request.getHeight());
        buffer.putInt(request.getNumberOfCharacters());
        buffer.flip();
        out.write(buffer);
    }
 
    public void dispose(IoSession session) throws Exception {
        // 释放相关资源,若是没有,直接继承自ProtocolEncoderAdapter
    }
    
    /**
     * MINA会为IoSession写入队列中的全部消息调用encode方法.由于咱们的客户端只写入ImageRequest,咱们能够安全强转.
     * 咱们从堆中分配一个新的IoBuffer,咱们最好避免使用直接缓冲区,由于一般来说,堆缓冲性能更好.
     * 你不须要手动释放缓冲区,MINA会帮你作.
     * 在dispose()方法中释放全部编码期间使用到的资源,若是没有须要处理的,能够直接让他继承ProtocolEncoderAdapter.
     */
}
 
//将特定协议数据解码成ImageRequest对象(服务端使用)
//CumulativeProtocolDecoder是很是有用的,他会缓冲全部的传入数据直到你的解码器能断定数据已经准备就绪
public class ImageRequestDecoder extends CumulativeProtocolDecoder {
 
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        if (in.remaining() >= 12) {
            int width = in.getInt();
            int height = in.getInt();
            int numberOfCharachters = in.getInt();
            ImageRequest request = new ImageRequest(width, height, numberOfCharachters);
            out.write(request);
            return true;
        } else {
            return false;
        }
    }
    
    /**
     * 每当一条完整的消息被解码,你应该把它写入ProtocolDecoderOutput;这些消息会沿着过滤器链传递并最终抵达IoHandler.messageReceived()方法
     * 你不须要手动释放缓冲区,MINA会帮你作.
     * 当数据还没接收彻底时,只须要return false. 
     */
}

    Response:

//一个见到的POJO表明一个响应
public class ImageResponse {
 
    private BufferedImage image1;
 
    private BufferedImage image2;
 
    public ImageResponse(BufferedImage image1, BufferedImage image2) {
        this.image1 = image1;
        this.image2 = image2;
    }
 
    public BufferedImage getImage1() {
        return image1;
    }
 
    public BufferedImage getImage2() {
        return image2;
    }
}
 
//将ImageResponse对象编码成特定协议数据(服务端使用)
public class ImageResponseEncoder extends ProtocolEncoderAdapter {
 
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        ImageResponse imageResponse = (ImageResponse) message;
        byte[] bytes1 = getBytes(imageResponse.getImage1());
        byte[] bytes2 = getBytes(imageResponse.getImage2());
        int capacity = bytes1.length + bytes2.length + 8;
        IoBuffer buffer = IoBuffer.allocate(capacity, false);
        buffer.setAutoExpand(true);
        buffer.putInt(bytes1.length);
        buffer.put(bytes1);
        buffer.putInt(bytes2.length);
        buffer.put(bytes2);
        buffer.flip();
        out.write(buffer);
    }
 
    private byte[] getBytes(BufferedImage image) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ImageIO.write(image, "PNG", baos);
        return baos.toByteArray();
    }
    
    /**
     * 若是IoBuffer大小不能被事先计算时,可使用自动扩容setAutoExpand(true).
     */
}

//将特定协议数据解码成ImageResponse对象(客户端使用)
public class ImageResponseDecoder extends CumulativeProtocolDecoder {
 
    private static final String DECODER_STATE_KEY = ImageResponseDecoder.class.getName() + ".STATE";
 
    public static final int MAX_IMAGE_SIZE = 5 * 1024 * 1024;
 
    private static class DecoderState {
        BufferedImage image1;
    }
 
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE_KEY);
        if (decoderState == null) {
            decoderState = new DecoderState();
            session.setAttribute(DECODER_STATE_KEY, decoderState);
        }
        if (decoderState.image1 == null) {
            // try to read first image
            if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
                decoderState.image1 = readImage(in);
            } else {
                // not enough data available to read first image
                return false;
            }
        }
        if (decoderState.image1 != null) {
            // try to read second image
            if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
                BufferedImage image2 = readImage(in);
                ImageResponse imageResponse = new ImageResponse(decoderState.image1, image2);
                out.write(imageResponse);
                decoderState.image1 = null;
                return true;
            } else {
                // not enough data available to read second image
                return false;
            }
        }
        return false;
    }
 
    private BufferedImage readImage(IoBuffer in) throws IOException {
        int length = in.getInt();
        byte[] bytes = new byte[length];
        in.get(bytes);
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        return ImageIO.read(bais);
    }
    
    /**
     * 咱们在session属性中保存了状态里记录解码进度,这个状态也能够被保存在Decoder自己属性中,但那样作有几个缺点:        
     *     每一个IoSession都须要本身的Decoder实例.
     *     MINA确保在同一个IoSession中不会有超过一个线程同时在执行decode(),可是它不保证永远是再一个线程中执行.假设第一个数据片被线程1处理,
     *     线程1断定这些数据还不足以被解码,当下一个数据片到达时,多是被另外一个线程处理.那么为了不可见性问题,咱们必须明确的对Decoder中的状态属
     *     性进行同步声明(IoSession自己就具备这一特性).
     * IoBuffer.prefixedDataAvailable() 是很是方便的当你使用长度前缀,它支持的1/2/4字节的前缀.
     * 不要忘了重置解码状态当一个响应被解码完成时(另外一种方式是:移除session中的状态属性).
     */
     
     //若是只须要处理一张图片,则不须要保存状态
}

    Factory:

public class ImageCodecFactory implements ProtocolCodecFactory {
    private ProtocolEncoder encoder;
    private ProtocolDecoder decoder;
 
    public ImageCodecFactory(boolean client) {
        if (client) {
            encoder = new ImageRequestEncoder();
            decoder = new ImageResponseDecoder();
        } else {
            encoder = new ImageResponseEncoder();
            decoder = new ImageRequestDecoder();
        }
    }
 
    public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
        return encoder;
    }
 
    public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
        return decoder;
    }
    
    /**
     * 针对每个新的IoSession,MINA会要求ProtocolCodecFactory建立编解码器.
     * 由于咱们的编解码器没有存储交互式的状态,因此让全部session共享一个实例是安全的.
     */
}

    服务端:

public class ImageServer {
    public static final int PORT = 33789;
 
    public static void main(String[] args) throws IOException {
        ImageServerIoHandler handler = new ImageServerIoHandler();
        NioSocketAcceptor acceptor = new NioSocketAcceptor();
        acceptor.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new ImageCodecFactory(false)));
        acceptor.setLocalAddress(new InetSocketAddress(PORT));
        acceptor.setHandler(handler);
        acceptor.bind();
        System.out.println("server is listenig at port " + PORT);
    }
}
 
public class ImageServerIoHandler extends IoHandlerAdapter {
 
    private final static String characters = "mina rocks abcdefghijklmnopqrstuvwxyz0123456789";
 
    public static final String INDEX_KEY = ImageServerIoHandler.class.getName() + ".INDEX";
 
    private Logger logger = LoggerFactory.getLogger(this.getClass());
 
    public void sessionOpened(IoSession session) throws Exception {
        session.setAttribute(INDEX_KEY, 0);
    }
 
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        IoSessionLogger sessionLogger = IoSessionLogger.getLogger(session, logger);
        sessionLogger.warn(cause.getMessage(), cause);
    }
 
    public void messageReceived(IoSession session, Object message) throws Exception {
        ImageRequest request = (ImageRequest) message;
        String text1 = generateString(session, request.getNumberOfCharacters());
        String text2 = generateString(session, request.getNumberOfCharacters());
        BufferedImage image1 = createImage(request, text1);
        BufferedImage image2 = createImage(request, text2);
        ImageResponse response = new ImageResponse(image1, image2);
        session.write(response);
    }
 
    private BufferedImage createImage(ImageRequest request, String text) {
        BufferedImage image = new BufferedImage(request.getWidth(), request.getHeight(), BufferedImage.TYPE_BYTE_INDEXED);
        Graphics graphics = image.createGraphics();
        graphics.setColor(Color.YELLOW);
        graphics.fillRect(0, 0, image.getWidth(), image.getHeight());
        Font serif = new Font("serif", Font.PLAIN, 30);
        graphics.setFont(serif);
        graphics.setColor(Color.BLUE);
        graphics.drawString(text, 10, 50);
        return image;
    }
 
    private String generateString(IoSession session, int length) {
        Integer index = (Integer) session.getAttribute(INDEX_KEY);
        StringBuffer buffer = new StringBuffer(length);
 
        while (buffer.length() < length) {
            buffer.append(characters.charAt(index));
            index++;
            if (index >= characters.length()) {
                index = 0;
            }
        }
        session.setAttribute(INDEX_KEY, index);
        return buffer.toString();
    }
}

    客户端:

public class ImageClient extends IoHandlerAdapter {
    public static final int CONNECT_TIMEOUT = 3000;
 
    private String host;
    private int port;
    private SocketConnector connector;
    private IoSession session;
    private ImageListener imageListener;
 
    public ImageClient(String host, int port, ImageListener imageListener) {
        this.host = host;
        this.port = port;
        this.imageListener = imageListener;
        connector = new NioSocketConnector();
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ImageCodecFactory(true)));
        connector.setHandler(this);
    }
 
    public void messageReceived(IoSession session, Object message) throws Exception {
        ImageResponse response = (ImageResponse) message;
        imageListener.onImages(response.getImage1(), response.getImage2());
    }
    ...
}
相关文章
相关标签/搜索