使用版本是0.19.2,据说0.20以后,MultipleOutputFormat不好使,不知道真假
api可以参考
但是说老实话,光看api有的时候有点混乱,每个函数到底影响些啥呢?
protected | ( key, value) Generate the actual key from the given key/value. |
protected | ( key, value) Generate the actual value from the given key and value. |
protected | ( key, value, name) Generate the file output file name based on the given key and the leaf file name. |
protected | ( name) Generate the leaf name for the output file name. |
protected abstract <,> | ( fs, job, name, arg3) |
protected | ( job, name) Generate the outfile name based on a given anme and the input file name. |
<,> | ( fs, job, name, arg3) Create a composite record writer that can write key/value data to different output files |
现在简单介绍了下调用的过程
ReduceTask.java文件中
1 public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException 2 { 3 .......... 4 5 String finalName = getOutputName(getPartition()); // return "part-" + NUMBER_FORMAT.format(partition);依据taskid产生诸如part-00000这样的文件名 6 7 FileSystem fs = FileSystem.get(job); 8 9 final RecordWriter out = job.getOutputFormat(). getRecordWriter(fs, job, finalName, reporter); // finalName=part-00000 10 11 ............. 12 }
在MultipleOutputFormat.java里面,请注意这些个函数的调用顺序
public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { final FileSystem myFS = fs; final String myName = generateLeafFileName(name); //在这里可以硬性的指定文件名名称 final JobConf myJob = job; final Progressable myProgressable = arg3; return new RecordWriter<K, V>() { // a cache storing the record writers for different output files. TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>(); public void write(K key, V value) throws IOException { // get the file name based on the key String keyBasedPath = generateFileNameForKeyValue(key, value, myName); //一般依据key来决定文件名的时候 就在这个函数 // get the file name based on the input file name String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath); //如果想依据jobconf配置来确定名称的话 就在这个函数里实现 finalPath 就是最终的文件名 // get the actual key K actualKey = generateActualKey(key, value); V actualValue = generateActualValue(key, value); RecordWriter<K, V> rw = this.recordWriters.get(finalPath); if (rw == null) { // if we don't have the record writer yet for the final path, create one and add it to the cache rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);//必须自己实现的 this.recordWriters.put(finalPath, rw); } rw.write(actualKey, actualValue);// };
.......
}; }
上述函数,除了getInputFileBasedOutputFileName,其他的红色函数基本上都只是简单的返回输入值.