Thursday, September 08, 2011

Using Distributed Cache in Hadoop (Hadoop 0.20.2)

Quite often we encounter situation when we need certain files like configuration files, jar libraries, xml files, properties files etc to be present in Hadoops processing nodes at the time of its execution. Quite understandably Hadoop has a feature called Distributed Cache which helps in sending those readonly files to the task nodes. In Hadoop environment jobs are basically map-Reduce jobs and the necessary readonly files are copied to the tasktracker nodes at the beginning of job execution process. The default size of distributed cache in Hadoop is about 10 GB but We can control the size of the distributed cache by explicitly defining its size in hadoop’s configuration file local.cache.size.


Thus, Distributed cache is a mechanism to caching readonly data over Hadoop cluster. The sending of readOnly files occurs at the time of job creation and the framework makes the cached files available to the cluster nodes at their computational time.


The following distributed cache java program sends the necessary xml files to the task executing nodes prior to job execution.


Java Program/Tutorial of Distributed Cache usage in Hadoop
Hadoop Version : 0.20.2
Java Version: Java-SE-1.6



The program below consists of two classes. The DcacheMapper class and the parent Class. Job is initialized in the base class. Job is initialized pointing to the location in HDFS where the file to be sent to all nodes is present. When the setup method in parent class is executed we can retrieve the distributed configuration file and read it for our usage.

API doc for Distributed Cache can be found at the given URL.

http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/filecache/DistributedCache.html


Class To make a Map Reduce Job for Distributed Cache


package com.bishal.mapreduce;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* @author Bishal Acharya
*
*/
public class DcacheMapper extends ParentMapper {
public DcacheMapper() {
super();
}

public void map(Object key, Text value, Context context) throws IOException {
/**
* Write your map Implementation
*/
};


public static void main(String args[]) throws URISyntaxException,
IOException, InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();

final String NAME_NODE = "hdfs://localhost:9000";

Job job = new Job(conf);

DistributedCache.addCacheFile(new URI(NAME_NODE
+ "/user/root/input/Configuration/layout.xml"),
job.getConfiguration());


job.setMapperClass(ImportMapReduce.class);
job.setJarByClass(ImportMapReduce.class);
job.setNumReduceTasks(0);

FileInputFormat.addInputPath(
job,
new Path(NAME_NODE + "/user/root/input/" + "/"
+ "/test.txt"));
FileOutputFormat.setOutputPath(
job,
new Path(NAME_NODE + "/user/root/output/" + "/"
+ "/importOutput"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}




Parent Class to read data from Distributed Cache


package com.bishal.mapreduce;

import java.io.BufferedReader;
import java.io.FileReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
* BaseMapper class to perform initialization setup and cleanUp tasks using Distributed Cache for Map
* Reduce job
*
* @author Bishal Acharya
*
*/
public class ParentMapper extends Mapper<Object, Text, Object, Text> {
protected Configuration conf;

public ParentMapper() {
initialize();
}

private void initialize() {
conf = new Configuration();
}

protected void setup(
org.apache.hadoop.mapreduce.Mapper<Object, Text, Object, Text>.Context context)
throws java.io.IOException, InterruptedException {

Path[] uris = DistributedCache.getLocalCacheFiles(context
.getConfiguration());

BufferedReader fis;

/**
* Prepare Objects from Layout XML
*/
for (int i = 0; i < uris.length; i++) {
if (uris[i].toString().contains("layout")) {

String chunk = null;
fis = new BufferedReader(new FileReader(uris[i].toString()));
String records = "";
while ((chunk = fis.readLine()) != null) {
records += chunk;
}
// do whatever you like with xml using parser
System.out.println("Records :" + records);
}

}

};

protected void cleanup(
org.apache.hadoop.mapreduce.Mapper<Object, Text, Object, Text>.Context context)
throws java.io.IOException, InterruptedException {
DistributedCache.purgeCache(conf);
};

}


In the parent class we read the cache file in the setup method of the job. And cachePurge operation is performed at the cleanup phase of the MapReduce job.

4 comments:

Unknown said...

Hi, thanks for your detailed tutorial. However based on your code, I tried to run it on 0.20.2 the getLocalCacheFile return the null. I run it in pseudo-distributed mode. Can you provide some help?

Unknown said...

Did you include layout.xml file in the path ? The only guess I can make for it to throw Null.

DistributedCache.addCacheFile(new URI(NAME_NODE
+ "/user/root/input/Configuration/layout.xml"),
job.getConfiguration());

Kaniyarasu said...

we have to use DistributedCache.getCacheFiles to get list of URIs from Distributed Cache

phurba said...

Thanks Bishal , it helped me use a file in distributed cache