用Hadoop的各种语言来进行wordcount(3):Apache Crunch

 

本文永久链接:https://www.askmac.cn/archives/hadoop-wordcount-3.html

 

 

 

Hadoop的各种语言来进行wordcount3):Apache Crunch

这是Wordcount的最后一篇讲座了。今天用crunch在MapReduce与Spark两方面进行wordcount。

 

Crunch (MapReduce)

Apache Crunch是apache project的OSS,所以这是将Google的Java Flume作为基础的。通过使用crunch,可以简单地记述mapreduce(现在是spark)的pipeline(现在是spark的program)的库。(即可以简单做到Apache Crunch:MapReduce 模块的java 库)

Crunch是由Cloudera的首席数据科学家,Josh Will开发、维护的。国外的数据科学家都是自己开发必要的工具呢。(Cloudera OryxImpyla、其他)。真是太厉害了。

 

crunch的参考链接

http://crunch.apache.org/getting-started.html

WordCount的代码参考crunch的页面,可以下载演示用代码来执行。

git clone http://github.com/jwills/crunch-demo

 

src/main/java/com/example/WordCount.java

 

package com.example; 
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;



/**

* A word count example for Apache Crunch, based on Crunch's example projects.

*/

public class WordCount extends Configured implements Tool {



public static void main(String[] args) throws Exception {

ToolRunner.run(new Configuration(), new WordCount(), args);

}



public int run(String[] args) throws Exception {



if (args.length != 2) {

System.err.println("Usage: hadoop jar crunch-demo-1.0-SNAPSHOT-job.jar"

+ " [generic options] input output");

System.err.println();

GenericOptionsParser.printGenericCommandUsage(System.err);

return 1;

}



String inputPath = args[0];

String outputPath = args[1];



// Create an object to coordinate pipeline creation and execution.

Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

//        Pipeline pipeline = new SparkPipeline("local","sort");



// Reference a given text file as a collection of Strings.

PCollection<String> lines = pipeline.readTextFile(inputPath);



// Define a function that splits each line in a PCollection of Strings into

// a PCollection made up of the individual words in the file.

// The second argument sets the serialization format.

PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());



// Take the collection of words and remove known stop words.

PCollection<String> noStopWords = words.filter(new StopWordFilter());



// The count method applies a series of Crunch primitives and returns

// a map of the unique words in the input PCollection to their counts.

PTable<String, Long> counts = noStopWords.count();



// Instruct the pipeline to write the resulting counts to a text file.

pipeline.writeTextFile(counts, outputPath);



// Execute the pipeline as a MapReduce.

PipelineResult result = pipeline.done();



return result.succeeded() ? 0 : 1;

}

}

 

src/main/java/com/example/StopWordFilter.java

package com.example; 
import java.util.Set;



import org.apache.crunch.FilterFn;



import com.google.common.collect.ImmutableSet;



/**

* A filter that removes known stop words.

*/

public class StopWordFilter extends FilterFn<String> {



// English stop words, borrowed from Lucene.

private static final Set<String> STOP_WORDS = ImmutableSet.copyOf(new String[] {

"a", "and", "are", "as", "at", "be", "but", "by",

"for", "if", "in", "into", "is", "it",

"no", "not", "of", "on", "or", "s", "such",

"t", "that", "the", "their", "then", "there", "these",

"they", "this", "to", "was", "will", "with"

});



@Override

public boolean accept(String word) {

return !STOP_WORDS.contains(word);

}

}

 

 

 

src/main/java/com/example/Tokenizer.java

 

 

package com.example; 
import org.apache.crunch.DoFn;

import org.apache.crunch.Emitter;



import com.google.common.base.Splitter;



/**

* Splits a line of text, filtering known stop words.

*/

public class Tokenizer extends DoFn<String, String> {

private static final Splitter SPLITTER = Splitter.onPattern("\s+").omitEmptyStrings();



@Override

public void process(String line, Emitter<String> emitter) {

for (String word : SPLITTER.split(line)) {

emitter.emit(word);

}

}

}

 

 

构建

Crunch的样本代码是用maven来进行构建,所以可以直接这样使用。

$ mvn package

[INFO] Scanning for projects...
[INFO]

[INFO] ------------------------------------------------------------------------

[INFO] Building crunch-demo 1.0-SNAPSHOT

[INFO] ------------------------------------------------------------------------

[INFO]

[INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ crunch-demo ---

[debug] execute contextualize

[INFO] Using 'UTF-8' encoding to copy filtered resources.

[INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/main/resources

[INFO]

[INFO] --- maven-compiler-plugin:2.5.1:compile (default-compile) @ crunch-demo ---

[INFO] Compiling 3 source files to /home/cloudera/work/crunch/crunch-demo/target/classes

[INFO]

[INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ crunch-demo ---

[debug] execute contextualize

[INFO] Using 'UTF-8' encoding to copy filtered resources.

[INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/test/resources

[INFO]

[INFO] --- maven-compiler-plugin:2.5.1:testCompile (default-testCompile) @ crunch-demo ---

[INFO] Compiling 2 source files to /home/cloudera/work/crunch/crunch-demo/target/test-classes

[INFO]

[INFO] --- maven-surefire-plugin:2.10:test (default-test) @ crunch-demo ---

(略)



[INFO] ------------------------------------------------------------------------

[INFO] BUILD SUCCESS

[INFO] ------------------------------------------------------------------------

[INFO] Total time: 12.738s

[INFO] Finished at: Sun Dec 14 20:28:27 PST 2014

[INFO] Final Memory: 33M/311M

[INFO] ------------------------------------------------------------------------

 

 

执行

hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar  input crunch.output
14/12/14 20:30:53 INFO impl.FileTargetImpl: Will write output files to new path: crunch.output

14/12/14 20:30:54 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address

14/12/14 20:30:55 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/127.0.0.1:8032

14/12/14 20:30:56 INFO Configuration.deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize

14/12/14 20:30:56 INFO input.FileInputFormat: Total input paths to process : 2

14/12/14 20:30:56 INFO input.CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 50

14/12/14 20:30:56 INFO mapreduce.JobSubmitter: number of splits:1

14/12/14 20:30:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1418545807639_0027

14/12/14 20:30:57 INFO impl.YarnClientImpl: Submitted application application_1418545807639_0027

14/12/14 20:30:57 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1418545807639_0027/

14/12/14 20:30:57 INFO jobcontrol.CrunchControlledJob: Running job "com.example.WordCount: Text(input)+S0+S1+Aggregate.count+GBK+combine+asText+Text... (1/1)"

14/12/14 20:30:57 INFO jobcontrol.CrunchControlledJob: Job status available at: http://quickstart.cloudera:8088/proxy/application_1418545807639_0027/

 

结果

 hadoop fs -cat crunch.output/part-r-00000[Bye,1]
[Goodbye,1]

[Hadoop,2]

[Hello,2]

[World,2]

 

Crunch (Spark)

那么最后就是用crunch的spark了。源代码几乎与java相同,不同的只有一行。(MRPipeline -> SparkPipeline)

-//        Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

+        Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

-        Pipeline pipeline = new SparkPipeline("spark://quickstart.cloudera:7077","CrunchWordCount");

+        //Pipeline pipeline = new SparkPipeline("spark://quickstart.cloudera:7077","CrunchWordCount");

在crunch中执行spark应用的信息非常少,我为此费了一番心力,终于成功启动了。

下述以外的文件是相同的。

 

src/main/java/com/example/WordCount.java

package com.example; 
import org.apache.crunch.PCollection;

import org.apache.crunch.PTable;

import org.apache.crunch.Pipeline;

import org.apache.crunch.PipelineResult;

import org.apache.crunch.impl.mr.MRPipeline;

import org.apache.crunch.impl.spark.SparkPipeline;

import org.apache.crunch.types.writable.Writables;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;



/**

* A word count example for Apache Crunch, based on Crunch's example projects.

*/

public class WordCount extends Configured implements Tool {



public static void main(String[] args) throws Exception {

ToolRunner.run(new Configuration(), new WordCount(), args);

}



public int run(String[] args) throws Exception {



if (args.length != 2) {

System.err.println("Usage: hadoop jar crunch-demo-1.0-SNAPSHOT-job.jar"

+ " [generic options] input output");

System.err.println();

GenericOptionsParser.printGenericCommandUsage(System.err);

return 1;

}



String inputPath = args[0];

String outputPath = args[1];



// Create an object to coordinate pipeline creation and execution.

//        Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

//Pipeline pipeline = new SparkPipeline("local","CrunchWordCount");

Pipeline pipeline = new SparkPipeline("spark://quickstart.cloudera:7077","CrunchWordCount");



// Reference a given text file as a collection of Strings.

PCollection<String> lines = pipeline.readTextFile(inputPath);



// Define a function that splits each line in a PCollection of Strings into

// a PCollection made up of the individual words in the file.

// The second argument sets the serialization format.

PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());



// Take the collection of words and remove known stop words.

PCollection<String> noStopWords = words.filter(new StopWordFilter());



// The count method applies a series of Crunch primitives and returns

// a map of the unique words in the input PCollection to their counts.

PTable<String, Long> counts = noStopWords.count();



// Instruct the pipeline to write the resulting counts to a text file.

pipeline.writeTextFile(counts, outputPath);



// Execute the pipeline as a MapReduce.

PipelineResult result = pipeline.done();



return result.succeeded() ? 0 : 1;

}

}

 

构建

构建也是使用maven。几乎完全相同。

mvn package

[INFO] Scanning for projects...

[INFO]

[INFO] ------------------------------------------------------------------------

[INFO] Building crunch-demo 1.0-SNAPSHOT

[INFO] ------------------------------------------------------------------------

[INFO]

[INFO] --- maven-resources-plugin:2.5:resources (default-resources) @ crunch-demo ---

[debug] execute contextualize

[INFO] Using 'UTF-8' encoding to copy filtered resources.

[INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/main/resources

[INFO]

[INFO] --- maven-compiler-plugin:2.5.1:compile (default-compile) @ crunch-demo ---

[INFO] Compiling 1 source file to /home/cloudera/work/crunch/crunch-demo/target/classes

[INFO]

[INFO] --- maven-resources-plugin:2.5:testResources (default-testResources) @ crunch-demo ---

[debug] execute contextualize

[INFO] Using 'UTF-8' encoding to copy filtered resources.

[INFO] skip non existing resourceDirectory /home/cloudera/work/crunch/crunch-demo/src/test/resources

[INFO]

[INFO] --- maven-compiler-plugin:2.5.1:testCompile (default-testCompile) @ crunch-demo ---

[INFO] Nothing to compile - all classes are up to date

[INFO]

[INFO] --- maven-surefire-plugin:2.10:test (default-test) @ crunch-demo ---

[INFO] Surefire report directory: /home/cloudera/work/crunch/crunch-demo/target/surefire-reports



-------------------------------------------------------

T E S T S

-------------------------------------------------------

Running com.example.StopWordFilterTest

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.139 sec

Running com.example.TokenizerTest

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.233 sec



Results :



Tests run: 2, Failures: 0, Errors: 0, Skipped: 0



[INFO]

[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ crunch-demo ---

[INFO] Building jar: /home/cloudera/work/crunch/crunch-demo/target/crunch-demo-1.0-SNAPSHOT.jar

[INFO]

[INFO] --- maven-assembly-plugin:2.3:single (make-assembly) @ crunch-demo ---

[INFO] Reading assembly descriptor: src/main/assembly/hadoop-job.xml

[INFO] Building jar: /home/cloudera/work/crunch/crunch-demo/target/crunch-demo-1.0-SNAPSHOT-job.jar

[INFO] ------------------------------------------------------------------------

[INFO] BUILD SUCCESS

[INFO] ------------------------------------------------------------------------

[INFO] Total time: 12.243s

[INFO] Finished at: Mon Dec 15 07:23:26 PST 2014

[INFO] Final Memory: 32M/297M

[INFO] ------------------------------------------------------------------------

 

 

执行

 

在执行前设定SPARK_CLASSPATH环境变量。不这样做的话,执行中在ClassNotFound的例外中的话就会报错。

 

export SPARK_CLASSPATH=/usr/lib/crunch/*
spark-submit --class com.example.WordCount target/crunch-demo-1.0-SNAPSHOT-job.jar  input crunch.output

 

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/usr/lib/spark/assembly/lib/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

14/12/15 07:28:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

14/12/15 07:28:47 INFO FileTargetImpl: Will write output files to new path: crunch.output

14/12/15 07:28:47 WARN SparkConf:

SPARK_CLASSPATH was detected (set to '/usr/lib/crunch/*').

在 Spark 1.0+后过时了.

请使用下列:

- ./spark-submit with --driver-class-path 
来定义驱动路径

- spark.executor.extraClassPath 
带指定执行者class路径


14/12/15 07:28:47 WARN SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/lib/crunch/*' as a work-around.

14/12/15 07:28:47 WARN SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/lib/crunch/*' as a work-around.

14/12/15 07:28:47 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 192.168.2.220 instead (on interface eth1)

14/12/15 07:28:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address

14/12/15 07:28:47 INFO SecurityManager: Changing view acls to: cloudera

14/12/15 07:28:47 INFO SecurityManager: Changing modify acls to: cloudera

14/12/15 07:28:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)

14/12/15 07:28:47 INFO Slf4jLogger: Slf4jLogger started

14/12/15 07:28:47 INFO Remoting: Starting remoting

14/12/15 07:28:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.2.220:46973]

14/12/15 07:28:47 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.2.220:46973]

14/12/15 07:28:47 INFO Utils: Successfully started service 'sparkDriver' on port 46973.

14/12/15 07:28:48 INFO SparkEnv: Registering MapOutputTracker

14/12/15 07:28:48 INFO SparkEnv: Registering BlockManagerMaster

14/12/15 07:28:48 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141215072848-a8ab

14/12/15 07:28:48 INFO Utils: Successfully started service 'Connection manager for block manager' on port 38854.

14/12/15 07:28:48 INFO ConnectionManager: Bound socket to port 38854 with id = ConnectionManagerId(192.168.2.220,38854)

14/12/15 07:28:48 INFO MemoryStore: MemoryStore started with capacity 265.4 MB

14/12/15 07:28:48 INFO BlockManagerMaster: Trying to register BlockManager

14/12/15 07:28:48 INFO BlockManagerMasterActor: Registering block manager 192.168.2.220:38854 with 265.4 MB RAM

14/12/15 07:28:48 INFO BlockManagerMaster: Registered BlockManager

14/12/15 07:28:48 INFO HttpFileServer: HTTP File server directory is /tmp/spark-a04d9dc5-d340-4237-bb12-cf45e390bbd5

14/12/15 07:28:48 INFO HttpServer: Starting HTTP Server

14/12/15 07:28:48 INFO Utils: Successfully started service 'HTTP file server' on port 46908.

14/12/15 07:28:48 INFO Utils: Successfully started service 'SparkUI' on port 4040.

14/12/15 07:28:48 INFO SparkUI: Started SparkUI at http://192.168.2.220:4040

14/12/15 07:28:48 INFO EventLoggingListener: Logging events to hdfs://quickstart.cloudera:8020/user/spark/applicationHistory/crunchwordcount-1418657328679

14/12/15 07:28:49 INFO SparkContext: Added JAR file:/home/cloudera/work/crunch/crunch-demo/target/crunch-demo-1.0-SNAPSHOT-job.jar at http://192.168.2.220:46908/jars/crunch-demo-1.0-SNAPSHOT-job.jar with timestamp 1418657329490

14/12/15 07:28:49 INFO AppClient$ClientActor: Connecting to master spark://quickstart.cloudera:7077...

14/12/15 07:28:49 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0

14/12/15 07:28:49 INFO MemoryStore: ensureFreeSpace(65576) called with curMem=0, maxMem=278302556

14/12/15 07:28:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 64.0 KB, free 265.3 MB)

14/12/15 07:28:49 INFO MemoryStore: ensureFreeSpace(20557) called with curMem=65576, maxMem=278302556

14/12/15 07:28:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.1 KB, free 265.3 MB)

14/12/15 07:28:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.220:38854 (size: 20.1 KB, free: 265.4 MB)

14/12/15 07:28:49 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0

14/12/15 07:28:50 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141215072850-0001

14/12/15 07:28:50 INFO AppClient$ClientActor: Executor added: app-20141215072850-0001/0 on worker-20141215063206-192.168.2.220-7078 (192.168.2.220:7078) with 2 cores

14/12/15 07:28:50 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141215072850-0001/0 on hostPort 192.168.2.220:7078 with 2 cores, 512.0 MB RAM

14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(260900) called with curMem=86133, maxMem=278302556

14/12/15 07:28:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 254.8 KB, free 265.1 MB)

14/12/15 07:28:50 INFO AppClient$ClientActor: Executor updated: app-20141215072850-0001/0 is now LOADING

14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(21040) called with curMem=347033, maxMem=278302556

14/12/15 07:28:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.5 KB, free 265.1 MB)

14/12/15 07:28:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.220:38854 (size: 20.5 KB, free: 265.4 MB)

14/12/15 07:28:50 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0

14/12/15 07:28:50 INFO AppClient$ClientActor: Executor updated: app-20141215072850-0001/0 is now RUNNING

14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(65576) called with curMem=368073, maxMem=278302556

14/12/15 07:28:50 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 64.0 KB, free 265.0 MB)

14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(20557) called with curMem=433649, maxMem=278302556

14/12/15 07:28:50 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 20.1 KB, free 265.0 MB)

14/12/15 07:28:50 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.220:38854 (size: 20.1 KB, free: 265.4 MB)

14/12/15 07:28:50 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0

14/12/15 07:28:50 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir

14/12/15 07:28:50 INFO SparkContext: Starting job: saveAsNewAPIHadoopFile at SparkRuntime.java:321

14/12/15 07:28:50 INFO deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize

14/12/15 07:28:50 INFO FileInputFormat: Total input paths to process : 2

14/12/15 07:28:50 INFO CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 50

14/12/15 07:28:50 INFO DAGScheduler: Registering RDD 9 (mapToPair at PGroupedTableImpl.java:115)

14/12/15 07:28:50 INFO DAGScheduler: Got job 0 (saveAsNewAPIHadoopFile at SparkRuntime.java:321) with 1 output partitions (allowLocal=false)

14/12/15 07:28:50 INFO DAGScheduler: Final stage: Stage 0(saveAsNewAPIHadoopFile at SparkRuntime.java:321)

14/12/15 07:28:50 INFO DAGScheduler: Parents of final stage: List(Stage 1)

14/12/15 07:28:50 INFO DAGScheduler: Missing parents: List(Stage 1)

14/12/15 07:28:50 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[9] at mapToPair at PGroupedTableImpl.java:115), which has no missing parents

14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(6984) called with curMem=454206, maxMem=278302556

14/12/15 07:28:50 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 6.8 KB, free 265.0 MB)

14/12/15 07:28:50 INFO MemoryStore: ensureFreeSpace(3636) called with curMem=461190, maxMem=278302556

14/12/15 07:28:50 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.6 KB, free 265.0 MB)

14/12/15 07:28:50 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.2.220:38854 (size: 3.6 KB, free: 265.3 MB)

14/12/15 07:28:50 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0

14/12/15 07:28:50 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedRDD[9] at mapToPair at PGroupedTableImpl.java:115)

14/12/15 07:28:50 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

14/12/15 07:28:53 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@192.168.2.220:46808/user/Executor#-1997953566] with ID 0

14/12/15 07:28:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, 192.168.2.220, ANY, 1556 bytes)

14/12/15 07:28:53 INFO BlockManagerMasterActor: Registering block manager 192.168.2.220:38205 with 265.4 MB RAM

14/12/15 07:28:55 INFO ConnectionManager: Accepted connection from [192.168.2.220/192.168.2.220:47191]

14/12/15 07:28:55 INFO SendingConnection: Initiating connection to [/192.168.2.220:38205]

14/12/15 07:28:55 INFO SendingConnection: Connected to [/192.168.2.220:38205], 1 messages pending

14/12/15 07:28:55 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.2.220:38205 (size: 3.6 KB, free: 265.4 MB)

14/12/15 07:28:55 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.220:38205 (size: 20.5 KB, free: 265.4 MB)

14/12/15 07:28:55 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.2.220:38205 (size: 20.1 KB, free: 265.4 MB)

14/12/15 07:28:57 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 4136 ms on 192.168.2.220 (1/1)

14/12/15 07:28:57 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

14/12/15 07:28:57 INFO DAGScheduler: Stage 1 (mapToPair at PGroupedTableImpl.java:115) finished in 6.832 s

14/12/15 07:28:57 INFO DAGScheduler: looking for newly runnable stages

14/12/15 07:28:57 INFO DAGScheduler: running: Set()

14/12/15 07:28:57 INFO DAGScheduler: waiting: Set(Stage 0)

14/12/15 07:28:57 INFO DAGScheduler: failed: Set()

14/12/15 07:28:57 INFO DAGScheduler: Missing parents for Stage 0: List()

14/12/15 07:28:57 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[18] at mapToPair at SparkRuntime.java:307), which is now runnable

14/12/15 07:28:57 INFO MemoryStore: ensureFreeSpace(69448) called with curMem=464826, maxMem=278302556

14/12/15 07:28:57 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 67.8 KB, free 264.9 MB)

14/12/15 07:28:57 INFO MemoryStore: ensureFreeSpace(25227) called with curMem=534274, maxMem=278302556

14/12/15 07:28:57 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 24.6 KB, free 264.9 MB)

14/12/15 07:28:57 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.2.220:38854 (size: 24.6 KB, free: 265.3 MB)

14/12/15 07:28:57 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0

14/12/15 07:28:57 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[18] at mapToPair at SparkRuntime.java:307)

14/12/15 07:28:57 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

14/12/15 07:28:57 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 1, 192.168.2.220, PROCESS_LOCAL, 1022 bytes)

14/12/15 07:28:57 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.2.220:38205 (size: 24.6 KB, free: 265.3 MB)

14/12/15 07:28:57 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@192.168.2.220:41851

14/12/15 07:28:57 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 131 bytes

14/12/15 07:28:58 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 1) in 534 ms on 192.168.2.220 (1/1)

14/12/15 07:28:58 INFO DAGScheduler: Stage 0 (saveAsNewAPIHadoopFile at SparkRuntime.java:321) finished in 0.534 s

14/12/15 07:28:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

14/12/15 07:28:58 INFO SparkContext: Job finished: saveAsNewAPIHadoopFile at SparkRuntime.java:321, took 7.709089524 s

14/12/15 07:28:58 INFO SparkUI: Stopped Spark web UI at http://192.168.2.220:4040

14/12/15 07:28:58 INFO DAGScheduler: Stopping DAGScheduler

14/12/15 07:28:58 INFO SparkDeploySchedulerBackend: Shutting down all executors

14/12/15 07:28:58 INFO SparkDeploySchedulerBackend: Asking each executor to shut down

14/12/15 07:28:58 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(192.168.2.220,38205)

14/12/15 07:28:58 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(192.168.2.220,38205)

14/12/15 07:28:58 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(192.168.2.220,38205) not found

14/12/15 07:28:59 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!

14/12/15 07:28:59 INFO ConnectionManager: Selector thread was interrupted!

14/12/15 07:28:59 INFO ConnectionManager: ConnectionManager stopped

14/12/15 07:28:59 INFO MemoryStore: MemoryStore cleared

14/12/15 07:28:59 INFO BlockManager: BlockManager stopped

14/12/15 07:28:59 INFO BlockManagerMaster: BlockManagerMaster stopped

14/12/15 07:28:59 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

14/12/15 07:28:59 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

14/12/15 07:28:59 INFO SparkContext: Successfully stopped SparkContext

 

结果

hadoop fs -cat crunch.output/part-r-00000[Hello,2]
[Bye,1]

[World,2]

[Goodbye,1]

[Hadoop,2]

 

至此,Apache Crunch的MapReduce与Spark的源代码就几乎可以相同地来执行了。虽然信息量较少的话非常难以处理。但这仍旧是一个强力的功能。

 

 

Comment

*

沪ICP备14014813号-2

沪公网安备 31010802001379号