001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.mode.dfidf;
031
032import gnu.trove.procedure.TObjectIntProcedure;
033
034import java.io.ByteArrayInputStream;
035import java.io.ByteArrayOutputStream;
036import java.io.DataOutput;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.HashMap;
040import java.util.List;
041import java.util.TreeSet;
042
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.io.BytesWritable;
046import org.apache.hadoop.io.LongWritable;
047import org.apache.hadoop.io.Text;
048import org.apache.hadoop.mapreduce.Job;
049import org.apache.hadoop.mapreduce.Mapper;
050import org.apache.hadoop.mapreduce.Reducer;
051import org.kohsuke.args4j.CmdLineException;
052import org.openimaj.hadoop.mapreduce.stage.StageProvider;
053import org.openimaj.hadoop.mapreduce.stage.helper.SimpleSequenceFileStage;
054import org.openimaj.hadoop.tools.HadoopToolsUtil;
055import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions;
056import org.openimaj.hadoop.tools.twitter.token.mode.TextEntryType;
057import org.openimaj.hadoop.tools.twitter.token.mode.WritableEnumCounter;
058import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.TimeFrequencyHolder.TimeFrequency;
059import org.openimaj.hadoop.tools.twitter.utils.TimeperiodTweetCountWordCount;
060import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap;
061import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF;
062import org.openimaj.io.IOUtils;
063import org.openimaj.io.wrappers.WriteableListBinary;
064
065/**
066 * A Stage provider wrapping the functionality of
067 * {@link CountWordsAcrossTimeperiod.Map} and
068 * {@link CountWordsAcrossTimeperiod.Reduce}.
069 *
070 * The Map expects times as keys and maps of word counts as input. The mapper
071 * emits words as keys with pairs of the timeperiods and counts within that time
072 * period
073 *
074 * This along with the TimeIndex generated by the
075 * {@link CountTweetsInTimeperiod} is used to construct a DFIDF per word per
076 * time period.
077 *
078 * One interpretation of the DFIDF score is a weighting based on counts up to a
079 * particular time period. This functionality is encoded in the
080 * {@link NonCombinedTimesReducer}
081 *
082 * Another interpretation is that the DFIDF score weights words based on word
083 * occurences at the end of some period of time, i.e. the count at some END
084 *
085 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
086 *
087 */
088public class CountWordsAcrossTimeperiod extends StageProvider {
089        private String[] nonHadoopArgs;
090        private boolean combinedTimes = false;
091
092        /**
093         *
094         * @param nonHadoopArgs
095         */
096        public CountWordsAcrossTimeperiod(String[] nonHadoopArgs) {
097                this.nonHadoopArgs = nonHadoopArgs;
098        }
099
100        /**
101         * @param nonHadoopArgs
102         * @param combinedTimes
103         *            whether the mapper expects times entries with values for each
104         *            word. i.e. combined times
105         */
106        public CountWordsAcrossTimeperiod(String[] nonHadoopArgs, boolean combinedTimes) {
107                this.nonHadoopArgs = nonHadoopArgs;
108                this.combinedTimes = combinedTimes;
109        }
110
111        /**
112         * arg key
113         */
114        public static final String ARGS_KEY = "TOKEN_ARGS";
115        private static final LongWritable END_TIME = new LongWritable(-1);
116        /**
117         * where the intermediat word counts should be stored
118         */
119        public final static String WORDCOUNT_DIR = "wordtimeperiodDFIDF";
120
121        /**
122         * function(timePeriodLength) map input: <timePeriod:
123         * <<tweet:#freq>,<word:#freq>,<word:#freq>,...> map output: [ word:
124         * <timeperiod, tweet:#freq, word:#freq>, word: <timeperiod, tweet:#freq,
125         * word:#freq>, ... ]
126         *
127         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
128         *
129         */
130        public static class Map extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> {
131
132                /**
133                 * Mapper constructor doesn't do anything (Mapper constructor doesn't
134                 * give a fuck)
135                 */
136                public Map() {
137
138                }
139
140                private static HadoopTwitterTokenToolOptions options;
141
142                protected static synchronized void loadOptions(
143                                Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException
144                {
145                        if (options == null) {
146                                try {
147                                        options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(ARGS_KEY));
148                                        options.prepare();
149                                } catch (final CmdLineException e) {
150                                        throw new IOException(e);
151                                } catch (final Exception e) {
152                                        throw new IOException(e);
153                                }
154                        }
155                }
156
157                @Override
158                protected void setup(Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context)
159                                throws IOException, InterruptedException
160                {
161                        loadOptions(context);
162                }
163
164                @Override
165                protected void map(final LongWritable key, BytesWritable value,
166                                final Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context)
167                                                throws java.io.IOException, InterruptedException
168                {
169
170                        final TweetCountWordMap periodCountWordCount = IOUtils.read(new ByteArrayInputStream(value.getBytes()),
171                                        TweetCountWordMap.class);
172                        final boolean written = periodCountWordCount.getTweetWordMap().forEachEntry(new TObjectIntProcedure<String>()
173                                        {
174
175                                @Override
176                                public boolean execute(String word, int wordCount) {
177                                        final TimeperiodTweetCountWordCount timeMap = new TimeperiodTweetCountWordCount(key.get(), wordCount,
178                                                        periodCountWordCount.getNTweets());
179                                        final ByteArrayOutputStream os = new ByteArrayOutputStream();
180                                        try {
181                                                IOUtils.writeBinary(os, timeMap);
182                                                final BytesWritable writeable = new BytesWritable(os.toByteArray());
183                                                context.write(new Text(word), writeable);
184                                        } catch (final IOException e) {
185                                                return false;
186                                        } catch (final InterruptedException e) {
187                                                return false;
188                                        }
189
190                                        return true;
191                                }
192                                        });
193                        if (!written) {
194                                throw new IOException("Couldn't write the TimeperiodTweetCountWordCount object");
195                        }
196                }
197        }
198
199        /**
200         * reduce input: <word: [ <timeperiod, tweet:#freq, word:#freq>,
201         * <timeperiod, tweet:#freq, word:#freq>,... ] reduce output: # <word:
202         * <timePeriod, DFIDF>,...>
203         *
204         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
205         *
206         */
207        public static class Reduce extends Reducer<Text, BytesWritable, Text, BytesWritable> {
208
209                /**
210                 * default construct does nothing
211                 */
212                public Reduce() {
213
214                }
215
216                @Override
217                protected void reduce(Text word, Iterable<BytesWritable> values,
218                                Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
219                                InterruptedException
220                {
221                        // read all timeperiods to objects, find the END_TIME instance, hold
222                        // the rest
223                        /*
224                         * # read total tweet frequency from timeperiod -1 Ttf # read total
225                         * word tweet frequency from timeperiod -1 Twf # read time period
226                         * tweet frequency from entry tf # read time period word frequency
227                         * from entry wf
228                         */
229                        TimeperiodTweetCountWordCount endTime = null;
230                        final List<TimeperiodTweetCountWordCount> otherTimes = new ArrayList<TimeperiodTweetCountWordCount>();
231                        for (final BytesWritable inputArr : values) {
232                                final ByteArrayInputStream stream = new ByteArrayInputStream(inputArr.getBytes());
233                                final TimeperiodTweetCountWordCount instance = IOUtils.read(stream, TimeperiodTweetCountWordCount.class);
234                                if (instance.timeperiod == END_TIME.get())
235                                        endTime = instance;
236                                else
237                                        otherTimes.add(instance);
238                        }
239                        /*
240                         * # for entry in input: # DF = wf/tf # IDF = Ttf/Twf
241                         */
242                        // Total number of tweets in all timeperiods
243                        final long Ttf = endTime.tweetcount;
244                        // Number of tweets containing this word in all timeperiods
245                        final long Twf = endTime.wordcount;
246                        final TreeSet<WordDFIDF> allDFIDF = new TreeSet<WordDFIDF>();
247                        for (final TimeperiodTweetCountWordCount tcwc : otherTimes) {
248                                // Number of tweets in this timeperiod
249                                final long tf = tcwc.tweetcount;
250                                // Number of tweets containing this word in this time period
251                                final long wf = tcwc.wordcount;
252
253                                final WordDFIDF dfidf = new WordDFIDF(tcwc.timeperiod, wf, tf, Twf, Ttf);
254                                allDFIDF.add(dfidf);
255                        }
256                        final List<WordDFIDF> listVersion = new ArrayList<WordDFIDF>();
257                        listVersion.addAll(allDFIDF);
258                        final WriteableListBinary<WordDFIDF> writeableCollection = new WriteableListBinary<WordDFIDF>(listVersion) {
259                                @Override
260                                protected void writeValue(WordDFIDF v, DataOutput out) throws IOException {
261                                        v.writeBinary(out);
262                                }
263
264                        };
265                        context.write(word, new BytesWritable(IOUtils.serialize(writeableCollection)));
266                }
267        }
268
269        /**
270         * reduce input: <word: [ <timeperiod, tweet:#freq, word:#freq>,
271         * <timeperiod, tweet:#freq, word:#freq>,... ] but unlike {@link Reduce}
272         * expects that each timeperiod may appear multiple times (i.e. each
273         * timeperiod was not combined!) reduce output: # <word: <timePeriod,
274         * DFIDF>,...>
275         *
276         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
277         */
278        public static class NonCombinedTimesReducer extends
279        Reducer<Text, BytesWritable, Text, BytesWritable>
280        {
281
282                private HadoopTwitterTokenToolOptions options;
283                private WritableEnumCounter<TextEntryType> tgs;
284                private TimeFrequencyHolder timeIndex;
285
286                /**
287                 * default construct does nothing
288                 */
289                public NonCombinedTimesReducer() {
290
291                }
292
293                protected synchronized void loadOptions(Reducer<Text, BytesWritable, Text, BytesWritable>.Context context)
294                                throws IOException
295                {
296                        try {
297                                options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(ARGS_KEY));
298                                options.prepare();
299                                final Path outpath = HadoopToolsUtil.getOutputPath(options);
300
301                                timeIndex = CountTweetsInTimeperiod.readTimeIndex(CountTweetsInTimeperiod.constructIndexPath(outpath));
302                                final Path timecountOut = new Path(outpath, CountTweetsInTimeperiod.TIMECOUNT_DIR);
303                                final Path statsout = new Path(timecountOut, CountTweetsInTimeperiod.GLOBAL_STATS_FILE);
304                                final FileSystem fs = HadoopToolsUtil.getFileSystem(statsout);
305                                final WritableEnumCounter<TextEntryType> et = new WritableEnumCounter<TextEntryType>() {
306                                        @Override
307                                        public TextEntryType valueOf(String str) {
308                                                return TextEntryType.valueOf(str);
309                                        }
310
311                                };
312                                tgs = IOUtils.read(fs.open(statsout), et);
313                        } catch (final CmdLineException e) {
314                                e.printStackTrace();
315                                throw new IOException(e);
316                        } catch (final Exception e) {
317                                e.printStackTrace();
318                                throw new IOException(e);
319                        }
320                }
321
322                @Override
323                protected void setup(Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
324                InterruptedException
325                {
326                        loadOptions(context);
327                }
328
329                @Override
330                protected void reduce(Text word, Iterable<BytesWritable> values,
331                                Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
332                                InterruptedException
333                {
334                        // read all timeperiods to objects, find the END_TIME instance, hold
335                        // the rest
336                        /*
337                         * read time period tweet frequency from entry tf read time period
338                         * word frequency from entry wf construct a single count for all
339                         * time periods of the above
340                         *
341                         * Go through compiled timeperiod counts in order, keep a count of
342                         * total tweets up to a time period and total words up to a time
343                         * period use the current total to construct a WordDFIDF object per
344                         * time period
345                         */
346                        TimeperiodTweetCountWordCount endTime = null;
347                        final TreeSet<Long> times = new TreeSet<Long>();
348                        final HashMap<Long, TimeperiodTweetCountWordCount> otherTimes = new HashMap<Long, TimeperiodTweetCountWordCount>();
349                        System.out.println("STARTING WORD: " + word);
350                        for (final BytesWritable inputArr : values) {
351                                final ByteArrayInputStream stream = new ByteArrayInputStream(inputArr.getBytes());
352                                final TimeperiodTweetCountWordCount instance = IOUtils.read(stream, TimeperiodTweetCountWordCount.class);
353                                System.out.println("... FOUND TIME INSTANCE:" + instance.timeperiod);
354                                if (instance.timeperiod == END_TIME.get())
355                                {
356                                        if (endTime == null)
357                                        {
358                                                // System.out.println("... end time CREATED");
359                                                endTime = instance;
360                                                endTime.tweetcount = tgs.getValue(TextEntryType.VALID);
361                                        }
362                                        else
363                                        {
364                                                // System.out.println("... end time INCREMENTED");
365                                                endTime.wordcount += instance.wordcount;
366                                        }
367                                        // Skip, not important!
368
369                                }
370                                else
371                                {
372                                        times.add(instance.timeperiod);
373                                        final TimeperiodTweetCountWordCount currentTimeCounter = otherTimes.get(instance.timeperiod);
374                                        System.out.println("Instance tweet count: " + instance.tweetcount);
375                                        System.out.println("Instance word count: " + instance.wordcount);
376                                        if (currentTimeCounter == null) {
377                                                System.out.println("... time CREATED");
378                                                otherTimes.put(instance.timeperiod, instance);
379                                        }
380                                        else {
381                                                System.out.println("... incremented time CREATED");
382                                                currentTimeCounter.tweetcount += instance.tweetcount;
383                                                currentTimeCounter.wordcount += instance.wordcount;
384                                        }
385                                }
386                        }
387                        // System.out.println("... TOTAL tweets = " + endTime.tweetcount);
388                        // System.out.println("... TOTAL tweets with THIS word = " +
389                        // endTime.wordcount);
390                        /*
391                         * # for entry in input: # DF = wf/tf # IDF = Ttf/Twf
392                         */
393                        // Total number of tweets seen so far
394                        // Number of tweets containing this word seen so far
395                        long Twf = 0;
396                        final TreeSet<WordDFIDF> allDFIDF = new TreeSet<WordDFIDF>();
397                        for (final Long time : times) {
398                                final TimeperiodTweetCountWordCount tcwc = otherTimes.get(time);
399                                final TimeFrequency indexEntry = timeIndex.get(time);
400
401                                final long Ttf = indexEntry.cumulativeFrequency;
402                                final long tf = indexEntry.periodFrequency;
403                                // Number of tweets containing this word in this time period
404                                final long wf = tcwc.wordcount;
405                                Twf += wf;
406                                final WordDFIDF dfidf = new WordDFIDF(tcwc.timeperiod, wf, tf, Twf, Ttf);
407                                allDFIDF.add(dfidf);
408                        }
409                        final List<WordDFIDF> listVersion = new ArrayList<WordDFIDF>();
410                        listVersion.addAll(allDFIDF);
411                        final WriteableListBinary<WordDFIDF> writeableCollection = new WriteableListBinary<WordDFIDF>(listVersion) {
412                                @Override
413                                protected void writeValue(WordDFIDF v, DataOutput out) throws IOException {
414                                        v.writeBinary(out);
415                                }
416
417                        };
418                        context.write(word, new BytesWritable(IOUtils.serialize(writeableCollection)));
419                }
420        }
421
422        @Override
423        public SimpleSequenceFileStage<LongWritable, BytesWritable, Text, BytesWritable> stage() {
424                return new SimpleSequenceFileStage<LongWritable, BytesWritable, Text, BytesWritable>() {
425                        @Override
426                        public void setup(Job job) {
427                                job.getConfiguration().setStrings(CountWordsAcrossTimeperiod.ARGS_KEY, nonHadoopArgs);
428
429                                // If times are not combined, each reducer has to do a bit more
430                                // work than usual, t
431                                if (!CountWordsAcrossTimeperiod.this.combinedTimes)
432                                        job.setNumReduceTasks(26);
433                        }
434
435                        @Override
436                        public Class<? extends Mapper<LongWritable, BytesWritable, Text, BytesWritable>> mapper() {
437                                return CountWordsAcrossTimeperiod.Map.class;
438                        }
439
440                        @Override
441                        public Class<? extends Reducer<Text, BytesWritable, Text, BytesWritable>> reducer() {
442                                if (CountWordsAcrossTimeperiod.this.combinedTimes)
443                                        return CountWordsAcrossTimeperiod.Reduce.class;
444                                else
445                                        return CountWordsAcrossTimeperiod.NonCombinedTimesReducer.class;
446
447                        }
448
449                        @Override
450                        public String outname() {
451                                return WORDCOUNT_DIR;
452                        }
453                };
454        }
455
456}