001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.mapreduce.stage;
031
032import java.io.IOException;
033import java.lang.reflect.Method;
034import java.util.List;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.io.compress.CompressionCodec;
039import org.apache.hadoop.mapreduce.InputFormat;
040import org.apache.hadoop.mapreduce.Job;
041import org.apache.hadoop.mapreduce.Mapper;
042import org.apache.hadoop.mapreduce.Reducer;
043import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
044import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
045import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
047import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
048import org.openimaj.util.reflection.ReflectionUtils;
049
050/**
051 * A stage in a multi step job. Each step is told where the jobs data will come
052 * from, where the output should be directed and then is expected to produce a
053 * stage. The job is configured and set up based on the generic types assigned
054 * to the stage. For most jobs these generics and providing the mapper/reducer
055 * classes should be enough. If any further settings need to be configured use
056 * the {@link #setup(Job)} which is called before the job is being returned
057 *
058 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
059 * @param <INPUT_FORMAT>
060 *            The job's input format. Must be a {@link FileOutputFormat}. Used
061 *            to {@link FileInputFormat#setInputPaths(Job, Path...)} with the
062 *            stage's input locations
063 * @param <OUTPUT_FORMAT>
064 *            The job's output format. Must be a {@link FileOutputFormat}. Used
065 *            to {@link FileOutputFormat#setOutputPath(Job, Path)} with the
066 *            stage's output location
067 * @param <INPUT_KEY>
068 *            The key format of the input to the map task
069 * @param <INPUT_VALUE>
070 *            The value format of the input to the map task
071 * @param <MAP_OUTPUT_KEY>
072 *            The key format of the output of the map task (and therefore the
073 *            input of the reduce)
074 * @param <MAP_OUTPUT_VALUE>
075 *            The value format of the output of the map task (and therefore the
076 *            input of the reduce)
077 * @param <OUTPUT_KEY>
078 *            The key format of the output of the reduce task
079 * @param <OUTPUT_VALUE>
080 *            The valueformat of the output of the reduce task
081 *
082 */
083
084@SuppressWarnings({ "unused", "unchecked" })
085public abstract class Stage<INPUT_FORMAT extends FileInputFormat<INPUT_KEY, INPUT_VALUE>, OUTPUT_FORMAT extends FileOutputFormat<OUTPUT_KEY, OUTPUT_VALUE>, INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>
086{
087        private Class<INPUT_FORMAT> inputFormatClass;
088        private Class<OUTPUT_FORMAT> outputFormatClass;
089
090        private Class<INPUT_VALUE> inputValueClass;
091        private Class<INPUT_KEY> inputKeyClass;
092        private Class<MAP_OUTPUT_KEY> mapOutputKeyClass;
093        private Class<MAP_OUTPUT_VALUE> mapOutputValueClass;
094        private Class<OUTPUT_KEY> outputKeyClass;
095        private Class<OUTPUT_VALUE> outputValueClass;
096        private List<Class<?>> genericTypes;
097
098        /**
099         * Inititalise all the classes based on the generics
100         */
101        public Stage() {
102                this.genericTypes = ReflectionUtils.getTypeArguments(Stage.class, this.getClass());
103                this.inputFormatClass = (Class<INPUT_FORMAT>) genericTypes.get(0);
104                this.outputFormatClass = (Class<OUTPUT_FORMAT>) genericTypes.get(1);
105                this.inputKeyClass = (Class<INPUT_KEY>) genericTypes.get(2);
106                this.inputValueClass = (Class<INPUT_VALUE>) genericTypes.get(3);
107                this.mapOutputKeyClass = (Class<MAP_OUTPUT_KEY>) genericTypes.get(4);
108                this.mapOutputValueClass = (Class<MAP_OUTPUT_VALUE>) genericTypes.get(5);
109                this.outputKeyClass = (Class<OUTPUT_KEY>) genericTypes.get(6);
110                this.outputValueClass = (Class<OUTPUT_VALUE>) genericTypes.get(7);
111        }
112
113        /**
114         * @return the name of the output directory of this stage. If the name is
115         *         null the directory itself is used.
116         */
117        public String outname() {
118                return null;
119        }
120
121        /**
122         * @param inputs
123         *            the input paths to be expected
124         * @param output
125         *            the output location
126         * @param conf
127         *            the job configuration
128         * @return the job to be launched in this stage
129         * @throws Exception
130         * @throws IOException
131         */
132        public Job stage(Path[] inputs, Path output, Configuration conf) throws Exception {
133
134                final Job job = new Job(conf);
135                // A bit of a dirty hack, if any input file is an lzo file sneakily
136                // switch the input format class to LZOTextInput
137                if (inputFormatClass.equals(TextInputFormat.class) && containsLZO(inputs)) {
138                        job.setInputFormatClass((Class<? extends InputFormat<?, ?>>) Class
139                                        .forName("com.hadoop.mapreduce.LzoTextInputFormat"));
140                }
141                else {
142                        job.setInputFormatClass(inputFormatClass);
143                }
144
145                job.setMapOutputKeyClass(mapOutputKeyClass);
146                job.setMapOutputValueClass(mapOutputValueClass);
147                job.setOutputKeyClass(outputKeyClass);
148                job.setOutputValueClass(outputValueClass);
149                job.setOutputFormatClass(outputFormatClass);
150                if (outputFormatClass.equals(TextOutputFormat.class) && this.lzoCompress()) {
151                        TextOutputFormat.setCompressOutput(job, true);
152                        TextOutputFormat.setOutputCompressorClass(job,
153                                        (Class<? extends CompressionCodec>) Class.forName("com.hadoop.compression.lzo.LzopCodec"));
154                } else {
155                        TextOutputFormat.setCompressOutput(job, false);
156                }
157
158                job.setJarByClass(this.getClass());
159                setInputPaths(job, inputs);
160                setOutputPath(job, output);
161                setMapperClass(job, mapper());
162                setReducerClass(job, reducer());
163                setCombinerClass(job, combiner());
164                setup(job);
165                return job;
166        }
167
168        /**
169         * For stages which require more fine grained control of how a job's
170         * combiner is set. This class is called with the job being constructed by
171         * this stage and the result of {@link #combiner()}.
172         *
173         * @param job
174         * @param combiner
175         */
176        public void setCombinerClass(Job job,
177                        Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>> combiner)
178        {
179                job.setCombinerClass(combiner);
180        }
181
182        /**
183         * For stages which require more fine grained control of how a job's reducer
184         * is set. This class is called with the job being constructed by this stage
185         * and the result of {@link #reducer()}.
186         *
187         * @param job
188         * @param reducer
189         */
190        public void setReducerClass(Job job,
191                        Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>> reducer)
192        {
193                job.setReducerClass(reducer);
194        }
195
196        /**
197         * For stages which need more fine grained control of how a job's mapper is
198         * set. For example, {@link MultithreadedMapper}) stages should overwrite
199         * this class, set the job's mapper to {@link MultithreadedMapper} with
200         * {@link Job#setMapperClass(Class)} and set the {@link MultithreadedMapper}
201         * mapper classed with
202         * {@link MultithreadedMapper#setMapperClass(Job, Class)}.
203         *
204         * this function is called with the result of {@link #mapper()}
205         *
206         * @param job
207         * @param mapper
208         */
209        public void setMapperClass(Job job,
210                        Class<? extends Mapper<INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>> mapper)
211        {
212                job.setMapperClass(mapper);
213        }
214
215        private boolean containsLZO(Path[] inputs) {
216                for (final Path path : inputs) {
217                        if (path.getName().endsWith(".lzo"))
218                                return true;
219                }
220                return false;
221        }
222
223        /**
224         * Add any final adjustments to the job's config
225         *
226         * @param job
227         * @throws IOException
228         */
229        public void setup(Job job) throws IOException {
230        }
231
232        /**
233         * By default this method returns the {@link IdentityMapper} class. This
234         * mapper outputs the values handed as they are.
235         *
236         * @return the class of the mapper to use
237         */
238        public Class<? extends Mapper<INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>> mapper() {
239                final IdentityMapper<INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE> nr = new IdentityMapper<INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>();
240                return (Class<? extends Mapper<INPUT_KEY, INPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>>) nr.getClass();
241        }
242
243        /**
244         * By default this method returns the {@link IdentityReducer} class. This
245         * reducer outputs the values handed as they are.
246         *
247         * @return the class of the reducer to use
248         */
249        public Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>> reducer() {
250                final IdentityReducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE> nr = new IdentityReducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>();
251                return (Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, OUTPUT_KEY, OUTPUT_VALUE>>) nr.getClass();
252        }
253
254        /**
255         * By default this method returns the {@link IdentityReducer} class. This
256         * combiner outputs the values handed as they are.
257         *
258         * @return the class of the reducer to use
259         */
260        public Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>> combiner() {
261                final IdentityReducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE> nr = new IdentityReducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>();
262                return (Class<? extends Reducer<MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE>>) nr
263                                .getClass();
264        }
265
266        private void setOutputPath(Job job, Path output) {
267                try {
268                        final Method method = outputFormatClass.getMethod("setOutputPath", Job.class, Path.class);
269                        method.invoke(null, job, output);
270                } catch (final Exception e) {
271                        System.err.println("Couldn't set output path!");
272                }
273        }
274
275        private void setInputPaths(Job job, Path[] inputs) {
276                try {
277                        final Method method = inputFormatClass.getMethod("setInputPaths", Job.class, Path[].class);
278                        method.invoke(null, job, inputs);
279                } catch (final Exception e) {
280                        System.err.println("Couldn't set input path!");
281                }
282        }
283
284        /**
285         * Called when the stage's job is completed. Might never be called in some
286         * cases. For example, when the stagerunner is told specifically not to wait
287         * for the job to finish.
288         *
289         * @param job
290         */
291        public void finished(Job job) {
292
293        }
294
295        /**
296         * @return Whether this stage should LZO compress its output
297         */
298        public boolean lzoCompress() {
299                return false;
300        }
301}