最近公司搞了一個寫程序的比賽,要求從2G的文件里統計出出現頻率最高的10個單詞。
最開始的想法是使用字典樹,后來發現字典樹更適合用在找前綴上,在查找沒有hash表效率高。
之后使用Hash表+DataFlow完成了功能,2G的文件處理在20秒以內(其實我有信心優化到10秒以內,但是太折騰了)。
這是我的設計圖:
為什么要形成那么多結果?因為我不想寫鎖,寫鎖會降低很多效率,而且也失去了線程的意義,每個線程做自己的工作,
最后在把每個線程處理的結果匯總起來,這樣也符合fork join 的設計。
而且我也試過,如果寫鎖的話,效率會降低10秒以上,我也嘗試過微軟提供的ConcurrentDictionary 原子哈希表,但是效果都不是
很理想,而且,在并行的年代,在寫鎖這個東西,感覺很惡心,好像在代碼里加了一坨屎一樣,我以前就很討厭鎖,也出現過代碼死鎖的情況。
最后我選擇了使用微軟的TPL 庫來解決并行的問題。
使用DataFlow解決了我處理時多線程管理的問題,還有線程等待消息隊列的問題,
使用BufferBlock 進行主控與工作線程之間消息傳遞,這是我的設計圖:
讀取文件之后使用BufferBlock.Post發送給工作線程,工作線程使用TryReceive接收消息,并且處理。
在MSDNhttps://msdn.microsoft.com/zh-cn/library/hh228601(v=vs.110).aspx 里有詳細的介紹。
這是典型的單生產者,多使用者的列子。
代碼方面首先是讀取文件:
public class FileBufferBlock { PRivate string _fileName; BufferBlock<WordStream> _buffer = null; public FileBufferBlock(BufferBlock<WordStream> buffer,string fileName) { this._fileName = fileName; this._buffer = buffer; } /// <summary> /// 按32M讀取文件,循環發送給WordBufferBlock /// </summary> public void ReadFile() { using (FileStream fs = new FileStream(_fileName, FileMode.Open, Fileaccess.Read)) { using (StreamReader sr = new StreamReader(fs)) { while (!sr.EndOfStream) { char[] charBuffer = new char[32 * 1024 * 1024]; sr.ReadBlock(charBuffer, 0, charBuffer.Length); _buffer.Post(new WordStream(charBuffer)); } } } _buffer.Complete(); }
在這里使用BufferBlock.Post 發送消息給工作線程,如果不用它,你得去找個能阻塞的消息隊列。
下面是我的接收方的代碼,使用BufferBlock.TryReceive 接收消息,然后處理,在這里可以開多個個線程去處理。
而且線程是它幫你管理的:
// --------------------------------------------------------------------------------------------------------------------// <copyright file="WordProcessBufferBlock.cs" company="yada">// Copyright (c) yada Corporation. All rights reserved.// </copyright>// change by qugang 2015.4.18// 描述:用于截取單詞的工作線程// --------------------------------------------------------------------------------------------------------------------using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using System.Threading.Tasks.Dataflow;namespace WordStatistics{ public class WordProcessBufferBlock { private int _taskCount = 1; BufferBlock<WordStream> _buffer = null; private List<Task<Dictionary<string, int>>> _list = new List<Task<Dictionary<string, int>>>(); /// <summary> /// 單詞處理類 /// </summary> /// <param name="taskCount">工作線程數</param> /// <param name="buffer">DataFlow的BufferBlock</param> public WordProcessBufferBlock(int taskCount, BufferBlock<WordStream> buffer) { _taskCount = taskCount; this._buffer = buffer; } public void StartWord() { for (int i = 0; i < _taskCount; i++) { _list.Add(Process()); } } /// <summary> /// 等待所有工作完成 /// </summary> /// <param name="f">完成后的工作函數</param> public void WaitAll(Action<Dictionary<string,int>> f) { Task.WaitAll(_list.ToArray()); foreach (var row in _list) { f(row.Result); } } /// <summary> /// 使用BufferBlock.TryReceive循環從消息里取從FileBufferBlock發送的buffer /// </summary> /// <returns>工作結果</returns> private async Task<Dictionary<string, int>> Process() { Dictionary<string, int> dic = new Dictionary<string, int>(); while (await _buffer.OutputAvailableAsync()) { WordStream ws; while (_buffer.TryReceive(out ws)) { foreach (string value in ws) { if (dic.ContainsKey(value)) { dic[value]++; } else { dic.Add(value, 1); } } } } return dic; } }}
WordStrem是我自己寫的一個單詞枚舉流,繼承了IEnumerable接口,將找單詞的算法寫到枚舉器里面,實現流化。
// --------------------------------------------------------------------------------------------------------------------// <copyright file="WordStatistics.cs" company="yada">// Copyright (c) yada Corporation. All rights reserved.// </copyright>// change by qugang 2015.4.18// 單詞枚舉器:算法從開始找字母,如果不是字母,則返回從pos 到end 的組成單詞// --------------------------------------------------------------------------------------------------------------------using System;using System.Collections;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace WordStatistics{ /// <summary> /// 單詞枚舉器 /// </summary> public class WordStream : IEnumerable { private char[] buffer; public WordStream(char[] buffer) { this.buffer = buffer; } IEnumerator IEnumerable.GetEnumerator() { return (IEnumerator)GetEnumerator(); } public WordStreamEnum GetEnumerator() { return new WordStreamEnum(this.buffer); } } public class WordStreamEnum : IEnumerator { private char[] buffer; int pos = 0; int endCount = 0; int index = -1; public WordStreamEnum(char[] buffer) { this.buffer = buffer; } public bool MoveNext() { while (index < buffer.Length - 1) { index++; char buff = buffer[index]; if ((buff >= 'a' && buff <= 'z') || (buff >= 'A' && buff <= 'Z')) { if (endCount == 0) { pos = index; endCount++; } else { endCount++; } } else { if (endCount != 0) return true; } if (buff == '/0') { return false; } } return false; } public object Current { get { int tempInt = endCount; endCount = 0; return new string(buffer, pos, tempInt); } } public void Reset() { index = -1; } }}
到這里就完成了,然后再Main函數里添加調用
static void Main(string[] args) { DateTime dt = DateTime.Now; var buffer = new BufferBlock<WordStream>(); //創建工作BufferBlock WordProcessBufferBlock wb = new WordProcessBufferBlock(8, buffer); wb.StartWord(); //創建讀取文件,發送的BufferBlock FileBufferBlock fb = new FileBufferBlock(buffer, @"D:/content.txt"); fb.ReadFile(); Dictionary<string,int> dic = new Dictionary<string,int>(); //等待工作完成匯總結果 wb.WaitAll(p => { foreach (var row in p) { if (!dic.ContainsKey(row.Key)) dic.Add(row.Key, row.Value); else { dic[row.K
新聞熱點
疑難解答