001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.fastkmeans; 031 032import java.io.IOException; 033import java.net.URI; 034import java.util.Arrays; 035import java.util.HashMap; 036 037import org.apache.hadoop.conf.Configured; 038import org.apache.hadoop.fs.FSDataOutputStream; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.io.BytesWritable; 042import org.apache.hadoop.io.IntWritable; 043import org.apache.hadoop.mapred.JobConf; 044import org.apache.hadoop.mapreduce.Job; 045import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper; 046import org.apache.hadoop.util.Tool; 047import org.apache.hadoop.util.ToolRunner; 048import org.openimaj.hadoop.mapreduce.TextBytesJobUtil; 049import org.openimaj.io.IOUtils; 050import org.openimaj.ml.clustering.ByteCentroidsResult; 051 052/** 053 * Approximate/Exact K-Means over Hadoop 054 * 055 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 056 * 057 */ 058public class HadoopFastKMeans extends Configured implements Tool { 059 public static final String EXTRA_USAGE_INFO = ""; 060 private HadoopFastKMeansOptions options = null; 061 private String[] original_args; 062 063 public HadoopFastKMeans(String[] args) { 064 this.original_args = args; 065 } 066 067 public HadoopFastKMeans() { 068 this.original_args = new String[0]; 069 } 070 071 @Override 072 public int run(String[] args) throws Exception { 073 if (options == null) { 074 options = new HadoopFastKMeansOptions(args, original_args, true); 075 options.prepare(); 076 } 077 078 final String base = options.output; 079 // Select a subset of the features 080 final String inputName = new Path(options.inputs.get(0)).getName(); 081 String selected = options.output + "/" + inputName + "_select_" + options.nsamples; 082 final URI selectOutFileURI = new Path(selected).toUri(); 083 if (!HadoopFastKMeansOptions.getFileSystem(selectOutFileURI).exists(new Path(selected))) { 084 final SequenceFileByteImageFeatureSelector sfbis = new SequenceFileByteImageFeatureSelector(options.inputs, 085 selected, options); 086 selected = sfbis.getFeatures(options.nsamples); 087 } 088 089 if (options.samplesOnly) 090 return 0; 091 if (options.checkSampleEquality) { 092 System.out.println("Checking sample equality"); 093 System.out.println("Using sequence file: " + selected); 094 SampleEqualityChecker.checkSampleEquality(selected + "/part-r-00000", options); 095 return 0; 096 } 097 098 // Select the intital centroids 099 final SequenceFileByteFeatureSelector sfbs = new SequenceFileByteFeatureSelector(selected, options.output 100 + "/init", options); 101 final String initialCentroids = sfbs.getRandomFeatures(options.k); 102 103 ByteCentroidsResult cluster = AKMeans.sequenceFileToCluster(initialCentroids + "/part-r-00000", options.k); 104 105 // at this point there might be fewer centroids than we wanted as a 106 // result of having fewer features than centroids... this should 107 // probably be considered to be an error. 108 cluster.centroids = trimNullClusters(cluster.centroids); 109 replaceSequenceFileWithCluster(initialCentroids, cluster); 110 if (cluster.centroids.length < options.k) { 111 System.err.println("More clusters were requested than there are features. K-Means cannot be performed."); 112 replaceSequenceFileWithCluster(initialCentroids, cluster); 113 replaceSequenceFileWithCluster(base + "/final", cluster); 114 return 1; 115 } 116 117 // Prepare the AKM procedure 118 String currentCompletePath = initialCentroids; 119 for (int i = 0; i < options.iter; i++) { 120 // create job... 121 // set input from previous job if i!=0, otherwise use given input 122 // from args 123 // set output to a file/directory named using i (combined with 124 // something in args) 125 System.out.println("Calling iteration: " + i); 126 String newOutPath = base + "/" + i; 127 if (i == options.iter - 1) 128 newOutPath = base + "/final"; 129 final Job job = TextBytesJobUtil.createJob(new Path(selected), new Path(newOutPath), 130 new HashMap<String, String>(), this.getConf()); 131 job.setJarByClass(this.getClass()); 132 job.setMapperClass(MultithreadedMapper.class); 133 MultithreadedMapper.setNumberOfThreads(job, options.concurrency); 134 MultithreadedMapper.setMapperClass(job, AKMeans.Map.class); 135 136 job.setCombinerClass(AKMeans.Combine.class); 137 job.setReducerClass(AKMeans.Reduce.class); 138 job.setOutputKeyClass(IntWritable.class); 139 job.setOutputValueClass(BytesWritable.class); 140 job.getConfiguration().setStrings(AKMeans.CENTROIDS_PATH, currentCompletePath); 141 job.getConfiguration().setStrings(AKMeans.CENTROIDS_K, options.k + ""); 142 job.getConfiguration().setStrings(AKMeans.CENTROIDS_EXACT, options.exact + ""); 143 ((JobConf) job.getConfiguration()).setNumTasksToExecutePerJvm(-1); 144 job.waitForCompletion(true); 145 146 currentCompletePath = newOutPath; 147 cluster = AKMeans.completeCentroids(currentCompletePath + "/part-r-00000", selected, options); 148 replaceSequenceFileWithCluster(currentCompletePath, cluster); 149 cluster = null; 150 } 151 return 0; 152 } 153 154 static byte[][] trimNullClusters(byte[][] bytes) { 155 int i = 0; 156 while (i < bytes.length && bytes[i] != null) { 157 i++; 158 } 159 160 return Arrays.copyOf(bytes, i); 161 } 162 163 private void replaceSequenceFileWithCluster(String sequenceFile, ByteCentroidsResult cluster) throws IOException { 164 final Path p = new Path(sequenceFile); 165 final FileSystem fs = HadoopFastKMeansOptions.getFileSystem(p.toUri()); 166 167 fs.delete(p, true); // Delete the sequence file of this name 168 169 FSDataOutputStream stream = null; 170 try { 171 stream = fs.create(p); 172 IOUtils.writeBinary(stream, cluster); // Write the cluster 173 } finally { 174 if (stream != null) 175 stream.close(); 176 } 177 } 178 179 /** 180 * Main method 181 * 182 * @param args 183 * @throws Exception 184 */ 185 public static void main(String[] args) throws Exception { 186 ToolRunner.run(new HadoopFastKMeans(args), args); 187 } 188 189 /** 190 * 191 * @param hfkmo 192 */ 193 public void setOptions(HadoopFastKMeansOptions hfkmo) { 194 this.options = hfkmo; 195 } 196}