NIO如何多线程操做

由于NIO自己是非阻塞的,因此他的消息选择器Selector能够在单线程下链接多台客户端的访问。java

为了增强NIO的性能,咱们加入多线程的操做,固然NIO并不能简单的把Selector.select()放入Executor.execute(Runnable)的run方法中。bootstrap

为完成NIO的多线程,咱们应该有一个调度类,一个服务类。数组

调度类的目的是初始化必定数量的线程,以及线程交接。多线程

package com.netty.nionetty.pool;

import com.netty.nionetty.NioServerBoss;
import com.netty.nionetty.NioServerWorker;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2018-05-17.
 */
public class NioSelectorRunnablePool {
    private final AtomicInteger bossIndex = new AtomicInteger();
    //欢迎线程数组
    private Boss[] bosses;
    private final AtomicInteger workerIndex = new AtomicInteger();
    //工做线程数组
    private Worker[] workers;
    public NioSelectorRunnablePool(Executor boss,Executor worker) {
        initBoss(boss,1);
        initWorker(worker,Runtime.getRuntime().availableProcessors() * 2);
    }
    //初始化1个欢迎线程
    private void initBoss(Executor boss,int count) {
        this.bosses = new NioServerBoss[count];
        for (int i = 0;i < bosses.length;i++) {
            bosses[i] = new NioServerBoss(boss,"boss thread " + (i + 1),this);
        }
    }
    //初始化2倍计算机核数的工做线程
    private void initWorker(Executor worker,int count) {
        this.workers = new NioServerWorker[count];
        for (int i = 0; i < workers.length;i++) {
            workers[i] = new NioServerWorker(worker,"worker thread" + (i + 1),this);
        }
    }
    //交接工做线程(从工做线程数组中挑出)
    public Worker nextWorker() {
        return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
    }
    //交接欢迎线程(从欢迎线程数组中挑出)
    public Boss nextBoss() {
        return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
    }
}

另外带一个欢迎线程接口,一个工做线程接口socket

package com.netty.nionetty.pool;

import java.nio.channels.ServerSocketChannel;

/**
 * Created by Administrator on 2018-05-17.
 */
public interface Boss {
    void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.netty.nionetty.pool;

import java.nio.channels.SocketChannel;

/**
 * Created by Administrator on 2018-05-17.
 */
public interface Worker {
    void registerNewChannelTask(SocketChannel channel);
}

有两种线程(欢迎线程和工做线程),因此咱们有一个抽象线程类ide

package com.netty.nionetty;

import com.netty.nionetty.pool.NioSelectorRunnablePool;

import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Created by Administrator on 2018-05-17.
 */
public abstract class AbstractNioSelector implements Runnable {
    //线程池
    private final Executor executor;
    //NIO消息选择器
    protected Selector selector;
    protected final AtomicBoolean wakeUp = new AtomicBoolean();
    //线程任务队列
    private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
    //线程名
    private String threadName;
    //线程调度器
    protected NioSelectorRunnablePool selectorRunnablePool;
    AbstractNioSelector(Executor executor,String threadName,NioSelectorRunnablePool selectorRunnablePool) {
        this.executor = executor;
        this.threadName = threadName;
        this.selectorRunnablePool = selectorRunnablePool;
        openSelector();
    }
    private void openSelector() {
        try {
            this.selector = Selector.open(); //打开消息选择器
        } catch (IOException e) {
            e.printStackTrace();
        }
        //把线程放入线程池,开始执行run方法
        executor.execute(this);
    }

    public void run() {
        Thread.currentThread().setName(this.threadName);
        while (true) {
            try {
                wakeUp.set(false);  //把消息选择器的状态定为未唤醒状态
                select(selector);   //消息选择器选择消息方式
                processTaskQueue(); //由于在主程序中绑定端口的时候已经注册了接收通道任务线程,因此这里是读出任务。
                process(selector);  //任务处理,欢迎线程跟工做线程各不相同
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    //欢迎线程跟工做线程各自添加不一样的线程,再把消息选择器唤醒
    protected final void registerTask(Runnable task) {
        taskQueue.add(task);
        Selector selector = this.selector;
        if (selector != null) {
            if (wakeUp.compareAndSet(false,true)) {
                selector.wakeup();
            }
        }else {
            taskQueue.remove(task);
        }
    }
    public NioSelectorRunnablePool getSelectorRunnablePool() {
        return selectorRunnablePool;
    }

    private void processTaskQueue() {
        for (;;) {
            final Runnable task = taskQueue.poll();
            if (task == null) {
                break;
            }
            task.run();
        }
    }
    protected abstract int select(Selector selector) throws IOException;
    protected abstract void process(Selector selector) throws IOException;
}

欢迎线程跟工做线程的具体实现性能

package com.netty.nionetty;

import com.netty.nionetty.pool.Boss;
import com.netty.nionetty.pool.NioSelectorRunnablePool;
import com.netty.nionetty.pool.Worker;

import java.io.IOException;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * Created by Administrator on 2018-05-17.
 */
public class NioServerBoss extends AbstractNioSelector implements Boss {
    public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
        super(executor,threadName,selectorRunnablePool);
    }
    //注册接收任务,会先调用抽象类,把任务线程先添加到任务队列,再注册接收消息类型
    public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) {
        final Selector selector = this.selector;
        registerTask(new Runnable() {
            public void run() {
                try {
                    serverChannel.register(selector,SelectionKey.OP_ACCEPT);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    protected int select(Selector selector) throws IOException {
        return selector.select();
    }
    //NIO操做,开始接收,接收后再启用工做线程,接收线程依然存在,并且工做线程也不断给到线程池未使用线程
    //具体看初始化的时候初始了多少工做线程,可是是几个链接对应一个工做线程。
    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator();i.hasNext();) {
            SelectionKey key = i.next();
            i.remove();
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            SocketChannel channel = server.accept();
            channel.configureBlocking(false);
            Worker nextWorker = getSelectorRunnablePool().nextWorker();
            nextWorker.registerNewChannelTask(channel);
            System.out.println("新客户端链接");
        }
    }
}
package com.netty.nionetty;

import com.netty.nionetty.pool.NioSelectorRunnablePool;
import com.netty.nionetty.pool.Worker;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * Created by Administrator on 2018-05-17.
 */
public class NioServerWorker extends AbstractNioSelector implements Worker {
    public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
        super(executor,threadName,selectorRunnablePool);
    }

    public void registerNewChannelTask(final SocketChannel channel) {
        final Selector selector = this.selector;
        registerTask(new Runnable() {
            public void run() {
                try {
                    channel.register(selector,SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    protected int select(Selector selector) throws IOException {
        return selector.select(500);
    }

    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
        while (ite.hasNext()) {
            SelectionKey key = (SelectionKey)ite.next();
            ite.remove();
            SocketChannel channel = (SocketChannel)key.channel();
            int ret = 0;
            boolean failure = true;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            try {
                ret = channel.read(buffer);
                failure = false;
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (ret <= 0 || failure) {
                key.cancel();
                System.out.println("客户端断开链接");
            }else {
                System.out.println("收到数据:" + new String(buffer.array()));
                ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
                channel.write(outBuffer);
            }
        }
    }
}

服务类this

package com.netty.nionetty;

import com.netty.nionetty.pool.Boss;
import com.netty.nionetty.pool.NioSelectorRunnablePool;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;

/**
 * Created by Administrator on 2018-05-17.
 */
public class ServerBootstap {
    private NioSelectorRunnablePool selectorRunnablePool;
    public ServerBootstap(NioSelectorRunnablePool selectorRunnablePool) {
        this.selectorRunnablePool = selectorRunnablePool;
    }
    public void bind(final SocketAddress localAddress) {
        try {
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(localAddress);
            Boss nextBoss = selectorRunnablePool.nextBoss();
            nextBoss.registerAcceptChannelTask(serverChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

主程序atom

package com.netty.nionetty;

import com.netty.nionetty.pool.NioSelectorRunnablePool;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

/**
 * Created by Administrator on 2018-05-17.
 */
public class Start {
    public static void main(String[] args) {
        NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
        ServerBootstap bootstrap = new ServerBootstap(nioSelectorRunnablePool);
        bootstrap.bind(new InetSocketAddress(10101));
        System.out.println("start");
    }
}

其实最主要的就是在线程调度器中,各类线程已经被初始化存在于线程池内存中了,因此后面只是把这些线程拿出来,并注册消息类型,进行处理,这就是NIO的多线程处理了。spa

相关文章
相关标签/搜索