001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.globalfeature;
031
032import java.io.ByteArrayInputStream;
033import java.io.ByteArrayOutputStream;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039
040import org.apache.hadoop.conf.Configured;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.io.BytesWritable;
043import org.apache.hadoop.io.SequenceFile;
044import org.apache.hadoop.io.Text;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.mapreduce.Mapper;
047import org.apache.hadoop.util.Tool;
048import org.apache.hadoop.util.ToolRunner;
049import org.apache.log4j.Logger;
050import org.openimaj.feature.FeatureVector;
051import org.openimaj.hadoop.mapreduce.TextBytesJobUtil;
052import org.openimaj.hadoop.sequencefile.MetadataConfiguration;
053import org.openimaj.hadoop.tools.HadoopToolsUtil;
054import org.openimaj.image.ImageUtilities;
055import org.openimaj.image.MBFImage;
056import org.openimaj.io.IOUtils;
057
058/**
059 * A Hadoop version of the GlobalFeaturesTool. Capable of extracting global
060 * image features on very large scale corpora from images stored in
061 * {@link SequenceFile}s.
062 *
063 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk)
064 */
065public class HadoopGlobalFeaturesTool extends Configured implements Tool
066{
067        private static final String ARGS_KEY = "globalfeatures.args";
068        private static Logger logger = Logger.getLogger(HadoopGlobalFeaturesTool.class);
069
070        static class GlobalFeaturesMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
071                private HadoopGlobalFeaturesOptions options;
072
073                public GlobalFeaturesMapper() {
074                }
075
076                @Override
077                protected void setup(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) {
078                        options = new HadoopGlobalFeaturesOptions(context.getConfiguration().getStrings(ARGS_KEY));
079                }
080
081                @Override
082                protected void
083                                map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context)
084                                                throws InterruptedException
085                {
086                        try {
087                                final MBFImage img = ImageUtilities.readMBF(new ByteArrayInputStream(value.getBytes()));
088                                final FeatureVector fv = options.featureOp.extract(img);
089
090                                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
091                                if (options.binary)
092                                        IOUtils.writeBinary(baos, fv);
093                                else
094                                        IOUtils.writeASCII(baos, fv);
095
096                                context.write(key, new BytesWritable(baos.toByteArray()));
097                        } catch (final Exception e) {
098                                logger.warn("Problem processing image " + key + " (" + e + ")");
099                        }
100                }
101        }
102
103        @Override
104        public int run(String[] args) throws Exception {
105                final HadoopGlobalFeaturesOptions options = new HadoopGlobalFeaturesOptions(args, true);
106
107                final Map<String, String> metadata = new HashMap<String, String>();
108                metadata.put(MetadataConfiguration.CONTENT_TYPE_KEY, "application/globalfeature-" + options.feature + "-"
109                                + (options.binary ? "bin" : "ascii"));
110
111                metadata.put("clusterquantiser.filetype", (options.binary ? "bin" : "ascii"));
112
113                final List<Path> allPaths = new ArrayList<Path>();
114                for (final String p : options.input) {
115                        allPaths.addAll(Arrays.asList(HadoopToolsUtil.getInputPaths(p)));
116                }
117
118                final Job job = TextBytesJobUtil.createJob(allPaths, new Path(options.output), metadata, this.getConf());
119                job.setJarByClass(this.getClass());
120                job.setMapperClass(GlobalFeaturesMapper.class);
121                job.getConfiguration().setStrings(ARGS_KEY, args);
122                job.setNumReduceTasks(0);
123
124                job.waitForCompletion(true);
125
126                return 0;
127        }
128
129        /**
130         * The main method for the tool.
131         * 
132         * @param args
133         *            the command-line arguments
134         * @throws Exception
135         *             if an error occurs
136         */
137        public static void main(String[] args) throws Exception
138        {
139                ToolRunner.run(new HadoopGlobalFeaturesTool(), args);
140        }
141}