001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.image.indexing; 031 032import java.io.IOException; 033 034import org.apache.hadoop.conf.Configured; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.io.IntWritable; 037import org.apache.hadoop.io.LongWritable; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.mapreduce.Mapper; 041import org.apache.hadoop.mapreduce.Reducer; 042import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 043import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 044import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 045import org.apache.hadoop.util.Tool; 046import org.apache.hadoop.util.ToolRunner; 047import org.kohsuke.args4j.CmdLineException; 048import org.kohsuke.args4j.CmdLineParser; 049import org.kohsuke.args4j.Option; 050import org.openimaj.hadoop.mapreduce.TextBytesJobUtil; 051import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 052 053public class ComputeLSHDistribution extends Configured implements Tool { 054 static class CountMapper extends Mapper<IntWritable, Text, IntWritable, IntWritable> { 055 final static IntWritable ONE = new IntWritable(1); 056 057 @Override 058 protected void map(IntWritable key, Text value, Context context) 059 throws IOException, InterruptedException 060 { 061 int pos = -1; 062 int count = 1; 063 while ((pos = value.find(" ", pos + 1)) != -1) { 064 count++; 065 } 066 067 context.write(new IntWritable(count), ONE); 068 } 069 } 070 071 static class CountReducer extends Reducer<IntWritable, IntWritable, IntWritable, LongWritable> { 072 @Override 073 protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, 074 InterruptedException 075 { 076 long count = 0; 077 078 for (final IntWritable v : values) { 079 count += v.get(); 080 } 081 082 context.write(key, new LongWritable(count)); 083 } 084 } 085 086 @Option( 087 name = "--remove", 088 aliases = "-rm", 089 required = false, 090 usage = "Remove the existing output location if it exists.", 091 metaVar = "BOOLEAN") 092 private boolean replace = false; 093 094 @Option(name = "--input", aliases = "-i", required = true, usage = "Input local features file.", metaVar = "STRING") 095 private String input; 096 097 @Option(name = "--output", aliases = "-o", required = true, usage = "Output pca-vlad file.", metaVar = "STRING") 098 private String output; 099 100 @Override 101 public int run(String[] args) throws Exception { 102 final CmdLineParser parser = new CmdLineParser(this); 103 104 try { 105 parser.parseArgument(args); 106 } catch (final CmdLineException e) { 107 System.err.println(e.getMessage()); 108 System.err.println("Usage: hadoop jar HadoopImageIndexer.jar [options]"); 109 parser.printUsage(System.err); 110 return -1; 111 } 112 113 final Path[] paths = SequenceFileUtility.getFilePaths(input, "part"); 114 final Path outputPath = new Path(output); 115 116 if (outputPath.getFileSystem(this.getConf()).exists(outputPath) && replace) 117 outputPath.getFileSystem(this.getConf()).delete(outputPath, true); 118 119 final Job job = TextBytesJobUtil.createJob(paths, outputPath, null, this.getConf()); 120 job.setJarByClass(this.getClass()); 121 122 job.setMapOutputKeyClass(IntWritable.class); 123 job.setMapOutputValueClass(IntWritable.class); 124 125 job.setOutputKeyClass(IntWritable.class); 126 job.setOutputValueClass(LongWritable.class); 127 128 job.setMapperClass(CountMapper.class); 129 job.setReducerClass(CountReducer.class); 130 131 job.setInputFormatClass(SequenceFileInputFormat.class); 132 job.setOutputFormatClass(TextOutputFormat.class); 133 134 job.setCombinerClass(CountReducer.class); 135 job.setNumReduceTasks(1); 136 137 FileOutputFormat.setCompressOutput(job, false); 138 139 job.waitForCompletion(true); 140 141 return 0; 142 } 143 144 /** 145 * Main method 146 * 147 * @param args 148 * @throws Exception 149 */ 150 public static void main(String[] args) throws Exception { 151 ToolRunner.run(new ComputeLSHDistribution(), args); 152 } 153}