001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.clusterquantiser;
031
032import java.io.BufferedInputStream;
033import java.io.IOException;
034import java.io.InputStream;
035import java.net.URI;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.LocalFileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.mapreduce.Job;
042import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
043import org.kohsuke.args4j.CmdLineException;
044import org.kohsuke.args4j.CmdLineOptionsProvider;
045import org.kohsuke.args4j.Option;
046import org.kohsuke.args4j.ProxyOptionHandler;
047import org.openimaj.hadoop.sequencefile.SequenceFileUtility;
048import org.openimaj.hadoop.tools.clusterquantiser.HadoopClusterQuantiserOptions.MapperMode.MapperModeOp;
049import org.openimaj.hadoop.tools.clusterquantiser.HadoopClusterQuantiserTool.ClusterQuantiserMapper;
050import org.openimaj.ml.clustering.SpatialClusters;
051import org.openimaj.tools.clusterquantiser.AbstractClusterQuantiserOptions;
052import org.openimaj.tools.clusterquantiser.ClusterType;
053import org.openimaj.tools.clusterquantiser.ClusterType.ClusterTypeOp;
054
055public class HadoopClusterQuantiserOptions extends AbstractClusterQuantiserOptions {
056
057        enum MapperMode implements CmdLineOptionsProvider {
058                STANDARD {
059                        @Override
060                        public MapperModeOp getOptions() {
061                                return new StandardOp();
062                        }
063                },
064                MULTITHREAD {
065                        @Override
066                        public MapperModeOp getOptions() {
067                                return new MultithreadOp();
068                        }
069                };
070
071                public static abstract class MapperModeOp {
072                        public abstract void prepareJobMapper(Job job, Class<ClusterQuantiserMapper> mapperClass,
073                                        AbstractClusterQuantiserOptions opts);
074                }
075
076                private static class StandardOp extends MapperModeOp {
077                        @Override
078                        public void prepareJobMapper(Job job, Class<ClusterQuantiserMapper> mapperClass,
079                                        AbstractClusterQuantiserOptions opts)
080                        {
081                                job.setMapperClass(mapperClass);
082                        }
083                }
084
085                private static class MultithreadOp extends MapperModeOp {
086
087                        @Override
088                        public void prepareJobMapper(Job job, Class<ClusterQuantiserMapper> mapperClass,
089                                        AbstractClusterQuantiserOptions opts)
090                        {
091                                int concurrency = opts.getConcurrency();
092                                if (opts.getConcurrency() <= 0)
093                                        concurrency = Runtime.getRuntime().availableProcessors();
094
095                                job.setMapperClass(MultithreadedMapper.class);
096                                MultithreadedMapper.setNumberOfThreads(job, concurrency);
097                                MultithreadedMapper.setMapperClass(job, mapperClass);
098                                System.out.println("NThreads = " + MultithreadedMapper.getNumberOfThreads(job));
099                        }
100                }
101        }
102
103        private boolean beforeMaps;
104
105        public HadoopClusterQuantiserOptions(String[] args) throws CmdLineException {
106                this(args, false);
107        }
108
109        public HadoopClusterQuantiserOptions(String[] args, boolean beforeMaps) throws CmdLineException {
110                super(args);
111                this.beforeMaps = beforeMaps;
112        }
113
114        /*
115         * IO args
116         */
117        @Option(name = "--input", aliases = "-i", required = true, usage = "set the input sequencefile")
118        private String input = null;
119
120        @Option(name = "--output", aliases = "-o", required = true, usage = "set the output directory")
121        private String output = null;
122
123        @Option(
124                        name = "--force-delete",
125                        aliases = "-rm",
126                        required = false,
127                        usage = "If it exists, remove the output directory before starting")
128        private boolean forceRM = false;
129
130        @Option(
131                        name = "--mapper-mode",
132                        aliases = "-mm",
133                        required = false,
134                        usage = "Choose a mapper mode.",
135                        handler = ProxyOptionHandler.class)
136        MapperMode mapperMode = MapperMode.STANDARD;
137        protected MapperModeOp mapperModeOp = (MapperModeOp) MapperMode.STANDARD.getOptions();
138
139        private ClusterTypeOp clusterTypeOp;
140
141        private Class<? extends SpatialClusters<?>> clusterClass;
142
143        @Override
144        public String getInputFileString() {
145                return input;
146        }
147
148        @Override
149        public String getOutputFileString() {
150                return output;
151        }
152
153        @Override
154        public void validate() throws CmdLineException {
155
156                if (infoFile != null) {
157                        info_mode = true;
158                        try {
159                                this.clusterTypeOp = sniffClusterType(infoFile);
160                                if (this.clusterTypeOp == null)
161                                        throw new CmdLineException(null, "Could not identify the clustertype");
162
163                                this.clusterClass = this.clusterTypeOp.getClusterClass();
164                        } catch (final IOException e) {
165                                throw new CmdLineException(null, "Could not identify the clustertype. File: " + infoFile, e);
166                        }
167
168                }
169                if (quantLocation != null) {
170                        if (info_mode)
171                                throw new CmdLineException(null,
172                                                "--quant and --info are mutually exclusive.");
173                        quant_mode = true;
174                        try {
175                                this.clusterTypeOp = sniffClusterType(quantLocation);
176                                if (this.clusterTypeOp == null)
177                                        throw new CmdLineException(null, "Could not identify the clustertype");
178
179                                this.clusterClass = this.clusterTypeOp.getClusterClass();
180                        } catch (final Exception e) {
181                                e.printStackTrace();
182                                throw new CmdLineException(null, "Could not identify the clustertype. File: " + quantLocation, e);
183                        }
184                }
185
186                if (this.getCountMode()) {
187                        if (this.extension.equals(".loc"))
188                                this.extension = ".counts";
189                }
190                if (forceRM && this.beforeMaps) {
191
192                        try {
193                                final URI outuri = SequenceFileUtility.convertToURI(this.output);
194                                final FileSystem fs = getFileSystem(outuri);
195                                fs.delete(new Path(outuri.toString()), true);
196                        } catch (final IOException e) {
197
198                        }
199                }
200        }
201
202        public static FileSystem getFileSystem(URI uri) throws IOException {
203                final Configuration config = new Configuration();
204                FileSystem fs = FileSystem.get(uri, config);
205                if (fs instanceof LocalFileSystem)
206                        fs = ((LocalFileSystem) fs).getRaw();
207                return fs;
208        }
209
210        public static ClusterTypeOp sniffClusterType(String quantFile) throws IOException {
211                InputStream fios = null;
212                try {
213                        fios = getClusterInputStream(quantFile);
214                        return ClusterType.sniffClusterType(new BufferedInputStream(fios));
215                } finally {
216                        if (fios != null)
217                                try {
218                                        fios.close();
219                                } catch (final IOException e) { /* don't care */
220                                }
221                }
222        }
223
224        @Override
225        public ClusterTypeOp getClusterType() {
226                return this.clusterTypeOp;
227        }
228
229        public static InputStream getClusterInputStream(String uriStr) throws IOException {
230                final URI uri = SequenceFileUtility.convertToURI(uriStr);
231                final FileSystem fs = getFileSystem(uri);
232                final Path p = new Path(uri.toString());
233                return fs.open(p);
234        }
235
236        public InputStream getClusterInputStream() throws IOException {
237                return getClusterInputStream(this.quantLocation);
238        }
239
240        public String getClusterInputString() {
241                return this.quantLocation;
242        }
243
244        public Path[] getInputPaths() throws IOException {
245                final Path[] sequenceFiles = SequenceFileUtility.getFilePaths(this.getInputFileString(), "part");
246                return sequenceFiles;
247        }
248
249        @Override
250        public ClusterTypeOp getOtherInfoType() {
251                return null;
252        }
253
254        @Override
255        public Class<? extends SpatialClusters<?>> getClusterClass() {
256                return this.clusterClass;
257        }
258
259        @Override
260        public Class<SpatialClusters<?>> getOtherInfoClass() {
261                return null;
262        }
263}