001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter;
031
032import java.io.IOException;
033
034import org.apache.commons.lang.StringUtils;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.io.LongWritable;
038import org.apache.hadoop.io.NullWritable;
039import org.apache.hadoop.io.Text;
040import org.apache.hadoop.mapreduce.Counter;
041import org.apache.hadoop.mapreduce.InputFormat;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.Mapper;
044import org.apache.hadoop.mapreduce.Reducer;
045import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
046import org.apache.hadoop.util.Tool;
047import org.openimaj.hadoop.tools.HadoopToolsUtil;
048import org.openimaj.text.nlp.TweetTokeniser;
049import org.openimaj.text.nlp.TweetTokeniserException;
050import org.openimaj.twitter.GeneralJSONTwitter;
051import org.openimaj.twitter.USMFStatus;
052
053/**
054 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
055 *
056 */
057public class HadoopLZOTest extends Configured implements Tool {
058        enum CounterEnum {
059                CHEESE, FLEES;
060        }
061
062        public static class CounterMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
063                public CounterMapper() {
064                        // TODO Auto-generated constructor stub
065                }
066
067                @Override
068                protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context)
069                                throws java.io.IOException, InterruptedException
070                {
071                        final USMFStatus status = new USMFStatus(GeneralJSONTwitter.class);
072                        status.fillFromString(value.toString());
073
074                        context.getCounter(CounterEnum.CHEESE).increment(10);
075                        context.getCounter(CounterEnum.FLEES).increment(20);
076                        if (status.isInvalid())
077                                return;
078                        try {
079                                final TweetTokeniser tok = new TweetTokeniser(status.text);
080                                context.write(key, new Text(StringUtils.join(tok.getTokens(), ",")));
081                        } catch (final TweetTokeniserException e) {
082                        }
083                }
084        }
085
086        public static class CounterReducer extends Reducer<LongWritable, Text, NullWritable, Text> {
087                public CounterReducer() {
088                        // TODO Auto-generated constructor stub
089                }
090
091                @Override
092                protected void reduce(LongWritable key, Iterable<Text> values,
093                                Reducer<LongWritable, Text, NullWritable, Text>.Context context)
094                {
095                        final Counter cheeseCounter = context.getCounter(CounterEnum.CHEESE);
096                        final Counter fleesCounter = context.getCounter(CounterEnum.FLEES);
097                        System.out.println(cheeseCounter.getName() + ": " + cheeseCounter.getValue());
098                        System.out.println(fleesCounter.getName() + ": " + fleesCounter.getValue());
099                        for (final Text text : values) {
100                                try {
101                                        context.write(NullWritable.get(), text);
102                                } catch (final IOException e) {
103                                } catch (final InterruptedException e) {
104                                }
105
106                        }
107
108                }
109        }
110
111        @SuppressWarnings("unchecked")
112        @Override
113        public int run(String[] args) throws Exception {
114                Class<? extends InputFormat<?, ?>> lzoClass = null;
115                try {
116                        lzoClass = (Class<? extends InputFormat<?, ?>>) Class.forName("com.hadoop.mapreduce.LzoTextInputFormat");
117                } catch (final ClassNotFoundException nfe) {
118                        System.err.println("LZO not installed; skipping");
119                        return -1;
120                }
121
122                final Path[] paths = new Path[] { new Path(args[0]) };
123                final Path out = new Path(args[1]);
124                HadoopToolsUtil.validateOutput(args[1], true);
125                final Job job = new Job(this.getConf());
126
127                job.setInputFormatClass(lzoClass);
128                job.setOutputKeyClass(LongWritable.class);
129                job.setOutputValueClass(Text.class);
130                job.setOutputFormatClass(TextOutputFormat.class);
131                job.setJarByClass(this.getClass());
132
133                lzoClass.getMethod("setInputPaths", Path[].class).invoke(null, (Object[]) paths);
134                TextOutputFormat.setOutputPath(job, out);
135                job.setMapperClass(CounterMapper.class);
136                job.setReducerClass(CounterReducer.class);
137
138                long start, end;
139                start = System.currentTimeMillis();
140                job.waitForCompletion(true);
141                end = System.currentTimeMillis();
142                System.out.println("Took: " + (end - start) + "ms");
143                return 0;
144        }
145}