001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.clusterquantiser;
031
032import java.io.ByteArrayInputStream;
033import java.io.ByteArrayOutputStream;
034import java.io.IOException;
035import java.io.InputStream;
036import java.io.PrintWriter;
037import java.util.HashMap;
038import java.util.Map;
039
040import org.apache.hadoop.conf.Configured;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.io.BytesWritable;
043import org.apache.hadoop.io.Text;
044import org.apache.hadoop.mapred.JobConf;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.mapreduce.Mapper;
047import org.apache.hadoop.util.Tool;
048import org.apache.hadoop.util.ToolRunner;
049import org.kohsuke.args4j.CmdLineException;
050import org.openimaj.hadoop.mapreduce.TextBytesJobUtil;
051import org.openimaj.hadoop.sequencefile.MetadataConfiguration;
052import org.openimaj.hadoop.sequencefile.TextBytesSequenceFileUtility;
053import org.openimaj.io.IOUtils;
054import org.openimaj.ml.clustering.ByteCentroidsResult;
055import org.openimaj.ml.clustering.IntCentroidsResult;
056import org.openimaj.ml.clustering.SpatialClusters;
057import org.openimaj.ml.clustering.assignment.HardAssigner;
058import org.openimaj.ml.clustering.assignment.hard.KDTreeByteEuclideanAssigner;
059import org.openimaj.ml.clustering.assignment.hard.KDTreeIntEuclideanAssigner;
060import org.openimaj.tools.clusterquantiser.ClusterQuantiser;
061import org.openimaj.tools.clusterquantiser.FeatureFile;
062import org.openimaj.tools.clusterquantiser.FeatureFileFeature;
063import org.openimaj.util.array.ByteArrayConverter;
064
065public class HadoopClusterQuantiserTool extends Configured implements Tool {
066        private static final String ARGS_KEY = "clusterquantiser.args";
067
068        static class ClusterQuantiserMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
069                private static SpatialClusters<?> tree = null;
070                private static HardAssigner<?, ?, ?> assigner = null;
071                private static HadoopClusterQuantiserOptions options = null;
072
073                protected static synchronized void loadCluster(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context)
074                                throws IOException
075                {
076                        if (options == null) {
077                                try {
078                                        options = new HadoopClusterQuantiserOptions(context.getConfiguration().getStrings(ARGS_KEY));
079                                        options.prepare();
080                                } catch (final CmdLineException e) {
081                                        throw new IOException(e);
082                                }
083                        }
084
085                        if (tree == null) {
086                                InputStream ios = null;
087                                try {
088                                        System.out.print("Reading quant data. ");
089                                        ios = options.getClusterInputStream();
090                                        tree = IOUtils.read(ios, options.getClusterClass());
091
092                                        if (tree instanceof ByteCentroidsResult)
093                                                assigner = new KDTreeByteEuclideanAssigner((ByteCentroidsResult) tree);
094                                        else if (tree instanceof IntCentroidsResult)
095                                                assigner = new KDTreeIntEuclideanAssigner((IntCentroidsResult) tree);
096                                        else
097                                                assigner = tree.defaultHardAssigner();
098
099                                        System.out.println("Done reading quant data.");
100                                } catch (final IOException e) {
101                                        e.printStackTrace();
102                                        throw e;
103                                } finally {
104                                        if (ios != null)
105                                                ios.close();
106                                }
107                        } else {
108                                System.out.println("tree already loaded");
109                        }
110                }
111
112                @Override
113                protected void setup(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
114                                InterruptedException
115                {
116                        loadCluster(context);
117                }
118
119                @SuppressWarnings("unchecked")
120                @Override
121                protected void
122                                map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context)
123                                                throws java.io.IOException, InterruptedException
124                {
125                        try {
126                                final long t1 = System.currentTimeMillis();
127
128                                System.out.println("[" + Thread.currentThread().getId() + "]" + "Calling map ");
129                                ByteArrayOutputStream baos = null;
130                                if (options.isInfoMode()) {
131                                        ClusterQuantiser.do_info(options);
132                                } else if (options.isQuantMode()) {
133
134                                        final FeatureFile input = options.getFileType().read(new ByteArrayInputStream(value.getBytes()));
135
136                                        baos = new ByteArrayOutputStream();
137                                        PrintWriter pw = null;
138                                        try {
139                                                pw = new PrintWriter(baos);
140                                                pw.format("%d\n%d\n", input.size(), tree.numClusters());
141
142                                                for (final FeatureFileFeature fff : input) {
143                                                        int cluster = -1;
144
145                                                        if (tree.getClass().getName().contains("Byte"))
146                                                                cluster = ((HardAssigner<byte[], ?, ?>) assigner).assign(fff.data);
147                                                        else
148                                                                cluster = ((HardAssigner<int[], ?, ?>) assigner).assign(ByteArrayConverter
149                                                                                .byteToInt(fff.data));
150
151                                                        pw.format("%s %d\n", fff.location.trim(), cluster);
152                                                }
153                                        } finally {
154                                                if (pw != null) {
155                                                        pw.flush();
156                                                        pw.close();
157                                                        input.close();
158                                                }
159                                        }
160
161                                        context.write(key, new BytesWritable(baos.toByteArray()));
162                                }
163                                final long t2 = System.currentTimeMillis();
164                                System.out.println("[" + Thread.currentThread().getId() + "]" + "Job time taken: " + (t2 - t1) / 1000.0
165                                                + "s");
166                        } catch (final Exception e) {
167                                System.err.println("Failed to quantise features because: " + e.getMessage());
168                        }
169                }
170        }
171
172        @Override
173        public int run(String[] args) throws Exception {
174                final HadoopClusterQuantiserOptions options = new HadoopClusterQuantiserOptions(args, true);
175                options.prepare();
176                // String clusterFileString = options.getInputFileString();
177                final Path[] paths = options.getInputPaths();
178                // DistributedCache.addCacheFile(SequenceFileUtility.convertToURI(clusterFileString),
179                // this.getConf());
180                final TextBytesSequenceFileUtility util = new TextBytesSequenceFileUtility(paths[0].toUri(), true);
181                final Map<String, String> metadata = new HashMap<String, String>();
182                if (util.getUUID() != null)
183                        metadata.put(MetadataConfiguration.UUID_KEY, util.getUUID());
184                metadata.put(MetadataConfiguration.CONTENT_TYPE_KEY, "application/quantised-"
185                                + options.getClusterType().toString().toLowerCase() + "-" + options.getExtension());
186
187                metadata.put("clusterquantiser.clustertype", options.getClusterType().toString());
188                metadata.put("clusterquantiser.filetype", options.getFileType().toString());
189                metadata.put("clusterquantiser.countmode", "" + options.getCountMode());
190                metadata.put("clusterquantiser.extention", "" + options.getExtension());
191
192                final Job job = TextBytesJobUtil.createJob(options.getInputFileString(), options.getOutputFileString(), metadata,
193                                this.getConf());
194                job.setJarByClass(this.getClass());
195                // job.setMapperClass(MultithreadedMapper.class);
196                // MultithreadedMapper.setNumberOfThreads(job,
197                // options.getConcurrency());
198                // MultithreadedMapper.setMapperClass(job,
199                // ClusterQuantiserMapper.class);
200                // System.out.println("NThreads = " +
201                // MultithreadedMapper.getNumberOfThreads(job));
202                options.mapperModeOp.prepareJobMapper(job, ClusterQuantiserMapper.class, options);
203
204                job.getConfiguration().setStrings(ARGS_KEY, args);
205                job.setNumReduceTasks(0);
206                ((JobConf) job.getConfiguration()).setNumTasksToExecutePerJvm(-1);
207                // job.getConfiguration().set("mapred.child.java.opts", "-Xmx3000M");
208                job.waitForCompletion(true);
209                return 0;
210        }
211
212        public static void main(String[] args) throws Exception {
213                try {
214                        ToolRunner.run(new HadoopClusterQuantiserTool(), args);
215                } catch (final CmdLineException e) {
216                        System.err.print(e);
217                }
218        }
219}