001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.mapreduce; 031 032import java.io.IOException; 033import java.net.URI; 034import java.util.ArrayList; 035import java.util.HashMap; 036import java.util.LinkedList; 037import java.util.List; 038import java.util.Map; 039 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.LocalFileSystem; 043import org.apache.hadoop.fs.Path; 044import org.openimaj.hadoop.mapreduce.stage.Stage; 045import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 046 047/** 048 * A set of hadoop jobs done in series. The final output (directory) of the nth job is used 049 * as the map input path of the (n+1)th job. 050 * 051 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 052 * 053 */ 054public class MultiStagedJob { 055 private Path outputRoot; 056 private boolean removePreliminary; 057 private LinkedList<Stage<?,?,?,?,?,?,?,?>> stages; 058 private Path[] initial; 059 private String[] toolArgs; 060 private Map<String,Path[]> completedJobs; 061 062 /** 063 * Start a multistaged job specification. The root path holds the final 064 * and all preliminary steps 065 * 066 * @param initialInput the initial input given to the first stage 067 * @param root the final output location 068 * @param args the arguments with which to start the job 069 */ 070 public MultiStagedJob(Path[] initialInput, Path root, String[] args) { 071 this(initialInput, root,false, args); 072 } 073 074 /** 075 * 076 * Start a multistaged job specification. The root path holds the final. 077 * and all preliminary steps 078 * 079 * @param initialInput the initial input given to the first stage 080 * @param removePreliminary whether all intermediate steps should be removed 081 * @param root the final output location 082 * @param args the arguments with which to start the job 083 */ 084 public MultiStagedJob(Path[] initialInput, Path root, boolean removePreliminary, String args[]) { 085 this.outputRoot = root; 086 this.initial = initialInput; 087 this.removePreliminary = removePreliminary; 088 this.stages = new LinkedList<Stage<?,?,?,?,?,?,?,?>>(); 089 this.toolArgs = args; 090 this.completedJobs = new HashMap<String,Path[]>(); 091 } 092 093 /** 094 * Conveniance function. Finds the input paths using #SequenceFileUtility.getFilePaths 095 * and uses Path(outpath) 096 * @param inpath 097 * @param outpath 098 * @param args the arguments with which to start the job 099 * @throws IOException 100 */ 101 public MultiStagedJob(String inpath, String outpath, String[] args) throws IOException { 102 this(SequenceFileUtility.getFilePaths(inpath, "path"),new Path(outpath),args); 103 } 104 105 /** 106 * Add a stage to the end of the queue of stages. 107 * @param s 108 */ 109 public void queueStage(Stage<?,?,?,?,?,?,?,?> s){ 110 this.stages.offer(s); 111 } 112 113 /** 114 * Run all the staged jobs. The input/output of each job is at follows: 115 * initial -> stage1 -> output1 116 * output1 -> stage2 -> output2 117 * ... 118 * output(N-1) -> stageN -> final 119 * 120 * for each output, the directory created is scanned for part files matching the regex "part.*" 121 * @return The path to the final output for convenience (##base##/final by convention) 122 * @throws Exception 123 */ 124 public Path runAll() throws Exception{ 125 Stage<?,?,?,?,?,?,?,?> s = null; 126 Path[] currentInputs = initial; 127 Path constructedOutputPath = null; 128 List<String> toRemove = new ArrayList<String>(); 129 // Check if the final output exists, and if so that it is not empty, if so, we're done here! continue! 130 constructedOutputPath = constructOutputPath(this.stages.getLast().outname()); 131 boolean finalOutputExists = fileExists(constructedOutputPath.toString()); 132 if( 133 finalOutputExists && 134 SequenceFileUtility.getFilePaths(constructedOutputPath.toString(), "part").length != 0 135 ) return constructedOutputPath; // we're done, the output exists and it isn't empty! 136 137 while((s = this.stages.pollFirst()) != null){ 138 constructedOutputPath = constructOutputPath(s.outname()); 139 boolean fExists = fileExists(constructedOutputPath.toString()); 140 if( 141 !fExists || // if the file doesn't exist 142 SequenceFileUtility.getFilePaths(constructedOutputPath.toString(), "part").length == 0 // or the file exists but the partfile does not 143 ){ 144 // At this point the file either doesn't exist or if it exists it had no part file, it should be deleted! 145 if(fExists){ 146 System.out.println("File exists but was empty, removing"); 147 FileSystem fs = getFileSystem(constructedOutputPath.toUri()); 148 fs.delete(constructedOutputPath, true); 149 } 150 SingleStagedJob runner = new SingleStagedJob(s, currentInputs, constructedOutputPath ); 151 runner.runMain(this.toolArgs); 152 } 153 currentInputs = SequenceFileUtility.getFilePaths(constructedOutputPath.toString(), "part"); 154 // add the output of this stage to the list of stages to be removed 155 if(this.removePreliminary && this.stages.size() > 0){ 156 toRemove.add(constructedOutputPath.toString()); 157 } 158 completedJobs.put(s.outname(),currentInputs); 159 } 160 for (String toremove : toRemove) { 161 System.out.println("Removing intermediate output: " + toremove); 162 Path ptoremove = new Path(toremove); 163 FileSystem fs = getFileSystem(ptoremove.toUri()); 164 fs.delete(ptoremove, true); 165 } 166 return constructedOutputPath; 167 } 168 169 private static boolean fileExists(String path) throws IOException{ 170 URI outuri = SequenceFileUtility.convertToURI(path); 171 FileSystem fs = getFileSystem(outuri); 172 Path p = new Path(outuri.toString()); 173 return fs.exists(p); 174 } 175 176 private static FileSystem getFileSystem(URI uri) throws IOException { 177 Configuration config = new Configuration(); 178 FileSystem fs = FileSystem.get(uri, config); 179 if (fs instanceof LocalFileSystem) fs = ((LocalFileSystem)fs).getRaw(); 180 return fs; 181 } 182 183 private Path constructOutputPath(String outname) { 184 String newOutPath = this.outputRoot.toString(); 185 if(outname != null) newOutPath += "/" + outname; 186 187 return new Path(newOutPath); 188 } 189 190 /** 191 * @param completedJobId 192 * @return the path to the output of the completed job 193 */ 194 public Path[] getStagePaths(String completedJobId) { 195 return this.completedJobs.get(completedJobId); 196 } 197 198 /** 199 * @param b if true all but the final output and input for this multi staged job are true 200 */ 201 public void removeIntermediate(boolean b) { 202 this.removePreliminary = b; 203 } 204}