001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.image.indexing; 031 032import java.io.ByteArrayInputStream; 033import java.io.ByteArrayOutputStream; 034import java.io.DataOutputStream; 035import java.io.File; 036import java.io.IOException; 037import java.util.List; 038 039import org.apache.hadoop.conf.Configured; 040import org.apache.hadoop.filecache.DistributedCache; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.io.BytesWritable; 043import org.apache.hadoop.io.Text; 044import org.apache.hadoop.mapreduce.Job; 045import org.apache.hadoop.mapreduce.Mapper; 046import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 047import org.apache.hadoop.util.Tool; 048import org.apache.hadoop.util.ToolRunner; 049import org.kohsuke.args4j.CmdLineException; 050import org.kohsuke.args4j.CmdLineParser; 051import org.kohsuke.args4j.Option; 052import org.openimaj.feature.local.list.MemoryLocalFeatureList; 053import org.openimaj.hadoop.mapreduce.TextBytesJobUtil; 054import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 055import org.openimaj.image.feature.local.keypoints.Keypoint; 056import org.openimaj.image.indexing.vlad.VLADIndexerData; 057 058/** 059 * Extractor for VLAD-PCA features. Consumes existing local-features and 060 * requires a {@link VLADIndexerData} to provide the data. 061 * 062 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 063 */ 064public class HadoopPcaVladExtractor extends Configured implements Tool { 065 private static final String VLAD_INDEXER_DATA_PATH_KEY = "openimaj.vlad.indexer.data"; 066 067 /** 068 * {@link Mapper} for extracting PCA-VLAD features from sets of local 069 * features 070 * 071 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 072 */ 073 static class PcaVladMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> { 074 static enum COUNTERS { 075 EMIT, NULL; 076 } 077 078 private VLADIndexerData indexer; 079 080 @Override 081 protected void setup(Context context) throws IOException, InterruptedException 082 { 083 indexer = VLADIndexerData.read(new File("./" + context.getConfiguration().get(VLAD_INDEXER_DATA_PATH_KEY))); 084 } 085 086 @Override 087 protected void map(Text key, BytesWritable value, Context context) 088 throws IOException, InterruptedException 089 { 090 final List<Keypoint> keys = MemoryLocalFeatureList.read(new ByteArrayInputStream(value.getBytes()), 091 Keypoint.class); 092 093 final float[] vladData = indexer.extractPcaVlad(keys); 094 095 if (vladData == null) { 096 context.getCounter(COUNTERS.NULL).increment(1L); 097 return; 098 } 099 100 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 101 final DataOutputStream dos = new DataOutputStream(baos); 102 for (final float f : vladData) 103 dos.writeFloat(f); 104 105 context.write(key, new BytesWritable(baos.toByteArray())); 106 context.getCounter(COUNTERS.EMIT).increment(1L); 107 } 108 } 109 110 @Option( 111 name = "--dont-compress-output", 112 required = false, 113 usage = "Don't compress sequencefile records.", 114 metaVar = "BOOLEAN") 115 private boolean dontcompress = false; 116 117 @Option( 118 name = "--remove", 119 aliases = "-rm", 120 required = false, 121 usage = "Remove the existing output location if it exists.", 122 metaVar = "BOOLEAN") 123 private boolean replace = false; 124 125 @Option(name = "--input", aliases = "-i", required = true, usage = "Input local features file.", metaVar = "STRING") 126 private String input; 127 128 @Option(name = "--output", aliases = "-o", required = true, usage = "Output pca-vlad file.", metaVar = "STRING") 129 private String output; 130 131 @Option(name = "--indexer-data", aliases = "-id", required = true, usage = "Indexer data file.", metaVar = "STRING") 132 private String indexerData; 133 134 @Override 135 public int run(String[] args) throws Exception { 136 final CmdLineParser parser = new CmdLineParser(this); 137 138 try { 139 parser.parseArgument(args); 140 } catch (final CmdLineException e) { 141 System.err.println(e.getMessage()); 142 System.err.println("Usage: hadoop jar HadoopImageIndexer.jar [options]"); 143 parser.printUsage(System.err); 144 return -1; 145 } 146 147 final Path[] paths = SequenceFileUtility.getFilePaths(input, "part"); 148 final Path outputPath = new Path(output); 149 150 if (outputPath.getFileSystem(this.getConf()).exists(outputPath) && replace) 151 outputPath.getFileSystem(this.getConf()).delete(outputPath, true); 152 153 final Job job = TextBytesJobUtil.createJob(paths, outputPath, null, this.getConf()); 154 job.setJarByClass(this.getClass()); 155 job.setMapperClass(PcaVladMapper.class); 156 job.setNumReduceTasks(0); 157 158 DistributedCache.addFileToClassPath(new Path(indexerData), job.getConfiguration()); 159 job.getConfiguration().set(VLAD_INDEXER_DATA_PATH_KEY, new Path(indexerData).getName()); 160 161 SequenceFileOutputFormat.setCompressOutput(job, !dontcompress); 162 job.waitForCompletion(true); 163 164 return 0; 165 } 166 167 /** 168 * Main method 169 * 170 * @param args 171 * @throws Exception 172 */ 173 public static void main(String[] args) throws Exception { 174 ToolRunner.run(new HadoopPcaVladExtractor(), args); 175 } 176}