麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 學院 > 開發設計 > 正文

ParallelProgramming-多消費者,多生產者同時運行并行

2019-11-14 13:46:49
字體:
來源:轉載
供稿:網友

在上一篇文章演示了并行的流水線操作(生產者和消費者并行同時執行),C#是通過BlockingCollection這個線程安全的對象作為Buffer,并且結合Task來實現的。但是上一篇文章有個缺陷,在整個流水線上,生產者和消費者是唯一的。本文將演示多個消費者多個生產者同時并行執行。

一、多消費者、多生產者示意圖

 與前一篇文章演示的流水線思想類似,不同之處就是本文的topic:消費者和生產者有多個,以buffer1為例,起生產者有兩個,消費者有兩個,現在有三個緯度的并行:

  1. Action1和Action2并行(消費者和生產者并行)
  2. 消費者并行(Action2.1和Action2.2并行)
  3. 生產者并行(Action1.1和Action1.2并行)

二、實現

2.1 代碼

 class PiplelineDemo    {        PRivate int seed;        public PiplelineDemo()        {            seed = 10;        }        public void Action11(BlockingCollection<string> output)        {            for (var i = 0; i < seed; i++)            {                output.Add(i.ToString());//initialize data to buffer1            }        }        public void Action12(BlockingCollection<string> output)        {            for (var i = 0; i < seed; i++)            {                output.Add(i.ToString());//initialize data to buffer1            }        }        public void Action21(BlockingCollection<string> input, BlockingCollection<string> output)        {            foreach (var item in input.GetConsumingEnumerable())            {                var itemToInt = int.Parse(item);                output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2            }        }        public void Action22(BlockingCollection<string> input, BlockingCollection<string> output)        {            foreach (var item in input.GetConsumingEnumerable())            {                var itemToInt = int.Parse(item);                output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2            }        }        public void Action31(BlockingCollection<string> input, BlockingCollection<string> output)        {            foreach (var item in input.GetConsumingEnumerable())            {                output.Add((item));// add new data to buffer3            }        }        public void Action32(BlockingCollection<string> input, BlockingCollection<string> output)        {            foreach (var item in input.GetConsumingEnumerable())            {                output.Add((item));// add new data to buffer3            }        }        public void Pipeline()        {            var buffer1 = new BlockingCollection<string>(seed * 2);            var buffer2 = new BlockingCollection<string>(seed * 2);            var buffer3 = new BlockingCollection<string>(seed * 2);            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);            var stage11 = taskFactory.StartNew(() => Action11(buffer1));            var stage12 = taskFactory.StartNew(() => Action12(buffer1));            Task.Factory.ContinueWhenAll(new Task[] { stage11, stage12 }, (tasks) =>            {                buffer1.CompleteAdding();            });            var stage21 = taskFactory.StartNew(() => Action21(buffer1, buffer2));            var stage22 = taskFactory.StartNew(() => Action22(buffer1, buffer2));            Task.Factory.ContinueWhenAll(new Task[] { stage21, stage22 }, (tasks) =>            {                buffer2.CompleteAdding();            });            var stage31 = taskFactory.StartNew(() => Action31(buffer2, buffer3));            var stage32 = taskFactory.StartNew(() => Action32(buffer2, buffer3));            Task.Factory.ContinueWhenAll(new Task[] { stage31, stage32 }, (tasks) =>            {                buffer3.CompleteAdding();            });            Task.WaitAll(stage11, stage12, stage21, stage22, stage31, stage32);            foreach (var item in buffer3.GetConsumingEnumerable())//print data in buffer3            {                Console.WriteLine(item);            }        }    }

2.2 運行結果

2.3 代碼解釋

  1. Action11和Action12相對比較好理解。初始化數據到buffer1。
  2. Action2.1和Action2.2相對比較費解,他們同時接受buffer1作為輸入,為什么最終的結果Buffer2沒有產生重復? 最后由Action21,action22同時產生的buffer3為什么也沒有重復?這就是GetConsumingEnumerable這個方法的功勞。這個方法會將buffer的數據分成多份給多個消費者,如果一個value已經被一個消費者獲取,那么其他消費者將不會再拿到這個值。這就回答了為什么沒有重復這個問題。
  3. 上面方法同時使用了多任務延續(ContinueWhenAll)對buffer的調用CompleteAdding方法:該方法非常重要,如果沒有調用這個方法,程序會進入死鎖,因為消費者(consumer)會處于一直的等待狀態。

 


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 天天黄色片 | 黄色特级片黄色特级片 | 国产精品99久久久久久宅女 | 综合精品一区 | 污污短视频| 亚洲91精品 | 国产高潮国产高潮久久久91 | 毛片电影网址 | 99精品国产视频 | 蜜桃久久一区二区三区 | 黄色免费电影网址 | 亚洲欧美在线视频免费 | 午夜精品在线视频 | 媚药按摩痉挛w中文字幕 | 久久亚洲精品国产一区 | 国产女厕一区二区三区在线视 | av免费在线观看免费 | 久久久久久久久成人 | 鲁丝片一区二区三区免费入口 | 91精品国 | 国产电影av在线 | 羞羞答答xxdd在线播放 | 青青草华人在线 | av性色全交蜜桃成熟时 | 国产精品99久久久久久久 | 久久艹艹艹 | 国产成人精品日本亚洲语音 | 综合精品一区 | 男女生羞羞视频网站在线观看 | 欧美69free性videos | 久久久青 | av国语 | 在线a毛片免费视频观看 | 久久精品视频网站 | 操操操操操 | 成人男女激情免费视频 | 久久综合久久综合久久综合 | 国产精品视频一区二区噜噜 | 国产成人羞羞视频在线 | 国产精品久久久久影院老司 | 成人偷拍片视频在线观看 |