美文网首页
MapReduce实现词频统计

MapReduce实现词频统计

作者: 扎西的德勒 | 来源:发表于2021-02-22 15:02 被阅读0次

一、MapReduce编程指导思想

MapReduce的开发一共有八个步骤其中map阶段分为2个步骤,shuffle阶段4个步骤,reduce阶段分为2个步骤。

1. Map阶段2个步骤

  • 第一步:设置inputFormat类,将数据切分成key,value对,输入到第二步

  • 第二步:自定义map逻辑,处理我们第一步的输入kv对数据,然后转换成新的key,value对进行输出

2. shuffle阶段4个步骤

  • 第三步:对上一步输出的key,value对进行分区。(相同key的kv对属于同一分区)

  • 第四步:对每个分区的数据按照key进行排序

  • 第五步:对分区中的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)

  • 第六步:对排序后的kv对数据进行分组;分组的过程中,key相同的kv对为一组;将同一组的kv对的所有value放到一个集合当中(每组数据调用一次reduce方法)

3. reduce阶段2个步骤

  • 第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出

  • 第八步:设置将输出的key,value对数据保存到文件中

二、MapReduce编程实现

1. 源数据准备

hello,hello
world,world
hadoop,hadoop
hello,world
hello,flume
hadoop,hive
hive,kafka
flume,storm
hive,oozie

2. 代码实现

创建maven工程导入jar包
 <properties>
        <hadoop.version>3.1.4</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--   <verbal>true</verbal>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
定义WordCountMain主类

固定写法,实现mapreduce8个步骤:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountMain extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        //定义MapReduce的8个步骤 然后通过Job组装

        //继承父类 Configured 获取conf
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf, "WordCount");

        //如果需要集群运行,需要设置程序运行主类;若本地运行,则不需要该设置
        job.setJarByClass(WordCountMain.class);

        //通过Job组装MpaReduce的8个步骤
        //第一步:读取文件,解析成为key,value对(k1,v1)
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///D:\\input\\数据文件"));
        //第二步:自定义map逻辑,接受(k1,v1) 转换成为新的 (k2,v2)
        job.setMapperClass(MyMapper.class);
        //设置map的输出的key,value对象(k2,v2)的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        /**
         * 第三步:分区
         * 第四步:排序
         * 第五步:规约
         * 第六步:分组
         */

        //第七步:自定义reduce逻辑
        job.setReducerClass(MyReduce.class);
        //设置reduce的输出key,value对象(k3,v3)的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //第八步:
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///D:\\output"));

        //提交整个job任务,等到整个任务结束之后,如果任务执行成功,返回true;如果执行失败,返回false
        boolean b = job.waitForCompletion(true);

        //使用三余运算符
        return b?0:1;

    }
    /**
     * MapReduce进行WordCount的入口类
     */
    public static void main(String[] args) throws Exception {
        //获取配置文件
        Configuration configuration = new Configuration();
        //通过ToolRunner.run 执行程序的入口
        int run = ToolRunner.run(configuration, new WordCountMain(), args);
        //整个系统的退出
        System.exit(run);
    }

}
定义Mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 使用TextInputFormat读取文件,一行一行进行读取
 *   key:行偏移量 一般为LongWritable
 *   value: 一行文本记录为 Text
 * 每个单词出现一次,记做一次
 *   key:单词  Text
 *   value: 1 IntWritable
 */

public class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable> {

    //准备好输出的: <Text,IntWritable>
    Text text = new Text();
    IntWritable intWritable = new IntWritable(1);
    /**
     *
     * @param key
     * @param value   一行文本记录
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //Text value: 对文件一行记录进行处理
        String line = value.toString();
        String[] split = line.split(",");

        for (String word : split){
            //context 为 <Text,IntWritable>
            text.set(word);
            context.write(text,intWritable);
        }

    }

}
定义Reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReduce extends Reducer<Text, IntWritable, Text,IntWritable> {

    /**
     *
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int result = 0;
        for(IntWritable value:values){
            int i = value.get();
            result += i;
        }
        IntWritable intWritable = new IntWritable(result);
        context.write(key,intWritable);
    }

}

注意: 切记导入正确的jar包,开发中由于导入了错误的Text包,导致返回code1,也没有报错,切记切记。

相关文章

网友评论

      本文标题:MapReduce实现词频统计

      本文链接:https://www.haomeiwen.com/subject/mwtkfltx.html