001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.mode.dfidf;
031
032import gnu.trove.map.hash.TObjectIntHashMap;
033import gnu.trove.procedure.TLongObjectProcedure;
034
035import java.io.ByteArrayInputStream;
036import java.io.ByteArrayOutputStream;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.HashMap;
040import java.util.List;
041import java.util.Map.Entry;
042
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FSDataInputStream;
045import org.apache.hadoop.fs.FSDataOutputStream;
046import org.apache.hadoop.fs.FileStatus;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.io.BytesWritable;
050import org.apache.hadoop.io.LongWritable;
051import org.apache.hadoop.io.Text;
052import org.apache.hadoop.mapreduce.Counters;
053import org.apache.hadoop.mapreduce.Job;
054import org.apache.hadoop.mapreduce.Mapper;
055import org.apache.hadoop.mapreduce.Reducer;
056import org.joda.time.DateTime;
057import org.kohsuke.args4j.CmdLineException;
058import org.openimaj.hadoop.mapreduce.stage.IdentityReducer;
059import org.openimaj.hadoop.mapreduce.stage.StageProvider;
060import org.openimaj.hadoop.mapreduce.stage.helper.TextLongByteStage;
061import org.openimaj.hadoop.tools.HadoopToolsUtil;
062import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions;
063import org.openimaj.hadoop.tools.twitter.JsonPathFilterSet;
064import org.openimaj.hadoop.tools.twitter.token.mode.TextEntryType;
065import org.openimaj.hadoop.tools.twitter.token.mode.WritableEnumCounter;
066import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.TimeFrequencyHolder.TimeFrequency;
067import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap;
068import org.openimaj.io.IOUtils;
069import org.openimaj.twitter.USMFStatus;
070
071import com.jayway.jsonpath.JsonPath;
072
073/**
074 * A mapper/reducer whose purpose is to do the following:
075 * function(timePeriodLength) So a word in a tweet can happen in the time period
076 * between t - 1 and t. First task: map input: tweetstatus # json twitter status
077 * with JSONPath to words map output: <timePeriod: <word:#freq,tweets:#freq>,
078 * -1:<word:#freq,tweets:#freq> > reduce input: <timePeriod:
079 * [<word:#freq,tweets:#freq>,...,<word:#freq,tweets:#freq>]> reduce output:
080 * <timePeriod: <<tweet:#freq>,<word:#freq>,<word:#freq>,...>
081 *
082 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
083 *
084 */
085public class CountTweetsInTimeperiod extends StageProvider {
086
087        private String[] nonHadoopArgs;
088        private boolean inmemoryCombine;
089        private boolean buildTimeIndex = true;
090        private long timedelta;
091        /**
092         * option for the timecount dir location
093         */
094        public final static String TIMECOUNT_DIR = "timeperiodTweet";
095
096        /**
097         * A time index holding tweet totals and cumulative totals for each time
098         * period
099         */
100        public final static String TIMEINDEX_FILE = "timeindex";
101
102        /**
103         * where to find the global stats file
104         */
105        public final static String GLOBAL_STATS_FILE = "globalstats";
106        private static final String TIMEDELTA = "org.openimaj.hadoop.tools.twitter.token.mode.dfidf.timedelta";
107        /**
108         * A time index holding tweet totals and cumulative totals for each time
109         * period
110         */
111        public final static String TIMEINDEX_LOCATION_PROP = "org.openimaj.hadoop.tools.twitter.token.mode.dfidf.timeindex";
112
113        /**
114         * @param nonHadoopArgs
115         *            to be sent to the stage
116         * @param timedelta
117         *            the time delta between which to quantise time periods
118         */
119        public CountTweetsInTimeperiod(String[] nonHadoopArgs, long timedelta) {
120                this.nonHadoopArgs = nonHadoopArgs;
121                this.inmemoryCombine = false;
122                this.timedelta = timedelta;
123        }
124
125        /**
126         * @param nonHadoopArgs
127         *            to be sent to the stage
128         * @param inMemoryCombine
129         *            whether an in memory combination of word counts should be
130         *            performed
131         * @param timedelta
132         *            the time delta between which to quantise time periods
133         */
134        public CountTweetsInTimeperiod(String[] nonHadoopArgs, boolean inMemoryCombine,
135                        long timedelta)
136        {
137                this.nonHadoopArgs = nonHadoopArgs;
138                this.inmemoryCombine = inMemoryCombine;
139                this.timedelta = timedelta;
140        }
141
142        /**
143         *
144         * map input: tweetstatus # json twitter status with JSONPath to words map
145         * output: <timePeriod: <word:#freq,tweets:#freq>,
146         * -1:<word:#freq,tweets:#freq> >
147         *
148         * @author Jonathon Hare (jsh2@ecs.soton.ac.uk), Sina Samangooei
149         *         <ss@ecs.soton.ac.uk>
150         *
151         */
152        public static class Map extends Mapper<LongWritable, Text, LongWritable, BytesWritable> {
153
154                /**
155                 * Mapper don't care, mapper don't give a fuck
156                 */
157                public Map() {
158
159                }
160
161                /**
162                 * The time used to signify the end, used to count total numbers of
163                 * times a given word appears
164                 */
165                public static final LongWritable END_TIME = new LongWritable(-1);
166                /**
167                 * A total of the number of tweets, must be ignored!
168                 */
169                public static final LongWritable TOTAL_TIME = new LongWritable(-2);
170                private HadoopTwitterTokenToolOptions options;
171                private long timeDeltaMillis;
172                private JsonPath jsonPath;
173                private JsonPathFilterSet filters;
174
175                protected synchronized void loadOptions(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context)
176                                throws IOException
177                {
178                        if (options == null) {
179                                try {
180                                        options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(
181                                                        HadoopTwitterTokenToolOptions.ARGS_KEY));
182                                        options.prepare();
183                                        filters = options.getFilters();
184                                        timeDeltaMillis = context.getConfiguration().getLong(CountTweetsInTimeperiod.TIMEDELTA, 60) * 60 * 1000;
185                                        jsonPath = JsonPath.compile(options.getJsonPath());
186
187                                } catch (final CmdLineException e) {
188                                        throw new IOException(e);
189                                } catch (final Exception e) {
190                                        throw new IOException(e);
191                                }
192                        }
193                }
194
195                private HashMap<Long, TweetCountWordMap> tweetWordMap;
196
197                @Override
198                protected void setup(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws IOException,
199                                InterruptedException
200                {
201                        loadOptions(context);
202                        this.tweetWordMap = new HashMap<Long, TweetCountWordMap>();
203                }
204
205                @Override
206                protected void map(LongWritable key, Text value,
207                                Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws java.io.IOException,
208                                InterruptedException
209                {
210                        List<String> tokens = null;
211                        USMFStatus status = null;
212                        DateTime time = null;
213                        try {
214                                final String svalue = value.toString();
215                                status = new USMFStatus(options.getStatusType().type());
216                                status.fillFromString(svalue);
217                                if (status.isInvalid())
218                                        return;
219                                if (!filters.filter(svalue))
220                                        return;
221                                tokens = jsonPath.read(svalue);
222                                if (tokens == null) {
223                                        context.getCounter(TextEntryType.INVALID_JSON).increment(1);
224                                        // System.err.println("Couldn't read the tokens from the tweet");
225                                        return;
226                                }
227                                if (tokens.size() == 0) {
228                                        context.getCounter(TextEntryType.INVALID_ZEROLENGTH).increment(1);
229                                        return; // Quietly quit, value exists but was empty
230                                }
231                                time = status.createdAt();
232                                if (time == null) {
233                                        context.getCounter(TextEntryType.INVALID_TIME).increment(1);
234                                        // System.err.println("Time was null, this usually means the original tweet had no time. Skip this tweet.");
235                                        return;
236                                }
237
238                        } catch (final Exception e) {
239                                // System.out.println("Couldn't get tokens from:\n" + value +
240                                // "\nwith jsonpath:\n" + jsonPath);
241                                return;
242                        }
243                        // Quantise the time to a specific index
244                        final long timeIndex = (time.getMillis() / timeDeltaMillis) * timeDeltaMillis;
245                        TweetCountWordMap timeWordMap = this.tweetWordMap.get(timeIndex);
246                        // System.out.println("Tweet time: " + time.getMillis());
247                        // System.out.println("Tweet timeindex: " + timeIndex);
248                        if (timeWordMap == null) {
249                                this.tweetWordMap.put(timeIndex, timeWordMap = new TweetCountWordMap());
250                        }
251                        final TObjectIntHashMap<String> tpMap = timeWordMap.getTweetWordMap();
252                        timeWordMap.incrementTweetCount(1);
253                        final List<String> seen = new ArrayList<String>();
254                        for (final String token : tokens) {
255                                // Apply stop words?
256                                // Apply junk words?
257                                // Already seen it?
258
259                                if (seen.contains(token))
260                                        continue;
261                                seen.add(token);
262                                tpMap.adjustOrPutValue(token, 1, 1);
263                                // if(token.equals("...")){
264                                // System.out.println("TOKEN: " + token);
265                                // System.out.println("TIME: " + timeIndex);
266                                // System.out.println("NEW VALUE: " + newv);
267                                // }
268                        }
269                        context.getCounter(TextEntryType.VALID).increment(1);
270                }
271
272                @Override
273                protected void cleanup(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context)
274                                throws IOException, InterruptedException
275                {
276                        System.out.println("Cleaing up mapper, seen " + this.tweetWordMap.entrySet().size() + " time slots");
277                        for (final Entry<Long, TweetCountWordMap> tpMapEntry : this.tweetWordMap.entrySet()) {
278                                final Long time = tpMapEntry.getKey();
279                                final TweetCountWordMap map = tpMapEntry.getValue();
280                                System.out.println("... time( " + time + ") seen " + map.getTweetWordMap().size() + " words");
281                                final ByteArrayOutputStream outarr = new ByteArrayOutputStream();
282                                IOUtils.writeBinary(outarr, map);
283                                final byte[] arr = outarr.toByteArray();
284                                final BytesWritable toWrite = new BytesWritable(arr);
285                                context.write(END_TIME, toWrite);
286                                context.write(new LongWritable(time), toWrite);
287                                context.getCounter(TextEntryType.ACUAL_EMITS).increment(1);
288                        }
289                }
290        }
291
292        /**
293         * Identical to the {@link IdentityReducer} but constructs a time index
294         * found in {@link #TIMEINDEX_FILE}
295         *
296         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
297         *
298         */
299        public static class TimeIndexReducer extends
300                        Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>
301        {
302                private TimeFrequencyHolder timeMap;
303
304                /**
305                 *
306                 */
307                public TimeIndexReducer() {
308                        timeMap = new TimeFrequencyHolder();
309                }
310
311                @Override
312                protected void reduce(LongWritable time, Iterable<BytesWritable> values, Context context) throws IOException,
313                                InterruptedException
314                {
315                        if (time.get() == Map.END_TIME.get()) {
316                                // End time can be ignored entirley in terms of the time index,
317                                // but still pass them on!
318                                for (final BytesWritable tweetwordmapbytes : values) {
319                                        context.write(time, tweetwordmapbytes);
320                                }
321                        }
322                        else {
323                                final TweetCountWordMap accum = new TweetCountWordMap();
324                                for (final BytesWritable tweetwordmapbytes : values) {
325                                        TweetCountWordMap tweetwordmap = null;
326                                        tweetwordmap = IOUtils.read(new ByteArrayInputStream(tweetwordmapbytes.getBytes()),
327                                                        TweetCountWordMap.class);
328                                        accum.combine(tweetwordmap);
329                                        context.write(time, tweetwordmapbytes);
330                                }
331                                final TimeFrequency tf = new TimeFrequency(time.get(), accum.getNTweets());
332                                timeMap.put(tf.time, tf);
333                        }
334
335                }
336
337                @Override
338                protected void cleanup(Context context) throws IOException, InterruptedException {
339                        final String output = context.getConfiguration().getStrings(TIMEINDEX_LOCATION_PROP)[0];
340                        final Path indexOut = new Path(output + "/" + context.getTaskAttemptID());
341                        System.out.println("Writing time index to: " + indexOut);
342                        System.out.println("Timemap contains: " + this.timeMap.size());
343                        CountTweetsInTimeperiod.writeTimeIndex(this.timeMap, indexOut);
344                }
345        }
346
347        /**
348         * reduce input: <timePeriod:
349         * [<word:#freq,tweets:#freq>,...,<word:#freq,tweets:#freq>]> reduce output:
350         * <timePeriod: <<tweet:#freq>,<word:#freq>,<word:#freq>,...>
351         *
352         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
353         *
354         */
355        public static class InMemoryCombiningReducer extends
356                        Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>
357        {
358
359                /**
360                 * default construct does nothing
361                 */
362                public InMemoryCombiningReducer() {
363
364                }
365
366                @Override
367                protected void reduce(LongWritable key, Iterable<BytesWritable> values,
368                                Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context context) throws IOException,
369                                InterruptedException
370                {
371                        final TweetCountWordMap accum = new TweetCountWordMap();
372                        for (final BytesWritable tweetwordmapbytes : values) {
373                                TweetCountWordMap tweetwordmap = null;
374                                tweetwordmap = IOUtils.read(new ByteArrayInputStream(tweetwordmapbytes.getBytes()),
375                                                TweetCountWordMap.class);
376                                accum.combine(tweetwordmap);
377                        }
378                        final ByteArrayOutputStream outstream = new ByteArrayOutputStream();
379                        IOUtils.writeBinary(outstream, accum);
380                        context.write(key, new BytesWritable(outstream.toByteArray()));
381                }
382        }
383
384        @Override
385        public TextLongByteStage stage() {
386                final TextLongByteStage s = new TextLongByteStage() {
387                        private Path actualOutputLocation;
388
389                        @Override
390                        public void setup(Job job) {
391                                job.getConfiguration().setStrings(HadoopTwitterTokenToolOptions.ARGS_KEY, nonHadoopArgs);
392                                job.getConfiguration().setLong(TIMEDELTA, timedelta);
393                                job.getConfiguration().setStrings(TIMEINDEX_LOCATION_PROP,
394                                                new Path(actualOutputLocation, TIMEINDEX_FILE).toString());
395                                if (!inmemoryCombine) {
396                                        if (!buildTimeIndex) {
397                                                job.setNumReduceTasks(0);
398                                        }
399                                        else {
400                                                job.setNumReduceTasks(10);
401                                        }
402                                }
403                        }
404
405                        @Override
406                        public Class<? extends Mapper<LongWritable, Text, LongWritable, BytesWritable>> mapper() {
407                                return CountTweetsInTimeperiod.Map.class;
408                        }
409
410                        @Override
411                        public Class<? extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>> reducer() {
412                                if (inmemoryCombine)
413                                        return CountTweetsInTimeperiod.InMemoryCombiningReducer.class;
414                                else if (buildTimeIndex)
415                                        return CountTweetsInTimeperiod.TimeIndexReducer.class;
416                                else
417                                        return super.reducer();
418                        }
419
420                        @Override
421                        public Job stage(Path[] inputs, Path output, Configuration conf) throws Exception {
422                                this.actualOutputLocation = output;
423                                return super.stage(inputs, output, conf);
424                        }
425
426                        @Override
427                        public String outname() {
428                                return TIMECOUNT_DIR;
429                        }
430
431                        @Override
432                        public void finished(Job job) {
433                                Counters counters;
434                                try {
435                                        counters = job.getCounters();
436                                } catch (final IOException e) {
437                                        // System.out.println("Counters not found!");
438                                        return;
439                                }
440                                // Prepare a writer to the actual output location
441                                final Path out = new Path(actualOutputLocation, GLOBAL_STATS_FILE);
442
443                                FileSystem fs;
444                                try {
445                                        fs = HadoopToolsUtil.getFileSystem(out);
446                                        final FSDataOutputStream os = fs.create(out);
447                                        IOUtils.writeASCII(os, new WritableEnumCounter<TextEntryType>(counters, TextEntryType.values()) {
448                                                @Override
449                                                public TextEntryType valueOf(String str) {
450                                                        return TextEntryType.valueOf(str);
451                                                }
452
453                                        });
454                                } catch (final IOException e) {
455                                }
456
457                        }
458                };
459                return s;
460        }
461
462        /**
463         * Write a timeindex to a {@link Path}
464         *
465         * @param timeMap
466         * @param indexOut
467         * @throws IOException
468         */
469        public static void writeTimeIndex(TimeFrequencyHolder timeMap, Path indexOut) throws IOException {
470                FSDataOutputStream os = null;
471                try {
472
473                        final FileSystem fs = HadoopToolsUtil.getFileSystem(indexOut);
474                        os = fs.create(indexOut, true);
475                        IOUtils.writeBinary(os, timeMap);
476                        os.flush();
477                } finally {
478                        os.close();
479                }
480        }
481
482        /**
483         * Read a {@link TimeFrequencyHolder} from a {@link Path}. Path is assumed
484         * to be a directory containing many {@link TimeFrequencyHolder} instances.
485         *
486         * @param indexOut
487         * @return a new {@link TimeFrequencyHolder}
488         * @throws IOException
489         */
490        public static TimeFrequencyHolder readTimeIndex(Path indexOut) throws IOException {
491                if (!HadoopToolsUtil.fileExists(indexOut.toString())) {
492                        return null;
493                }
494                System.out.println("Reading time index from: " + indexOut);
495                final TimeFrequencyHolder tfh = new TimeFrequencyHolder();
496
497                final FileSystem fs = HadoopToolsUtil.getFileSystem(indexOut);
498                final FileStatus[] indexParts = fs.listStatus(indexOut);
499                for (final FileStatus fileStatus : indexParts) {
500                        System.out.println("Reading index part: " + fileStatus.getPath());
501                        FSDataInputStream in = null;
502                        try {
503                                in = fs.open(fileStatus.getPath());
504                                final TimeFrequencyHolder tempTfh = IOUtils.read(in, TimeFrequencyHolder.class);
505                                tempTfh.forEachEntry(new TLongObjectProcedure<TimeFrequency>() {
506                                        @Override
507                                        public boolean execute(long a, TimeFrequency b) {
508                                                tfh.put(a, b); // This is safe because each time
509                                                // frequency should contain completely
510                                                // unique times!
511                                                return true;
512                                        }
513                                });
514                        } finally {
515                                in.close();
516                        }
517                }
518                tfh.recalculateCumulativeFrequencies();
519                return tfh;
520
521        }
522
523        /**
524         * @param outpath
525         * @return the index location if it exists
526         */
527        public static Path constructIndexPath(Path outpath) {
528                final Path retPath = new Path(new Path(outpath, CountTweetsInTimeperiod.TIMECOUNT_DIR), TIMEINDEX_FILE);
529                return retPath;
530        }
531}