0%

MR学习总结

本文是在尚硅谷MapReduce课程后的课后总结

一、 MapReduce介绍

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念”Map(映射)”和”Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。


二、 MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:
| 进程类型 | 作用 |
| — | — |
| MrAppMaster | 负责整个程序的过程调度及状态协调 |
| MapTask | 负责Map阶段的整个数据处理流程 |
| ReduceTask | 负责Reduce阶段的整个数据处理流程 |


三、 常见数据序列化类型

Java类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable

四、 maven包准备

1. 在项目中添加以下pom配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
</dependencies>

2. 在resource目录下添加日志相关配置文件 log4j.properties

1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

五、 实现WordCount

1. 统计数据如下

1
2
3
4
5
hello hadoop
hello hdfs
hello scala
mapreduce
word count

2. 期望输出数据:

1
2
3
4
5
6
7
count	1
hadoop 1
hdfs 1
hello 3
mapreduce 1
scala 1
word 1

3. 编写Mapper方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text k = new Text();
private IntWritable v = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] lineArr = line.split(" ");
for (String s : lineArr) {
k.set(s);
context.write(k, v);
}
}
}

4. 编写Reducer方法

1
2
3
4
5
6
7
8
9
10
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}

5. 编写Driver方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"/input_path", "/output/path"};
Configuration conf = new Configuration();
// 获取Job对象
Job job = Job.getInstance(conf);
// 设置jar存储位置
job.setJarByClass(WordCountDriver.class);
// 关联map和reduce
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置mapper阶段输出数据
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终数据输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置程序输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

6. 执行Driver方法

可在输出路径下看到结果与预期一致


六、 自定义序列化对象

1. 什么是序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

2. 自定义bean对象实现序列化接口(Writable)

实现一个bean对象的序列化需要以下7个步骤

  1. 类必须实现Writable接口
  2. 反序列化的时候,需要反射调用无参构造函数,所以类必须有无参构造函数

    1
    2
    3
    public WritableBean(){
    super();
    }
  3. 重写序列化方法

    1
    2
    3
    4
    @Override
    public void write(DataOutput dataOutput) throws IOException {
    // todo
    }
  4. 重写反序列化方法

    1
    2
    3
    4
    @Override
    public void readFields(DataInput dataInput) throws IOException {
    // todo
    }
  5. 反序列化顺序和序列化顺序必须完全一致。否则会导致反序列化失败

  6. 要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用
  7. 如果自定义bean需要在key中传输,需要实现Comparable接口,因为MapReduce的Shuffle阶段要求key是可排序的
    1
    2
    3
    4
    @Override
    public int compareTo(WritableBean o) {
    // todo
    }

3. 案例实操

1. 需求如下:

需要统计下列数据中每个手机号的总上行流量,总下行流量以及总流量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200

2. 期望输出数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
13470253144	180	180	360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548
3. 创建流量统计Bean对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class FlowBean implements Writable {

// 上行流量
private long upFlow;
// 下行流量
private long downFlow;
// 总流量
private long sumFlow;

public FlowBean() {
super();
}

public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
4. 编写Mapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text k = new Text();
private FlowBean flowBean = new FlowBean();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行
String line = value.toString();

// 切割
String[] lineArr = line.split("\t");

// 封装对象
k.set(lineArr[1]);
long upFLow = Long.parseLong(lineArr[lineArr.length - 3]);
long downFLow = Long.parseLong(lineArr[lineArr.length - 2]);
flowBean.setUpFlow(upFLow);
flowBean.setDownFlow(downFLow);
flowBean.setSumFlow(upFLow + downFLow);

// 写出
context.write(k, flowBean);
}
}
5. 编写Reducer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

private FlowBean flowBean = new FlowBean();

@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sumUpFlow = 0;
long sumDownFlow = 0;
// 累计求和
for (FlowBean value : values) {
sumDownFlow += value.getDownFlow();
sumUpFlow += value.getUpFlow();
}
flowBean.setUpFlow(sumUpFlow);
flowBean.setDownFlow(sumDownFlow);
flowBean.setSumFlow(sumDownFlow + sumUpFlow);
// 写出
context.write(key, flowBean);
}
}
6. 编写Driver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"/input_path", "/output_path"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowCountDriver.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
7. 执行Driver类

可得统计结果与预期一致


七、 FileInputFormat实现类介绍

FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。

1. TextInputFormat

TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
以下是一个示例,比如,一个分片包含了如下4条文本记录。

1
2
3
4
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

每条记录表示为以下键/值对:

1
2
3
4
(0,Rich learning form)
(19,Intelligent learning engine)
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)

2. KeyValueTextInputFormat

每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中添加以下配置来设定分隔符。默认分隔符是tab(\t)

1
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");

以下是一个示例,输入是一个包含4条记录的分片。其中——>表示一个(水平方向的)制表符。

1
2
3
4
line1 ——>Rich learning form
line2 ——>Intelligent learning engine
line3 ——>Learning more convenient
line4 ——>From the real demand for more close to the enterprise

每条记录表示为以下键/值对:

1
2
3
4
(line1,Rich learning form)
(line2,Intelligent learning engine)
(line3,Learning more convenient)
(line4,From the real demand for more close to the enterprise)

此时的键是每行排在制表符之前的Text序列。

3. NLineInputFormat

如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整除,切片数=商+1。
以下是一个示例,仍然以上面的4行输入为例。

1
2
3
4
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

例如,如果N是2,则每个输入分片包含两行。开启2个MapTask。

1
2
(0,Rich learning form)
(19,Intelligent learning engine)

另一个 mapper 则收到后两行:

1
2
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)

这里的键和值与TextInputFormat生成的一样。

4. CombineTextInputFormat

框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
设置虚拟存储切片最大值方法

1
2
//注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

(1)虚拟存储过程:
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
(2)切片过程:
(a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
(c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最终会形成3个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M


八、 KeyValueTextInputFormat实操

1. 需求如下:

统计输入文件中每一行的第一个单词相同的行数。

1
2
3
4
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang

2. 期望输出数据:

1
2
banzhang	2
xihuan 2

3. 编写Mapper类

1
2
3
4
5
6
7
public class KVTextMapper extends Mapper<Text, Text, Text, IntWritable> {
IntWritable intWritable = new IntWritable(1);
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, intWritable);
}
}

4. 编写Reducer类

1
2
3
4
5
6
7
8
9
10
11
12
public class KVTextReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable intWritable = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
intWritable.set(sum);
context.write(key, intWritable);
}
}

5. 编写Driver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class KVTextDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"/input_path", "/output_path"};
Configuration conf = new Configuration();
// 设置切割符
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
Job job = Job.getInstance(conf);
job.setJarByClass(KVTextDriver.class);
job.setMapperClass(KVTextMapper.class);
job.setReducerClass(KVTextReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

6. 执行Driver方法

发现输出结果与期望数据一致


九、 NLineInputFormat实操

1. 需求如下:

对每个单词进行个数统计,要求根据每个输入文件的行数来规定输出多少个切片。此案例要求每三行放入一个切片中。

1
2
3
4
5
6
7
8
9
10
11
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang banzhang ni hao
xihuan hadoop banzhang

2. 期待输出数据:

在日志中打印的分区数为4

1
Number of splits:4

3. 编写Mapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class NLineMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] strArr = line.split(" ");
for (String s : strArr) {
k.set(s);
context.write(k, v);
}
}
}

4. 编写Reducer类

1
2
3
4
5
6
7
8
9
10
11
12
public class NLineReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable(1);
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}

5. 编写Driver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class NLineDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"/input_path", "/output_path"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置每个切片InputSplit中划分三条记录
NLineInputFormat.setNumLinesPerSplit(job, 3);
// 使用NLineInputFormat处理记录数
job.setInputFormatClass(NLineInputFormat.class);
job.setJarByClass(NLineDriver.class);
job.setMapperClass(NLineMapper.class);
job.setReducerClass(NLineReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

自定义InputFormat

无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。

1.需求

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
输入数据为三个文件,所存储的数据分别如下
1.txt

1
2
yongpeng weidong weinan
sanfeng luozong xiaoming

2.txt

1
2
3
4
longlong fanfan
mazong kailun yuhang yixin
longlong fanfan
mazong kailun yuhang yixin

3.txt

1
2
shuaige changmo zhenqiang 
dongli lingu xuanxuan

2. 自定义InputFormat流程

  1. 自定义一个类继承FileInputFormat
    (1)重写isSplitable()方法,返回false不可切割
    (2)重写createRecordReader(),创建自定义的RecordReader对象,并初始化
  2. 改写RecordReader,实现一次读取一个完整文件封装为KV
    (1)采用IO流一次读取一个文件输出到value中,因为设置了不可切片,最终把所有文件都封装到了value中
    (2)获取文件路径信息+名称,并设置key
  3. 设置Driver
    1
    2
    3
    4
    // (1)设置输入的自定义inputFormat
    job.setInputFormatClass(XXXFileInputformat.class);
    // (2)设置输出的outputFormat
    job.setOutputFormatClass(SequenceFileOutputFormat.class);

3. 实现WholeFileInputFormat

1
2
3
4
5
6
7
8
9
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {

@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(inputSplit, taskAttemptContext);
return recordReader;
}
}

4. 实现WholeRecordReader类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class WholeRecordReader extends RecordReader<Text, BytesWritable> {

FileSplit split;
Configuration configuration;
Text k = new Text();
BytesWritable v = new BytesWritable();
boolean isProgress = true;

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
// 初始化
this.split = (FileSplit) inputSplit;
this.configuration = taskAttemptContext.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// 核心业务
if (isProgress){
// 1. 获取fileSystem对象
Path path = split.getPath();
FileSystem fileSystem = path.getFileSystem(configuration);
// 2. 获取输入流
FSDataInputStream fsDataInputStream = fileSystem.open(path);
// 3. 拷贝
byte[] buf = new byte[(int) split.getLength()];
IOUtils.readFully(fsDataInputStream, buf, 0, buf.length);
// 4. 封装kv
k.set(path.toString());
v.set(buf, 0, buf.length);
// 5. 关闭资源
IOUtils.closeStream(fsDataInputStream);
isProgress = false;
return true;
}
return false;
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return k;
}

@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return v;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}

@Override
public void close() throws IOException {

}
}

5. 编写Mapper方法

1
2
3
4
5
6
public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}

6. 编写Reducer方法

1
2
3
4
5
6
7
8
public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
for (BytesWritable value : values) {
context.write(key, value);
}
}
}

7. 编写Driver方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SequenceFileDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"/input_path", "/output_path"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SequenceFileDriver.class);
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);
// 设置输入的inputFormat
job.setInputFormatClass(WholeFileInputFormat.class);
// 设置输出的outputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

十、 实现自定义Partition分区

1. 需求:

在之前统计流量的基础上,将统计结果按照手机归属地不同省份输出到不同文件中

2. 在之前的案例中添加一个自定义分区类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

@Override
public int getPartition(Text text, FlowBean flowBean, int i) {
int partition = 4;
String prePhone = text.toString().substring(0, 3);
if ("136".equals(prePhone)) {
partition = 0;
}else if ("137".equals(prePhone)) {
partition = 1;
}else if ("138".equals(prePhone)) {
partition = 2;
}else if ("139".equals(prePhone)) {
partition = 3;
}
return partition;
}
}

3. 在原案例的Driver方法中添加分区配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"/Users/zhangjia/Desktop/phone.txt", "/Users/zhangjia/Desktop/result_phone"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowCountDriver.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置自定义分区类
job.setPartitionerClass(ProvincePartitioner.class);
// 设定Reducer任务数量
job.setNumReduceTasks(5);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

4. 执行

发现输出结果已经按照手机号前三位进行分区

5. 注意

对于Driver中设置的Reducer Task任务数量
(1)如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
(2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
(3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
(4)分区号必须从零开始,逐一累加。
举例:例如:假设自定义分区数为5,则

DriverTask数 结果
1 会正常运行,只不过会产生一个输出文件
2 会报错
6 大于5,程序会正常运行,会产生空文件

十一、 自定义WritableComparable排序

1. 原理分析

bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

1
2
3
4
@Override
public int compareTo(FlowBean o) {
// todo
}

2. WritableComparable排序案例实操(全排序)

1. 输入数据为之前流量统计的输出数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
13470253144	180	180	360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548
2. 期望输出数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
13509468723	7335	110349	117684
13975057813 11058 48243 59301
13568436656 3597 25635 29232
13736230513 2481 24681 27162
18390173782 9531 2412 11943
13630577991 6960 690 7650
15043685818 3659 3538 7197
13992314666 3008 3720 6728
15910133277 3156 2936 6092
13560439638 918 4938 5856
84188413 4116 1432 5548
13682846555 1938 2910 4848
18271575951 1527 2106 3633
15959002129 1938 180 2118
13590439668 1116 954 2070
13956435636 132 1512 1644
13470253144 180 180 360
13846544121 264 0 264
13966251146 240 0 240
13768778790 120 120 240
13729199489 240 0 240
3. 编写自定义FlowBean类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class FlowBean implements WritableComparable<FlowBean> {

// 上行流量
private long upFlow;
// 下行流量
private long downFlow;
// 总流量
private long sumFlow;

public FlowBean() {
super();
}

public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

@Override
public int compareTo(FlowBean o) {
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
4. 编写Mapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

FlowBean flowBean = new FlowBean();
Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] strArr = line.split("\t");
String phoneNum = strArr[0];
long upFlow = Long.parseLong(strArr[1]);
long downFlow = Long.parseLong(strArr[2]);
long sumFlow = Long.parseLong(strArr[3]);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
flowBean.setSumFlow(sumFlow);
v.set(phoneNum);
context.write(flowBean, v);
}
}
5. 编写Reducer类
1
2
3
4
5
6
7
8
public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
6. 编写Driver方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class FlowCountSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"/input_path","/output_path"};

// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCountSortDriver.class);

// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);

// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
7. 执行

执行后发现结果与期望数据一致

注:如果希望实现区内排序,只需要在之前需求的基础上添加一个自定义Partition类即可。