001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.mapreduce.stage; 031 032import java.io.IOException; 033import java.lang.reflect.Method; 034import java.util.List; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.io.compress.CompressionCodec; 039import org.apache.hadoop.mapreduce.InputFormat; 040import org.apache.hadoop.mapreduce.Job; 041import org.apache.hadoop.mapreduce.Mapper; 042import org.apache.hadoop.mapreduce.Reducer; 043import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 044import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 045import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper; 046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 047import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 048import org.openimaj.util.reflection.ReflectionUtils; 049 050/** 051 * A stage in a multi step job. Each step is told where the jobs data will come 052 * from, where the output should be directed and then is expected to produce a 053 * stage. The job is configured and set up based on the generic types assigned 054 * to the stage. For most jobs these generics and providing the mapper/reducer 055 * classes should be enough. If any further settings need to be configured use 056 * the {@link #setup(Job)} which is called before the job is being returned 057 * 058 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 059 * @param <INPUT_FORMAT> 060 * The job's input format. Must be a {@link FileOutputFormat}. Used 061 * to {@link FileInputFormat#setInputPaths(Job, Path...)} with the 062 * stage's input locations 063 * @param <OUTPUT_FORMAT> 064 * The job's output format. Must be a {@link FileOutputFormat}. Used 065 * to {@link FileOutputFormat#setOutputPath(Job, Path)} with the 066 * stage's output location 067 * @param <INPUT_KEY> 068 * The key format of the input to the map task 069 * @param <INPUT_VALUE> 070 * The value format of the input to the map task 071 * @param <MAP_OUTPUT_KEY> 072 * The key format of the output of the map task (and therefore the 073 * input of the reduce) 074 * @param <MAP_OUTPUT_VALUE> 075 * The value format of the output of the map task (and therefore the 076 * input of the reduce) 077 * @param <OUTPUT_KEY> 078 * The key format of the output of the reduce task 079 * @param <OUTPUT_VALUE> 080 * The valueformat of the output of the reduce task 081 * 082 */ 083 084@SuppressWarnings({ "unused", "unchecked" }) 085public abstract class Stage<INPUT_FORMAT extends FileInputFormat<INPUT_KEY, INPUT_VALUE>, OUTPUT_FORMAT extends FileOutputFormat<OUTPUT_KEY, OUTPUT_VALUE>, INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE> 086{ 087 private Class<INPUT_FORMAT> inputFormatClass; 088 private Class<OUTPUT_FORMAT> outputFormatClass; 089 090 private Class<INPUT_VALUE> inputValueClass; 091 private Class<INPUT_KEY> inputKeyClass; 092 private Class<MAP_OUTPUT_KEY> mapOutputKeyClass; 093 private Class<MAP_OUTPUT_VALUE> mapOutputValueClass; 094 private Class<OUTPUT_KEY> outputKeyClass; 095 private Class<OUTPUT_VALUE> outputValueClass; 096 private List<Class<?>> genericTypes; 097 098 /** 099 * Inititalise all the classes based on the generics 100 */ 101 public Stage() { 102 this.genericTypes = ReflectionUtils.getTypeArguments(Stage.class, this.getClass()); 103 this.inputFormatClass = (Class<INPUT_FORMAT>) genericTypes.get(0); 104 this.outputFormatClass = (Class<OUTPUT_FORMAT>) genericTypes.get(1); 105 this.inputKeyClass = (Class<INPUT_KEY>) genericTypes.get(2); 106 this.inputValueClass = (Class<INPUT_VALUE>) genericTypes.get(3); 107 this.mapOutputKeyClass = (Class<MAP_OUTPUT_KEY>) genericTypes.get(4); 108 this.mapOutputValueClass = (Class<MAP_OUTPUT_VALUE>) genericTypes.get(5); 109 this.outputKeyClass = (Class<OUTPUT_KEY>) genericTypes.get(6); 110 this.outputValueClass = (Class<OUTPUT_VALUE>) genericTypes.get(7); 111 } 112 113 /** 114 * @return the name of the output directory of this stage. If the name is 115 * null the directory itself is used. 116 */ 117 public String outname() { 118 return null; 119 } 120 121 /** 122 * @param inputs 123 * the input paths to be expected 124 * @param output 125 * the output location 126 * @param conf 127 * the job configuration 128 * @return the job to be launched in this stage 129 * @throws Exception 130 * @throws IOException 131 */ 132 public Job stage(Path[] inputs, Path output, Configuration conf) throws Exception { 133 134 final Job job = new Job(conf); 135 // A bit of a dirty hack, if any input file is an lzo file sneakily 136 // switch the input format class to LZOTextInput 137 if (inputFormatClass.equals(TextInputFormat.class) && containsLZO(inputs)) { 138 job.setInputFormatClass((Class<? extends InputFormat<?, ?>>) Class 139 .forName("com.hadoop.mapreduce.LzoTextInputFormat")); 140 } 141 else { 142 job.setInputFormatClass(inputFormatClass); 143 } 144 145 job.setMapOutputKeyClass(mapOutputKeyClass); 146 job.setMapOutputValueClass(mapOutputValueClass); 147 job.setOutputKeyClass(outputKeyClass); 148 job.setOutputValueClass(outputValueClass); 149 job.setOutputFormatClass(outputFormatClass); 150 if (outputFormatClass.equals(TextOutputFormat.class) && this.lzoCompress()) { 151 TextOutputFormat.setCompressOutput(job, true); 152 TextOutputFormat.setOutputCompressorClass(job, 153 (Class<? extends CompressionCodec>) Class.forName("com.hadoop.compression.lzo.LzopCodec")); 154 } else { 155 TextOutputFormat.setCompressOutput(job, false); 156 } 157 158 job.setJarByClass(this.getClass()); 159 setInputPaths(job, inputs); 160 setOutputPath(job, output); 161 setMapperClass(job, mapper()); 162 setReducerClass(job, reducer()); 163 setCombinerClass(job, combiner()); 164 setup(job); 165 return job; 166 } 167 168 /** 169 * For stages which require more fine grained control of how a job's 170 * combiner is set. This class is called with the job being constructed by 171 * this stage and the result of {@link #combiner()}. 172 * 173 * @param job 174 * @param combiner 175 */ 176 public void setCombinerClass(Job job, 177 Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>> combiner) 178 { 179 job.setCombinerClass(combiner); 180 } 181 182 /** 183 * For stages which require more fine grained control of how a job's reducer 184 * is set. This class is called with the job being constructed by this stage 185 * and the result of {@link #reducer()}. 186 * 187 * @param job 188 * @param reducer 189 */ 190 public void setReducerClass(Job job, 191 Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>> reducer) 192 { 193 job.setReducerClass(reducer); 194 } 195 196 /** 197 * For stages which need more fine grained control of how a job's mapper is 198 * set. For example, {@link MultithreadedMapper}) stages should overwrite 199 * this class, set the job's mapper to {@link MultithreadedMapper} with 200 * {@link Job#setMapperClass(Class)} and set the {@link MultithreadedMapper} 201 * mapper classed with 202 * {@link MultithreadedMapper#setMapperClass(Job, Class)}. 203 * 204 * this function is called with the result of {@link #mapper()} 205 * 206 * @param job 207 * @param mapper 208 */ 209 public void setMapperClass(Job job, 210 Class<? extends Mapper<INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>> mapper) 211 { 212 job.setMapperClass(mapper); 213 } 214 215 private boolean containsLZO(Path[] inputs) { 216 for (final Path path : inputs) { 217 if (path.getName().endsWith(".lzo")) 218 return true; 219 } 220 return false; 221 } 222 223 /** 224 * Add any final adjustments to the job's config 225 * 226 * @param job 227 * @throws IOException 228 */ 229 public void setup(Job job) throws IOException { 230 } 231 232 /** 233 * By default this method returns the {@link IdentityMapper} class. This 234 * mapper outputs the values handed as they are. 235 * 236 * @return the class of the mapper to use 237 */ 238 public Class<? extends Mapper<INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>> mapper() { 239 final IdentityMapper<INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE> nr = new IdentityMapper<INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>(); 240 return (Class<? extends Mapper<INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>>) nr.getClass(); 241 } 242 243 /** 244 * By default this method returns the {@link IdentityReducer} class. This 245 * reducer outputs the values handed as they are. 246 * 247 * @return the class of the reducer to use 248 */ 249 public Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>> reducer() { 250 final IdentityReducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE> nr = new IdentityReducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>(); 251 return (Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>>) nr.getClass(); 252 } 253 254 /** 255 * By default this method returns the {@link IdentityReducer} class. This 256 * combiner outputs the values handed as they are. 257 * 258 * @return the class of the reducer to use 259 */ 260 public Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>> combiner() { 261 final IdentityReducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE> nr = new IdentityReducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>(); 262 return (Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>>) nr 263 .getClass(); 264 } 265 266 private void setOutputPath(Job job, Path output) { 267 try { 268 final Method method = outputFormatClass.getMethod("setOutputPath", Job.class, Path.class); 269 method.invoke(null, job, output); 270 } catch (final Exception e) { 271 System.err.println("Couldn't set output path!"); 272 } 273 } 274 275 private void setInputPaths(Job job, Path[] inputs) { 276 try { 277 final Method method = inputFormatClass.getMethod("setInputPaths", Job.class, Path[].class); 278 method.invoke(null, job, inputs); 279 } catch (final Exception e) { 280 System.err.println("Couldn't set input path!"); 281 } 282 } 283 284 /** 285 * Called when the stage's job is completed. Might never be called in some 286 * cases. For example, when the stagerunner is told specifically not to wait 287 * for the job to finish. 288 * 289 * @param job 290 */ 291 public void finished(Job job) { 292 293 } 294 295 /** 296 * @return Whether this stage should LZO compress its output 297 */ 298 public boolean lzoCompress() { 299 return false; 300 } 301}