001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.localfeature; 031 032import java.io.IOException; 033import java.net.URI; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.LocalFileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper; 041import org.kohsuke.args4j.CmdLineException; 042import org.kohsuke.args4j.CmdLineOptionsProvider; 043import org.kohsuke.args4j.CmdLineParser; 044import org.kohsuke.args4j.Option; 045import org.kohsuke.args4j.ProxyOptionHandler; 046import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 047import org.openimaj.hadoop.tools.localfeature.HadoopLocalFeaturesTool.LocalFeaturesMapper; 048import org.openimaj.hadoop.tools.localfeature.HadoopLocalFeaturesToolOptions.MapperMode.MapperModeOp; 049import org.openimaj.tools.localfeature.options.ExtractorOptions; 050 051/** 052 * Options for the {@link HadoopLocalFeaturesTool}. 053 * 054 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 055 */ 056public class HadoopLocalFeaturesToolOptions extends ExtractorOptions { 057 static enum MapperMode implements CmdLineOptionsProvider { 058 STANDARD { 059 @Override 060 public MapperModeOp getOptions() { 061 return new MapperModeOp() { 062 @Override 063 public void prepareJobMapper(Job job, Class<LocalFeaturesMapper> mapperClass) { 064 job.setMapperClass(mapperClass); 065 } 066 }; 067 } 068 }, 069 MULTITHREAD { 070 @Override 071 public MapperModeOp getOptions() { 072 return new MapperModeOp() { 073 @Option( 074 name = "--threads", 075 aliases = "-j", 076 required = false, 077 usage = "Use NUMBER threads per mapper. defaults n processors.", 078 metaVar = "NUMBER") 079 private int concurrency = Runtime.getRuntime().availableProcessors(); 080 081 @Override 082 public void prepareJobMapper(Job job, Class<LocalFeaturesMapper> mapperClass) { 083 if (concurrency <= 0) 084 concurrency = Runtime.getRuntime().availableProcessors(); 085 086 job.setMapperClass(MultithreadedMapper.class); 087 MultithreadedMapper.setNumberOfThreads(job, concurrency); 088 MultithreadedMapper.setMapperClass(job, mapperClass); 089 System.out.println("Using multithreaded mapper"); 090 } 091 }; 092 } 093 }; 094 095 @Override 096 public abstract MapperModeOp getOptions(); 097 098 public interface MapperModeOp { 099 public abstract void prepareJobMapper(Job job, Class<LocalFeaturesMapper> mapperClass); 100 } 101 } 102 103 private String[] args; 104 105 @Option( 106 name = "--remove", 107 aliases = "-rm", 108 required = false, 109 usage = "Remove the existing output location if it exists.", 110 metaVar = "BOOLEAN") 111 private boolean replace = false; 112 113 @Option( 114 name = "--mapper-mode", 115 aliases = "-mm", 116 required = false, 117 usage = "Choose a mapper mode.", 118 handler = ProxyOptionHandler.class) 119 MapperMode mapperMode = MapperMode.STANDARD; 120 MapperModeOp mapperModeOp; 121 122 @Option( 123 name = "--dont-write", 124 aliases = "-dr", 125 required = false, 126 usage = "Don't actually emmit. Only useful for testing.", 127 metaVar = "BOOLEAN") 128 boolean dontwrite = false; 129 130 @Option( 131 name = "--dont-compress-output", 132 required = false, 133 usage = "Don't compress sequencefile records.", 134 metaVar = "BOOLEAN") 135 boolean dontcompress = false; 136 137 private boolean beforeMap; 138 139 /** 140 * Construct with the given arguments string 141 * 142 * @param args 143 */ 144 public HadoopLocalFeaturesToolOptions(String[] args) { 145 this(args, false); 146 } 147 148 /** 149 * Construct with the given arguments string 150 * 151 * @param args 152 * @param beforeMap 153 */ 154 public HadoopLocalFeaturesToolOptions(String[] args, boolean beforeMap) { 155 this.args = args; 156 this.beforeMap = beforeMap; 157 } 158 159 /** 160 * Prepare the options 161 */ 162 public void prepare() { 163 final CmdLineParser parser = new CmdLineParser(this); 164 try { 165 parser.parseArgument(args); 166 this.validate(); 167 } catch (final CmdLineException e) { 168 System.err.println(e.getMessage()); 169 System.err.println("Usage: hadoop jar HadoopLocalFeaturesTool.jar [options...] [files...]"); 170 parser.printUsage(System.err); 171 172 System.exit(1); 173 } 174 } 175 176 private void validate() { 177 if (replace && beforeMap) { 178 try { 179 final URI outuri = SequenceFileUtility.convertToURI(this.getOutput()); 180 final FileSystem fs = getFileSystem(outuri); 181 fs.delete(new Path(outuri.toString()), true); 182 } catch (final IOException e) { 183 184 } 185 } 186 } 187 188 static FileSystem getFileSystem(URI uri) throws IOException { 189 final Configuration config = new Configuration(); 190 FileSystem fs = FileSystem.get(uri, config); 191 if (fs instanceof LocalFileSystem) 192 fs = ((LocalFileSystem) fs).getRaw(); 193 return fs; 194 } 195 196 /** 197 * @return the input paths 198 * @throws IOException 199 */ 200 public Path[] getInputPaths() throws IOException { 201 final Path[] sequenceFiles = SequenceFileUtility.getFilePaths(this.getInput(), "part"); 202 return sequenceFiles; 203 } 204 205 /** 206 * @return the output path 207 */ 208 public Path getOutputPath() { 209 return new Path(SequenceFileUtility.convertToURI(this.getOutput()).toString()); 210 } 211}