001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.mapreduce; 031 032import java.io.IOException; 033import java.util.ArrayList; 034import java.util.Collection; 035import java.util.List; 036import java.util.Map; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.io.BytesWritable; 041import org.apache.hadoop.io.SequenceFile.CompressionType; 042import org.apache.hadoop.io.Text; 043import org.apache.hadoop.io.compress.DefaultCodec; 044import org.apache.hadoop.mapreduce.Job; 045import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 046import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 047import org.openimaj.hadoop.sequencefile.MetadataConfiguration; 048import org.openimaj.hadoop.sequencefile.MetadataSequenceFileOutputFormat; 049 050/** 051 * Utility methods for creating {@link Job}s that injest and output {@link Text} 052 * keys and {@link BytesWritable} values. 053 * 054 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 055 */ 056public class TextBytesJobUtil { 057 public static Job createJob(String inputPath, String outputPath, Map<String, String> metadata, Configuration config) 058 throws IOException 059 { 060 return createJob(new Path(inputPath), new Path(outputPath), metadata, config); 061 } 062 063 public static Job createJob(Collection<String> inputPaths, String outputPath, Map<String, String> metadata, 064 Configuration config) throws IOException 065 { 066 final List<Path> paths = new ArrayList<Path>(); 067 068 for (final String s : inputPaths) 069 paths.add(new Path(s)); 070 071 return createJob(paths, new Path(outputPath), metadata, config); 072 } 073 074 public static Job createJob(Path inputPath, Path outputPath, Map<String, String> metadata, Configuration config) 075 throws IOException 076 { 077 return createJob(new Path[] { inputPath }, outputPath, metadata, config); 078 } 079 080 public static Job createJob(Collection<Path> inputPaths, Path outputPath, Map<String, String> metadata, 081 Configuration config) throws IOException 082 { 083 return createJob(inputPaths.toArray(new Path[inputPaths.size()]), outputPath, metadata, config); 084 } 085 086 public static Job createJob(Path[] inputPaths, Path outputPath, Map<String, String> metadata, Configuration config) 087 throws IOException 088 { 089 final Job job = new Job(config); 090 091 job.setInputFormatClass(SequenceFileInputFormat.class); 092 job.setOutputKeyClass(Text.class); 093 job.setOutputValueClass(BytesWritable.class); 094 job.setOutputFormatClass(MetadataSequenceFileOutputFormat.class); 095 096 SequenceFileInputFormat.setInputPaths(job, inputPaths); 097 SequenceFileOutputFormat.setOutputPath(job, outputPath); 098 SequenceFileOutputFormat.setCompressOutput(job, true); 099 SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); 100 SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); 101 102 if (metadata != null) 103 MetadataConfiguration.setMetadata(metadata, job.getConfiguration()); 104 105 return job; 106 } 107}