麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 學(xué)院 > 開發(fā)設(shè)計 > 正文

Storm系列(二):使用Csharp創(chuàng)建你的第一個Storm拓撲(wordcount)

2019-11-17 02:19:23
字體:
供稿:網(wǎng)友

Storm系列(二):使用Csharp創(chuàng)建你的第一個Storm拓撲(Wordcount)

WordCount在大數(shù)據(jù)領(lǐng)域就像學(xué)習(xí)一門語言時的hello world,得益于Storm的開源以及Storm.Net.Adapter,現(xiàn)在我們也可以像java或Python一樣,使用Csharp創(chuàng)建原生支持的Storm Topologies。下面我將通過介紹wordcount來展示如何使用Csharp開發(fā)Storm拓撲。

上篇博客已經(jīng)介紹了如何部署Storm開發(fā)環(huán)境,本文所講述demo已包含在Storm.Net.Adapter中,如果你覺得對你有幫助,歡迎Star和Fork,讓更多人看到來幫助完善這個項目。

首先,我們創(chuàng)建一個控制臺應(yīng)用程序(使用控制臺是方便調(diào)用) StormSimple;使用Nuget添加添加Storm.Net.Adapter(該類庫的namespace為Storm)。

wordcount project

STEP1:通過繼承ISpout創(chuàng)建一個Spout:Generator,實現(xiàn)ISpout的四個方法:

void Open(Config stormConf, TopologyContext context);void NextTuple();void Ack(long seqId);void Fail(long seqId);

在實現(xiàn)這4個方法之前,我們還需要創(chuàng)建一些變量和方法來初始化這個類:

PRivate Context ctx;public Generator(Context ctx){    Context.Logger.Info("Generator constructor called");    this.ctx = ctx;    // Declare Output schema    Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();    outputSchema.Add("default", new List<Type>() { typeof(string) });    this.ctx.DeclareComponentSchema(new ComponentStreamSchema(null, outputSchema));}

  

我使用了一個私有變量ctx來保存實例化時傳入的Context對象,Context有一個靜態(tài)的Logger,用于日志的發(fā)送,我們無需實例化即可使用它。根據(jù)日志級別不同,包含 Trace Debug Info Warn Error 五個級別,另外我們在實例化方法里還需要定義輸入和輸出的參數(shù)的數(shù)量和類型,本例子中輸入為null,輸出為一個字符串。另外我們還創(chuàng)建一個方法來直接返回實例化后的類:

/// <summary>///  Implements of delegate "newPlugin", which is used to create a instance of this spout/bolt/// </summary>/// <param name="ctx">Context instance</param>/// <returns></returns>public static Generator Get(Context ctx){    return new Generator(ctx);}

其中Open在該類第一次任務(wù)調(diào)用前執(zhí)行,主要用于預(yù)處理和一些配置信息的傳入,大多數(shù)情況下,我們并不需要做什么;NextTuple方法用于生成Tuple,會不斷被調(diào)用,因此如果沒什么任務(wù)要向下發(fā)送,可以使用Thread.Sleep(50);來減少CPU的消耗(具體休息時間與Topology設(shè)置有關(guān),只要不超過超時時間就沒有問題)。

本例子中NextTuple主要用于從一個包含英語句子的數(shù)組中隨機取出一條句子,并把它發(fā)送到下一個環(huán)節(jié),為了能夠保證所有的任務(wù)都被成功執(zhí)行一遍,我們將發(fā)送的消息緩存起來,并且限制正在執(zhí)行中的任務(wù)數(shù)量為20。

private const int MAX_PENDING_TUPLE_NUM = 20;private long lastSeqId = 0;private Dictionary<long, string> cachedTuples = new Dictionary<long, string>();private Random rand = new Random();string[] sentences = new string[] {                                  "the cow jumped over the moon",                                  "an apple a day keeps the doctor away",                                  "four score and seven years ago",                                  "snow white and the seven dwarfs",                                  "i am at two with nature"};/// <summary>/// This method is used to emit one or more tuples. If there is nothing to emit, this method should return without emitting anything. /// It should be noted that NextTuple(), Ack(), and Fail() are all called in a tight loop in a single thread in C# process. /// When there are no tuples to emit, it is courteous to have NextTuple sleep for a short amount of time (such as 10 milliseconds), so as not to waste too much CPU./// </summary>public void NextTuple(){    Context.Logger.Info("NextTuple enter");    string sentence;    if (cachedTuples.Count <= MAX_PENDING_TUPLE_NUM)    {        lastSeqId++;        sentence = sentences[rand.Next(0, sentences.Length - 1)];        Context.Logger.Info("Generator Emit: {0}, seqId: {1}", sentence, lastSeqId);        this.ctx.Emit("default", new List<object>() { sentence }, lastSeqId);        cachedTuples[lastSeqId] = sentence;    }    else    {        // if have nothing to emit, then sleep for a little while to release CPU        Thread.Sleep(50);    }    Context.Logger.Info("cached tuple num: {0}", cachedTuples.Count);    Context.Logger.Info("Generator NextTx exit");}

this.ctx.Emit即用來把Topology發(fā)送給下一個Bolt。

Ack()和Fail()方法分別在整個Topology執(zhí)行成功和Topology失敗時被調(diào)用。本例中Ack主要是移除緩存,F(xiàn)ail主要是用于取出緩存數(shù)據(jù)并重新發(fā)送Tuple。

/// <summary>/// Ack() will be called only when ack mechanism is enabled in spec file./// If ack is not supported in non-transactional topology, the Ack() can be left as empty function. /// </summary>/// <param name="seqId">Sequence Id of the tuple which is acked.</param>public void Ack(long seqId){    Context.Logger.Info("Ack, seqId: {0}", seqId);    bool result = cachedTuples.Remove(seqId);    if (!result)    {        Context.Logger.Warn("Ack(), remove cached tuple for seqId {0} fail!", seqId);    }}/// <summary>/// Fail() will be called only when ack mechanism is enabled in spec file. /// If ack is not supported in non-transactional topology, the Fail() can be left as empty function./// </summary>/// <param name="seqId">Sequence Id of the tuple which is failed.</param>public void Fail(long seqId){    Context.Logger.Info("Fail, seqId: {0}", seqId);    if (cachedTuples.ContainsKey(seqId))    {        string sentence = cachedTuples[seqId];        Context.Logger.Info("Re-Emit: {0}, seqId: {1}", sentence, seqId);        this.ctx.Emit("default", new List<object>() { sentence }, seqId);    }    else    {        Context.Logger.Warn("Fail(), can't find cached tuple for seqId {0}!", seqId);    }}

至此,一個Spout就算完成了,下面我們繼續(xù)分析Bolt。

STEP2:通過繼承IBasicBolt創(chuàng)建Bolt:Splitter、Counter。

Splitter是一個通過空格來拆分英語句子為一個個獨立的單詞,Counter則用來統(tǒng)計各個單詞出現(xiàn)的次數(shù)。我們只詳細分析Splitter,Counter類僅貼出全部源碼

和Generator相同,我們首先也要構(gòu)造一個實例化方法方便使用者傳參和調(diào)用:

private Context ctx;private int msgTimeoutSecs;public Splitter(Context ctx){    Context.Logger.Info("Splitter constructor called");    this.ctx = ctx;    // Declare Input and Output schemas    Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();    inputSchema.Add("default", new List<Type>() { typeof(string) });    Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();    outputSchema.Add("default", new List<Type>() { typeof(string), typeof(char) });    this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema));    // Demo how to get stormConf info    if (Context.Config.StormConf.ContainsKey("topology.message.timeout.secs"))    {        msgTimeoutSecs = Convert.ToInt32(Context.Config.StormConf["topology.message.timeout.secs"]);    }    Context.Logger.Info("msgTimeoutSecs: {0}", msgTimeoutSecs);}/// <summary>///  Implements of delegate "newPlugin", which is used to create a instance of this spout/bolt/// </summary>/// <param name="ctx">Context instance</param>/// <returns></returns>public static Splitter Get(Context ctx){    return new Splitter(ctx);}

在這個實例化方法中,我們增加了一個沒有使用的變量msgTimeoutSecs用來展示如何獲取Topology的配置。

由于繼承了IBasicBolt,我們需要實現(xiàn)以下兩個方法:

void Prepare(Config stormConf, TopologyContext context);void Execute(StormTuple tuple);

這和IBolt是一致的,IBasicBolt和IBolt的區(qū)別僅僅在于后者需要自己處理何時向Storm發(fā)送Ack或Fail,IBasicBolt則不需要關(guān)心這些,如果你的Execute沒有拋出異常的話,總會在最后向Storm發(fā)送Ack,否則則發(fā)送Fail。Prepare則是用于執(zhí)行前的預(yù)處理,此例子里同樣什么都不需要做。

/// <summary>/// The Execute() function will be called, when a new tuple is available./// </summary>/// <param name="tuple"></param>public void Execute(StormTuple tuple){    Context.Logger.Info("Execute enter");    string sentence = tuple.GetString(0);    foreach (string word in sentence.Split(' '))    {        Context.Logger.Info("Splitter Emit: {0}", word);        this.ctx.Emit("default", new List<StormTuple> { tuple }, new List<object> { word, word[0] });    }    Context.Logger.Info("Splitter Execute exit");}public void Prepare(Config stormConf, TopologyContext context){    return;}

Counter和上述的代碼類似:

using Storm;using System;using System.Collections.Generic;namespace StormSample{    /// <summary>    /// The bolt "counter" uses a dictionary to record the occurrence number of each word.    /// </summary>    public class Counter : IBasicBolt    {        private Context ctx;        private Dictionary<string, int> counts = new Dictionary<string, int>();
發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 成人一级在线 | 中文字幕亚洲一区二区三区 | 国产精品视频自拍 | 国产69精品福利视频 | 青青国产在线视频 | 欧美一级全黄 | 亚洲小视频在线 | 色淫影院 | 看免费毛片 | 免费在线观看国产精品 | 精品乱码久久久久 | 久久免费观看一级毛片 | 亚洲成人免费网站 | 欧美日韩在线播放一区 | 欧美一区高清 | 亚洲性综合网 | 久久视频精品 | 在线看一区二区三区 | 久久久久亚洲精品国产 | 欧美日韩电影 | 97干在线| xxxxxx打针视频vk | 国产视频在线一区 | av日韩一区二区三区 | av手机免费在线观看 | 国产91中文字幕 | 操网 | 韩国一级免费视频 | 黄色片网站在线免费观看 | 国产精品久久久久久久久久久久午夜 | xxxxhdhdhdhd日本 | 国产三级在线视频观看 | 色悠悠久久久久 | 国产精品久久久久av | 国产三级精品最新在线 | 97中文字幕第一一一页 | 日韩视频在线观看免费 | 最新中文字幕第一页视频 | 成人免费福利视频 | 久久久一区二区三区精品 | 日本一区免费看 |