001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.localfeature;
031
032import java.io.IOException;
033import java.net.URI;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.LocalFileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.mapreduce.Job;
040import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
041import org.kohsuke.args4j.CmdLineException;
042import org.kohsuke.args4j.CmdLineOptionsProvider;
043import org.kohsuke.args4j.CmdLineParser;
044import org.kohsuke.args4j.Option;
045import org.kohsuke.args4j.ProxyOptionHandler;
046import org.openimaj.hadoop.sequencefile.SequenceFileUtility;
047import org.openimaj.hadoop.tools.localfeature.HadoopLocalFeaturesTool.LocalFeaturesMapper;
048import org.openimaj.hadoop.tools.localfeature.HadoopLocalFeaturesToolOptions.MapperMode.MapperModeOp;
049import org.openimaj.tools.localfeature.options.ExtractorOptions;
050
051/**
052 * Options for the {@link HadoopLocalFeaturesTool}.
053 * 
054 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk)
055 */
056public class HadoopLocalFeaturesToolOptions extends ExtractorOptions {
057        static enum MapperMode implements CmdLineOptionsProvider {
058                STANDARD {
059                        @Override
060                        public MapperModeOp getOptions() {
061                                return new MapperModeOp() {
062                                        @Override
063                                        public void prepareJobMapper(Job job, Class<LocalFeaturesMapper> mapperClass) {
064                                                job.setMapperClass(mapperClass);
065                                        }
066                                };
067                        }
068                },
069                MULTITHREAD {
070                        @Override
071                        public MapperModeOp getOptions() {
072                                return new MapperModeOp() {
073                                        @Option(
074                                                        name = "--threads",
075                                                        aliases = "-j",
076                                                        required = false,
077                                                        usage = "Use NUMBER threads per mapper. defaults n processors.",
078                                                        metaVar = "NUMBER")
079                                        private int concurrency = Runtime.getRuntime().availableProcessors();
080
081                                        @Override
082                                        public void prepareJobMapper(Job job, Class<LocalFeaturesMapper> mapperClass) {
083                                                if (concurrency <= 0)
084                                                        concurrency = Runtime.getRuntime().availableProcessors();
085
086                                                job.setMapperClass(MultithreadedMapper.class);
087                                                MultithreadedMapper.setNumberOfThreads(job, concurrency);
088                                                MultithreadedMapper.setMapperClass(job, mapperClass);
089                                                System.out.println("Using multithreaded mapper");
090                                        }
091                                };
092                        }
093                };
094
095                @Override
096                public abstract MapperModeOp getOptions();
097
098                public interface MapperModeOp {
099                        public abstract void prepareJobMapper(Job job, Class<LocalFeaturesMapper> mapperClass);
100                }
101        }
102
103        private String[] args;
104
105        @Option(
106                        name = "--remove",
107                        aliases = "-rm",
108                        required = false,
109                        usage = "Remove the existing output location if it exists.",
110                        metaVar = "BOOLEAN")
111        private boolean replace = false;
112
113        @Option(
114                        name = "--mapper-mode",
115                        aliases = "-mm",
116                        required = false,
117                        usage = "Choose a mapper mode.",
118                        handler = ProxyOptionHandler.class)
119        MapperMode mapperMode = MapperMode.STANDARD;
120        MapperModeOp mapperModeOp;
121
122        @Option(
123                        name = "--dont-write",
124                        aliases = "-dr",
125                        required = false,
126                        usage = "Don't actually emmit. Only useful for testing.",
127                        metaVar = "BOOLEAN")
128        boolean dontwrite = false;
129
130        @Option(
131                        name = "--dont-compress-output",
132                        required = false,
133                        usage = "Don't compress sequencefile records.",
134                        metaVar = "BOOLEAN")
135        boolean dontcompress = false;
136
137        private boolean beforeMap;
138
139        /**
140         * Construct with the given arguments string
141         * 
142         * @param args
143         */
144        public HadoopLocalFeaturesToolOptions(String[] args) {
145                this(args, false);
146        }
147
148        /**
149         * Construct with the given arguments string
150         * 
151         * @param args
152         * @param beforeMap
153         */
154        public HadoopLocalFeaturesToolOptions(String[] args, boolean beforeMap) {
155                this.args = args;
156                this.beforeMap = beforeMap;
157        }
158
159        /**
160         * Prepare the options
161         */
162        public void prepare() {
163                final CmdLineParser parser = new CmdLineParser(this);
164                try {
165                        parser.parseArgument(args);
166                        this.validate();
167                } catch (final CmdLineException e) {
168                        System.err.println(e.getMessage());
169                        System.err.println("Usage: hadoop jar HadoopLocalFeaturesTool.jar [options...] [files...]");
170                        parser.printUsage(System.err);
171
172                        System.exit(1);
173                }
174        }
175
176        private void validate() {
177                if (replace && beforeMap) {
178                        try {
179                                final URI outuri = SequenceFileUtility.convertToURI(this.getOutput());
180                                final FileSystem fs = getFileSystem(outuri);
181                                fs.delete(new Path(outuri.toString()), true);
182                        } catch (final IOException e) {
183
184                        }
185                }
186        }
187
188        static FileSystem getFileSystem(URI uri) throws IOException {
189                final Configuration config = new Configuration();
190                FileSystem fs = FileSystem.get(uri, config);
191                if (fs instanceof LocalFileSystem)
192                        fs = ((LocalFileSystem) fs).getRaw();
193                return fs;
194        }
195
196        /**
197         * @return the input paths
198         * @throws IOException
199         */
200        public Path[] getInputPaths() throws IOException {
201                final Path[] sequenceFiles = SequenceFileUtility.getFilePaths(this.getInput(), "part");
202                return sequenceFiles;
203        }
204
205        /**
206         * @return the output path
207         */
208        public Path getOutputPath() {
209                return new Path(SequenceFileUtility.convertToURI(this.getOutput()).toString());
210        }
211}