001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.fastkmeans;
031
032import java.io.ByteArrayInputStream;
033import java.io.IOException;
034
035import org.apache.hadoop.io.BytesWritable;
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.mapreduce.Mapper;
038import org.apache.hadoop.mapreduce.Reducer;
039import org.openimaj.tools.clusterquantiser.FeatureFile;
040import org.openimaj.tools.clusterquantiser.FeatureFileFeature;
041import org.openimaj.tools.clusterquantiser.FileType;
042
043public class ImageFeatureSelect {
044        public static final String FILETYPE_KEY = "uk.ac.soton.ecs.jsh2.clusterquantiser.FileType";
045        public static final String NFEATURE_KEY = "uk.ac.soton.ecs.ss.hadoop.fastkmeans.nfeatures";
046
047        public static class Map extends Mapper<Text, BytesWritable, Text, BytesWritable>
048        {
049                private int nfeatures = -1;
050                private static FileType fileType = null;
051                private IndexedByteArrayPriorityQueue queue = null;
052
053                @Override
054                protected void setup(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
055                                InterruptedException
056                {
057                        System.out.println("Setting up mapper");
058                        if (fileType == null) {
059                                fileType = FileType.valueOf(context.getConfiguration().getStrings(FILETYPE_KEY)[0]);
060                        }
061                        nfeatures = Integer.parseInt(context.getConfiguration().getStrings(NFEATURE_KEY)[0]);
062                        if (nfeatures != -1)
063                                queue = new IndexedByteArrayPriorityQueue(nfeatures);
064                }
065
066                @Override
067                public void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
068                        System.out.println("Reading from file: " + key);
069                        try {
070                                final FeatureFile input = fileType.read(new ByteArrayInputStream(value.getBytes()));
071                                for (final FeatureFileFeature ff : input) {
072                                        final IndexedByteArray iba = new IndexedByteArray(ff.data);
073                                        if (queue != null) {
074                                                queue.insert(iba);
075                                        }
076                                        else {
077                                                context.write(new Text(iba.index + ""), new BytesWritable(iba.array));
078                                        }
079
080                                }
081                        } catch (final Throwable t) {
082                                System.out.println("There was an error while reading the features in the map phase");
083                                t.printStackTrace();
084                        }
085                        //
086                }
087
088                @Override
089                protected void cleanup(Context context) throws IOException, InterruptedException {
090                        System.out.println("Cleaning up (emitting)");
091                        try {
092                                if (queue != null)
093                                        while (this.queue.size() > 0) {
094                                                final IndexedByteArray item = this.queue.pop();
095                                                context.write(new Text("" + item.index), new BytesWritable(item.array));
096                                        }
097                        } catch (final Throwable e) {
098                                System.out.println("There was an error reading features in the cleanup phase");
099                                e.printStackTrace();
100                        }
101                }
102
103        }
104
105        public static class Reduce extends Reducer<Text, BytesWritable, Text, BytesWritable> {
106                private int nfeatures = -1;
107                private int seen = 0;
108
109                @Override
110                protected void setup(Context context) throws IOException, InterruptedException {
111                        if (nfeatures == -1) {
112                                nfeatures = Integer.parseInt(context.getConfiguration().getStrings(NFEATURE_KEY)[0]);
113                        }
114                }
115
116                @Override
117                public void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException,
118                                InterruptedException
119                {
120                        if (nfeatures != -1 && seen >= nfeatures) {
121                                return;
122                        }
123                        for (final BytesWritable val : values) {
124                                context.write(new Text(seen + ""), val);
125                                seen++;
126                                if (seen >= nfeatures) {
127                                        return;
128                                }
129                        }
130                }
131        }
132}