001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.image.indexing; 031 032import java.io.ByteArrayInputStream; 033import java.io.DataInput; 034import java.io.DataOutput; 035import java.io.IOException; 036import java.util.ArrayList; 037import java.util.Collections; 038import java.util.HashSet; 039import java.util.List; 040import java.util.Set; 041 042import org.apache.hadoop.conf.Configured; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.io.BytesWritable; 045import org.apache.hadoop.io.IntWritable; 046import org.apache.hadoop.io.Text; 047import org.apache.hadoop.io.WritableComparable; 048import org.apache.hadoop.io.WritableComparator; 049import org.apache.hadoop.mapreduce.Job; 050import org.apache.hadoop.mapreduce.Mapper; 051import org.apache.hadoop.mapreduce.Partitioner; 052import org.apache.hadoop.mapreduce.Reducer; 053import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 054import org.apache.hadoop.util.Tool; 055import org.apache.hadoop.util.ToolRunner; 056import org.kohsuke.args4j.CmdLineException; 057import org.kohsuke.args4j.CmdLineParser; 058import org.kohsuke.args4j.Option; 059import org.openimaj.feature.local.list.MemoryLocalFeatureList; 060import org.openimaj.hadoop.mapreduce.TextBytesJobUtil; 061import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 062import org.openimaj.image.feature.local.keypoints.Keypoint; 063import org.openimaj.lsh.functions.DoubleGaussianFactory; 064import org.openimaj.lsh.sketch.IntLSHSketcher; 065import org.openimaj.util.hash.HashFunction; 066import org.openimaj.util.hash.HashFunctionFactory; 067import org.openimaj.util.hash.modifier.LSBModifier; 068 069import cern.jet.random.engine.MersenneTwister; 070 071/** 072 * Tool to convert SIFT features to LSH sketch form. 073 * 074 * Mapper is <key, [sift_features]> -> <(index,hash), key> 075 * <p> 076 * Reducer output is <hash, [keys]> for each index 077 * 078 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 079 * 080 */ 081public class HadoopSiftLSHExtractor extends Configured implements Tool { 082 public static class MapperOut implements WritableComparable<MapperOut> { 083 public byte index; 084 public int hash; 085 086 @Override 087 public void write(DataOutput out) throws IOException { 088 out.writeByte(index); 089 out.writeInt(hash); 090 } 091 092 @Override 093 public void readFields(DataInput in) throws IOException { 094 index = in.readByte(); 095 hash = in.readInt(); 096 } 097 098 @Override 099 public int compareTo(MapperOut o) { 100 final int thisVal = this.hash; 101 final int anotherVal = o.hash; 102 return (thisVal < anotherVal ? -1 : (thisVal == anotherVal ? 0 : 1)); 103 } 104 } 105 106 public static class Sketcher { 107 private static final int nbits = 128; 108 private static final int ndims = 128; 109 private static final int seed = 1; 110 private static final double w = 6.0; 111 final float LOG_BASE = 0.001f; 112 private IntLSHSketcher<double[]> sketcher; 113 114 public Sketcher() { 115 final MersenneTwister rng = new MersenneTwister(seed); 116 117 final DoubleGaussianFactory gauss = new DoubleGaussianFactory(ndims, rng, w); 118 final HashFunctionFactory<double[]> factory = new HashFunctionFactory<double[]>() { 119 @Override 120 public HashFunction<double[]> create() { 121 return new LSBModifier<double[]>(gauss.create()); 122 } 123 }; 124 125 sketcher = new IntLSHSketcher<double[]>(factory, nbits); 126 } 127 128 public int[] sketch(Keypoint k) { 129 return sketcher.createSketch(logScale(k.ivec, LOG_BASE)); 130 } 131 132 double[] logScale(byte[] v, float l) { 133 final double[] dfv = new double[v.length]; 134 final double s = -Math.log(l); 135 136 for (int i = 0; i < v.length; i++) { 137 double d = (v[i] + 128.0) / 256.0; 138 139 if (d < l) 140 d = l; 141 d = (Math.log(d) + s) / s; 142 if (d > 1.0) 143 d = 1.0; 144 145 dfv[i] = d; 146 } 147 return dfv; 148 } 149 } 150 151 public static class LSHMapper extends Mapper<Text, BytesWritable, MapperOut, Text> { 152 Sketcher sketcher = new Sketcher(); 153 154 private void process(List<Keypoint> features, Text key, Context context) throws IOException, InterruptedException 155 { 156 for (final Keypoint k : features) { 157 final int[] sketch = sketcher.sketch(k); 158 159 for (byte i = 0; i < sketch.length; i++) { 160 final MapperOut mo = new MapperOut(); 161 mo.index = i; 162 mo.hash = sketch[i]; 163 164 context.write(mo, key); 165 } 166 } 167 } 168 169 @Override 170 public void map(Text key, BytesWritable data, Context context) throws IOException, InterruptedException { 171 final List<Keypoint> features = MemoryLocalFeatureList.read( 172 new ByteArrayInputStream(data.getBytes()), Keypoint.class); 173 174 process(features, key, context); 175 } 176 } 177 178 /** 179 * Partitioner that sends the data to a reducer based on the 180 * {@link MapperOut#index}. 181 * 182 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 183 */ 184 public static class LSHPartitioner extends Partitioner<MapperOut, Text> { 185 @Override 186 public int getPartition(MapperOut key, Text value, int numPartitions) { 187 return key.index; 188 } 189 } 190 191 /** 192 * Comparator to group the maps outputs by their {@link MapperOut#hash}. 193 * 194 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 195 * 196 */ 197 public static class LSHGroupingComparator extends WritableComparator { 198 public LSHGroupingComparator() { 199 super(MapperOut.class, true); 200 } 201 202 @SuppressWarnings("rawtypes") 203 @Override 204 public int compare(WritableComparable a, WritableComparable b) { 205 final int aVal = ((MapperOut) a).hash; 206 final int bVal = ((MapperOut) b).hash; 207 return (aVal < bVal ? -1 : (aVal == bVal ? 0 : 1)); 208 } 209 } 210 211 public static class LSHReducer extends Reducer<MapperOut, Text, IntWritable, Text> { 212 @Override 213 protected void reduce(MapperOut key, Iterable<Text> values, Context context) throws IOException, 214 InterruptedException 215 { 216 // System.out.println("Task: " + context.getTaskAttemptID() + 217 // " -> index: " + key.index); 218 219 final Set<String> valSet = new HashSet<String>(); 220 for (final Text t : values) { 221 valSet.add(t.toString()); 222 } 223 224 final List<String> list = new ArrayList<String>(valSet); 225 Collections.sort(list); // order naturally 226 227 String s = list.get(0); 228 for (int i = 1; i < list.size(); i++) 229 s += " " + list.get(i); 230 231 context.write(new IntWritable(key.hash), new Text(s)); 232 } 233 } 234 235 @Option( 236 name = "--dont-compress-output", 237 required = false, 238 usage = "Don't compress sequencefile records.", 239 metaVar = "BOOLEAN") 240 private boolean dontcompress = false; 241 242 @Option( 243 name = "--remove", 244 aliases = "-rm", 245 required = false, 246 usage = "Remove the existing output location if it exists.", 247 metaVar = "BOOLEAN") 248 private boolean replace = false; 249 250 @Option(name = "--input", aliases = "-i", required = true, usage = "Input local features file.", metaVar = "STRING") 251 private String input; 252 253 @Option(name = "--output", aliases = "-o", required = true, usage = "Output pca-vlad file.", metaVar = "STRING") 254 private String output; 255 256 @Override 257 public int run(String[] args) throws Exception { 258 final CmdLineParser parser = new CmdLineParser(this); 259 260 try { 261 parser.parseArgument(args); 262 } catch (final CmdLineException e) { 263 System.err.println(e.getMessage()); 264 System.err.println("Usage: hadoop jar HadoopImageIndexer.jar [options]"); 265 parser.printUsage(System.err); 266 return -1; 267 } 268 269 final Path[] paths = SequenceFileUtility.getFilePaths(input, "part"); 270 final Path outputPath = new Path(output); 271 272 if (outputPath.getFileSystem(this.getConf()).exists(outputPath) && replace) 273 outputPath.getFileSystem(this.getConf()).delete(outputPath, true); 274 275 final Job job = TextBytesJobUtil.createJob(paths, outputPath, null, this.getConf()); 276 277 job.setMapOutputKeyClass(MapperOut.class); 278 job.setMapOutputValueClass(Text.class); 279 job.setOutputKeyClass(IntWritable.class); 280 job.setOutputValueClass(Text.class); 281 282 job.setJarByClass(this.getClass()); 283 284 job.setMapperClass(LSHMapper.class); 285 job.setReducerClass(LSHReducer.class); 286 287 job.setNumReduceTasks(4); 288 job.setPartitionerClass(LSHPartitioner.class); 289 job.setGroupingComparatorClass(LSHGroupingComparator.class); 290 291 SequenceFileOutputFormat.setCompressOutput(job, !dontcompress); 292 job.waitForCompletion(true); 293 294 return 0; 295 } 296 297 /** 298 * Main method 299 * 300 * @param args 301 * @throws Exception 302 */ 303 public static void main(String[] args) throws Exception { 304 ToolRunner.run(new HadoopSiftLSHExtractor(), args); 305 } 306}