MapReduce之wordcount

新建maven项目
注入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<!--pom.xml-->
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
</dependencies>

log4j.properties文件

1
2
3
4
5
6
7
8
9
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//WcDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WcDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 获取一个job实例
Job job = Job.getInstance(new Configuration());

// 2.设置类路径
job.setJarByClass(WcDriver.class);

//3. 设置Mapper和Reducer
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);

//4. 设置Mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置reducer的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//5. 设置输入输出文件
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));

//6. 提交job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//WcMapper.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

// 把数据变成一行一行的数据
public class WcMapper extends Mapper<LongWritable,Text, Text, IntWritable> {

private IntWritable one = new IntWritable(1);
private Text word = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 把输入的一行数据,按照空格分隔。
String[] words = value.toString().split(" ");

// 增强for遍历所有word,通过write把所有单词输出
for (String word : words) {
this.word.set(word);
context.write(this.word,this.one);
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//WcReducer.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WcReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
private IntWritable total = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 求和
for (IntWritable value : values) {
sum += value.get();
}
total.set(sum);
// 返回key的数量
context.write(key,total);
}
}

输入文件
helllo.txt

1
2
3
4
5
6
7
8
我爱 北京 天安门
天安门 太阳
我爱 小红
小红 爱我
我 是 一个 好人
天安门 前 太阳 升起 厉害
天安门 北京 你好 世界

输出日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
	File System Counters
FILE: Number of bytes read=1240
FILE: Number of bytes written=561726
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=7
Map output records=22
Map output bytes=245
Map output materialized bytes=295
Input split bytes=89
Combine input records=0
Combine output records=0
Reduce input groups=15
Reduce shuffle bytes=295
Reduce input records=22
Reduce output records=15
Spilled Records=44
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=768606208
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=165
File Output Format Counters
Bytes Written=141

Process finished with exit code 0

输出文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// part-r-00000
一个 1
世界 1
你好 1
前 1
北京 2
升起 1
厉害 1
天安门 4
太阳 2
好人 1
小红 2
我 1
我爱 2
是 1
爱我 1

天安门出现了4次,太阳、北京、我爱、小红各出现了2次。其他词汇各出现了一次。