001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.image.indexing; 031 032import java.io.IOException; 033 034import org.apache.hadoop.conf.Configured; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.io.IntWritable; 037import org.apache.hadoop.io.LongWritable; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.mapreduce.Mapper; 041import org.apache.hadoop.mapreduce.Reducer; 042import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 043import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 044import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 045import org.apache.hadoop.util.Tool; 046import org.apache.hadoop.util.ToolRunner; 047import org.kohsuke.args4j.CmdLineException; 048import org.kohsuke.args4j.CmdLineParser; 049import org.kohsuke.args4j.Option; 050import org.openimaj.hadoop.mapreduce.TextBytesJobUtil; 051import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 052 053public class DupsEdgeFilter extends Configured implements Tool { 054 static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { 055 @Override 056 protected void map(LongWritable key, Text value, Context context) 057 throws IOException, InterruptedException 058 { 059 final String[] parts = value.toString().split("\\s"); 060 061 if (parts.length == 3) { 062 final int count = Integer.parseInt(parts[2]); 063 064 if (count > 10) { 065 if (parts[0].compareTo(parts[1]) < 0) 066 context.write(new Text(parts[0] + " " + parts[1]), new IntWritable(count)); 067 else 068 context.write(new Text(parts[1] + " " + parts[0]), new IntWritable(count)); 069 } 070 } 071 } 072 } 073 074 static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { 075 @Override 076 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 077 InterruptedException 078 { 079 int count = 0; 080 for (final IntWritable v : values) { 081 count += v.get(); 082 } 083 context.write(key, new IntWritable(count)); 084 } 085 } 086 087 @Option( 088 name = "--remove", 089 aliases = "-rm", 090 required = false, 091 usage = "Remove the existing output location if it exists.", 092 metaVar = "BOOLEAN") 093 private boolean replace = false; 094 095 @Option(name = "--input", aliases = "-i", required = true, usage = "Input graph edges file.", metaVar = "STRING") 096 private String input; 097 098 @Option(name = "--output", aliases = "-o", required = true, usage = "Output graph edges file.", metaVar = "STRING") 099 private String output; 100 101 @Override 102 public int run(String[] args) throws Exception { 103 final CmdLineParser parser = new CmdLineParser(this); 104 105 try { 106 parser.parseArgument(args); 107 } catch (final CmdLineException e) { 108 System.err.println(e.getMessage()); 109 System.err.println("Usage: hadoop jar HadoopImageIndexer.jar [options]"); 110 parser.printUsage(System.err); 111 return -1; 112 } 113 114 final Path[] paths = SequenceFileUtility.getFilePaths(input, "part"); 115 final Path outputPath = new Path(output); 116 117 if (outputPath.getFileSystem(this.getConf()).exists(outputPath) && replace) 118 outputPath.getFileSystem(this.getConf()).delete(outputPath, true); 119 120 final Job job = TextBytesJobUtil.createJob(paths, outputPath, null, this.getConf()); 121 job.setJarByClass(this.getClass()); 122 123 job.setMapOutputKeyClass(Text.class); 124 job.setMapOutputValueClass(IntWritable.class); 125 126 job.setOutputKeyClass(Text.class); 127 job.setOutputValueClass(IntWritable.class); 128 129 job.setMapperClass(Map.class); 130 job.setReducerClass(Reduce.class); 131 132 job.setInputFormatClass(TextInputFormat.class); 133 job.setOutputFormatClass(TextOutputFormat.class); 134 135 job.setCombinerClass(Reduce.class); 136 job.setNumReduceTasks(1); 137 138 FileOutputFormat.setCompressOutput(job, false); 139 140 job.waitForCompletion(true); 141 142 return 0; 143 } 144 145 /** 146 * Main method 147 * 148 * @param args 149 * @throws Exception 150 */ 151 public static void main(String[] args) throws Exception { 152 ToolRunner.run(new DupsEdgeFilter(), args); 153 } 154}