001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.mapreduce;
031
032import java.io.IOException;
033import java.net.URI;
034import java.util.ArrayList;
035import java.util.HashMap;
036import java.util.LinkedList;
037import java.util.List;
038import java.util.Map;
039
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.LocalFileSystem;
043import org.apache.hadoop.fs.Path;
044import org.openimaj.hadoop.mapreduce.stage.Stage;
045import org.openimaj.hadoop.sequencefile.SequenceFileUtility;
046
047/**
048 * A set of hadoop jobs done in series. The final output (directory) of the nth job is used
049 * as the map input path of the (n+1)th job.
050 * 
051 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
052 *
053 */
054public class MultiStagedJob {
055        private Path outputRoot;
056        private boolean removePreliminary;
057        private LinkedList<Stage<?,?,?,?,?,?,?,?>> stages;
058        private Path[] initial;
059        private String[] toolArgs;
060        private Map<String,Path[]> completedJobs;
061
062        /**
063         * Start a multistaged job specification. The root path holds the final
064         * and all preliminary steps
065         * 
066         * @param initialInput the initial input given to the first stage
067         * @param root the final output location
068         * @param args the arguments with which to start the job
069         */
070        public MultiStagedJob(Path[] initialInput, Path root, String[] args) {
071                this(initialInput, root,false, args);
072        }
073        
074        /**
075         * 
076         * Start a multistaged job specification. The root path holds the final.
077         * and all preliminary steps
078         * 
079         * @param initialInput the initial input given to the first stage
080         * @param removePreliminary whether all intermediate steps should be removed
081         * @param root the final output location
082         * @param args the arguments with which to start the job
083         */
084        public MultiStagedJob(Path[] initialInput, Path root, boolean removePreliminary, String args[]) {
085                this.outputRoot = root;
086                this.initial = initialInput;
087                this.removePreliminary = removePreliminary;
088                this.stages = new LinkedList<Stage<?,?,?,?,?,?,?,?>>();
089                this.toolArgs = args;
090                this.completedJobs = new HashMap<String,Path[]>();
091        }
092        
093        /**
094         * Conveniance function. Finds the input paths using #SequenceFileUtility.getFilePaths 
095         * and uses Path(outpath)
096         * @param inpath
097         * @param outpath
098         * @param args the arguments with which to start the job
099         * @throws IOException
100         */
101        public MultiStagedJob(String inpath, String outpath, String[] args) throws IOException {
102                this(SequenceFileUtility.getFilePaths(inpath, "path"),new Path(outpath),args);
103        }
104
105        /**
106         * Add a stage to the end of the queue of stages.
107         * @param s
108         */
109        public void queueStage(Stage<?,?,?,?,?,?,?,?> s){
110                this.stages.offer(s);
111        }
112        
113        /**
114         * Run all the staged jobs. The input/output of each job is at follows:
115         * initial -> stage1 -> output1
116         * output1 -> stage2 -> output2
117         * ...
118         * output(N-1) -> stageN -> final
119         * 
120         * for each output, the directory created is scanned for part files matching the regex "part.*"
121         * @return The path to the final output for convenience (##base##/final by convention)
122         * @throws Exception 
123         */
124        public Path runAll() throws Exception{
125                Stage<?,?,?,?,?,?,?,?> s = null;
126                Path[] currentInputs = initial;
127                Path constructedOutputPath = null;
128                List<String> toRemove = new ArrayList<String>();
129                // Check if the final output exists, and if so that it is not empty, if so, we're done here! continue!
130                constructedOutputPath = constructOutputPath(this.stages.getLast().outname());
131                boolean finalOutputExists = fileExists(constructedOutputPath.toString());
132                if(
133                                finalOutputExists && 
134                                SequenceFileUtility.getFilePaths(constructedOutputPath.toString(), "part").length != 0
135                ) return constructedOutputPath; // we're done, the output exists and it isn't empty!
136                
137                while((s = this.stages.pollFirst()) != null){
138                        constructedOutputPath = constructOutputPath(s.outname());
139                        boolean fExists = fileExists(constructedOutputPath.toString());
140                        if(
141                                !fExists || // if the file doesn't exist
142                                SequenceFileUtility.getFilePaths(constructedOutputPath.toString(), "part").length == 0 // or the file exists but the partfile does not
143                        ){
144                                // At this point the file either doesn't exist or if it exists it had no part file, it should be deleted!
145                                if(fExists){
146                                        System.out.println("File exists but was empty, removing");
147                                        FileSystem fs = getFileSystem(constructedOutputPath.toUri());
148                                        fs.delete(constructedOutputPath, true);
149                                }
150                                SingleStagedJob runner = new SingleStagedJob(s, currentInputs, constructedOutputPath );
151                                runner.runMain(this.toolArgs);
152                        }
153                        currentInputs = SequenceFileUtility.getFilePaths(constructedOutputPath.toString(), "part");
154                        // add the output of this stage to the list of stages to be removed
155                        if(this.removePreliminary && this.stages.size() > 0){
156                                toRemove.add(constructedOutputPath.toString());
157                        }
158                        completedJobs.put(s.outname(),currentInputs);
159                }
160                for (String toremove : toRemove) {
161                        System.out.println("Removing intermediate output: " + toremove);
162                        Path ptoremove = new Path(toremove);
163                        FileSystem fs = getFileSystem(ptoremove.toUri());
164                        fs.delete(ptoremove, true);
165                }
166                return constructedOutputPath;
167        }
168        
169        private static boolean fileExists(String path) throws IOException{
170                URI outuri = SequenceFileUtility.convertToURI(path);
171                FileSystem fs = getFileSystem(outuri);
172                Path p = new Path(outuri.toString());
173                return fs.exists(p);
174        }
175        
176        private static FileSystem getFileSystem(URI uri) throws IOException {
177                Configuration config = new Configuration();
178                FileSystem fs = FileSystem.get(uri, config);
179                if (fs instanceof LocalFileSystem) fs = ((LocalFileSystem)fs).getRaw();
180                return fs;
181        }
182        
183        private Path constructOutputPath(String outname) {
184                String newOutPath = this.outputRoot.toString();
185                if(outname != null) newOutPath +=  "/" + outname;
186                
187                return new Path(newOutPath);
188        }
189
190        /**
191         * @param completedJobId
192         * @return the path to the output of the completed job
193         */
194        public Path[] getStagePaths(String completedJobId) {
195                return this.completedJobs.get(completedJobId);
196        }
197
198        /**
199         * @param b if true all but the final output and input for this multi staged job are true
200         */
201        public void removeIntermediate(boolean b) {
202                this.removePreliminary = b;
203        }
204}