本文固定链接:https://www.askmac.cn/archives/hadoop-inputoutput.html
Hadoop Input/Output
前几章具体概述了编程模型MapReduce的概念,在第五章末我们开始进一步研究实施Hadoop的方式。本章内容在那一理念上进行延伸。首先,我们会解释压缩方案,接着会对Hadoop的I/O展开详尽的讨论。我们会讲述各种类型的文件,例如Sequence文件和Avro文件。在此过程中,你们会对MapReduce是如何在Hadoop引擎内部运行,有更进一步的理解。
压缩方案
目前为止,你已经了解了MapReduce的基本原理。
MapReduce是一种I/O密集化的过程。降低或优化I/O是提高MapReduce程序运行的关键。
Hadoop Framework提供了几种可行性来减少I/O。第六章我们通过使用Combiner来减少Mapper和Reducer之间的I/O。本章我们将探讨能够最大程度优化MapReduce的I/O运行的压缩方案。
首先我们先快速浏览一下涉及到I/O的各种MapReduce步骤:
- 读取HDFS中的文件时,输入到Mapper中。
- 从Mapper中输出的文件会放到本地磁盘。
- 由于Reducer从Mapper节点中接收文件,Reducer 和 Mapper之间会有网络I/O。
- 从Mapper节点接收分区然排序后,合并到本地磁盘存储到Reducer节点。
- 回读本地磁盘的文件作为记录,可供Reducer中的reduce方法使用。
- 从Reducer输出——这是写回HDFS。
在任何MapReduce程序中,I/O是最昂贵的操作,任何能减少磁盘或者网络I/O,都可以获取更佳的总体吞吐量。Hadoop架构让你可以压缩从Mapper和Reducer中输出的文件,但是却需要权衡一下:压缩会大量占用CPU资源,而且会消耗CPU的周期。任何用于压缩的CPU的周期都将损耗其他程序,比方说Mapper, Reducer, Partitioner, Combiner, Sort 和Shuffle等等。就像在Hadoop中的大多数决定一样,在设计的时候必须要认真地把资源权衡考虑在内(www.askmac.cn)。
压缩也间接影响,能够启动来处理输入数据的Mapper数量。HDFS一个重要的方面就是一个单一文件可以储存在多个块中。设想使用TextInputFormat来消耗的大型文本文件,使用TextInputFormat格式时,文件的每一行都可视为一个记录。但是行不遵从块界限,一行可以随意越过块界限。所以TextInputFormat读取文件时,为了提取剩余的行,也许会对随后的块进行远程调用。相反,处理下一个块的Mapper必须跳到行末,那是因为它知道另一个Mapper会在开头处理一部分记录。这样就提高了效率,因为一个单一的文件可以分配到多个Mappers中。只要Mappers了解它在消耗的块,以及是否第一个块就是起始块还有紧随最后一个块之后的块地址,Mapper继续处理起来都绰绰有余。
但是使用压缩文件的时候会发生什么呢?即使我们知道要压缩的文件是由什么块组成的,只是简单地给Mappers指定块还是不够的。在压缩文件中,是不可能在任何任意点启动的。
一些压缩方案是可拆分的,意思就是说很可能只读取部分文件,而且那些部分还没办法压缩。对那些不可拆分的压缩文件,除了用一个Mapper来对应消耗一个文件,没有其他的选择。最有可能的情况是这个文件的大多数块不是服务于本地有两种含义的Mapper结点。首先,如果一个文件很大的话,一个单个的Mapper 会控制map-side阶段的运行。这个单个长期运行的Mapper会延缓Reduce阶段,因此会影响整个任务运行。第二,如果输入的文件大,数量少,那么只有少数的Mappers可以启动。所以即使文件差不多大,但是对应的数量是有限定的,那么整个工作也会拖延。这样的情况下,使用未压缩的文件可能最好不过了。
其他文件输入格式,例如会在本章节随后探讨的SequenceFileInputFormat在未压缩和压缩形式下也是可以拆分的。文件格式和压缩方案的选择对运行性能的好坏有着至关重要的影响。
那么什么可以被压缩呢?
可以根据下列标准执行压缩:
- 输入文件压缩:如果输入的文件是可以压缩的,在Mapper里读取输入的块时,就会执行比较少的I/O。然而,不压缩文件的时候,就会拓展CPU的周期。
- 压缩中间Mapper输出:在MapReduce任务中,Mapper在Mapper节点上产生了中间文件。最终的Reducer将这些文件分隔开,经Reducer密钥排序,最终通过HTTP协议,被Reducer下载。当文件从Mapper读取到本地磁盘,还有分区从Mapper节点转移到Reducer节点的时候,压缩这个输出就减少了文件的I/O。
- 压缩MapReduce任务输出:无论这项工作是否采用Reduce阶段都是可以适用的,它适用于针对Map-only任务的Mapper输出以及使用针对reducer阶段的任务而采用的Reducer输出(www.askmac.cn)。
一般说来,在每个阶段使用压缩都是有效的。对于Mapper和Reducer来说。一般不会消耗大量CPU,可以有足够的周期来压缩(或者解压缩)。
压缩方案
压缩方案可以在三个标准的基础上进行评估。
- 压缩文件的大小;越小越好,因为这样在执行到硬盘和网络上的读或者写时,能产生更少的I/O。
- 压缩文件花费的时间;时间越快越好。
- 压缩文件的格式是否是可拆分的。通过许可一个单个文件由多个Mappers处理,一个可拆分的格式可以帮助实现更好的并行运算。
这里要权衡一下。你能获得好的压缩,但是(当使用更快的压缩方案时),会消耗更多的CPU周期。你应该作出一个务实的选择:选择一个快速而又不占用CPU的压缩方案来实现高质量的压缩。这个方案能在不占用Mapper和 Reducer的CPU的情况下启用空闲的CPU周期。
表7-1比较了各种比较流行的压缩方案。需要注意的是大小和速度是想关联的。对于大小和速度来说,GZIP是一个好的折中方案,但是却是不可拆分的;BZIP2会产生更小,而且可以拆分的文件,但是速度相对来说却比较缓慢。LZO和Snappy都可以拆分,速度也快,但是却无法像其他格式一样尽可能多地压缩(www.askmac.cn)。
表7-1. 流行压缩方案的比较
Scheme | Compressed Size | Speed | Splittable |
GZIPBZIP2LZOSnappy | ModerateSmallLargeLarge | MediumSlowFastFast | NoYesYesYes |
我们提供一些数字来进行权衡,尽管Snappy在数量级上比GZIP压缩方法快很多,但是输出的文件却比GZIP大20-100%。Snappy是谷歌发明的,现在广泛应用于它内部的MapReduce和派生架构。
Hadoop 框架拥有编解码器来执行文件压缩和解压。(一个编解码器是用来执行CompressionCodec类)。获得支持的编解码器的列表优先由io.compression.codecs,以core-site.xml.格式提供。它的价值就在于这是一个完全合格的逗号分隔列表。
CompressionCodec 实施类。例如,CompressionCodec实施类是来操控GZIP,方案是org.apache.hadoop.io.compress.GzipCodec.
Snappy 方案同样的等级是org.apache.hadoop.io.compress.SnappyCodec.
压缩在默认情况下是不启用的。在mapred-site.xml中设置的下列属性的值为true值来启用(www.askmac.cn):
- output.fileoutputformat.compress 是用来执行输出工作的。
- map.output.compress用来中间mapper输出。
如果压缩得到授权,没有详细说明的编解码器的话,使用的默认编解码器是org.apache.hadoop.io.compress.
DefaultCodec使用的是DEFLATE压缩格式。这与GZIP包括的没有附加标题的GZIP相同。默认的编解码器可以通过下面的属性,在mapred-site.xml中更新和配置:
- output.fileoutputformat.compress.codec,为任务输出配置默认的解编码器。
- map.output.compress.codec 为中间mapper输出配置默认的解编码器。
启用压缩
我们想启用压缩或者使用专门的压缩工具来读取压缩文件时,该如何向the Hadoop 框架指出呢?对于Mapper输入来说,我们什么都不需要做,只要按照要求,将CompressionCodec implementation class以io.compression.codecs的性质配置,然后以类路径的结构呈现,从文件拓展名推断出压缩方案,并做需要做的事情。至于阶段的其他部分,我们需要明确告诉框架使用压缩(www.askmac.cn)。
为了对输出的文件进行压缩(不是中间Mapper输出),在配置作业的时候我们添加了下面几行。
Configuration conf = new Configuration(); Job job=new Job(conf); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);
这个任务是你曾见过的Job类的实例。FileOutputFormat是输出格式的基本类,可以把文件写入HDFS中,包括会在本章后面遇到的TextOutputFormat和SequenceFileOutput。我们在以前的例子中用过GzipCodec。
为了使用Snappy codec,我们用下面这一行:
FileOutputFormat.setOutputCompressorClass(job,SnappyCodec.class);
为了对从the Mapper输出的中间文件进行压缩。我们用接下来的几行:
Configuration conf = new Configuration(); //Compress the Mapper Intermediate outoput //Use this only for jobs with Reduce phase //or mapred.compress.map.output which is deprecated but widely used conf. setBoolean("mapreduce.map.output.compress ",true); //or mapred.map.output.compression.codec is deprecated but widely used conf. setClass("mapreduce.map.output.compress.codec ",GzipCodec.class, CompressionCodec.class); //Compress job output FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job,GzipCodec,class); Job job=new Job(conf);
Hadoop I/O内部处理
这部分内容主要检查InputFormat和OutputFormat类别。但是在我们展开详细讨论之前,我们要解释一下这些类是如何读写数据的。我们用以前用过的两个类别来解释:
- TextInputFormat:这一类别可以作为输入格式来读取航空公司数据集(www.askmac.cn)。
- TextOutputFormat: 这一类一直是为所有的MapReduce程序输出。
InputFormat
InputFormat在MapReduce任务中执行下列作用:
- 验证工作规范,确保输入的持续配置。
- InputFormat完成与他们各自的RecordReader实施紧密相关的任务,把由InputSplit呈现的字节转换为代表Mapper类输入的键值类实例。
InputSplit的分析
InputFormat把输入文件分割成具有逻辑性的InputSplit,其中一个就是给每个Mapper服务的。这里我们需要讲一下InputSplit和HDFS block的区别。后者代表磁盘上输入数据的物理分区。前者是由Mapper消耗的数据逻辑分区。在某种程度上,InputSplit包含多个块,可以对客户端代码透明。以TextInputFormat为例:不能保证每行只跨越一个块,还有一些在块边界的行可以跨越多个块。RecordReader用来执行TextInputFormat格式的LineRecordReader,为客户端管理这个。InputSplit也许能从不同于主块的一个块中捕捉一个片段记录。这个块可以存放在不同的节点上,也可能带来没有必要的通信费用,由于很少发生,成本可以忽略不计。从Mapper的角度来说,这种远程通信可以抽象出来。Mapper只能看到整个的记录。
InputSplit有下列两种方法:
- int getLength():这种方法根据InputSplit的字节数返回大小。
- String[] getLocations():这种方法提供了存放InputSplit数据的主机名称的名单。这些存储单元是MapReduce 框架用来提示分隔开来的Mappers,从而使它的数据获取最大的局域性(www.askmac.cn)。
InputFormat的分析
可以用两种方法来解释InputFormat:
- List<InputSplit> getSplits():这个方法是被客户端程序调用来获得分片列表,然后写入这些列表到应用主服务,从HDFS中接收这些文件来创建基于每个InputSplit位置的资源请求。
- RecordReader<K,V> createRecordReader():这个方法是由在容器中执行的Mapper任务调用,为the Mapper处理的InputSplit返回RecordReader实例。
作为InputFormat 子类的FileInputFormat类,有isSplittable()方法,能够指明文件是否是可以拆分的。默认是可以的,每个文件可以产出多个部分。回想一下一个Mapper实例处理InputSplit的实例。因此一个可拆分的InputFormat许可多个Mappers来处理每个输入的文件。可拆分的输入格式要想展开是比较复杂的,但是却是很有效率的,因为他们允许在map阶段更高程度的并行性。如果isSplittable()方法的返回值是错误的,那么整个的文件就会作为一个单一的InputSplit,也就意味着所有组成文件的块被相同的Mapper处理。一些快在本地mapper,但是大多数块在文件太大的时候不会在本地。一个不可拆分的InputFileFormat更易于展开,但是相较可以拆分的InputFileFormat来说,却是没有效率的。
然而,有时无法生成一个可拆分的InputFileFormat格式。例如,即使优先的InputFormat是可拆分的(比方说TextInputFormat),如果需要处理没有可拆分压缩方案的的压缩文件,InputFormat也默认为不可拆分模式。在可拆分的条件下,输入的文件通常可以分成分片,那些分片建立在组成文件的块的基础之上。每个输入切片的块的数量可以经由mapred-site.xml中下列属性来设置。
- input.fileinputformat.split.minsize – 默认值是1
- input.fileinputformat.split.maxsize – 通常设置为Long.MAX_VALUE
- blocksize – 默认是128 MB
三个内容一起来定义每个输入切片的数量,所以必须要满足下列三个约束条件:
mapreduce.input.fileinputformat.split.minsize< dfs.blocksize< mapreduce.input.fileinputformat.split.maxsize
大多数情况下会致使一个切片为1个块。设置mapreduce.input. fileinputformat.split.minsize的值比dfs.blocksize大的话,会导致每个切片不止一个块。这样的话就不是最理想的效果,因为这样在输入切片的时候,会增加非本地块的数量。mapreduce.input.fileinputformat.split.max比the dfs.blocksize小的话,会产生一定的影响。会迫使切片小于一个块。回想在运行时可以通过作业配置来设置的最小和最大的切片大小,以及the dfs.blocksize的默认值(128 MB),它可以为每个文件配置。因此每个切片的块数量可以为每个运行的作业而设置。
TextInputFormat
TextInputFormat类启用一种InputFormat格式,可以把简单的文本文件作为文本行来处理。行被假设被换行符和回车字符隔开。The TextInputFormat拓展了抽象类FileInputFormat(反过来可以启用InputFormat),它是建立在输入模式上的所有文件的基本类。使用TextInputFormat方法的TextInputFormat启用程序比较复杂。对于未经压缩的文件来说,TextInputFormat是可拆分的,对于可压缩文件来说,只有底层压缩方案可以拆分,那这个压缩文件才可以拆分(www.askmac.cn)。
对于TextInputFormat来说,相应的RecordReader子类是LineRecordReader。输入文件的每一行文本都可以视为一个记录。LineRecordReader负责生成一个密钥,是文件开头每行的偏移(LongWritable的实例),是整行文本的值(Text的实例)。就像之前讨论过的,一行很有可能越过它起初的块边界。相关联的LineRecordReader类足够强大,可以处理这种经常会涉及网络调用的状况,原因在于完成这一行的数据块可能不在同一个节点上。
OutputFormat
OutputFormat 实施角色是双重的
- 它验证作业输出的规范性,如果不存在输出文件夹,它就会校验。
- 提供一个RecordWriter启用的实例,用来写到输出文件。RecordWriter负责转换输出密钥的java实例和将值转化为原始字节。
TextOutputFormat
TextOutputFormat类把作业的输出规范性称为由文本行组成的文本文件。就像一个InputFormat 类与一个RecordReader类关联, OutputFormat也与一个RecordWriter有关。RecordWriter 被TextOutputFormat的LineRecordWriter实现利用,转化作业键值对输出为一行文本,此时键和值是第一次被转化为字符串,然后写出到一行以tab分隔。如果密钥是NullWritable,就只需要写出值部分即可(www.askmac.cn)。
随后的部分讨论如何创建自定义的OutputFormat,然后再讨论InputFormat 类。因为我们想用自定义的输出格式写一个MapReduce 作业结果,所以我们就依照这个顺序进行。然后我们会创建一个自定义的输入格式来从另一个MapReduce 作业中消耗这个专门的输出格式。这样,你无需用专门的自定义输入格式,来创建用于消耗的输入文件。
像往常一样,我们创建一个案例,然后使用Hadoop APIs来解决。
自定义输出格式:从Text到XML的转变
这一部分探讨如何把详细的文本文件转换成一个仅由延误信息组成的大型XML文件。样本XML文件展示在列表7-1.。
Listing 7-1. XML 样例文件
<recs> <rec> <key>0</key> <year>1988</year> <month>01</month> <date>18</date> <dayofweek>1</dayofweek> <depdelay>89</depdelay> <arrdelay>89</arrdelay> <origin>STL</origin> <destination>MSP</destination> <carrier>NW</carrier> </rec> <rec> <key>98</key> <year>1988</year> <month>01</month> <date>18</date> <dayofweek>1</dayofweek> <depdelay>0</depdelay> <arrdelay>6</arrdelay> <origin>SNA</origin> <destination>SFO</destination> <carrier>AA</carrier> </rec> <!- More rec tags --> </recs>
注意<key>的值是记录的关键,因为它是由TextInputForma返回的,碰巧是文件起始行的字节偏移。这是文本文件内部记录开始的字节偏移(www.askmac.cn)。
常用的OutputFormat被org.apress.prohadoop.c7.XMLOutputFormat类定义。
使用这种常用的OutputFormat格式的 MapReduce程序,被org.apress.prohadoop. c7.TextToXMLConversionJob类定义。
列表7-2显示这种类别的run()方法。只有OutputFormat的定义有所区别。
Listing 7-2. TextToXMLConversionJob.run()
public int run(String[] allArgs) throws Exception { Job job = Job.getInstance(getConf()); job.setJarByClass(TextToXMLConversionJob.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(XMLOutputFormat.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(TextToXMLConversionMapper.class); job.setNumReduceTasks(0); String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs(); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); return 0; }
这是Map-only的作业,the Mapper 类列在了列表7-3.
注意 Mapper 类完全忽略了所使用的OutputFormat。我们可以不去管使用的OutputFormat,重新使用Mapper 和Reducer类
Listing 7-3. TextToXMLConversionMapper
public static class TextToXMLConversionMapper extends Mapper<LongWritable, Text, LongWritable, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (!AirlineDataUtils.isHeader(value)) { int month = Integer.parseInt( AirlineDataUtils.getMonth(value.toString().split(","))); context.write(key, value); } } }
最终,我们可以执行常用的OutputFormat类。这一类的名称是org.apress.prohadoop. c7.XMLOutputFormat。
列表7-4显示了这一类的实现框架。我们可以注意到实际上它使用一个RecordWriter来实现输出。The XMLRecordWriter类具备所有必要的写到XML格式的文件的回调。初始的<recs>标签在构造函数中写出,t close()方法关闭最外层的<recs>标签。为了响应Mapper(或者Reducer) 中的context.write(),writeTag()方法反复被调用(www.askmac.cn)。
Listing 7-4. XMLOutputFormat and XMLRecordWriter
package org.apress.prohadoop.c7; /*Import Statements*/ public class XMLOutputFormat extends FileOutputFormat<LongWritable,Text> { protected static class XMLRecordWriter extends RecordWriter<LongWritable, Text> { private DataOutputStream out; public XMLRecordWriter(DataOutputStream out) throws IOException { this.out = out; out.writeBytes("<recs>\n"); } private void writeTag(String tag,String value) throws IOException{ out.writeBytes("<"+tag+">"+value+"</"+tag+">"); } public synchronized void write(LongWritable key, Text value) throws IOException { out.writeBytes("<rec>"); this.writeTag("key", Long.toString(key.get())); String[] contents = value.toString().split(","); String year = AirlineDataUtils.getYear(contents); this.writeTag("year", year); //Remaining this.write for various tags out.writeBytes("</rec>\n"); } public synchronized void close(TaskAttemptContext job) throws IOException { try { out.writeBytes("</recs>\n"); } finally { out.close(); } } } public RecordWriter<LongWritable,Text> getRecordWriter( TaskAttemptContext job) throws IOException { String extension = ".xml"; Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(job.getConfiguration()); FSDataOutputStream fileOut = fs.create(file, false); return new XMLRecordWriter(fileOut); } }
输出写过程如下:
- Hadoop框架通过调用XMLOutputFormat 类(OutputFormat的一种实现方式)上的getRecordWriter()方法,关联XMLRecordWriter(RecordWriter的子类)。
- The RecordWriter配置输出的特征,包括文件的前缀名和拓展名。代码列表中提供了扩展名。文件默认的前缀是part, 可以通过提供一个具有mapreduce.output.basename属性的自定义值来重写。而且可以通过下列命令行上的通用参数得到配置:
-D mapreduce.output.basename=airlinedata
要注意getRecordWriter方法是如何配置文件扩展的。到目前为止,我们执行过的输出文件没有扩展。这个例子中我们用XMLOutputFormat格式的getRecordWriter方法,调用下面两行来定制自定义的扩展(www.askmac.cn):
String extension = ".xml"; Path file = getDefaultWorkFile(job, extension);
这些设置会使得输出的文件有下列命名格式:
airlinedata-m-nnnnn.xml
在这个地方 m 表示 Mapper产生文件,n 表示0-9中的一个数字。如果任务有一个reduce 阶段,输出文件名的格式就是下面这样:
airlinedata-r-nnnnn.xml
这里r 表示 Reducer产生文件,n 表示0-9间的一个数字。
- 实际输出是框架不断调用 RecordWriter实例的write()方法而产生的,例如在Mapper中的一个Map-only作业和一个作业的Reducer中reduce阶段调用wirte()。
在集群上运行Text-to-XML作业
最后,我们使用Maven重建并在Hadoop环境下执行作业:
hadoop jar prohadoop-0.0.1-SNAPSHOT.jar \ org.apress.prohadoop.c7. TextToXMLConversionMRJob \ -D mapreduce.output.basename=airlinedata \ /user/hdfs/sampledata \ /user/hdfshdfs/output/c7/xml
输出目录有我们定义的自定义命名的文件。一个样本文件的命名是airlinedata-m-00000.xml
定制的Input Format:消费一个定制的XML文件
这部分会演示输入的时候该如何来消费一个XML文件。TextInputFormat只支持一行中包含一个记录的文件,但是XML文件不能依靠只有一行,一个记录的语义。我们需要写一个自定义的InputFormat类;给它定义的类是org.apress.prohadoop.c7.XMLDelaysInputFormat。(方便起见,我们自定义的类XMLDelaysInputFormat实现了FileInputFormat。)
InputFormat类依靠RecordReader来读取输入的字节数,并且将它们反序为键值对类的实例(www.askmac.cn)。
我们自定义的Record Reader类是org.apress.prohadoop.c7.XMLDelaysRecordReader。
列表7-5显示了XMLDelaysInputFormat类。
Listing 7-5. XMLDelaysInputFormat
public class XMLDelaysInputFormat extends FileInputFormat<LongWritable, DelaysWritable> { @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<LongWritable, DelaysWritable> createRecordReader(InputSplit split, TaskAttemptContext context) { return new XMLDelaysRecordReader(); } }
前面讲的实现方式意味着两点
- 为了说明这个观点 ,我们把常用的InputFormat定义为不可拆分,所以一个Mapper处理一个文件。但是我们前面已经探讨过这不是处理文件最有效的方式。没有必要把InputFormat设置为不可拆分,TextInputFormat也是如此,它会在出现换行符的时候确定每一行,XML文件出现特定的起始和结束标记时,可以发现记录。一个可拆分的InputFormat版本需要识别XML文件通过块拆分和获取到一个可能的远程块(或多个),来完成XML文件,从InputSplit的主块开始。因为XML解析操作只能在完整的XML文件上进行,所以处理InputSplit的Mapper必须处理整个XML文件。
- 对HDFS (InputSplit)中每个文件来说,有一个XMLDelaysRecordReader实例(www.askmac.cn)。
列表7-6显示了XMLDelaysRecordReader类的骨架。
Listing 7-6. XMLDelaysRecordReader
/*Package and import declaration*/ class XMLDelaysRecordReader extends RecordReader<LongWritable, DelaysWritable> { /*Configure the XML Reader with the file level tag and the record level tag as Job configuration properties */ public static final String FILE_START_TAG_KEY ="xmlfile.start.tag"; public static final String RECORD_START_TAG_KEY ="record.start.tag"; /*XML specific state variables*/ private byte[] xmlFileTag; private byte[] recordTag; private byte[] recordEndTag; private String recordTagName = ""; private long start; private long end; /*Hadoop API specific variables*/ private FileSplit fileSplit; private Configuration conf; private FSDataInputStream in = null; /*Mapper input key/value instances*/ private LongWritable key = new LongWritable(-1); private DelaysWritable value = new DelaysWritable(); private DataOutputBuffer buffer = new DataOutputBuffer(); /*Initialize the process of XML Parsing. *Set the file pointer to the start *of the first record */ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException,InterruptedException { xmlFileTag = taskAttemptContext.getConfiguration().get(FILE_START_TAG_KEY).getBytes(); recordTag=("<"+ taskAttemptContext.getConfiguration().get(RECORD_START_TAG_KEY) + ">").getBytes(); recordTagName = taskAttemptContext.getConfiguration().get(RECORD_START_TAG_KEY); recordEndTag = ("</"+ taskAttemptContext.getConfiguration().get(RECORD_START_TAG_KEY) + ">").getBytes(); this.fileSplit = (FileSplit) inputSplit; this.conf = taskAttemptContext.getConfiguration(); start = fileSplit.getStart(); end = start + fileSplit.getLength(); FileSystem fs = fileSplit.getPath().getFileSystem(conf); this.in = fs.open(fileSplit.getPath()); this.in.seek(start); //Read until start of the first record. Skip the <recs> tag readUntilMatch(xmlFileTag, false); } /*Method is invoked by the Mapper classes run() method *to fetch the next key value class */ public boolean nextKeyValue() throws IOException { if (this.in.getPos() < this.end && readUntilMatch(recordTag, false)) { buffer.write(this.recordTag); if (readUntilMatch(this.recordEndTag, true)) { key.set(key.get() + 1); DelaysWritabledw = this.parseDelaysWritable(this. createInputStream(buffer.getData())); value.setDelaysWritable(dw); this.buffer = new DataOutputBuffer(); return true; } } return false; } /*The Mapper run() method will call getCurrentKey() *and getCurrentValue() if nextKeyValue( method returns true. *This indicates that there are more records and the *record has been deserialized */ @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public DelaysWritable getCurrentValue() throws IOException, InterruptedException { return value; } /*Used to calculate the progress of the Mapper which is indicated *in the MR console */ @Override public float getProgress() throws IOException, InterruptedException { float f = (float)(this.in.getPos()-this.start)/(float)(this.end-this.start); return f; } /*When the nextKeyValue() returns false the run() method the mapper *will close the RecordReader and call the close() method *of the Mapper class */ @Override public void close() throws IOException { this.in.close(); } /*Logic to parse an XML record from the XML file. *Read until the closing tag is encountered. */ private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { //Skipping implementation. Populates the DataOutputBuffer } /*Convert an XML record <rec>...</rec> into a *DelaysWritable instance */ private DelaysWritable parseDelaysWritable(InputStreamdelaysXML) { DelaysWritable dw = new DelaysWritable(); //Parse into DelaysWritable instance Return dw; } /*Create an InputStream from a byte array representing *an XML record string */ private InputStream createInputStream(byte[] bytes) throws java.io.UnsupportedEncodingException { String xml = (new String(bytes)).trim(); return new ByteArrayInputStream(xml.getBytes()); } }
现在我们准备讨论XMLDelaysRecordReader类的运行。它的特点如下:
- 1.配置参数的时候,需要通过文件级标签和记录级标签。这种情况下,他们的值如下:
xmlfile.start.tag=recs record.start.tag=rec
- 初始化the Mapper之前,XMLDelaysRecordReader 上的initialize()方法被the MapReduce架构调用。由XMLDelaysRecordReader实例处理的InputSplit实例被传到initialize()方法。既然自定义的InputFormat是FileInputFormat的子类,那么InputSplit实现利用的是FileSplit类。因为我们定义的InputFormat是不可拆分的, FileSplit实例就代表一个完整的输入文件。如果定义的InputFormat是可以拆分的,,那么FileSplit实例就只代表输入文件的一部分,但是还是需要完整的XML文件集合。初始化进程开始消耗the InputSplit中的字节数,并会带着文件指针穿过文件级标签(<recs>),在字节处代表第一个<rec>标签。
- The Hadoop实时引擎现在会创建一个org.apache. hadoop.mapreduce.Mapper.Context类的实例,然后用前面初始过的XMLDelaysRecordReader对它进行配置。
- .在Mapper 上的run()方法被前一步已经配置完成的Hadoop 架构org.apache.hadoop.mapreduce.Mapper.Context调用。检索到XMLDelaysRecordReader的记录后,Mapper上的map方法将会在定义的Mapper上反复被调用。接下来会讲述这一调用的内部工作(www.askmac.cn)。
分析 org.apache.hadoop.mapreduce.Mapper.run() 方法
org.apache.hadoop.mapreduce.Mapper类上的run() method有下列执行方式。要强调的方法是通过用户定义的Mapper类实现的方法。
public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
运行方法执行下列步骤
- setup() 方法受Mapper调用. setup() method的实现的是用户在自定义Mapper类中的用户定义。
- RecordReader实例上的这些方法通过Context实例被调用,来检索成功被传递到map中的输入键值对。
- XMLDelaysRecordReader的nextKeyValue()方法负责解析文件中的字节,并把它们转换成LongWritable实例 (key 实例)和DelaysWritable实例 (value 实例)。如果下一个记录可以成功解析,这种方法就可以施行。如果没有更多的<rec>标签实例,其返回失败。
- 如果nextKeyValue()方法返回成功,会调用the getCurrentKey() 和 getCurrentValue()方法,来获取随后会被用来调用用户定义map()方法的键值对的实例。
- If the nextKeyValue() method returns false, the cleanup() method is invoked on the This is also a user-defined method.
- Finally, the Hadoop Framework invokes the close() method on the RecordReader that completes the Mapper life
- 如果nextKeyValue()方法行不通,就会调用Mapper上的cleanup()方法。这也是一种用户定义的方法。
- 最后,Hadoop Framework调用负责完成Mapper生命周期的RecordReader上的cleanup()方法。
如果run()方法被覆盖的话,就没有必要启用map()方法。回想一下Aggregation使用案例的执行(第6章讨论过Aggregation),在这种情况下,输入是要依照“GroupBy”标准进行排序。通过处理多个输入记录,run()方法可以执行中间聚合操作。可以完全忽略Map方法调用。The run()方法准确知道何时再没有更多的记录来处理,而且它可以输出的最终聚合结果。列表7-7中展示了这样的执行例子,就是假设输入的是一个XML文件,文件中的记录是按月来排序的。我们每个月计算记录的总数,我们从基本的org.apache.hadoop.mapreduce.Mapper类上覆盖run方法,而不是覆盖map方法。这是一个关于多个记录是如何在Mapper中被一起处理的例子(www.askmac.cn)。
需要注意的是,我们已经使用这种技巧模拟了一个更有成效的Combiner实现。前面的章节已经了解到Combiner是应用于the Partitioner的输出。用这种更高效的版本,每个Mapper的输出是聚合操作,应用于文件中的每月条目。这里不需要map()调用。要注意可以用Mapper来达到这个目的——通过拓展map()方法,特定的实例也是多变的,就像我们目前在书中所做的那样。但是使用map()方法,建立在实现基础上,必须采用cleanup()方法来进行最后的写,因为处理最后一个记录时,是不知道使用map()方法的。这种执行可以作为一个例子来展示如何覆盖run()方法。也要注意我们还需要一个Reducer来按月产生最终计数。如果我们用一个可拆分的InputFormat或者如果属于单月的记录穿过文件边界,由Mapper计算的聚合就只是起媒介作用。Reducer负责确保这些中间聚合在输出程序中完全聚合。
Listing 7-7. AdvancedGroupBySumMapper
public class AdvancedGroupBySumMapper extends Mapper<LongWritable, DelaysWritable, IntWritable, LongWritable> { public void map(LongWritable key, DelaysWritable value, Context context) throws IOException, InterruptedException { //Do nothing } @Override public void run(Context context) throws IOException, InterruptedException { setup(context); IntWritable month = new IntWritable(0); LongWritable total = new LongWritable(0); while (context.nextKeyValue()) { LongWritable key = context.getCurrentKey(); DelaysWritable dw = context.getCurrentValue(); if(month.get()==dw.month.get()){ total.set(total.get()+1); } else{ if(month.get()>0){//Skip the first iteration context.write(month, total);//Write intermediate total } month = new IntWritable(dw.month.get()); //Reset total to one as it is first record total = new LongWritable(1); } } //Write the last aggregation for this file if there were any //records which is indicated by month.get() being > 0 if(month.get()>0){ context.write(month, total); } cleanup(context); } }
CompositeInputFormat 和大型连接
这部分我们来探讨InputFormat的一种特殊类型,可以用来联接大表。CompositeInputFormat可以联接数据集,使用map-side联接,根据联接关键字来进行排序的。就像第六章讲过的那样,map-side joins比map-reduce joins更快,因为它避免了Reducer阶段相伴产生的昂贵的Sort 和Shuffle阶段。map-side操作通过扫描输入文件,可以使用很高的并行来达到很好的性能。
首先,我们以航线数据集为基础,创建一个问题。我们想创建一个每天从出发地到目的地的转机列表。假设转机在同一天出发和到达。我们创建了下列两个数据集:
- 以YYYY-MM-DD的格式对出发地机场代码和出发日期进行排序;例如:SFO-2014-12-01。
- 以YYYY-MM-DD格式对目的地机场代码和到达日期进行排序;例如:SFO-2014-12-01.
每个经过排序的文件可以使用KeyValueInputFormat来读取。这也就暗示一个分隔符和一个值都跟随着一个关键字。默认的分隔符是一个标签(\t),但是它可以选择是任何字符。这个作业需要通过设置mapreduce.input.keyvaluelinerecordreader.key.value.separator属性来配置一个自定义的分隔符。在这种情况下,我们假设这个关键字就是联接关键字,可以把机场代码和出发或者到达日期结合在一起(www.askmac.cn)。
接下来,我们就来配置列表7-8中显示的run()方法。
Listing 7-8. LargeMapSideJoin.run()
public int run(String[] allArgs) throws Exception { String[] args = new GenericOptionsParser(getConf(),allArgs).getRemainingArgs(); Job job = Job.getInstance(getConf()); job.setJarByClass(LargeMapSideJoin.class); //Input Paths Path flightByOriginAndDt = new Path(args[0]); Path flightByDestinationAndDt = new Path(args[1]); job.setInputFormatClass(CompositeInputFormat.class); //Configure join String strJoinStmt = CompositeInputFormat.compose("inner",KeyValueTextInputFormat.class, flightByOriginAndDt, flightByDestinationAndDt); job.getConfiguration().set("mapred.join.expr", strJoinStmt); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(LargeMapSideJoinMapper.class); job.setNumReduceTasks(0); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.waitForCompletion(true); return 0; }
这种方法的关键特点如下:
- 首先我们给两个需要联接的数据集定义输入路径。我们还要假定每个路径代表一个由TextOutputFormat产生的单个文件。每个文件有许多行,每一行包含以 tab(“\t”)分隔的一个键值对,这些行是根据关键字来排序的。我们的目标就是在这个关键字的基础上,把这两个输入文件联接在一起。
- 接下来,我们把CompositeInputFormat定义为输入格式,并调用compose方法来对它进行配置。配置参数如下:
Type of join: The “inner”值配置了一个内部联接。“outer”值应该用来配置一个完整的外部联接。注意使用mapred.join.expr使用来配置在作业实例中下一行的连接标准(www.askmac.cn)
Delegate InputFormat to consume inputs: CompositeInputFormat 代表把每一个输入路径读取到另一个适合消耗输出的InputFormat。我们利用KeyValueTextInputFormat。这种InputFormat假定输入的是文本的行。每一行包含由mapreduce.input.keyvaluelinerecordreader.key.value.separator属性来定义的字符分隔开的键值对。默认的分隔符是tab(“\t”)。因为我们输入的文件有被分隔符隔开的键值对,我们就利用这种默认设置。如果分隔符字节不在一行,整行就会返回,这样键值对将为空值。底层的RecordReader实现被KeyValueTextInputFormat,为KeyValueLineRecordReader。其返回一个底层的键值对作为文本实例的输入。
Input Path instances: compose()的最后两个参数是需要联接在一起的数据集的路径
- 备注:作为代表利用InputFormat(我们所举例子中的KeyValueTextInputFormat)。我们假定CompositeInputFormat设置一个文件集合(一共包含两个文件,分别来自两个路径)被进行最小化配置的一个mapper处理,等同于联接在一起的一个文件Long.MAX_VALUE。然而,如果在联接的每一边的路径都有多个文件,每个路径就必须包含特定的文件例如其中的记录是全部记录安装关键字总排序后的结果。第六章我们探讨过总排序了。意思就是根据字母顺序,对文件名进行排序,然后添加进一个单独的文件,因子创建的必须任然保证按它们的键排序。另外一个需要使用CompositeInputFormat来排序的约束就是,代表联接每边的路径有相同的文件数量,在一个路径中的每一个文件中,必须包含在另一个路径中的相应文件的相同的键范围。
使用的CompositeInputFormat的一个独有特点是我们配置的Mapper。列表7-9显示了LargeMapSideJoinMapper类。 Mapper被键值对类配置:
- Text:这是一种常用的连接关键字,底层的KeyValueLineRecordReader实例消费各自的输入路径,将返回一个关键字的Text实例。既然我们在例子中使用内部联接,CompositeInputFormat就利用InnerJoinRecordReader类作为它的RecordReader。InnerJoinRecordReader是执行连接功能的JoinRecordReader的子类。InnerJoinRecordReader消耗由两个底层的KeyValueLineRecordReader返回的关键字实例。如果它们是相同的,服务这个实例作为关键字来从map调用LargeMapSideJoinMapper类(www.askmac.cn)。
- TupleWritable: InnerJoinRecordReader会返回这类的实例,作为map调用的价值参数。这里有两个Writable实例的列表,分别来自两个不同的输入路径。表中Writable实例的排序顺序对compose()调用中使用run()方法的输入路径的顺序至关重要。既然我们下面的输入模式是KeyValueTextInputFormat,这里也有Text类的例子。每个Text实例是KeyValueLineRecordReader所返回的记录值。
Listing 7-9. LargeMapSideJoinMapper
public static class LargeMapSideJoinMapper extends Mapper<Text, TupleWritable, Text, Text> { public void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException { Text connectingFlight = new Text(""); if (value.toString().length() > 0) { String originAtts[] = value.get(0).toString().split(","); String destAtts[] = value.get(1).toString().split(","); String oFN = AirlineDataUtils.getFlightNum(originAtts); String oDT = AirlineDataUtils.getDepartureTime(destAtts); String dFN = AirlineDataUtils.getFlightNum(destAtts); String dAT = AirlineDataUtils.getArrivalTime(destAtts); //Add logic to ensure originDepTime > depArrTime connectingFlight.set(key+","+oFN+","+oDT+","+dFN+","+dAT); context.write(key, connectingFlight); } } }
注意下面这些在TupleWritable实例上的调用在Mapper内部:
- get(0): 获取从出发地出发地起始航班的航行数据。
- get(1): 获取到达起始地(或者从到达航班的角度来说是目的地)的到达航班的航行数据。
注: 为了简化代码,我们略去逻辑操作,来确保出发的航班时间(以起始地来说)必须比到达的的航班时间(以起飞时刻来说)早。
最终,我们把这两个数据集的航线整合在一起,然后输出。要注意的是Mapper会从两个数据集中获取航线,这样所有的排列都可以计算到。例如,如果我们接收同样的机场和日期结合的三个到达航班和两个出发航班,我们就有六个配对,map()方法会通过六个单独调用 Mapper 来接收所有六个对(www.askmac.cn)。
Map-Side是怎样把工作和排序过的数据集连接在一起的呢
这部分着重讨论数据集怎样通过关键字排序,被连接到map side上。
为了执行内部连接,我们需要从头开始浏览两个数据集。无论是否在2个数据集中遇到相同的关键字。按照前者的情况,我们会用同样的关键字产生一对记录。如果是后者那种情况,我们开始跳过数据集上的记录,它现在的关键字在其他数据集密钥的后面(从排序顺序数据图来看),如果跳过去的数据集的关键字最终以比其他数据集的关键字高而结束的话,我们开始跳过其他数据集。在两个关键字再次相同之前,我们会一直这么做,然后我们开始进行排列。
前面讨论的主要是内部连接,但是它也可以很容易地应用到全外连接,左外连接,或者右外连接。在那些情况下,我们不会跳过,而是会用空的连接一边产生输出记录,这取决于连接要求的类型。
Hadoop Files
这一部分主要探讨特殊文件的格式。我们讨论了Sequence文件,这是一种存储二进制键值对的二进制格式。接下来我们简单地讨论一下Map 文件,它是一种经过排序和索引的Sequence文件。最后,我们会探讨Avro格式,一种依赖于方案支持多种语言的紧凑和快速二进制格式。因此尽管Sequence和Avro文件格式都是二进制的,但是前者仅支持Java语言,而后者可以使用Avro所绑定的任何一种语言(www.askmac.cn)。
SequenceFile
我们经常使用TextInputFormat,这种格式要求每个记录为文本的一行。最后两章,我们一直在处理在延迟航班记录下运行的数据集。我们创建了一个名为DelaysWritable的自定义的Writable类,使用这个类的时候,我们必须使用TextOutputFormat写出为Text,然后用TextInputFormat读回,最后在Mapper类中将它重新转换成DelaysWritable实例。如果我们使用一个SequenceFile,我们不需要这样做。我们只需要在文件中简单地存储一个DelaysWritable,然后从SequenceFile中将它作为一个DelaysWritable实例读回。用来读取SequenceFile的InputFormat是SequenceFileInputFormat,相应的OutputFormat是SequenceFileOutputFormat。SequenceFileInputFormat拓展了FileInputFormat,SequenceFileOutputFormat则拓展了FileOutputFormat。
SequenceFile是一种二进制格式,可以存储二进制键值对。键和值没有必要成为Writable实例,但是他们至少要成为Serializable。然而在实践操作中,Sequence文件的键和值通常是Writable实例。尽管Sequence文件是键-值对存储,但它没有map-like的语义。这些key没必要是唯一的,如果你想不出明显的key,就可以用NullWritable的实例。这种情况下,它就发挥收集Writable (或者 Serializable)实例的功能。(www.askmac.cn)
SequenceFileOutputFormat: 创建一个SequenceFile
我们写一个列表7-1中把XML格式的文件转化为SequenceFile的程序,来举例创建一个SequenceFile。SequenceFile的key是一个包含<key>标签值的LongWritable的实例。值是DelaysWritabl实例e,而且执行这个会话的类是org.apress.prohadoop.c7.XML ToSequenceFileConversionJob。
列表7-10展示了这一类的run()方法。这一方法的主要特点如下:
- XMLDelaysInputFormat类的使用定义
- 使用SequenceFileOutputFormat,来保存LongWritable实例的key和DelaysWritable实例的值。SequenceFile的值不是自定义的Writable实例。事实上它也根本不需要成为Writable;它也可以是一种io.Serializable实例。然后,你在Hadoop遇到SequenceFile文件时,值部分通常是一个Writable的实例。
Listing 7-10. XMLToSequenceFileConversionJob.run() public int run(String[] allArgs) throws Exception { String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs(); Job job = Job.getInstance(getConf()); job.setInputFormatClass(XMLDelaysInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(DelaysWritable.class); job.setMapperClass(XMLToSequenceFileConversionMapper.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //Could also use SequenceFileOutputFormat.setOutputPath job.waitForCompletion(true); return 0; }
列表7-11展示了The Mapper。要注意它是很简洁的,因为我们能直接写出键和值得实例,无需进行所有Text的会话(www.askmac.cn)
Listing 7-11. XMLToSequenceFileConversionMapper
public static class XMLToSequenceFileConversionMapper extends Mapper<LongWritable, DelaysWritable,LongWritable, DelaysWritable> { public void map(LongWritable key,DelaysWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } }
你也可以从Java程序中创建一个SequenceFile。列表7-12展示了创建一个未压缩SequenceFile文件的操作方法。创建压缩文件的方法是相似的。
Listing 7-12. SequenceFileWriter.writeFile() Method
public static void writeFile(String fName) throws IOException { SequenceFile.Writer writer = null; try { Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); Path seqFilePath = new Path(fName); LongWritable key = new LongWritable(); DelaysWritable val = new DelaysWritable(); writer = SequenceFile.createWriter(fs,conf,seqFilePath, key.getClass(), val.getClass()); for (int i = 1; i <= 12; i++) { key.set(i); //We just set the month for this illustration val.month = new IntWritable(i); writer.append(key, val); } } finally { writer.close(); } }
注:参照SequenceFile上的apache文档,从一个单独的Java程序中创建压缩过的SequenceFile的技术。创建未压缩SequenceFile文件的程序作为SequenceFileWriter包含在书的源代码中,(网址是www.apress.com)
SequenceFileInputFormat:从SequenceFile中读取数据
这部分讨论使用MapReduce程序应如何来读取SequenceFile。上一部分我们使用SequenceFileInputFormat,创建了SequenceFile,然后把它转成了使用TextOutputFormat的文本文件。执行这一任务的程序是org.apress.prohadoop.c7.SequenceToTextFileConversionJob
列表7-13显示了这个程序。这个程序唯一的特点就是使用SequenceInputFormat(www.askmac.cn)。
Listing 7-13. SequenceToTextFileConversionJob
/*Package and import declarations*/ public class SequenceToTextFileConversionJob extends Configured implements Tool { public static class SequenceToTextConversionMapper extends Mapper<LongWritable, DelaysWritable, NullWritable, Text> { public void map(LongWritable key, DelaysWritable value, Context context) throws IOException, InterruptedException { Text line = AirlineDataUtils.parseDelaysWritableToText(value); context.write(NullWritable.get(), line); } } public int run(String[] allArgs) throws Exception { Job job = Job.getInstance(getConf()); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(SequenceToTextConversionMapper.class); job.setNumReduceTasks(0); String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs(); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); ToolRunner.run(new SequenceToTextFileConversionJob(), args); } }
你也可以从Java程序中读取一个SequenceFile。列表7-14展示了这一操作的执行方法。SequenceFile.Reader处理压缩和非压缩SequenceFiles文件。(这个程序作为SequenceFileReader.java.包含在书的源代码中)。注意我们创建了空的键和值得实例,来填充在每个迭代中调用的ereader.next()
Listing 7-14. SequenceFileReader.readFile()
public static void readFile(String fName) throws IOException { SequenceFile.Reader reader = null; try { Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); Path seqFilePath = new Path(fName); reader = new SequenceFile.Reader(conf, Reader.file(seqFilePath)); LongWritable key = new LongWritable(); DelaysWritable val = new DelaysWritable(); while (reader.next(key, val)) { System.out.println(key + "\t" + val.year + "/" + val.month + "/" + val.date + " from/to==" + val.originAirportCode + " to " + val.destAirportCode); } } finally { reader.close(); } }
压缩和SequenceFiles
这部分讨论SequenceFile的结构和它支持的压缩类型。我们从概念的角度来检查这个结构。(这个格式的详细说明可以在Apache网站查找,网址是http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/SequenceFile.html)
Sequence File Header
每个SequenceFile都有一个头部。这个头部是由下列信息组成的(www.askmac.cn):
- 非SequenceFile格式的版本
- 键和值的类名称。在被利用的Job实例上,调用setOutputKeyClass和setOutputValueclass方法来配置输出的键和值得类名称
- 一个 布尔值 表明 键和值是否经过压缩。
- 一个 布尔值 表明 是否启用BLOCK压缩。启用的时候,多个记录可以一起被压缩。
- 额外的元数据以及最后一个同步标记意味着头部的结束。跳过头部部分后,需要开始处理记录时,这个标记就是SequenceFile所搜寻的。
同步标记和SequenceFiles的可拆分特性
一个SequenceFile每几秒就会有一个同步标记,这可以使SequenceFile成为拆分文件。在这个标记的基础上,SequenceFileInputFormat可以把SequenceFile分成多个InputSplits。只有整个的记录(键-值对)包含在同步的记录中才可以得到保障。文本文件也是如此,记录可以穿越块界限,出现这种情况的时候,底层的SequenceFileInputFormat和它对应的读取器一定要能够解释这些跨越块的字节。
记录压缩
一个具备压缩RECORD等级的SequenceFile是由下列部分组成的:
- 头部
- 被同步标记分开,具有每个结构型记录的记录群如下:
- 记录长度
- key 长度
- key
- 压缩值(当压缩未开启时不压缩)
- 每几秒的同步标记
注意值长度是不存储在记录里的,其可以从记录长度和key长度中推断出来。为了在RECORD等级上,对一个使用SequenceFileOutputFormat的作业启用压缩,要进行下面的步骤来配置任务(www.askmac.cn):
SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job,SnappyCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.RECORD);
如果压缩经由SequenceFileOutputFormat启用,那么每个记录的值部分都会被压缩。
块压缩
具有BLOCK压缩等级的SequenceFile由下列步骤组成:
- 头部
- 同步的标记-分离块,每个都由下列几个具有结构化的记录组成:
- 记录的数量
- 压缩key的长度
- 压缩的key
- 压缩的值长度
- 压缩的值
- 在块之间的同步标记
针对使用SequenceFileOutputFormat的作业,为了启动BLOCK等级的压缩,用下列指令来配置任务:
SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job,SnappyCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);
MapFiles
MapFile,作为SequenceFile的特殊形式,它的目标是支持随机访问存储在排序过的SequenceFile中的数据。这一想法是为了给从排过序的SequenceFile中的key样本提供索引。这个索引标记着在排序过的SequenceFile中,存储在索引的key字节偏移。MapFile发出一个特定的key请求时,就会从索引中搜索最近的key,接下来就会从那个字节偏移中进行连续的搜寻,直到发现请求的key或者发现比所请求的key更高的key(www.askmac.cn)。
MapFile是储存在一个文件中的拥有map 文件名称的两个SequenceFiles集合(由如下多样化的${map.file.name}来表示):
- ${map.file.name}/data是一个SequenceFile,其中包含经key排序的数据。使用Writer.append()调用创建MapFile的过程中,要检查这种情况。如果违背了这种情况,就要抛出一个IOException。
- ${map.file.name}/index是一个比较小的SequenceFile,其中包含一部分key。针对索引文件的每个key,数据文件内的字节偏移都要经过排序。当在MapFile上执行查找key时,优先考虑的就是使用索引文件来查找最近的key,而不是正在查找的key。字节偏移能够使MapFile阅读器从key记录的起点进行。从那一点开始,在找到所需求的key或者碰到比要查找的key大的key之前,这些key会依照次序进行移动。索引文件中排序过的key的数量可以通过调用MapFile.Writer上的setIndexInterval()方法来设置。在索引文件设置key之前,间隔的时间决定了要跳过的key数量。间隔时间越短,索引文件中就会储存更多的key。因此,MapFile变得更随机访问。要记住索引文件完全是在内存中加载,所以读取MapFile时,如果间隔太短的话(转向大的索引文件)会出现OutOfMemoryException的情况。因此我们需要保持MapFile中可获取的大小和随机性的平衡。
MapFile的两个限制条件是键类必须是WritableComparable的实例(这样是为了确保键排序),再就是没有办法执行重复键。如果使用同一个键进行了多个搜索路径,那么这些路径都可以接受。如果重复MapFile,就应该使用重复键检索所有的值。然而,如果你在MapFile. Reader调用get()指令,就只能检索到第一个。MapFile.Reader 有一个seek(WritableComparable key)方法。如果你用一个有副本的键调用它,它就会使用第一个键实例,存放在记录末端。所以,如果MapFile中每一个键都复制两次(同一个键有三个实例),就会在第一个键的末端产生寻找指令。开始复制MapFile.Reader会返回两个键(用来调用seek)和随后的三个键。本书源代码中有相关从Java程序中读写MapFile的代码例子。这些类如下:
org.apress.prohadoop.utils.MapFileReader org.apress.prohadoop.utils.MapFileWriter
MapFile可以用来输入MapReduce程序。InputFormat类使用的是SequenceFileInputFormat,在处理输入文件时,这一类能很灵巧地忽略索引文件,只考虑数据文件(www.askmac.cn)。
产生一个MapFile来作为MapReduce的输出程序也是可行的。在一个Reduce过程中时很自然的事情,因为输入到一个Reducer 中自然的会安装key实例排序。每个Reducer会产生一个单独的MapFile。所以即使使用多个Reducers,这也可以施行。使用的OutputFormat格式是MapFileOutputFormat。只要我们知道Mapper的输出会经过MapFile key的排序,针对Map-only任务来使用这种OutputFormat也是可行的。只有输入到Mapper中的文件是根据属性进行排序,将会用来作为MapFile使用的时候,这种方式通常可以实现。
map 文件和分布式缓存
最后一章探讨Distributed Cache(典型的例子代表是DistributedCache,主要是由第六章展示的Job例子来管理)是如何把比较小的主数据文件传送给每个作业节点。但是Distributed Cache不仅能处理小文件,大MapFile也可以使用Distributed Cache来分布式。
可以运用单个的Mappers和Reducers来在MapFile上查找所需要的记录。
当每个Mapper 和 Reducer需要比较接近的键时,就很适合使用这种方法。我们可以用这种方法做一个大的表连接,来模拟CompositeInputFormat功能。(www.askmac.cn)
如果输入的SequenceFile文件是由连接键进行排序(或者只是数据集上建立在连接键基础上,具有排序标准的MapFile文件),另一个数据集是MapFile文件(储存在Distributed Cache),它经过一个连接键的排序,我们可以有一个Map-only任务,扫描第一个SequenceFile(或MapFile)。我们在每个Mapper中,从输入的SequenceFile(或者MapFile)中收集了一堆键,然后从Distributed Cache中的第二个MapFile上的那些键上执行Mapper中的搜索。由于MapFile的排序特性,获取第一个记录之后,可以迅速地浏览这些键。可以使用Map-only任务来执行另一个有大型表连接的方法。
Avro 文件
Avro是一种允许多语言支持Hadoop文件的二进制格式。这一点与SequenceFile相似,但是SequenceFile仅限于Java程序,然而Avro可以被Java和其他程序语言支持。使用Avro,数据可以在指定模式的基础上序列化成一个很紧凑的二进制格式。Avro含有大量的数据类型。所有主要的原始数据类型都可以得到支持,也可以支持复杂的数据类型,例如records, enums, arrays, 和 maps。我们这部分不是来详细介绍Avro,而是要论证如何使用Avro文件格式来处理MapReduce任务。
FlightDelay.avsc: Avro Schema File
列表7-15展示了模仿DelaysWritable类的Avro文件模式的一个例子。我们把这个文件储存在Maven项目中的src/main/avro文件夹中。
Listing 7-15. FlightDelay.avsc
{"namespace": "org.apress.prohadoop.avro", "type": "record", "name": "FlightDelay", "fields": [ {"name": "year", "type": "int"}, {"name": "month", "type": "int"}, {"name": "date", "type": "int"}, {"name": "dayOfWeek", "type": "int"}, {"name": "depDelay", "type": ["int", "null"]}, {"name": "arrDelay", "type": ["int", "null"]}, {"name": "origin", "type": "string"}, {"name": "destination", "type": "string"}, {"name": "carrier", "type": "string"} ] }
尽管没有必要执行任何代码产生,助手库会借助Avro模式,使用强类型类。我们想通过代码生成转换之前的方案为一个java类;列表7-16展示了POM文件需要更新的地方(www.askmac.cn)。
Listing 7-16. pom.xml中的构建配置
<dependencies> ... <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.6</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-mapred</artifactId> <version>1.7.6</version> <classifier>hadoop2</classifier> </dependency> ... </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.7.6</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory> ${project.basedir}/src/main/resources/avro/ </sourceDirectory> <outputDirectory> ${project.basedir}/src/main/java/ </outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build>
我们进行Maven组建时,这个类可以创建为:org.apress.prohadoop.avro.FlightDelay。它可以从列表7-15中取一个模式的实例,作为助手类来执行工厂运作。
从Text格式转换到Avro格式的作业
我们现在来看一下如何在MapReduce程序中使用Avro格式,特别是把航班输入文件转换为Avro文件。列表7-17展示了这个程序(www.askmac.cn)。
Listing 7-17. TextToAvroFileConversionJob
/*Package and Import declarations*/ public class TextToAvroFileConversionJob extends Configured implements Tool { public static class TextToAvroFileConversionMapper extends Mapper<LongWritable, Text, AvroKey<NullWritable>, AvroValue<FlightDelay>> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if(!AirlineDataUtils.isHeader(value)){ String[] contents = value.toString().split(","); FlightDelay.Builder fd = FlightDelay.newBuilder(); int year = Integer.parseInt AirlineDataUtils.getYear(contents)); fd.setYear(year); int month = Integer.parseInt( AirlineDataUtils.getMonth(contents)); fd.setMonth(month); intdate = Integer.parseInt( AirlineDataUtils.getDateOfMonth(contents)); fd.setDate(date); intdow = Integer.parseInt( AirlineDataUtils.getDayOfTheWeek(contents)); fd.setDayOfWeek(dow); intarrDelay = AirlineDataUtils.parseMinutes( AirlineDataUtils.getArrivalDelay(contents),0); fd.setArrDelay(arrDelay); intdepDelay = AirlineDataUtils.parseMinutes( AirlineDataUtils.getDepartureDelay(contents),0); fd.setDepDelay(depDelay); String origin = AirlineDataUtils.getOrigin(contents); fd.setOrigin(origin); String dest = AirlineDataUtils.getDestination(contents); fd.setDestination(dest); String carrier = AirlineDataUtils.getUniqueCarrier(contents); fd.setCarrier(carrier); context.write( new AvroKey<NullWritable>(NullWritable.get()), new AvroValue<FlightDelay>(fd.build())); } } } public int run(String[] allArgs) throws Exception { Job job = Job.getInstance(getConf()); job.setJarByClass(TextToAvroFileConversionJob.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(AvroKeyValueOutputFormat.class); AvroJob.setOutputKeySchema(job, Schema.create(org.apache.avro.Schema.Type.NULL)); AvroJob.setOutputValueSchema(job, FlightDelay.SCHEMA$); job.setMapperClass(TextToAvroFileConversionMapper.class); job.setNumReduceTasks(0); String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs(); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { ToolRunner.run(new TextToAvroFileConversionJob(), args); } }
下面这些就是列表7-17的关键特性:
- Mapper 签名中的输出键和值类是一个NullWritable 类和覆盖在AvroKey 和 AvroValue中的常用的Avro类FlightDelay。
底层的OutputFormat执行标准的Hadoop Writable类和用于标准的原始Avro类型的基本的Avro模式之间的映射。
- OutputFormat格式是一个AvroOutputFormat类。但是除了指定的输出格式,我们也要指定模式。我们直接从FlightDelay帮助类中访问Schema,就是FlightDelay如何作为帮助类来起作用。如果没有它的话,我们就需要建造一个Schema实例。因为输出的Avro文件包含了有数据的模式,能够让任何一个没有附加信息的程序使用它,所以很有必要提供这个模式。数据和模式包含在文件中。
- 我们从产生的类中使用Builder来充当factory方法来创建Avro模式基础上的对象实例。我们可以用其他任何Java类来处理这些对象。FlightDelay类又会把所有特定的Avro细节隐藏起来。没有它的话,我们必须要处理org.apache.avro.generic.GenericRecord实例。可以参考Avro使用说明来获取更多详细内容。
- write()调用书写AvroKey和 AvroValue实例,他们是NullWritable 和 FlightDelay实例的封装(www.askmac.cn)。
从Avro Format 到Text Format的任务转换。
我们现在检查如何使用AvroInputFormat来消耗Avro文件。列表7-18展示了一个执行Avro格式到text格式反向转换的程序。
Listing 7-18. AvroToTextFileConversionJob
/*Package and import declarations*/ public class AvroToTextFileConversionJob extends Configured implements Tool { public static class AvroToTextFileConversionMapper extends Mapper<AvroKey<NullWritable>,AvroValue<FlightDelay>, NullWritable, Text> { public void map(AvroKey<NullWritable> key,AvroValue<FlightDelay> value, Context context) throws IOException, InterruptedException { FlightDelay fd = value.datum(); StringBuilder output = new StringBuilder(""); output.append(fd.getYear().toString()).append(",").append(fd.getMonth().toString()).append(",").append(fd.getDate().toString()).append(",").append(fd.getDayOfWeek().toString()).append(",").append(fd.getArrDelay().toString()).append(",").append(fd.getDepDelay().toString()).append(",").append(fd.getOrigin().toString()).append(",").append(fd.getDestination().toString()); context.write(NullWritable.get(),new Text(output.toString())); } } public int run(String[] allArgs) throws Exception { Job job = Job.getInstance(getConf()); job.setJarByClass(AvroToTextFileConversionJob.class); job.setInputFormatClass(AvroKeyValueInputFormat.class); AvroJob.setInputKeySchema(job,Schema.create(org.apache.avro.Schema.Type.NULL)); AvroJob.setInputValueSchema(job,FlightDelay.SCHEMA$); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(AvroToTextFileConversionMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs(); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); ToolRunner.run(new AvroToTextFileConversionJob(), args); } }
7-18列表中关键特性如下:
- 要注意AvroInputFormat是如何配置的,也需要为键类的Avro模式和值类的Avro模式进行配置。以往的例子中我们使用NullWritable作为key,但是现在我们用apache.avro.Schema.Type.NULL的例子取而代之,因为当底层AvroOutputFormat的RecordWriter类写出Avro文件中Avro记录的key时,NullWritable就会被映射到这个模式上(www.askmac.cn)。
- 注意Mapper 中输入的键和值类。我们在AvroKey类中再次封装了在AvroKey中的NullWritable实例,和在AvroValue 类中生成了FlightDelay实例。底层的RecordReader类中的AvroInputFormat假设承担相从字节到相应的类实例的转换。
小结
我们在本章概括了MapReduce架构内部的过程。第5章和第6章展示了如何在MapReduce执行各种密集型数据编程模式。本章从基本的执行方面详细检查了MapReduce。
本书的前7章讲述的是基本内容。掌握了这些知识,你们就能够理解MapReduce顶端的更高等级的架构,例如Hive, Pig, HBase还有其他更多的内容。我们会在第10章开始检查那些架构。
接下来两章我们会探讨单位测试和MapReduce程序的监控。
Comment