001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.image.indexing; 031 032import java.io.ByteArrayInputStream; 033import java.io.ByteArrayOutputStream; 034import java.io.DataOutputStream; 035import java.io.File; 036import java.io.IOException; 037import java.net.URI; 038import java.util.List; 039 040import org.apache.hadoop.conf.Configured; 041import org.apache.hadoop.filecache.DistributedCache; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.io.BytesWritable; 044import org.apache.hadoop.io.Text; 045import org.apache.hadoop.mapreduce.Job; 046import org.apache.hadoop.mapreduce.Mapper; 047import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 048import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 049import org.apache.hadoop.util.Tool; 050import org.apache.hadoop.util.ToolRunner; 051import org.kohsuke.args4j.CmdLineException; 052import org.kohsuke.args4j.CmdLineParser; 053import org.kohsuke.args4j.Option; 054import org.openimaj.feature.local.list.MemoryLocalFeatureList; 055import org.openimaj.hadoop.mapreduce.TextBytesJobUtil; 056import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 057import org.openimaj.image.feature.local.keypoints.Keypoint; 058import org.openimaj.image.indexing.vlad.VLADIndexerData; 059 060/** 061 * Indexer for Product-quantised VLAD-PCA features. Consumes existing 062 * local-features and requires a {@link VLADIndexerData} to provide the data. 063 * 064 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 065 */ 066public class HadoopPqPcaVladIndexer extends Configured implements Tool { 067 /** 068 * {@link Mapper} for extracting PQ-PCA-VLAD features from sets of local 069 * features. Also outputs the raw PCA-VLAD features. 070 * 071 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 072 */ 073 static class PqPcaVladMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> { 074 static enum COUNTERS { 075 EMIT, NULL; 076 } 077 078 private VLADIndexerData indexer; 079 private MultipleOutputs<Text, BytesWritable> mos; 080 081 @Override 082 protected void setup(Context context) throws IOException, InterruptedException 083 { 084 indexer = VLADIndexerData.read(new File("vlad-data.bin")); 085 mos = new MultipleOutputs<Text, BytesWritable>(context); 086 } 087 088 @Override 089 protected void map(Text key, BytesWritable value, Context context) 090 throws IOException, InterruptedException 091 { 092 final List<Keypoint> keys = MemoryLocalFeatureList.read(new ByteArrayInputStream(value.getBytes()), 093 Keypoint.class); 094 095 final float[] vladData = indexer.extractPcaVlad(keys); 096 097 if (vladData == null) { 098 context.getCounter(COUNTERS.NULL).increment(1L); 099 System.out.println("VLAD is null; keys has length " + keys.size()); 100 return; 101 } 102 103 final byte[] pqVladData = indexer.getProductQuantiser().quantise(vladData); 104 105 mos.write("pcavlad", key, floatToBytes(vladData)); 106 107 context.write(key, new BytesWritable(pqVladData)); 108 context.getCounter(COUNTERS.EMIT).increment(1L); 109 } 110 111 BytesWritable floatToBytes(float[] arr) throws IOException { 112 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 113 final DataOutputStream dos = new DataOutputStream(baos); 114 for (final float f : arr) 115 dos.writeFloat(f); 116 117 return new BytesWritable(baos.toByteArray()); 118 } 119 120 @Override 121 protected void cleanup(Context context) throws IOException, 122 InterruptedException 123 { 124 super.cleanup(context); 125 mos.close(); 126 } 127 } 128 129 @Option( 130 name = "--dont-compress-output", 131 required = false, 132 usage = "Don't compress sequencefile records.", 133 metaVar = "BOOLEAN") 134 private boolean dontcompress = false; 135 136 @Option( 137 name = "--remove", 138 aliases = "-rm", 139 required = false, 140 usage = "Remove the existing output location if it exists.", 141 metaVar = "BOOLEAN") 142 private boolean replace = false; 143 144 @Option(name = "--input", aliases = "-i", required = true, usage = "Input local features file.", metaVar = "STRING") 145 private String input; 146 147 @Option(name = "--output", aliases = "-o", required = true, usage = "Output pca-vlad file.", metaVar = "STRING") 148 private String output; 149 150 @Option(name = "--indexer-data", aliases = "-id", required = true, usage = "Indexer data file.", metaVar = "STRING") 151 private String indexerData; 152 153 @Override 154 public int run(String[] args) throws Exception { 155 final CmdLineParser parser = new CmdLineParser(this); 156 157 try { 158 parser.parseArgument(args); 159 } catch (final CmdLineException e) { 160 System.err.println(e.getMessage()); 161 System.err.println("Usage: hadoop jar HadoopImageIndexer.jar [options]"); 162 parser.printUsage(System.err); 163 return -1; 164 } 165 166 final Path[] paths = SequenceFileUtility.getFilePaths(input, "part"); 167 final Path outputPath = new Path(output); 168 169 if (outputPath.getFileSystem(this.getConf()).exists(outputPath) && replace) 170 outputPath.getFileSystem(this.getConf()).delete(outputPath, true); 171 172 final Job job = TextBytesJobUtil.createJob(paths, outputPath, null, this.getConf()); 173 job.setJarByClass(this.getClass()); 174 job.setMapperClass(PqPcaVladMapper.class); 175 job.setNumReduceTasks(0); 176 177 MultipleOutputs.addNamedOutput(job, "pcavlad", SequenceFileOutputFormat.class, Text.class, BytesWritable.class); 178 179 DistributedCache.createSymlink(job.getConfiguration()); 180 DistributedCache.addCacheFile(new URI(indexerData + "#vlad-data.bin"), job.getConfiguration()); 181 182 SequenceFileOutputFormat.setCompressOutput(job, !dontcompress); 183 job.waitForCompletion(true); 184 185 return 0; 186 } 187 188 /** 189 * Main method 190 * 191 * @param args 192 * @throws Exception 193 */ 194 public static void main(String[] args) throws Exception { 195 ToolRunner.run(new HadoopPqPcaVladIndexer(), args); 196 } 197}