隨著用戶量上升,項目的架構也在不斷的升級,由最開始的MVC的垂直架構(傳統項目)到RPC架構(webservice,rest,netty,mina),再到SOA模型(dubbo),再到最近的微服務,又比如Tomcat6之前的IO模型都是BIO 也就是阻塞IO,到后來變成多路復用,也是阻塞IO。到非阻塞NIO,再到異步非阻塞AIO,
言歸正傳,接著談netty,傳統IO是一個線程服務一個客戶,后來通過netty,可以一個線程服務多個客戶,下面的那個圖展示的是netty的NIO通過引入多線程來提高性能,既一個線程負責一片用戶
直接上代碼
package com.cn;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import com.cn.pool.NioSelectorRunnablePool;/** * 啟動函數 * */public class Start { public static void main(String[] args) { //初始化線程 NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); //獲取服務類 ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool); //綁定端口 bootstrap.bind(new InetSocketAddress(10101)); System.out.PRintln("start"); }}package com.cn.pool;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import com.cn.NioServerBoss;import com.cn.NioServerWorker;/** * selector線程管理者 * */public class NioSelectorRunnablePool { /** * boss線程數組 */ private final AtomicInteger bossIndex = new AtomicInteger(); private Boss[] bosses; /** * worker線程數組 */ private final AtomicInteger workerIndex = new AtomicInteger(); private Worker[] workeres; public NioSelectorRunnablePool(Executor boss, Executor worker) { initBoss(boss, 1); initWorker(worker, Runtime.getRuntime().availableProcessors() * 2); } /** * 初始化boss線程 * @param boss * @param count */ private void initBoss(Executor boss, int count) { this.bosses = new NioServerBoss[count]; for (int i = 0; i < bosses.length; i++) { bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this); } } /** * 初始化worker線程 * @param worker * @param count */ private void initWorker(Executor worker, int count) { this.workeres = new NioServerWorker[count]; for (int i = 0; i < workeres.length; i++) { workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this); } } /** * 獲取一個worker * @return */ public Worker nextWorker() { return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)]; } /** * 獲取一個boss * @return */ public Boss nextBoss() { return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; }}package com.cn;import java.net.SocketAddress;import java.nio.channels.ServerSocketChannel;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;/** * 服務類 * */public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool; public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) { this.selectorRunnablePool = selectorRunnablePool; } /** * 綁定端口 * @param localAddress */ public void bind(final SocketAddress localAddress){ try { // 獲得一個ServerSocket通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); // 設置通道為非阻塞 serverChannel.configureBlocking(false); // 將該通道對應的ServerSocket綁定到port端口 serverChannel.socket().bind(localAddress); //獲取一個boss線程 Boss nextBoss = selectorRunnablePool.nextBoss(); //向boss注冊一個ServerSocket通道 nextBoss.registerAcceptChannelTask(serverChannel); } catch (Exception e) { e.printStackTrace(); } }}package com.cn.pool;import java.nio.channels.SocketChannel;/** * worker接口 * */public interface Worker { /** * 加入一個新的客戶端會話 * @param channel */ public void registerNewChannelTask(SocketChannel channel);}package com.cn.pool;import java.nio.channels.ServerSocketChannel;/** * boss接口 * */public interface Boss { /** * 加入一個新的ServerSocket * @param serverChannel */ public void registerAcceptChannelTask(ServerSocketChannel serverChannel);}package com.cn;import java.io.IOException;import java.nio.channels.Selector;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;import com.cn.pool.NioSelectorRunnablePool;/** * 抽象selector線程類 * * */public abstract class AbstractNioSelector implements Runnable { /** * 線程池 */ private final Executor executor; /** * 選擇器 */ protected Selector selector; /** * 選擇器wakenUp狀態標記 */ protected final AtomicBoolean wakenUp = new AtomicBoolean(); /** * 任務隊列 */ private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); /** * 線程名稱 */ private String threadName; /** * 線程管理對象 */ protected NioSelectorRunnablePool selectorRunnablePool; AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { this.executor = executor; this.threadName = threadName; this.selectorRunnablePool = selectorRunnablePool; openSelector(); } /** * 獲取selector并啟動線程 */ private void openSelector() { try { this.selector = Selector.open(); } catch (IOException e) { throw new RuntimeException("Failed to create a selector."); } executor.execute(this); } @Override public void run() { Thread.currentThread().setName(this.threadName); while (true) { try { wakenUp.set(false); select(selector); processTaskQueue(); process(selector); } catch (Exception e) { // ignore } } } /** * 注冊一個任務并激活selector * * @param task */ protected final void registerTask(Runnable task) { taskQueue.add(task); Selector selector = this.selector; if (selector != null) { if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } } else { taskQueue.remove(task); } } /** * 執行隊列里的任務 */ private void processTaskQueue() { for (;;) { final Runnable task = taskQueue.poll(); if (task == null) { break; } task.run(); } } /** * 獲取線程管理對象 * @return */ public NioSelectorRunnablePool getSelectorRunnablePool() { return selectorRunnablePool; } /** * select抽象方法 * * @param selector * @return * @throws IOException */ protected abstract int select(Selector selector) throws IOException; /** * selector的業務處理 * * @param selector * @throws IOException */ protected abstract void process(Selector selector) throws IOException;}package com.cn;import java.io.IOException;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * boss實現類 * */public class NioServerBoss extends AbstractNioSelector implements Boss{ public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor, threadName, selectorRunnablePool); } @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey key = i.next(); i.remove(); ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 新客戶端 SocketChannel channel = server.accept(); // 設置為非阻塞 channel.configureBlocking(false); // 獲取一個worker Worker nextworker = getSelectorRunnablePool().nextWorker(); // 注冊新客戶端接入任務 nextworker.registerNewChannelTask(channel); System.out.println("新客戶端鏈接"); } } public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){ final Selector selector = this.selector; registerTask(new Runnable() { @Override public void run() { try { //注冊serverChannel到selector serverChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } @Override protected int select(Selector selector) throws IOException { return selector.select(); }}package com.cn;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * worker實現類 * */public class NioServerWorker extends AbstractNioSelector implements Worker{ public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor, threadName, selectorRunnablePool); } @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // 移除,防止重復處理 ite.remove(); // 得到事件發生的Socket通道 SocketChannel channel = (SocketChannel) key.channel(); // 數據總長度 int ret = 0; boolean failure = true; ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取數據 try { ret = channel.read(buffer); failure = false; } catch (Exception e) { // ignore } //判斷是否連接已斷開 if (ret <= 0 || failure) { key.cancel(); System.out.println("客戶端斷開連接"); }else{ System.out.println("收到數據:" + new String(buffer.array())); //回寫數據 ByteBuffer outBuffer = ByteBuffer.wrap("收到/n".getBytes()); channel.write(outBuffer);// 將消息回送給客戶端 } } } /** * 加入一個新的socket客戶端 */ public void registerNewChannelTask(final SocketChannel channel){ final Selector selector = this.selector; registerTask(new Runnable() { @Override public void run() { try { //將客戶端注冊到selector中 channel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } @Override protected int select(Selector selector) throws IOException { return selector.select(500); } }
新聞熱點
疑難解答