001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools; 031 032import java.io.IOException; 033import java.net.URI; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.List; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.LocalFileSystem; 042import org.apache.hadoop.fs.Path; 043import org.kohsuke.args4j.CmdLineException; 044import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 045import org.openimaj.io.FileUtils; 046import org.openimaj.tools.InOutToolOptions; 047 048/** 049 * Tools for dealing with #InOutTool instances that are hdfs files 050 * 051 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 052 */ 053public class HadoopToolsUtil { 054 055 /** 056 * 057 * @param tool options to get data from 058 * @throws CmdLineException 059 */ 060 public static void validateOutput(InOutToolOptions tool) throws CmdLineException { 061 try { 062 if(tool.getOutput() == null) throw new CmdLineException(null,"No Output Specified"); 063 URI outuri = SequenceFileUtility.convertToURI(tool.getOutput()); 064 FileSystem fs = getFileSystem(outuri); 065 Path p = new Path(outuri.toString()); 066 if(fs.exists(p)) 067 { 068 if(tool.overwriteOutput()) 069 { 070 fs.delete(p, true); 071 } 072 else{ 073// throw new CmdLineException(null, "Output exists, couldn't delete"); 074 System.out.println("Output exists, trying to use what is there..."); 075 } 076 } 077 } catch (IOException e) { 078 throw new CmdLineException(null, "Couldn't delete existing output"); 079 } 080 081 } 082 /** 083 * 084 * @param outpath The desired output 085 * @param replace whether the existing outputs should be removed 086 * @throws CmdLineException 087 */ 088 public static void validateOutput(String outpath, boolean replace) throws CmdLineException { 089 try { 090 URI outuri = SequenceFileUtility.convertToURI(outpath); 091 FileSystem fs = getFileSystem(outuri); 092 Path p = new Path(outuri.toString()); 093 if(fs.exists(p)) 094 { 095 if(replace) 096 { 097 fs.delete(p, true); 098 } 099 else{ 100// throw new CmdLineException(null, "Output exists, couldn't delete"); 101 System.out.println("Output exists, trying to use what is there..."); 102 } 103 } 104 } catch (IOException e) { 105 throw new CmdLineException(null, "Couldn't delete existing output"); 106 } 107 108 } 109 110 /** 111 * @param uri 112 * @return the file system of a given path (HDFS or Local usually) 113 * @throws IOException 114 */ 115 public static FileSystem getFileSystem(URI uri) throws IOException { 116 Configuration config = new Configuration(); 117 FileSystem fs = FileSystem.get(uri, config); 118 if (fs instanceof LocalFileSystem) fs = ((LocalFileSystem)fs).getRaw(); 119 return fs; 120 } 121 122 /** 123 * Get the {@link FileSystem} corresponding to a {@link Path}. 124 * @param p the path. 125 * @return the filesystem 126 * @throws IOException 127 */ 128 public static FileSystem getFileSystem(Path p) throws IOException { 129 return getFileSystem(p.toUri()); 130 } 131 132 /** 133 * @param tool 134 * @throws CmdLineException 135 */ 136 public static void validateInput(InOutToolOptions tool) throws CmdLineException { 137 138 try { 139 FileSystem fs = null ; 140 if(tool.getAllInputs() == null) throw new IOException(); 141 for (String input : tool.getAllInputs()) { 142 URI outuri = SequenceFileUtility.convertToURI(input); 143 if(fs == null) fs = getFileSystem(outuri); 144 if(!fs.exists(new Path(outuri.toString()))) 145 throw new CmdLineException(null, "Couldn't find input file"); 146 } 147 148 } catch (IOException e) { 149 throw new CmdLineException(null, "Couldn't find input file filesystem"); 150 } 151 } 152 153 /** 154 * Delete a file 155 * @param f the file to delete 156 * @throws IOException 157 */ 158 public static void removeFile(String f) throws IOException { 159 URI outuri = SequenceFileUtility.convertToURI(f); 160 FileSystem fs = getFileSystem(outuri); 161 Path p = new Path(outuri.toString()); 162 fs.delete(p, true); 163 } 164 165 /** 166 * Get the output path from an {@link InOutToolOptions}. 167 * @param options the {@link InOutToolOptions}. 168 * @return the output path. 169 */ 170 public static Path getOutputPath(InOutToolOptions options) { 171 return new Path(options.getOutput()); 172 } 173 174 /** 175 * Get the output path from a String. 176 * @param path the path string 177 * @return the path 178 */ 179 public static Path getOutputPath(String path) { 180 return new Path(path); 181 } 182 183 /** 184 * Get the input paths from an {@link InOutToolOptions}. This will resolve the input path 185 * and return either a {@link Path} object representing the string 186 * or, if the path string is a directory, a list of {@link Path}s 187 * representing all the "part" files. 188 * @param options the {@link InOutToolOptions}. 189 * @return the input path 190 * @throws IOException 191 */ 192 public static Path[] getInputPaths(InOutToolOptions options) throws IOException { 193 return SequenceFileUtility.getFilePaths(options.getAllInputs(), "part"); 194 } 195 196 /** 197 * Get the input paths from a String. This will resolve the path string 198 * and return either a {@link Path} object representing the string 199 * or, if the path string is a directory, a list of {@link Path}s 200 * representing all the "part" files. 201 * 202 * @param path the path string 203 * @return the paths 204 * @throws IOException 205 */ 206 public static Path[] getInputPaths(String path) throws IOException { 207 return SequenceFileUtility.getFilePaths(path, "part"); 208 } 209 210 /** 211 * @param paths 212 * @return all the file starting with "part" in the paths requested 213 * @throws IOException 214 */ 215 public static Path[] getInputPaths(String[] paths) throws IOException { 216 return SequenceFileUtility.getFilePaths(paths, "part"); 217 } 218 219 /** 220 * All the files starting with "part" in the paths which look like: "paths[i]/subdir 221 * @param paths 222 * @param subdir 223 * @return the paths to the part files 224 * @throws IOException 225 */ 226 public static Path[] getInputPaths(String[] paths, String subdir) throws IOException { 227 return SequenceFileUtility.getFilePaths(paths, subdir, "part"); 228 } 229 230 /** 231 * Use hadoop filesystem to check if the given path exists 232 * @param path the path to the file 233 * @return true if file exists; false otherwise 234 * @throws IOException 235 */ 236 public static boolean fileExists(String path) throws IOException{ 237 URI outuri = SequenceFileUtility.convertToURI(path); 238 FileSystem fs = getFileSystem(outuri); 239 Path p = new Path(outuri.toString()); 240 return fs.exists(p); 241 } 242 /** 243 * Read a whole hadoop file into a string. This is obviously a ridiculous thing to do for all but the SMALLEST hadoop files 244 * so be very careful 245 * @param p a path 246 * @return the content of the path p as a string 247 * @throws IOException 248 */ 249 public static String[] readlines(String p) throws IOException { 250 Path[] allIn = getInputPaths(p); 251 if(allIn.length == 0)return new String[0]; 252 List<String> out = new ArrayList<String>(); 253 FileSystem fs = getFileSystem(allIn[0]); 254 for (Path path : allIn) { 255 FSDataInputStream is = fs.open(path); 256 out.addAll(Arrays.asList(FileUtils.readlines(is))); 257 } 258 return out.toArray(new String[out.size()]); 259 } 260 261 private static String COMMA_REPLACE = "#COMMA#"; 262 263 /** 264 * A horrible hack to deal with hadoop's horrible hack when setting arrays of strings as configs 265 * @param args 266 * @return horribly replace each "," with #COMMA# 267 */ 268 public static String[] encodeArgs(String[] args) { 269 String[] ret = new String[args.length]; 270 int i = 0; 271 for (String arg : args) { 272 ret[i] = arg.replaceAll(",", COMMA_REPLACE); 273 i++; 274 } 275 return ret; 276 } 277 278 /** 279 * A horrible hack to deal with hadoop's horrible hack when setting arrays of strings as configs 280 * @param args 281 * @return horribly replace each #COMMA# with "," 282 */ 283 public static String[] decodeArgs(String[] args) { 284 String[] ret = new String[args.length]; 285 int i = 0; 286 for (String arg : args) { 287 ret[i] = arg.replaceAll(COMMA_REPLACE,","); 288 i++; 289 } 290 return ret; 291 } 292 293 294 295 296}