001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.mode.dfidf; 031 032import gnu.trove.procedure.TObjectIntProcedure; 033 034import java.io.ByteArrayInputStream; 035import java.io.ByteArrayOutputStream; 036import java.io.DataOutput; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.HashMap; 040import java.util.List; 041import java.util.TreeSet; 042 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.io.BytesWritable; 046import org.apache.hadoop.io.LongWritable; 047import org.apache.hadoop.io.Text; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.mapreduce.Mapper; 050import org.apache.hadoop.mapreduce.Reducer; 051import org.kohsuke.args4j.CmdLineException; 052import org.openimaj.hadoop.mapreduce.stage.StageProvider; 053import org.openimaj.hadoop.mapreduce.stage.helper.SimpleSequenceFileStage; 054import org.openimaj.hadoop.tools.HadoopToolsUtil; 055import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions; 056import org.openimaj.hadoop.tools.twitter.token.mode.TextEntryType; 057import org.openimaj.hadoop.tools.twitter.token.mode.WritableEnumCounter; 058import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.TimeFrequencyHolder.TimeFrequency; 059import org.openimaj.hadoop.tools.twitter.utils.TimeperiodTweetCountWordCount; 060import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap; 061import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF; 062import org.openimaj.io.IOUtils; 063import org.openimaj.io.wrappers.WriteableListBinary; 064 065/** 066 * A Stage provider wrapping the functionality of 067 * {@link CountWordsAcrossTimeperiod.Map} and 068 * {@link CountWordsAcrossTimeperiod.Reduce}. 069 * 070 * The Map expects times as keys and maps of word counts as input. The mapper 071 * emits words as keys with pairs of the timeperiods and counts within that time 072 * period 073 * 074 * This along with the TimeIndex generated by the 075 * {@link CountTweetsInTimeperiod} is used to construct a DFIDF per word per 076 * time period. 077 * 078 * One interpretation of the DFIDF score is a weighting based on counts up to a 079 * particular time period. This functionality is encoded in the 080 * {@link NonCombinedTimesReducer} 081 * 082 * Another interpretation is that the DFIDF score weights words based on word 083 * occurences at the end of some period of time, i.e. the count at some END 084 * 085 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 086 * 087 */ 088public class CountWordsAcrossTimeperiod extends StageProvider { 089 private String[] nonHadoopArgs; 090 private boolean combinedTimes = false; 091 092 /** 093 * 094 * @param nonHadoopArgs 095 */ 096 public CountWordsAcrossTimeperiod(String[] nonHadoopArgs) { 097 this.nonHadoopArgs = nonHadoopArgs; 098 } 099 100 /** 101 * @param nonHadoopArgs 102 * @param combinedTimes 103 * whether the mapper expects times entries with values for each 104 * word. i.e. combined times 105 */ 106 public CountWordsAcrossTimeperiod(String[] nonHadoopArgs, boolean combinedTimes) { 107 this.nonHadoopArgs = nonHadoopArgs; 108 this.combinedTimes = combinedTimes; 109 } 110 111 /** 112 * arg key 113 */ 114 public static final String ARGS_KEY = "TOKEN_ARGS"; 115 private static final LongWritable END_TIME = new LongWritable(-1); 116 /** 117 * where the intermediat word counts should be stored 118 */ 119 public final static String WORDCOUNT_DIR = "wordtimeperiodDFIDF"; 120 121 /** 122 * function(timePeriodLength) map input: <timePeriod: 123 * <<tweet:#freq>,<word:#freq>,<word:#freq>,...> map output: [ word: 124 * <timeperiod, tweet:#freq, word:#freq>, word: <timeperiod, tweet:#freq, 125 * word:#freq>, ... ] 126 * 127 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 128 * 129 */ 130 public static class Map extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> { 131 132 /** 133 * Mapper constructor doesn't do anything (Mapper constructor doesn't 134 * give a fuck) 135 */ 136 public Map() { 137 138 } 139 140 private static HadoopTwitterTokenToolOptions options; 141 142 protected static synchronized void loadOptions( 143 Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException 144 { 145 if (options == null) { 146 try { 147 options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(ARGS_KEY)); 148 options.prepare(); 149 } catch (final CmdLineException e) { 150 throw new IOException(e); 151 } catch (final Exception e) { 152 throw new IOException(e); 153 } 154 } 155 } 156 157 @Override 158 protected void setup(Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context) 159 throws IOException, InterruptedException 160 { 161 loadOptions(context); 162 } 163 164 @Override 165 protected void map(final LongWritable key, BytesWritable value, 166 final Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context) 167 throws java.io.IOException, InterruptedException 168 { 169 170 final TweetCountWordMap periodCountWordCount = IOUtils.read(new ByteArrayInputStream(value.getBytes()), 171 TweetCountWordMap.class); 172 final boolean written = periodCountWordCount.getTweetWordMap().forEachEntry(new TObjectIntProcedure<String>() 173 { 174 175 @Override 176 public boolean execute(String word, int wordCount) { 177 final TimeperiodTweetCountWordCount timeMap = new TimeperiodTweetCountWordCount(key.get(), wordCount, 178 periodCountWordCount.getNTweets()); 179 final ByteArrayOutputStream os = new ByteArrayOutputStream(); 180 try { 181 IOUtils.writeBinary(os, timeMap); 182 final BytesWritable writeable = new BytesWritable(os.toByteArray()); 183 context.write(new Text(word), writeable); 184 } catch (final IOException e) { 185 return false; 186 } catch (final InterruptedException e) { 187 return false; 188 } 189 190 return true; 191 } 192 }); 193 if (!written) { 194 throw new IOException("Couldn't write the TimeperiodTweetCountWordCount object"); 195 } 196 } 197 } 198 199 /** 200 * reduce input: <word: [ <timeperiod, tweet:#freq, word:#freq>, 201 * <timeperiod, tweet:#freq, word:#freq>,... ] reduce output: # <word: 202 * <timePeriod, DFIDF>,...> 203 * 204 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 205 * 206 */ 207 public static class Reduce extends Reducer<Text, BytesWritable, Text, BytesWritable> { 208 209 /** 210 * default construct does nothing 211 */ 212 public Reduce() { 213 214 } 215 216 @Override 217 protected void reduce(Text word, Iterable<BytesWritable> values, 218 Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, 219 InterruptedException 220 { 221 // read all timeperiods to objects, find the END_TIME instance, hold 222 // the rest 223 /* 224 * # read total tweet frequency from timeperiod -1 Ttf # read total 225 * word tweet frequency from timeperiod -1 Twf # read time period 226 * tweet frequency from entry tf # read time period word frequency 227 * from entry wf 228 */ 229 TimeperiodTweetCountWordCount endTime = null; 230 final List<TimeperiodTweetCountWordCount> otherTimes = new ArrayList<TimeperiodTweetCountWordCount>(); 231 for (final BytesWritable inputArr : values) { 232 final ByteArrayInputStream stream = new ByteArrayInputStream(inputArr.getBytes()); 233 final TimeperiodTweetCountWordCount instance = IOUtils.read(stream, TimeperiodTweetCountWordCount.class); 234 if (instance.timeperiod == END_TIME.get()) 235 endTime = instance; 236 else 237 otherTimes.add(instance); 238 } 239 /* 240 * # for entry in input: # DF = wf/tf # IDF = Ttf/Twf 241 */ 242 // Total number of tweets in all timeperiods 243 final long Ttf = endTime.tweetcount; 244 // Number of tweets containing this word in all timeperiods 245 final long Twf = endTime.wordcount; 246 final TreeSet<WordDFIDF> allDFIDF = new TreeSet<WordDFIDF>(); 247 for (final TimeperiodTweetCountWordCount tcwc : otherTimes) { 248 // Number of tweets in this timeperiod 249 final long tf = tcwc.tweetcount; 250 // Number of tweets containing this word in this time period 251 final long wf = tcwc.wordcount; 252 253 final WordDFIDF dfidf = new WordDFIDF(tcwc.timeperiod, wf, tf, Twf, Ttf); 254 allDFIDF.add(dfidf); 255 } 256 final List<WordDFIDF> listVersion = new ArrayList<WordDFIDF>(); 257 listVersion.addAll(allDFIDF); 258 final WriteableListBinary<WordDFIDF> writeableCollection = new WriteableListBinary<WordDFIDF>(listVersion) { 259 @Override 260 protected void writeValue(WordDFIDF v, DataOutput out) throws IOException { 261 v.writeBinary(out); 262 } 263 264 }; 265 context.write(word, new BytesWritable(IOUtils.serialize(writeableCollection))); 266 } 267 } 268 269 /** 270 * reduce input: <word: [ <timeperiod, tweet:#freq, word:#freq>, 271 * <timeperiod, tweet:#freq, word:#freq>,... ] but unlike {@link Reduce} 272 * expects that each timeperiod may appear multiple times (i.e. each 273 * timeperiod was not combined!) reduce output: # <word: <timePeriod, 274 * DFIDF>,...> 275 * 276 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 277 */ 278 public static class NonCombinedTimesReducer extends 279 Reducer<Text, BytesWritable, Text, BytesWritable> 280 { 281 282 private HadoopTwitterTokenToolOptions options; 283 private WritableEnumCounter<TextEntryType> tgs; 284 private TimeFrequencyHolder timeIndex; 285 286 /** 287 * default construct does nothing 288 */ 289 public NonCombinedTimesReducer() { 290 291 } 292 293 protected synchronized void loadOptions(Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) 294 throws IOException 295 { 296 try { 297 options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(ARGS_KEY)); 298 options.prepare(); 299 final Path outpath = HadoopToolsUtil.getOutputPath(options); 300 301 timeIndex = CountTweetsInTimeperiod.readTimeIndex(CountTweetsInTimeperiod.constructIndexPath(outpath)); 302 final Path timecountOut = new Path(outpath, CountTweetsInTimeperiod.TIMECOUNT_DIR); 303 final Path statsout = new Path(timecountOut, CountTweetsInTimeperiod.GLOBAL_STATS_FILE); 304 final FileSystem fs = HadoopToolsUtil.getFileSystem(statsout); 305 final WritableEnumCounter<TextEntryType> et = new WritableEnumCounter<TextEntryType>() { 306 @Override 307 public TextEntryType valueOf(String str) { 308 return TextEntryType.valueOf(str); 309 } 310 311 }; 312 tgs = IOUtils.read(fs.open(statsout), et); 313 } catch (final CmdLineException e) { 314 e.printStackTrace(); 315 throw new IOException(e); 316 } catch (final Exception e) { 317 e.printStackTrace(); 318 throw new IOException(e); 319 } 320 } 321 322 @Override 323 protected void setup(Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, 324 InterruptedException 325 { 326 loadOptions(context); 327 } 328 329 @Override 330 protected void reduce(Text word, Iterable<BytesWritable> values, 331 Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, 332 InterruptedException 333 { 334 // read all timeperiods to objects, find the END_TIME instance, hold 335 // the rest 336 /* 337 * read time period tweet frequency from entry tf read time period 338 * word frequency from entry wf construct a single count for all 339 * time periods of the above 340 * 341 * Go through compiled timeperiod counts in order, keep a count of 342 * total tweets up to a time period and total words up to a time 343 * period use the current total to construct a WordDFIDF object per 344 * time period 345 */ 346 TimeperiodTweetCountWordCount endTime = null; 347 final TreeSet<Long> times = new TreeSet<Long>(); 348 final HashMap<Long, TimeperiodTweetCountWordCount> otherTimes = new HashMap<Long, TimeperiodTweetCountWordCount>(); 349 System.out.println("STARTING WORD: " + word); 350 for (final BytesWritable inputArr : values) { 351 final ByteArrayInputStream stream = new ByteArrayInputStream(inputArr.getBytes()); 352 final TimeperiodTweetCountWordCount instance = IOUtils.read(stream, TimeperiodTweetCountWordCount.class); 353 System.out.println("... FOUND TIME INSTANCE:" + instance.timeperiod); 354 if (instance.timeperiod == END_TIME.get()) 355 { 356 if (endTime == null) 357 { 358 // System.out.println("... end time CREATED"); 359 endTime = instance; 360 endTime.tweetcount = tgs.getValue(TextEntryType.VALID); 361 } 362 else 363 { 364 // System.out.println("... end time INCREMENTED"); 365 endTime.wordcount += instance.wordcount; 366 } 367 // Skip, not important! 368 369 } 370 else 371 { 372 times.add(instance.timeperiod); 373 final TimeperiodTweetCountWordCount currentTimeCounter = otherTimes.get(instance.timeperiod); 374 System.out.println("Instance tweet count: " + instance.tweetcount); 375 System.out.println("Instance word count: " + instance.wordcount); 376 if (currentTimeCounter == null) { 377 System.out.println("... time CREATED"); 378 otherTimes.put(instance.timeperiod, instance); 379 } 380 else { 381 System.out.println("... incremented time CREATED"); 382 currentTimeCounter.tweetcount += instance.tweetcount; 383 currentTimeCounter.wordcount += instance.wordcount; 384 } 385 } 386 } 387 // System.out.println("... TOTAL tweets = " + endTime.tweetcount); 388 // System.out.println("... TOTAL tweets with THIS word = " + 389 // endTime.wordcount); 390 /* 391 * # for entry in input: # DF = wf/tf # IDF = Ttf/Twf 392 */ 393 // Total number of tweets seen so far 394 // Number of tweets containing this word seen so far 395 long Twf = 0; 396 final TreeSet<WordDFIDF> allDFIDF = new TreeSet<WordDFIDF>(); 397 for (final Long time : times) { 398 final TimeperiodTweetCountWordCount tcwc = otherTimes.get(time); 399 final TimeFrequency indexEntry = timeIndex.get(time); 400 401 final long Ttf = indexEntry.cumulativeFrequency; 402 final long tf = indexEntry.periodFrequency; 403 // Number of tweets containing this word in this time period 404 final long wf = tcwc.wordcount; 405 Twf += wf; 406 final WordDFIDF dfidf = new WordDFIDF(tcwc.timeperiod, wf, tf, Twf, Ttf); 407 allDFIDF.add(dfidf); 408 } 409 final List<WordDFIDF> listVersion = new ArrayList<WordDFIDF>(); 410 listVersion.addAll(allDFIDF); 411 final WriteableListBinary<WordDFIDF> writeableCollection = new WriteableListBinary<WordDFIDF>(listVersion) { 412 @Override 413 protected void writeValue(WordDFIDF v, DataOutput out) throws IOException { 414 v.writeBinary(out); 415 } 416 417 }; 418 context.write(word, new BytesWritable(IOUtils.serialize(writeableCollection))); 419 } 420 } 421 422 @Override 423 public SimpleSequenceFileStage<LongWritable, BytesWritable, Text, BytesWritable> stage() { 424 return new SimpleSequenceFileStage<LongWritable, BytesWritable, Text, BytesWritable>() { 425 @Override 426 public void setup(Job job) { 427 job.getConfiguration().setStrings(CountWordsAcrossTimeperiod.ARGS_KEY, nonHadoopArgs); 428 429 // If times are not combined, each reducer has to do a bit more 430 // work than usual, t 431 if (!CountWordsAcrossTimeperiod.this.combinedTimes) 432 job.setNumReduceTasks(26); 433 } 434 435 @Override 436 public Class<? extends Mapper<LongWritable, BytesWritable, Text, BytesWritable>> mapper() { 437 return CountWordsAcrossTimeperiod.Map.class; 438 } 439 440 @Override 441 public Class<? extends Reducer<Text, BytesWritable, Text, BytesWritable>> reducer() { 442 if (CountWordsAcrossTimeperiod.this.combinedTimes) 443 return CountWordsAcrossTimeperiod.Reduce.class; 444 else 445 return CountWordsAcrossTimeperiod.NonCombinedTimesReducer.class; 446 447 } 448 449 @Override 450 public String outname() { 451 return WORDCOUNT_DIR; 452 } 453 }; 454 } 455 456}