001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.fastkmeans; 031 032import java.io.ByteArrayInputStream; 033import java.io.IOException; 034 035import org.apache.hadoop.io.BytesWritable; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.mapreduce.Mapper; 038import org.apache.hadoop.mapreduce.Reducer; 039import org.openimaj.tools.clusterquantiser.FeatureFile; 040import org.openimaj.tools.clusterquantiser.FeatureFileFeature; 041import org.openimaj.tools.clusterquantiser.FileType; 042 043public class ImageFeatureSelect { 044 public static final String FILETYPE_KEY = "uk.ac.soton.ecs.jsh2.clusterquantiser.FileType"; 045 public static final String NFEATURE_KEY = "uk.ac.soton.ecs.ss.hadoop.fastkmeans.nfeatures"; 046 047 public static class Map extends Mapper<Text, BytesWritable, Text, BytesWritable> 048 { 049 private int nfeatures = -1; 050 private static FileType fileType = null; 051 private IndexedByteArrayPriorityQueue queue = null; 052 053 @Override 054 protected void setup(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, 055 InterruptedException 056 { 057 System.out.println("Setting up mapper"); 058 if (fileType == null) { 059 fileType = FileType.valueOf(context.getConfiguration().getStrings(FILETYPE_KEY)[0]); 060 } 061 nfeatures = Integer.parseInt(context.getConfiguration().getStrings(NFEATURE_KEY)[0]); 062 if (nfeatures != -1) 063 queue = new IndexedByteArrayPriorityQueue(nfeatures); 064 } 065 066 @Override 067 public void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { 068 System.out.println("Reading from file: " + key); 069 try { 070 final FeatureFile input = fileType.read(new ByteArrayInputStream(value.getBytes())); 071 for (final FeatureFileFeature ff : input) { 072 final IndexedByteArray iba = new IndexedByteArray(ff.data); 073 if (queue != null) { 074 queue.insert(iba); 075 } 076 else { 077 context.write(new Text(iba.index + ""), new BytesWritable(iba.array)); 078 } 079 080 } 081 } catch (final Throwable t) { 082 System.out.println("There was an error while reading the features in the map phase"); 083 t.printStackTrace(); 084 } 085 // 086 } 087 088 @Override 089 protected void cleanup(Context context) throws IOException, InterruptedException { 090 System.out.println("Cleaning up (emitting)"); 091 try { 092 if (queue != null) 093 while (this.queue.size() > 0) { 094 final IndexedByteArray item = this.queue.pop(); 095 context.write(new Text("" + item.index), new BytesWritable(item.array)); 096 } 097 } catch (final Throwable e) { 098 System.out.println("There was an error reading features in the cleanup phase"); 099 e.printStackTrace(); 100 } 101 } 102 103 } 104 105 public static class Reduce extends Reducer<Text, BytesWritable, Text, BytesWritable> { 106 private int nfeatures = -1; 107 private int seen = 0; 108 109 @Override 110 protected void setup(Context context) throws IOException, InterruptedException { 111 if (nfeatures == -1) { 112 nfeatures = Integer.parseInt(context.getConfiguration().getStrings(NFEATURE_KEY)[0]); 113 } 114 } 115 116 @Override 117 public void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, 118 InterruptedException 119 { 120 if (nfeatures != -1 && seen >= nfeatures) { 121 return; 122 } 123 for (final BytesWritable val : values) { 124 context.write(new Text(seen + ""), val); 125 seen++; 126 if (seen >= nfeatures) { 127 return; 128 } 129 } 130 } 131 } 132}