001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.image.indexing; 031 032import java.io.IOException; 033 034import org.apache.hadoop.conf.Configured; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.io.IntWritable; 037import org.apache.hadoop.io.Text; 038import org.apache.hadoop.mapreduce.Job; 039import org.apache.hadoop.mapreduce.Mapper; 040import org.apache.hadoop.mapreduce.Reducer; 041import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 042import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 043import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 044import org.apache.hadoop.util.Tool; 045import org.apache.hadoop.util.ToolRunner; 046import org.kohsuke.args4j.CmdLineException; 047import org.kohsuke.args4j.CmdLineParser; 048import org.kohsuke.args4j.Option; 049import org.openimaj.hadoop.mapreduce.TextBytesJobUtil; 050import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 051 052public class SimpleLSHDuplicatesFinder extends Configured implements Tool { 053 private static final String LOWER_THRESH_KEY = "lower.thresh"; 054 private static final String UPPER_THRESH_KEY = "upper.thresh"; 055 056 static class Map extends Mapper<IntWritable, Text, Text, IntWritable> { 057 private final static IntWritable ONE = new IntWritable(1); 058 059 int lowerThresh; 060 int upperThresh; 061 062 @Override 063 protected void setup(Context context) throws IOException, InterruptedException 064 { 065 lowerThresh = context.getConfiguration().getInt(LOWER_THRESH_KEY, 0); 066 upperThresh = context.getConfiguration().getInt(UPPER_THRESH_KEY, 100); 067 } 068 069 @Override 070 protected void map(IntWritable key, Text value, Context context) 071 throws IOException, InterruptedException 072 { 073 int pos = -1; 074 int count = 1; 075 while ((pos = value.find(" ", pos + 1)) != -1) { 076 count++; 077 } 078 079 if (count > upperThresh || count <= lowerThresh) 080 return; 081 082 final String[] ids = value.toString().split(" "); 083 084 // FIXME: edge weights 085 for (int i = 0; i < ids.length; i++) 086 for (int j = i + 1; j < ids.length; j++) 087 if (ids[i].compareTo(ids[j]) < 0) 088 context.write(new Text(ids[i] + " " + ids[j]), ONE); 089 else 090 context.write(new Text(ids[j] + " " + ids[i]), ONE); 091 } 092 } 093 094 static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { 095 @Override 096 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 097 InterruptedException 098 { 099 int count = 0; 100 for (final IntWritable v : values) { 101 count += v.get(); 102 } 103 if (count > 1) { 104 context.write(key, new IntWritable(count)); 105 } 106 } 107 } 108 109 static class Combiner extends Reducer<Text, IntWritable, Text, IntWritable> { 110 @Override 111 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 112 InterruptedException 113 { 114 int count = 0; 115 for (final IntWritable v : values) { 116 count += v.get(); 117 } 118 context.write(key, new IntWritable(count)); 119 } 120 } 121 122 @Option( 123 name = "--remove", 124 aliases = "-rm", 125 required = false, 126 usage = "Remove the existing output location if it exists.", 127 metaVar = "BOOLEAN") 128 private boolean replace = false; 129 130 @Option(name = "--input", aliases = "-i", required = true, usage = "Input local features file.", metaVar = "STRING") 131 private String input; 132 133 @Option(name = "--output", aliases = "-o", required = true, usage = "Output graph edges file.", metaVar = "STRING") 134 private String output; 135 136 @Option( 137 name = "--min-threshold", 138 aliases = "-min", 139 required = true, 140 usage = "min threshold for bin size (bin count must be > minThreshold)") 141 private int minThreshold; 142 143 @Option( 144 name = "--max-threshold", 145 aliases = "-max", 146 required = true, 147 usage = "max threshold for bin size (bin count must be < maxThreshold)") 148 private int maxThreshold; 149 150 @Override 151 public int run(String[] args) throws Exception { 152 final CmdLineParser parser = new CmdLineParser(this); 153 154 try { 155 parser.parseArgument(args); 156 } catch (final CmdLineException e) { 157 System.err.println(e.getMessage()); 158 System.err.println("Usage: hadoop jar HadoopImageIndexer.jar [options]"); 159 parser.printUsage(System.err); 160 return -1; 161 } 162 163 final Path[] paths = SequenceFileUtility.getFilePaths(input, "part"); 164 final Path outputPath = new Path(output); 165 166 if (outputPath.getFileSystem(this.getConf()).exists(outputPath) && replace) 167 outputPath.getFileSystem(this.getConf()).delete(outputPath, true); 168 169 final Job job = TextBytesJobUtil.createJob(paths, outputPath, null, this.getConf()); 170 job.setJarByClass(this.getClass()); 171 172 job.setMapOutputKeyClass(Text.class); 173 job.setMapOutputValueClass(IntWritable.class); 174 175 job.setOutputKeyClass(Text.class); 176 job.setOutputValueClass(IntWritable.class); 177 178 job.setMapperClass(Map.class); 179 job.setReducerClass(Reduce.class); 180 181 job.setInputFormatClass(SequenceFileInputFormat.class); 182 job.setOutputFormatClass(TextOutputFormat.class); 183 184 job.setCombinerClass(Combiner.class); 185 job.setNumReduceTasks(1); 186 187 FileOutputFormat.setCompressOutput(job, false); 188 189 job.getConfiguration().setInt(LOWER_THRESH_KEY, minThreshold); 190 job.getConfiguration().setInt(UPPER_THRESH_KEY, maxThreshold); 191 192 job.waitForCompletion(true); 193 194 return 0; 195 } 196 197 /** 198 * Main method 199 * 200 * @param args 201 * @throws Exception 202 */ 203 public static void main(String[] args) throws Exception { 204 ToolRunner.run(new SimpleLSHDuplicatesFinder(), args); 205 } 206}