麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

Hadoop mapreduce自定義分組RawComparator

2019-11-14 22:41:39
字體:
供稿:網(wǎng)友
Hadoop maPReduce自定義分組RawComparator

本文發(fā)表于本人博客。

今天接著上次【Hadoop mapreduce自定義排序WritableComparable】文章寫,按照順序那么這次應(yīng)該是講解自定義分組如何實(shí)現(xiàn),關(guān)于操作順序在這里不多說了,需要了解的可以看看我在博客園的評(píng)論,現(xiàn)在開始。

首先我們查看下Job這個(gè)類,發(fā)現(xiàn)有setGroupingComparatorClass()這個(gè)方法,具體源碼如下:

  /**   * Define the comparator that controls which keys are grouped together   * for a single call to    * {@link Reducer#reduce(Object, Iterable,    *                       org.apache.hadoop.mapreduce.Reducer.Context)}   * @param cls the raw comparator to use   * @throws IllegalStateException if the job is submitted   */  public void setGroupingComparatorClass(Class<? extends RawComparator> cls                                         ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setOutputValueGroupingComparator(cls);  }

從方法的源碼可以看出這個(gè)方法是定義自定義鍵分組功能。設(shè)置這個(gè)自定義分組類必須滿足extends RawComparator,那我們可以看下這個(gè)類的源碼:

/** * <p> * A {@link Comparator} that Operates directly on byte representations of * objects. * </p> * @param <T> * @see DeserializerComparator */public interface RawComparator<T> extends Comparator<T> {  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);}

然而這個(gè)RawComparator是泛型繼承Comparator接口的,簡(jiǎn)單看了下那我們來自定義一個(gè)類繼承RawComparator,代碼如下:

public class MyGrouper implements RawComparator<SortAPI> {    @Override    public int compare(SortAPI o1, SortAPI o2) {        return (int)(o1.first - o2.first);    }    @Override    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {        int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);        return compareBytes;    }    }

源碼中SortAPI是上節(jié)自定義排序中的定義對(duì)象,第一個(gè)方法從注釋可以看出是比較2個(gè)參數(shù)的大小,返回的是自然整數(shù);第二個(gè)方法是在反序列化時(shí)比較,所以需要是用字節(jié)比較。接下來我們繼續(xù)看看自定義MyMapper類:

public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> {        @Override    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {        String[] splied = value.toString().split("/t");        try {            long first = Long.parseLong(splied[0]);            long second = Long.parseLong(splied[1]);            context.write(new SortAPI(first,second), new LongWritable(1));        } catch (Exception e) {            System.out.println(e.getMessage());        }    }    }

自定義MyReduce類:

public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> {    @Override    protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {        context.write(new LongWritable(key.first), new LongWritable(key.second));    }    }

自定義SortAPI類:

public class SortAPI implements WritableComparable<SortAPI> {    public Long first;    public Long second;    public SortAPI(){            }    public SortAPI(long first,long second){        this.first = first;        this.second = second;    }    @Override    public int compareTo(SortAPI o) {        return (int) (this.first - o.first);    }    @Override    public void write(DataOutput out) throws IOException {        out.writeLong(first);        out.writeLong(second);    }    @Override    public void readFields(DataInput in) throws IOException {        this.first = in.readLong();        this.second = in.readLong();            }    @Override    public int hashCode() {        return this.first.hashCode() + this.second.hashCode();    }    @Override    public boolean equals(Object obj) {        if(obj instanceof SortAPI){            SortAPI o = (SortAPI)obj;            return this.first == o.first && this.second == o.second;        }        return false;    }        @Override    public String toString() {        return "輸出:" + this.first + ";" + this.second;    }    }

接下來準(zhǔn)備數(shù)據(jù),數(shù)據(jù)如下:

1       21       13       03       22       21       2

上傳至hdfs://hadoop-master:9000/grouper/input/test.txt,main代碼如下:

public class Test {    static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/grouper/output/";    static final String INPUT_DIR = "hdfs://hadoop-master:9000/grouper/input/test.txt";    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, Test.class.getSimpleName());            job.setJarByClass(Test.class);        deleteOutputFile(OUTPUT_DIR);        //1設(shè)置輸入目錄        FileInputFormat.setInputPaths(job, INPUT_DIR);        //2設(shè)置輸入格式化類        job.setInputFormatClass(TextInputFormat.class);        //3設(shè)置自定義Mapper以及鍵值類型        job.setMapperClass(MyMapper.class);        job.setMapOutputKeyClass(SortAPI.class);        job.setMapOutputValueClass(LongWritable.class);        //4分區(qū)        job.setPartitionerClass(HashPartitioner.class);        job.setNumReduceTasks(1);        //5排序分組        job.setGroupingComparatorClass(MyGrouper.class);        //6設(shè)置在一定Reduce以及鍵值類型        job.setReducerClass(MyReduce.class);        job.setOutputKeyClass(LongWritable.class);        job.setOutputValueClass(LongWritable.class);        //7設(shè)置輸出目錄        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));        //8提交job        job.waitForCompletion(true);    }        static void deleteOutputFile(String path) throws Exception{        Configuration conf = new Configuration();        FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);        if(fs.exists(new Path(path))){            fs.delete(new Path(path));        }    }}

執(zhí)行代碼,然后在節(jié)點(diǎn)上用終端輸入:hadoop fs -text /grouper/output/part-r-00000查看結(jié)果:

1       22       23       0

接下來我們修改下SortAPI類的compareTo()方法:

    @Override    public int compareTo(SortAPI o) {        long mis = (this.first - o.first) * -1;        if(mis != 0 ){            return (int)mis;        }        else{            return (int)(this.second - o.second);        }    }

再次執(zhí)行并查看/grouper/output/part-r-00000文件:

3       02       21       1

這樣我們就得出了同樣的數(shù)據(jù)分組結(jié)果會(huì)受到排序算法的影響,比如排序是倒序那么分組也是先按照倒序數(shù)據(jù)源進(jìn)行分組輸出。我們還可以在map函數(shù)以及reduce函數(shù)中打印記錄(過程省略)這樣經(jīng)過對(duì)比也得出分組階段:鍵值對(duì)中key相同(即compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法返回0)的則為一組,當(dāng)前組再按照順序選擇第一個(gè)往緩沖區(qū)輸出(也許會(huì)存儲(chǔ)到硬盤)。其它的相同key的鍵值對(duì)就不會(huì)再往緩沖區(qū)輸出了。在百度上檢索到這邊文章,其中它的分組是把map函數(shù)輸出的value全部迭代到同一個(gè)key中,就相當(dāng)于上面{key,value}:{1,{2,1,2}},這個(gè)結(jié)果跟最開始沒有自定義分組時(shí)是一樣的,我們可以在reduce函數(shù)輸出Iterable<LongWritable> values進(jìn)行查看,其實(shí)我覺得這樣的才算是分組吧就像數(shù)據(jù)查詢一樣。

在這里我們應(yīng)該要弄懂分組與分區(qū)的區(qū)別。分區(qū)是對(duì)輸出結(jié)果文件進(jìn)行分類拆分文件以便更好查看,比如一個(gè)輸出文件包含所有狀態(tài)的http請(qǐng)求,那么為了方便查看通過分區(qū)把請(qǐng)求狀態(tài)分成幾個(gè)結(jié)果文件。分組就是把一些相同鍵的鍵值對(duì)進(jìn)行計(jì)算減少輸出;分區(qū)之后數(shù)據(jù)全部還是照樣輸出到reduce端,而分組的話就有所減少了;當(dāng)然這2個(gè)步驟也是不同的階段執(zhí)行。

這次先到這里。堅(jiān)持記錄點(diǎn)點(diǎn)滴滴!


發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 叉逼视频| 日韩精品中文字幕在线播放 | 日本精品久久久一区二区三区 | 国产视频在线观看一区二区三区 | 高清做爰免费无遮网站挡 | 精品视频在线免费看 | 亚洲成人精品区 | 欧美三级毛片 | 国产成人在线网址 | 亚洲国产超高清a毛毛片 | 国产精品成人免费一区久久羞羞 | 一级看片免费视频 | 视频一区二区不卡 | 动漫孕妇被羞羞视频 | 国产成年人视频 | 玖玖视频精品 | 国产91在线高潮白浆在线观看 | 黄网站在线免费 | 日本a∨精品中文字幕在线 狠狠干精品视频 | 91九色网 | 国产一区二区成人在线 | 九九热在线免费观看视频 | 姑娘第四集免费看视频 | 午夜视频国产 | 伊人在线视频 | 一级电影在线观看 | 久久91久久久久麻豆精品 | 国产一区二区在线免费观看 | 91在线精品亚洲一区二区 | 国产一级淫片免费看 | 国内精品久久久久久2021浪潮 | 日产精品久久久一区二区开放时间 | 久久久三区 | 91精品国产免费久久 | 国产18成人免费视频 | 福利免费在线观看 | 销魂美女一区二区 | 日本黄色免费观看视频 | 91短视频网页版 | 天天夜干 | 99r国产精品|