001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.fastkmeans;
031
032import java.io.IOException;
033import java.net.URI;
034import java.util.Arrays;
035import java.util.HashMap;
036
037import org.apache.hadoop.conf.Configured;
038import org.apache.hadoop.fs.FSDataOutputStream;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.io.BytesWritable;
042import org.apache.hadoop.io.IntWritable;
043import org.apache.hadoop.mapred.JobConf;
044import org.apache.hadoop.mapreduce.Job;
045import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
046import org.apache.hadoop.util.Tool;
047import org.apache.hadoop.util.ToolRunner;
048import org.openimaj.hadoop.mapreduce.TextBytesJobUtil;
049import org.openimaj.io.IOUtils;
050import org.openimaj.ml.clustering.ByteCentroidsResult;
051
052/**
053 * Approximate/Exact K-Means over Hadoop
054 * 
055 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
056 * 
057 */
058public class HadoopFastKMeans extends Configured implements Tool {
059        public static final String EXTRA_USAGE_INFO = "";
060        private HadoopFastKMeansOptions options = null;
061        private String[] original_args;
062
063        public HadoopFastKMeans(String[] args) {
064                this.original_args = args;
065        }
066
067        public HadoopFastKMeans() {
068                this.original_args = new String[0];
069        }
070
071        @Override
072        public int run(String[] args) throws Exception {
073                if (options == null) {
074                        options = new HadoopFastKMeansOptions(args, original_args, true);
075                        options.prepare();
076                }
077
078                final String base = options.output;
079                // Select a subset of the features
080                final String inputName = new Path(options.inputs.get(0)).getName();
081                String selected = options.output + "/" + inputName + "_select_" + options.nsamples;
082                final URI selectOutFileURI = new Path(selected).toUri();
083                if (!HadoopFastKMeansOptions.getFileSystem(selectOutFileURI).exists(new Path(selected))) {
084                        final SequenceFileByteImageFeatureSelector sfbis = new SequenceFileByteImageFeatureSelector(options.inputs,
085                                        selected, options);
086                        selected = sfbis.getFeatures(options.nsamples);
087                }
088
089                if (options.samplesOnly)
090                        return 0;
091                if (options.checkSampleEquality) {
092                        System.out.println("Checking sample equality");
093                        System.out.println("Using sequence file: " + selected);
094                        SampleEqualityChecker.checkSampleEquality(selected + "/part-r-00000", options);
095                        return 0;
096                }
097
098                // Select the intital centroids
099                final SequenceFileByteFeatureSelector sfbs = new SequenceFileByteFeatureSelector(selected, options.output
100                                + "/init", options);
101                final String initialCentroids = sfbs.getRandomFeatures(options.k);
102
103                ByteCentroidsResult cluster = AKMeans.sequenceFileToCluster(initialCentroids + "/part-r-00000", options.k);
104
105                // at this point there might be fewer centroids than we wanted as a
106                // result of having fewer features than centroids... this should
107                // probably be considered to be an error.
108                cluster.centroids = trimNullClusters(cluster.centroids);
109                replaceSequenceFileWithCluster(initialCentroids, cluster);
110                if (cluster.centroids.length < options.k) {
111                        System.err.println("More clusters were requested than there are features. K-Means cannot be performed.");
112                        replaceSequenceFileWithCluster(initialCentroids, cluster);
113                        replaceSequenceFileWithCluster(base + "/final", cluster);
114                        return 1;
115                }
116
117                // Prepare the AKM procedure
118                String currentCompletePath = initialCentroids;
119                for (int i = 0; i < options.iter; i++) {
120                        // create job...
121                        // set input from previous job if i!=0, otherwise use given input
122                        // from args
123                        // set output to a file/directory named using i (combined with
124                        // something in args)
125                        System.out.println("Calling iteration: " + i);
126                        String newOutPath = base + "/" + i;
127                        if (i == options.iter - 1)
128                                newOutPath = base + "/final";
129                        final Job job = TextBytesJobUtil.createJob(new Path(selected), new Path(newOutPath),
130                                        new HashMap<String, String>(), this.getConf());
131                        job.setJarByClass(this.getClass());
132                        job.setMapperClass(MultithreadedMapper.class);
133                        MultithreadedMapper.setNumberOfThreads(job, options.concurrency);
134                        MultithreadedMapper.setMapperClass(job, AKMeans.Map.class);
135
136                        job.setCombinerClass(AKMeans.Combine.class);
137                        job.setReducerClass(AKMeans.Reduce.class);
138                        job.setOutputKeyClass(IntWritable.class);
139                        job.setOutputValueClass(BytesWritable.class);
140                        job.getConfiguration().setStrings(AKMeans.CENTROIDS_PATH, currentCompletePath);
141                        job.getConfiguration().setStrings(AKMeans.CENTROIDS_K, options.k + "");
142                        job.getConfiguration().setStrings(AKMeans.CENTROIDS_EXACT, options.exact + "");
143                        ((JobConf) job.getConfiguration()).setNumTasksToExecutePerJvm(-1);
144                        job.waitForCompletion(true);
145
146                        currentCompletePath = newOutPath;
147                        cluster = AKMeans.completeCentroids(currentCompletePath + "/part-r-00000", selected, options);
148                        replaceSequenceFileWithCluster(currentCompletePath, cluster);
149                        cluster = null;
150                }
151                return 0;
152        }
153
154        static byte[][] trimNullClusters(byte[][] bytes) {
155                int i = 0;
156                while (i < bytes.length && bytes[i] != null) {
157                        i++;
158                }
159
160                return Arrays.copyOf(bytes, i);
161        }
162
163        private void replaceSequenceFileWithCluster(String sequenceFile, ByteCentroidsResult cluster) throws IOException {
164                final Path p = new Path(sequenceFile);
165                final FileSystem fs = HadoopFastKMeansOptions.getFileSystem(p.toUri());
166
167                fs.delete(p, true); // Delete the sequence file of this name
168
169                FSDataOutputStream stream = null;
170                try {
171                        stream = fs.create(p);
172                        IOUtils.writeBinary(stream, cluster); // Write the cluster
173                } finally {
174                        if (stream != null)
175                                stream.close();
176                }
177        }
178
179        /**
180         * Main method
181         * 
182         * @param args
183         * @throws Exception
184         */
185        public static void main(String[] args) throws Exception {
186                ToolRunner.run(new HadoopFastKMeans(args), args);
187        }
188
189        /**
190         * 
191         * @param hfkmo
192         */
193        public void setOptions(HadoopFastKMeansOptions hfkmo) {
194                this.options = hfkmo;
195        }
196}