001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.clusterquantiser; 031 032import java.io.ByteArrayInputStream; 033import java.io.ByteArrayOutputStream; 034import java.io.IOException; 035import java.io.InputStream; 036import java.io.PrintWriter; 037import java.util.HashMap; 038import java.util.Map; 039 040import org.apache.hadoop.conf.Configured; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.io.BytesWritable; 043import org.apache.hadoop.io.Text; 044import org.apache.hadoop.mapred.JobConf; 045import org.apache.hadoop.mapreduce.Job; 046import org.apache.hadoop.mapreduce.Mapper; 047import org.apache.hadoop.util.Tool; 048import org.apache.hadoop.util.ToolRunner; 049import org.kohsuke.args4j.CmdLineException; 050import org.openimaj.hadoop.mapreduce.TextBytesJobUtil; 051import org.openimaj.hadoop.sequencefile.MetadataConfiguration; 052import org.openimaj.hadoop.sequencefile.TextBytesSequenceFileUtility; 053import org.openimaj.io.IOUtils; 054import org.openimaj.ml.clustering.ByteCentroidsResult; 055import org.openimaj.ml.clustering.IntCentroidsResult; 056import org.openimaj.ml.clustering.SpatialClusters; 057import org.openimaj.ml.clustering.assignment.HardAssigner; 058import org.openimaj.ml.clustering.assignment.hard.KDTreeByteEuclideanAssigner; 059import org.openimaj.ml.clustering.assignment.hard.KDTreeIntEuclideanAssigner; 060import org.openimaj.tools.clusterquantiser.ClusterQuantiser; 061import org.openimaj.tools.clusterquantiser.FeatureFile; 062import org.openimaj.tools.clusterquantiser.FeatureFileFeature; 063import org.openimaj.util.array.ByteArrayConverter; 064 065public class HadoopClusterQuantiserTool extends Configured implements Tool { 066 private static final String ARGS_KEY = "clusterquantiser.args"; 067 068 static class ClusterQuantiserMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> { 069 private static SpatialClusters<?> tree = null; 070 private static HardAssigner<?, ?, ?> assigner = null; 071 private static HadoopClusterQuantiserOptions options = null; 072 073 protected static synchronized void loadCluster(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) 074 throws IOException 075 { 076 if (options == null) { 077 try { 078 options = new HadoopClusterQuantiserOptions(context.getConfiguration().getStrings(ARGS_KEY)); 079 options.prepare(); 080 } catch (final CmdLineException e) { 081 throw new IOException(e); 082 } 083 } 084 085 if (tree == null) { 086 InputStream ios = null; 087 try { 088 System.out.print("Reading quant data. "); 089 ios = options.getClusterInputStream(); 090 tree = IOUtils.read(ios, options.getClusterClass()); 091 092 if (tree instanceof ByteCentroidsResult) 093 assigner = new KDTreeByteEuclideanAssigner((ByteCentroidsResult) tree); 094 else if (tree instanceof IntCentroidsResult) 095 assigner = new KDTreeIntEuclideanAssigner((IntCentroidsResult) tree); 096 else 097 assigner = tree.defaultHardAssigner(); 098 099 System.out.println("Done reading quant data."); 100 } catch (final IOException e) { 101 e.printStackTrace(); 102 throw e; 103 } finally { 104 if (ios != null) 105 ios.close(); 106 } 107 } else { 108 System.out.println("tree already loaded"); 109 } 110 } 111 112 @Override 113 protected void setup(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, 114 InterruptedException 115 { 116 loadCluster(context); 117 } 118 119 @SuppressWarnings("unchecked") 120 @Override 121 protected void 122 map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) 123 throws java.io.IOException, InterruptedException 124 { 125 try { 126 final long t1 = System.currentTimeMillis(); 127 128 System.out.println("[" + Thread.currentThread().getId() + "]" + "Calling map "); 129 ByteArrayOutputStream baos = null; 130 if (options.isInfoMode()) { 131 ClusterQuantiser.do_info(options); 132 } else if (options.isQuantMode()) { 133 134 final FeatureFile input = options.getFileType().read(new ByteArrayInputStream(value.getBytes())); 135 136 baos = new ByteArrayOutputStream(); 137 PrintWriter pw = null; 138 try { 139 pw = new PrintWriter(baos); 140 pw.format("%d\n%d\n", input.size(), tree.numClusters()); 141 142 for (final FeatureFileFeature fff : input) { 143 int cluster = -1; 144 145 if (tree.getClass().getName().contains("Byte")) 146 cluster = ((HardAssigner<byte[], ?, ?>) assigner).assign(fff.data); 147 else 148 cluster = ((HardAssigner<int[], ?, ?>) assigner).assign(ByteArrayConverter 149 .byteToInt(fff.data)); 150 151 pw.format("%s %d\n", fff.location.trim(), cluster); 152 } 153 } finally { 154 if (pw != null) { 155 pw.flush(); 156 pw.close(); 157 input.close(); 158 } 159 } 160 161 context.write(key, new BytesWritable(baos.toByteArray())); 162 } 163 final long t2 = System.currentTimeMillis(); 164 System.out.println("[" + Thread.currentThread().getId() + "]" + "Job time taken: " + (t2 - t1) / 1000.0 165 + "s"); 166 } catch (final Exception e) { 167 System.err.println("Failed to quantise features because: " + e.getMessage()); 168 } 169 } 170 } 171 172 @Override 173 public int run(String[] args) throws Exception { 174 final HadoopClusterQuantiserOptions options = new HadoopClusterQuantiserOptions(args, true); 175 options.prepare(); 176 // String clusterFileString = options.getInputFileString(); 177 final Path[] paths = options.getInputPaths(); 178 // DistributedCache.addCacheFile(SequenceFileUtility.convertToURI(clusterFileString), 179 // this.getConf()); 180 final TextBytesSequenceFileUtility util = new TextBytesSequenceFileUtility(paths[0].toUri(), true); 181 final Map<String, String> metadata = new HashMap<String, String>(); 182 if (util.getUUID() != null) 183 metadata.put(MetadataConfiguration.UUID_KEY, util.getUUID()); 184 metadata.put(MetadataConfiguration.CONTENT_TYPE_KEY, "application/quantised-" 185 + options.getClusterType().toString().toLowerCase() + "-" + options.getExtension()); 186 187 metadata.put("clusterquantiser.clustertype", options.getClusterType().toString()); 188 metadata.put("clusterquantiser.filetype", options.getFileType().toString()); 189 metadata.put("clusterquantiser.countmode", "" + options.getCountMode()); 190 metadata.put("clusterquantiser.extention", "" + options.getExtension()); 191 192 final Job job = TextBytesJobUtil.createJob(options.getInputFileString(), options.getOutputFileString(), metadata, 193 this.getConf()); 194 job.setJarByClass(this.getClass()); 195 // job.setMapperClass(MultithreadedMapper.class); 196 // MultithreadedMapper.setNumberOfThreads(job, 197 // options.getConcurrency()); 198 // MultithreadedMapper.setMapperClass(job, 199 // ClusterQuantiserMapper.class); 200 // System.out.println("NThreads = " + 201 // MultithreadedMapper.getNumberOfThreads(job)); 202 options.mapperModeOp.prepareJobMapper(job, ClusterQuantiserMapper.class, options); 203 204 job.getConfiguration().setStrings(ARGS_KEY, args); 205 job.setNumReduceTasks(0); 206 ((JobConf) job.getConfiguration()).setNumTasksToExecutePerJvm(-1); 207 // job.getConfiguration().set("mapred.child.java.opts", "-Xmx3000M"); 208 job.waitForCompletion(true); 209 return 0; 210 } 211 212 public static void main(String[] args) throws Exception { 213 try { 214 ToolRunner.run(new HadoopClusterQuantiserTool(), args); 215 } catch (final CmdLineException e) { 216 System.err.print(e); 217 } 218 } 219}