麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 學院 > 開發設計 > 正文

基于Netty的RPC架構筆記3之線程模型源碼分析(1)

2019-11-14 09:02:40
字體:
來源:轉載
供稿:網友

      隨著用戶量上升,項目的架構也在不斷的升級,由最開始的MVC的垂直架構(傳統項目)到RPC架構(webservice,rest,netty,mina),再到SOA模型(dubbo),再到最近的微服務,又比如Tomcat6之前的IO模型都是BIO 也就是阻塞IO,到后來變成多路復用,也是阻塞IO。到非阻塞NIO,再到異步非阻塞AIO,

     言歸正傳,接著談netty,傳統IO是一個線程服務一個客戶,后來通過netty,可以一個線程服務多個客戶,下面的那個圖展示的是netty的NIO通過引入多線程來提高性能,既一個線程負責一片用戶

直接上代碼

package com.cn;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import com.cn.pool.NioSelectorRunnablePool;/** * 啟動函數 * */public class Start {	public static void main(String[] args) {				//初始化線程		NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());				//獲取服務類		ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);				//綁定端口		bootstrap.bind(new InetSocketAddress(10101));				System.out.PRintln("start");	}}
package com.cn.pool;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import com.cn.NioServerBoss;import com.cn.NioServerWorker;/** * selector線程管理者 * */public class NioSelectorRunnablePool {	/**	 * boss線程數組	 */	private final AtomicInteger bossIndex = new AtomicInteger();	private Boss[] bosses;	/**	 * worker線程數組	 */	private final AtomicInteger workerIndex = new AtomicInteger();	private Worker[] workeres;		public NioSelectorRunnablePool(Executor boss, Executor worker) {		initBoss(boss, 1);		initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);	}	/**	 * 初始化boss線程	 * @param boss	 * @param count	 */	private void initBoss(Executor boss, int count) {		this.bosses = new NioServerBoss[count];		for (int i = 0; i < bosses.length; i++) {			bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);		}	}	/**	 * 初始化worker線程	 * @param worker	 * @param count	 */	private void initWorker(Executor worker, int count) {		this.workeres = new NioServerWorker[count];		for (int i = 0; i < workeres.length; i++) {			workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);		}	}	/**	 * 獲取一個worker	 * @return	 */	public Worker nextWorker() {		 return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];	}	/**	 * 獲取一個boss	 * @return	 */	public Boss nextBoss() {		 return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];	}}
package com.cn;import java.net.SocketAddress;import java.nio.channels.ServerSocketChannel;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;/** * 服務類 * */public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool;		public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {		this.selectorRunnablePool = selectorRunnablePool;	}		/**	 * 綁定端口	 * @param localAddress	 */	public void bind(final SocketAddress localAddress){		try {			// 獲得一個ServerSocket通道			ServerSocketChannel serverChannel = ServerSocketChannel.open();			// 設置通道為非阻塞			serverChannel.configureBlocking(false);			// 將該通道對應的ServerSocket綁定到port端口			serverChannel.socket().bind(localAddress);						//獲取一個boss線程			Boss nextBoss = selectorRunnablePool.nextBoss();			//向boss注冊一個ServerSocket通道			nextBoss.registerAcceptChannelTask(serverChannel);		} catch (Exception e) {			e.printStackTrace();		}	}}
package com.cn.pool;import java.nio.channels.SocketChannel;/** * worker接口 * */public interface Worker {		/**	 * 加入一個新的客戶端會話	 * @param channel	 */	public void registerNewChannelTask(SocketChannel channel);}
package com.cn.pool;import java.nio.channels.ServerSocketChannel;/** * boss接口 * */public interface Boss {		/**	 * 加入一個新的ServerSocket	 * @param serverChannel	 */	public void registerAcceptChannelTask(ServerSocketChannel serverChannel);}
package com.cn;import java.io.IOException;import java.nio.channels.Selector;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;import com.cn.pool.NioSelectorRunnablePool;/** * 抽象selector線程類 *  *  */public abstract class AbstractNioSelector implements Runnable {	/**	 * 線程池	 */	private final Executor executor;	/**	 * 選擇器	 */	protected Selector selector;	/**	 * 選擇器wakenUp狀態標記	 */	protected final AtomicBoolean wakenUp = new AtomicBoolean();	/**	 * 任務隊列	 */	private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();	/**	 * 線程名稱	 */	private String threadName;		/**	 * 線程管理對象	 */	protected NioSelectorRunnablePool selectorRunnablePool;	AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		this.executor = executor;		this.threadName = threadName;		this.selectorRunnablePool = selectorRunnablePool;		openSelector();	}	/**	 * 獲取selector并啟動線程	 */	private void openSelector() {		try {			this.selector = Selector.open();		} catch (IOException e) {			throw new RuntimeException("Failed to create a selector.");		}		executor.execute(this);	}	@Override	public void run() {				Thread.currentThread().setName(this.threadName);		while (true) {			try {				wakenUp.set(false);				select(selector);				processTaskQueue();				process(selector);			} catch (Exception e) {				// ignore			}		}	}	/**	 * 注冊一個任務并激活selector	 * 	 * @param task	 */	protected final void registerTask(Runnable task) {		taskQueue.add(task);		Selector selector = this.selector;		if (selector != null) {			if (wakenUp.compareAndSet(false, true)) {				selector.wakeup();			}		} else {			taskQueue.remove(task);		}	}	/**	 * 執行隊列里的任務	 */	private void processTaskQueue() {		for (;;) {			final Runnable task = taskQueue.poll();			if (task == null) {				break;			}			task.run();		}	}		/**	 * 獲取線程管理對象	 * @return	 */	public NioSelectorRunnablePool getSelectorRunnablePool() {		return selectorRunnablePool;	}	/**	 * select抽象方法	 * 	 * @param selector	 * @return	 * @throws IOException	 */	protected abstract int select(Selector selector) throws IOException;	/**	 * selector的業務處理	 * 	 * @param selector	 * @throws IOException	 */	protected abstract void process(Selector selector) throws IOException;}
package com.cn;import java.io.IOException;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * boss實現類 * */public class NioServerBoss extends AbstractNioSelector implements Boss{	public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		super(executor, threadName, selectorRunnablePool);	}	@Override	protected void process(Selector selector) throws IOException {		Set<SelectionKey> selectedKeys = selector.selectedKeys();        if (selectedKeys.isEmpty()) {            return;        }                for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {            SelectionKey key = i.next();            i.remove();            ServerSocketChannel server = (ServerSocketChannel) key.channel();    		// 新客戶端    		SocketChannel channel = server.accept();    		// 設置為非阻塞    		channel.configureBlocking(false);    		// 獲取一個worker    		Worker nextworker = getSelectorRunnablePool().nextWorker();    		// 注冊新客戶端接入任務    		nextworker.registerNewChannelTask(channel);    		    		System.out.println("新客戶端鏈接");        }	}			public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){		 final Selector selector = this.selector;		 registerTask(new Runnable() {			@Override			public void run() {				try {					//注冊serverChannel到selector					serverChannel.register(selector, SelectionKey.OP_ACCEPT);				} catch (ClosedChannelException e) {					e.printStackTrace();				}			}		});	}		@Override	protected int select(Selector selector) throws IOException {		return selector.select();	}}
package com.cn;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * worker實現類 * */public class NioServerWorker extends AbstractNioSelector implements Worker{	public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		super(executor, threadName, selectorRunnablePool);	}	@Override	protected void process(Selector selector) throws IOException {		Set<SelectionKey> selectedKeys = selector.selectedKeys();        if (selectedKeys.isEmpty()) {            return;        }        Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();		while (ite.hasNext()) {			SelectionKey key = (SelectionKey) ite.next();			// 移除,防止重復處理			ite.remove();						// 得到事件發生的Socket通道			SocketChannel channel = (SocketChannel) key.channel();						// 數據總長度			int ret = 0;			boolean failure = true;			ByteBuffer buffer = ByteBuffer.allocate(1024);			//讀取數據			try {				ret = channel.read(buffer);				failure = false;			} catch (Exception e) {				// ignore			}			//判斷是否連接已斷開			if (ret <= 0 || failure) {				key.cancel();				System.out.println("客戶端斷開連接");	        }else{	        	 System.out.println("收到數據:" + new String(buffer.array()));	        	 	     		//回寫數據	     		ByteBuffer outBuffer = ByteBuffer.wrap("收到/n".getBytes());	     		channel.write(outBuffer);// 將消息回送給客戶端	        }		}	}	/**	 * 加入一個新的socket客戶端	 */	public void registerNewChannelTask(final SocketChannel channel){		 final Selector selector = this.selector;		 registerTask(new Runnable() {			@Override			public void run() {				try {					//將客戶端注冊到selector中					channel.register(selector, SelectionKey.OP_READ);				} catch (ClosedChannelException e) {					e.printStackTrace();				}			}		});	}	@Override	protected int select(Selector selector) throws IOException {		return selector.select(500);	}	}


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 成人国产精品久久久 | 久久午夜免费视频 | 欧美一区黄色 | 成年人高清视频在线观看 | 日本一区二区免费在线观看 | 国产精品自拍av | 久久17| 成年人视频免费 | 日韩精品a在线观看 | 日本人乱人乱亲乱色视频观看 | 中文字幕在线观看视频一区 | 在线高清中文字幕 | 天天都色视频 | 亚洲福利视 | 成人免费网站在线观看视频 | 在火车上摸两乳爽的大叫 | 在线播放黄色网址 | 丰满年轻岳中文字幕一区二区 | 久久综合久久综合久久综合 | 日韩一级免费毛片 | 黄色av网站免费 | 少妇的肉体的满足毛片 | 免费视频xxxx | 日本在线视频免费观看 | 成人片免费视频 | 免费国产羞羞网站视频 | 九色一区二区 | www国产成人免费观看视频 | 欧洲精品久久久久69精品 | av手机在线免费播放 | 日韩黄色一级视频 | 日韩精品| 欧美成人精品一区二区男人小说 | 久久恋 | 久久久久电影网站 | 成人羞羞视频在线观看免费 | 极品国产91在线网站 | 亚洲精品一区二区三区在线看 | 日韩色电影 | 国产精品久久久久久婷婷天堂 | 日本免费不卡一区二区 |