并行 LINQ (PLINQ) 是 LINQ to Objects 的并行實現(xiàn)。 PLINQ 實現(xiàn)完整的 LINQ 標準查詢運算符集作為 T:System.Linq 命名空間的擴展方法,并具有用于并行運算的其他運算符。 PLINQ 將 LINQ 語法的簡潔和可靠性與并行編程的強大功能結(jié)合在一起。 就像面向任務(wù)并行庫的代碼一樣,PLINQ 查詢會根據(jù)主計算機的能力按比例調(diào)整并發(fā)程度。
在許多情況下,PLINQ 可通過更有效地使用主計算機上的所有可用內(nèi)核來顯著提高 LINQ to Objects 查詢的速度。 這一性能提升將使桌面具備高性能計算能力。
為了實現(xiàn)加速,PLINQ 查詢必須具有足夠的適合并行工作來彌補開銷。 工作可表示為每個委托的計算開銷與源集合中元素數(shù)量的乘積。 假定某個操作可并行化,則它的計算開銷越高,加速的可能性就越大。 例如,如果某個函數(shù)執(zhí)行花費的時間為 1 毫秒,則針對 1000 個元素進行的順序查詢將花費 1 秒來執(zhí)行該操作,而在四核計算機上進行的并行查詢可能只花費 250 毫秒。 這樣就產(chǎn)生了 750 毫秒的加速。 如果該函數(shù)對于每個元素需要花費 1 秒來執(zhí)行,則加速將為 750 秒。 如果委托的開銷很大,則對于源集合中的很少幾個項,PLINQ 可能會提供明顯的加速。 相反,包含無關(guān)緊要委托的小型源集合通常不適合于 PLINQ。 ——MSDN
1.普通示例
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
namespace PLinq
{
class PRogram
{
static IList<Person> _persons = new List<Person>();
static void Main(string[] args)
{
try
{
CreateTextDB();
PLINQ_Where();
}
catch (Exception ex)
{
Console.WriteLine(string.Format("Exception Message:{0}", ex.Message.Trim()));
}
finally
{
Console.ReadLine();
}
}
/// <summary>
/// 篩選
/// </summary>
private static void PLINQ_Where()
{
Stopwatch _wacth = new Stopwatch();
_wacth.Start();
IList<Person> _personWhereNormal = _persons.Where(p => p.Age > 10 && p.Age < 50).ToList();
_wacth.Stop();
Console.WriteLine(string.Format("Normal LINQ Where Cost Time:{0}", _wacth.ElapsedMilliseconds));
_wacth.Restart();
//WithDegreeOfParallelism
IList<Person> _personWhereParallel = _persons.asparallel<Person>().Where(p => p.Age > 10 && p.Age < 50).ToList();
_wacth.Stop();
Console.WriteLine(string.Format("PLINQ Where Cost Time:{0}", _wacth.ElapsedMilliseconds));
}
/// <summary>
/// 創(chuàng)建測試數(shù)據(jù)
/// </summary>
private static void CreateTextDB()
{
Stopwatch _wacth = new Stopwatch();
_wacth.Start();
Parallel.For(0, 10000000, (i, loopstatus) =>
{
Person _person = new Person();
_person.FristName = "Yan";
_person.LastName = string.Format("Zhiwei {0}", i);
_person.RegisterTime = DateTime.Now;
_person.Age = i;
lock (((ICollection)_persons).SyncRoot)
{
_persons.Add(_person);
}
});
_wacth.Stop();
Console.WriteLine(string.Format("Create TextDB Cost Time:{0},Count:{1}", _wacth.ElapsedMilliseconds, _persons.Count));
}
}
class Person
{
public string FristName { get; set; }
public string LastName { get; set; }
public int Age { get; set; }
public DateTime RegisterTime { get; set; }
}
}
2.異常處理
using System;using System.Collections;using System.Collections.Generic;using System.Diagnostics;using System.Linq;using System.Threading.Tasks;namespace PLinq{ class Program { static IList<Person> _persons = new List<Person>(); static void Main(string[] args) { try { CreateTextDB(); PINQ_Exception(); } catch (Exception ex) { Console.WriteLine(string.Format("Exception Message:{0}", ex.Message.Trim())); } finally { Console.ReadLine(); } } private static void PINQ_Exception() { /* * 這樣處理,當出現(xiàn)異常的時候不會影響下次foralll遍歷 */ Func<int, bool> isTure = (age) => { try { if (age > 1989222) throw new Exception("PLINQ Text"); Console.WriteLine(age); return true; } catch (Exception ex) { Console.WriteLine(ex.Message); return false; } }; _persons.AsParallel<Person>().ForAll(p => isTure(p.Age)); } /// <summary> /// 創(chuàng)建測試數(shù)據(jù) /// </summary> private static void CreateTextDB() { Stopwatch _wacth = new Stopwatch(); _wacth.Start(); Parallel.For(0, 10000000, (i, loopstatus) => { Person _person = new Person(); _person.FristName = "Yan"; _person.LastName = string.Format("Zhiwei {0}", i); _person.RegisterTime = DateTime.Now; _person.Age = i; lock (((ICollection)_persons).SyncRoot) { _persons.Add(_person); } }); _wacth.Stop(); Console.WriteLine(string.Format("Create TextDB Cost Time:{0},Count:{1}", _wacth.ElapsedMilliseconds, _persons.Count)); } } class Person { public string FristName { get; set; } public string LastName { get; set; } public int Age { get; set; } public DateTime RegisterTime { get; set; } }}
代碼效果
3.取消 PLINQ 查詢
using System;using System.Collections;using System.Collections.Generic;using System.Diagnostics;using System.Linq;using System.Threading;using System.Threading.Tasks;namespace PLinq{ class Program { static IList<Person> _persons = new List<Person>(); static void Main(string[] args) { try { CreateTextDB(); PINQ_Cancellation(); } catch (Exception ex) { Console.WriteLine(string.Format("Exception Message:{0}", ex.Message.Trim())); } finally { Console.ReadLine(); } } private static void PINQ_Cancellation() { try { /* * 當您在用戶代碼中處理取消時,不必在查詢定義中使用 WithCancellation<TSource>。 但是,我們建議您這樣做,原因是 WithCancellation<TSource> 對查詢性能沒有影響, * 并且它使取消能夠由查詢運算符和用戶代碼進行處理。 為了確保系統(tǒng)響應(yīng)能力,我們建議您大約每毫秒檢查是否存在取消一次; * 不過,任何 10 毫秒以下的期間都被視為可接受。 此頻率對代碼的性能應(yīng)沒有負面影響。 ——MSDN */ CancellationTokenSource _cancel = new CancellationTokenSource(); Func<int, bool> isTure = (age) => { if (age > 1989222) _cancel.Cancel(); Console.WriteLine(age); return true; }; _persons.AsParallel<Person>().WithCancellation(_cancel.Token).ForAll(p => isTure(p.Age)); } catch (OperationCanceledException ex) { Console.WriteLine(string.Format("OperationCanceledException Message:{0}", ex.Message)); } } /// <summary> /// 創(chuàng)建測試數(shù)據(jù) /// </summary> private static void CreateTextDB() { Stopwatch _wacth = new Stopwatch(); _wacth.Start(); Parallel.For(0, 10000000, (i, loopstatus) => { Person _person = new Person(); _person.FristName = "Yan"; _person.LastName = string.Format("Zhiwei {0}", i); _person.RegisterTime = DateTime.Now; _person.Age = i; lock (((ICollection)_persons).SyncRoot) { _persons.Add(_person); } }); _wacth.Stop(); Console.WriteLine(string.Format("Create TextDB Cost Time:{0},Count:{1}", _wacth.ElapsedMilliseconds, _persons.Count)); } } class Person { public string FristName { get; set; } public string LastName { get; set; } public int Age { get; set; } public DateTime RegisterTime { get; set; } }}
代碼效果
|
新聞熱點
疑難解答