001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.fastkmeans; 031 032import java.io.IOException; 033 034import org.apache.hadoop.io.BytesWritable; 035import org.apache.hadoop.io.IntWritable; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.mapreduce.Mapper; 038import org.apache.hadoop.mapreduce.Reducer; 039import org.openimaj.tools.clusterquantiser.FileType; 040 041/** 042 * Map-Reduce (random) feature selection 043 * 044 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 045 * 046 */ 047public class FeatureSelect { 048 static final String FILETYPE_KEY = "clusterquantiser.FileType"; 049 static final String NFEATURE_KEY = "fastkmeans.nfeatures"; 050 051 /** 052 * Mapper 053 * 054 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 055 * 056 */ 057 public static class Map extends Mapper<Text, BytesWritable, IntWritable, BytesWritable> 058 { 059 private int nfeatures = -1; 060 private static FileType fileType = null; 061 private IndexedByteArrayPriorityQueue queue; 062 063 @Override 064 protected void setup(Mapper<Text, BytesWritable, IntWritable, BytesWritable>.Context context) throws IOException, 065 InterruptedException 066 { 067 if (fileType == null) { 068 fileType = FileType.valueOf(context.getConfiguration().get(FILETYPE_KEY)); 069 } 070 if (nfeatures == -1) { 071 nfeatures = Integer.parseInt(context.getConfiguration().get(NFEATURE_KEY)); 072 } 073 074 queue = new IndexedByteArrayPriorityQueue(nfeatures); 075 } 076 077 @Override 078 public void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { 079 final byte[] validBytes = new byte[value.getLength()]; 080 System.arraycopy(value.getBytes(), 0, validBytes, 0, validBytes.length); 081 final IndexedByteArray indexedItem = new IndexedByteArray(validBytes); 082 queue.insert(indexedItem); 083 } 084 085 @Override 086 protected void cleanup(Context context) throws IOException, InterruptedException { 087 while (this.queue.size() > 0) { 088 final IndexedByteArray item = this.queue.pop(); 089 context.write(new IntWritable(item.index), new BytesWritable(item.array)); 090 } 091 } 092 } 093 094 /** 095 * Reducer 096 * 097 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 098 * 099 */ 100 public static class Reduce extends Reducer<IntWritable, BytesWritable, IntWritable, BytesWritable> { 101 private int nfeatures = -1; 102 private int seen = 0; 103 104 @Override 105 protected void setup(Context context) throws IOException, InterruptedException { 106 if (nfeatures == -1) { 107 nfeatures = Integer.parseInt(context.getConfiguration().getStrings(NFEATURE_KEY)[0]); 108 } 109 } 110 111 @Override 112 public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, 113 InterruptedException 114 { 115 if (seen >= nfeatures) { 116 return; 117 } 118 for (final BytesWritable val : values) { 119 context.write(new IntWritable(seen), val); 120 seen++; 121 if (seen >= nfeatures) { 122 return; 123 } 124 } 125 } 126 } 127 128}