001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.mode.dfidf; 031 032import gnu.trove.map.hash.TObjectIntHashMap; 033import gnu.trove.procedure.TLongObjectProcedure; 034 035import java.io.ByteArrayInputStream; 036import java.io.ByteArrayOutputStream; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.HashMap; 040import java.util.List; 041import java.util.Map.Entry; 042 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FSDataInputStream; 045import org.apache.hadoop.fs.FSDataOutputStream; 046import org.apache.hadoop.fs.FileStatus; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.io.BytesWritable; 050import org.apache.hadoop.io.LongWritable; 051import org.apache.hadoop.io.Text; 052import org.apache.hadoop.mapreduce.Counters; 053import org.apache.hadoop.mapreduce.Job; 054import org.apache.hadoop.mapreduce.Mapper; 055import org.apache.hadoop.mapreduce.Reducer; 056import org.joda.time.DateTime; 057import org.kohsuke.args4j.CmdLineException; 058import org.openimaj.hadoop.mapreduce.stage.IdentityReducer; 059import org.openimaj.hadoop.mapreduce.stage.StageProvider; 060import org.openimaj.hadoop.mapreduce.stage.helper.TextLongByteStage; 061import org.openimaj.hadoop.tools.HadoopToolsUtil; 062import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions; 063import org.openimaj.hadoop.tools.twitter.JsonPathFilterSet; 064import org.openimaj.hadoop.tools.twitter.token.mode.TextEntryType; 065import org.openimaj.hadoop.tools.twitter.token.mode.WritableEnumCounter; 066import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.TimeFrequencyHolder.TimeFrequency; 067import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap; 068import org.openimaj.io.IOUtils; 069import org.openimaj.twitter.USMFStatus; 070 071import com.jayway.jsonpath.JsonPath; 072 073/** 074 * A mapper/reducer whose purpose is to do the following: 075 * function(timePeriodLength) So a word in a tweet can happen in the time period 076 * between t - 1 and t. First task: map input: tweetstatus # json twitter status 077 * with JSONPath to words map output: <timePeriod: <word:#freq,tweets:#freq>, 078 * -1:<word:#freq,tweets:#freq> > reduce input: <timePeriod: 079 * [<word:#freq,tweets:#freq>,...,<word:#freq,tweets:#freq>]> reduce output: 080 * <timePeriod: <<tweet:#freq>,<word:#freq>,<word:#freq>,...> 081 * 082 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 083 * 084 */ 085public class CountTweetsInTimeperiod extends StageProvider { 086 087 private String[] nonHadoopArgs; 088 private boolean inmemoryCombine; 089 private boolean buildTimeIndex = true; 090 private long timedelta; 091 /** 092 * option for the timecount dir location 093 */ 094 public final static String TIMECOUNT_DIR = "timeperiodTweet"; 095 096 /** 097 * A time index holding tweet totals and cumulative totals for each time 098 * period 099 */ 100 public final static String TIMEINDEX_FILE = "timeindex"; 101 102 /** 103 * where to find the global stats file 104 */ 105 public final static String GLOBAL_STATS_FILE = "globalstats"; 106 private static final String TIMEDELTA = "org.openimaj.hadoop.tools.twitter.token.mode.dfidf.timedelta"; 107 /** 108 * A time index holding tweet totals and cumulative totals for each time 109 * period 110 */ 111 public final static String TIMEINDEX_LOCATION_PROP = "org.openimaj.hadoop.tools.twitter.token.mode.dfidf.timeindex"; 112 113 /** 114 * @param nonHadoopArgs 115 * to be sent to the stage 116 * @param timedelta 117 * the time delta between which to quantise time periods 118 */ 119 public CountTweetsInTimeperiod(String[] nonHadoopArgs, long timedelta) { 120 this.nonHadoopArgs = nonHadoopArgs; 121 this.inmemoryCombine = false; 122 this.timedelta = timedelta; 123 } 124 125 /** 126 * @param nonHadoopArgs 127 * to be sent to the stage 128 * @param inMemoryCombine 129 * whether an in memory combination of word counts should be 130 * performed 131 * @param timedelta 132 * the time delta between which to quantise time periods 133 */ 134 public CountTweetsInTimeperiod(String[] nonHadoopArgs, boolean inMemoryCombine, 135 long timedelta) 136 { 137 this.nonHadoopArgs = nonHadoopArgs; 138 this.inmemoryCombine = inMemoryCombine; 139 this.timedelta = timedelta; 140 } 141 142 /** 143 * 144 * map input: tweetstatus # json twitter status with JSONPath to words map 145 * output: <timePeriod: <word:#freq,tweets:#freq>, 146 * -1:<word:#freq,tweets:#freq> > 147 * 148 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk), Sina Samangooei 149 * <ss@ecs.soton.ac.uk> 150 * 151 */ 152 public static class Map extends Mapper<LongWritable, Text, LongWritable, BytesWritable> { 153 154 /** 155 * Mapper don't care, mapper don't give a fuck 156 */ 157 public Map() { 158 159 } 160 161 /** 162 * The time used to signify the end, used to count total numbers of 163 * times a given word appears 164 */ 165 public static final LongWritable END_TIME = new LongWritable(-1); 166 /** 167 * A total of the number of tweets, must be ignored! 168 */ 169 public static final LongWritable TOTAL_TIME = new LongWritable(-2); 170 private HadoopTwitterTokenToolOptions options; 171 private long timeDeltaMillis; 172 private JsonPath jsonPath; 173 private JsonPathFilterSet filters; 174 175 protected synchronized void loadOptions(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) 176 throws IOException 177 { 178 if (options == null) { 179 try { 180 options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings( 181 HadoopTwitterTokenToolOptions.ARGS_KEY)); 182 options.prepare(); 183 filters = options.getFilters(); 184 timeDeltaMillis = context.getConfiguration().getLong(CountTweetsInTimeperiod.TIMEDELTA, 60) * 60 * 1000; 185 jsonPath = JsonPath.compile(options.getJsonPath()); 186 187 } catch (final CmdLineException e) { 188 throw new IOException(e); 189 } catch (final Exception e) { 190 throw new IOException(e); 191 } 192 } 193 } 194 195 private HashMap<Long, TweetCountWordMap> tweetWordMap; 196 197 @Override 198 protected void setup(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws IOException, 199 InterruptedException 200 { 201 loadOptions(context); 202 this.tweetWordMap = new HashMap<Long, TweetCountWordMap>(); 203 } 204 205 @Override 206 protected void map(LongWritable key, Text value, 207 Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws java.io.IOException, 208 InterruptedException 209 { 210 List<String> tokens = null; 211 USMFStatus status = null; 212 DateTime time = null; 213 try { 214 final String svalue = value.toString(); 215 status = new USMFStatus(options.getStatusType().type()); 216 status.fillFromString(svalue); 217 if (status.isInvalid()) 218 return; 219 if (!filters.filter(svalue)) 220 return; 221 tokens = jsonPath.read(svalue); 222 if (tokens == null) { 223 context.getCounter(TextEntryType.INVALID_JSON).increment(1); 224 // System.err.println("Couldn't read the tokens from the tweet"); 225 return; 226 } 227 if (tokens.size() == 0) { 228 context.getCounter(TextEntryType.INVALID_ZEROLENGTH).increment(1); 229 return; // Quietly quit, value exists but was empty 230 } 231 time = status.createdAt(); 232 if (time == null) { 233 context.getCounter(TextEntryType.INVALID_TIME).increment(1); 234 // System.err.println("Time was null, this usually means the original tweet had no time. Skip this tweet."); 235 return; 236 } 237 238 } catch (final Exception e) { 239 // System.out.println("Couldn't get tokens from:\n" + value + 240 // "\nwith jsonpath:\n" + jsonPath); 241 return; 242 } 243 // Quantise the time to a specific index 244 final long timeIndex = (time.getMillis() / timeDeltaMillis) * timeDeltaMillis; 245 TweetCountWordMap timeWordMap = this.tweetWordMap.get(timeIndex); 246 // System.out.println("Tweet time: " + time.getMillis()); 247 // System.out.println("Tweet timeindex: " + timeIndex); 248 if (timeWordMap == null) { 249 this.tweetWordMap.put(timeIndex, timeWordMap = new TweetCountWordMap()); 250 } 251 final TObjectIntHashMap<String> tpMap = timeWordMap.getTweetWordMap(); 252 timeWordMap.incrementTweetCount(1); 253 final List<String> seen = new ArrayList<String>(); 254 for (final String token : tokens) { 255 // Apply stop words? 256 // Apply junk words? 257 // Already seen it? 258 259 if (seen.contains(token)) 260 continue; 261 seen.add(token); 262 tpMap.adjustOrPutValue(token, 1, 1); 263 // if(token.equals("...")){ 264 // System.out.println("TOKEN: " + token); 265 // System.out.println("TIME: " + timeIndex); 266 // System.out.println("NEW VALUE: " + newv); 267 // } 268 } 269 context.getCounter(TextEntryType.VALID).increment(1); 270 } 271 272 @Override 273 protected void cleanup(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) 274 throws IOException, InterruptedException 275 { 276 System.out.println("Cleaing up mapper, seen " + this.tweetWordMap.entrySet().size() + " time slots"); 277 for (final Entry<Long, TweetCountWordMap> tpMapEntry : this.tweetWordMap.entrySet()) { 278 final Long time = tpMapEntry.getKey(); 279 final TweetCountWordMap map = tpMapEntry.getValue(); 280 System.out.println("... time( " + time + ") seen " + map.getTweetWordMap().size() + " words"); 281 final ByteArrayOutputStream outarr = new ByteArrayOutputStream(); 282 IOUtils.writeBinary(outarr, map); 283 final byte[] arr = outarr.toByteArray(); 284 final BytesWritable toWrite = new BytesWritable(arr); 285 context.write(END_TIME, toWrite); 286 context.write(new LongWritable(time), toWrite); 287 context.getCounter(TextEntryType.ACUAL_EMITS).increment(1); 288 } 289 } 290 } 291 292 /** 293 * Identical to the {@link IdentityReducer} but constructs a time index 294 * found in {@link #TIMEINDEX_FILE} 295 * 296 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 297 * 298 */ 299 public static class TimeIndexReducer extends 300 Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> 301 { 302 private TimeFrequencyHolder timeMap; 303 304 /** 305 * 306 */ 307 public TimeIndexReducer() { 308 timeMap = new TimeFrequencyHolder(); 309 } 310 311 @Override 312 protected void reduce(LongWritable time, Iterable<BytesWritable> values, Context context) throws IOException, 313 InterruptedException 314 { 315 if (time.get() == Map.END_TIME.get()) { 316 // End time can be ignored entirley in terms of the time index, 317 // but still pass them on! 318 for (final BytesWritable tweetwordmapbytes : values) { 319 context.write(time, tweetwordmapbytes); 320 } 321 } 322 else { 323 final TweetCountWordMap accum = new TweetCountWordMap(); 324 for (final BytesWritable tweetwordmapbytes : values) { 325 TweetCountWordMap tweetwordmap = null; 326 tweetwordmap = IOUtils.read(new ByteArrayInputStream(tweetwordmapbytes.getBytes()), 327 TweetCountWordMap.class); 328 accum.combine(tweetwordmap); 329 context.write(time, tweetwordmapbytes); 330 } 331 final TimeFrequency tf = new TimeFrequency(time.get(), accum.getNTweets()); 332 timeMap.put(tf.time, tf); 333 } 334 335 } 336 337 @Override 338 protected void cleanup(Context context) throws IOException, InterruptedException { 339 final String output = context.getConfiguration().getStrings(TIMEINDEX_LOCATION_PROP)[0]; 340 final Path indexOut = new Path(output + "/" + context.getTaskAttemptID()); 341 System.out.println("Writing time index to: " + indexOut); 342 System.out.println("Timemap contains: " + this.timeMap.size()); 343 CountTweetsInTimeperiod.writeTimeIndex(this.timeMap, indexOut); 344 } 345 } 346 347 /** 348 * reduce input: <timePeriod: 349 * [<word:#freq,tweets:#freq>,...,<word:#freq,tweets:#freq>]> reduce output: 350 * <timePeriod: <<tweet:#freq>,<word:#freq>,<word:#freq>,...> 351 * 352 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 353 * 354 */ 355 public static class InMemoryCombiningReducer extends 356 Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> 357 { 358 359 /** 360 * default construct does nothing 361 */ 362 public InMemoryCombiningReducer() { 363 364 } 365 366 @Override 367 protected void reduce(LongWritable key, Iterable<BytesWritable> values, 368 Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context context) throws IOException, 369 InterruptedException 370 { 371 final TweetCountWordMap accum = new TweetCountWordMap(); 372 for (final BytesWritable tweetwordmapbytes : values) { 373 TweetCountWordMap tweetwordmap = null; 374 tweetwordmap = IOUtils.read(new ByteArrayInputStream(tweetwordmapbytes.getBytes()), 375 TweetCountWordMap.class); 376 accum.combine(tweetwordmap); 377 } 378 final ByteArrayOutputStream outstream = new ByteArrayOutputStream(); 379 IOUtils.writeBinary(outstream, accum); 380 context.write(key, new BytesWritable(outstream.toByteArray())); 381 } 382 } 383 384 @Override 385 public TextLongByteStage stage() { 386 final TextLongByteStage s = new TextLongByteStage() { 387 private Path actualOutputLocation; 388 389 @Override 390 public void setup(Job job) { 391 job.getConfiguration().setStrings(HadoopTwitterTokenToolOptions.ARGS_KEY, nonHadoopArgs); 392 job.getConfiguration().setLong(TIMEDELTA, timedelta); 393 job.getConfiguration().setStrings(TIMEINDEX_LOCATION_PROP, 394 new Path(actualOutputLocation, TIMEINDEX_FILE).toString()); 395 if (!inmemoryCombine) { 396 if (!buildTimeIndex) { 397 job.setNumReduceTasks(0); 398 } 399 else { 400 job.setNumReduceTasks(10); 401 } 402 } 403 } 404 405 @Override 406 public Class<? extends Mapper<LongWritable, Text, LongWritable, BytesWritable>> mapper() { 407 return CountTweetsInTimeperiod.Map.class; 408 } 409 410 @Override 411 public Class<? extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>> reducer() { 412 if (inmemoryCombine) 413 return CountTweetsInTimeperiod.InMemoryCombiningReducer.class; 414 else if (buildTimeIndex) 415 return CountTweetsInTimeperiod.TimeIndexReducer.class; 416 else 417 return super.reducer(); 418 } 419 420 @Override 421 public Job stage(Path[] inputs, Path output, Configuration conf) throws Exception { 422 this.actualOutputLocation = output; 423 return super.stage(inputs, output, conf); 424 } 425 426 @Override 427 public String outname() { 428 return TIMECOUNT_DIR; 429 } 430 431 @Override 432 public void finished(Job job) { 433 Counters counters; 434 try { 435 counters = job.getCounters(); 436 } catch (final IOException e) { 437 // System.out.println("Counters not found!"); 438 return; 439 } 440 // Prepare a writer to the actual output location 441 final Path out = new Path(actualOutputLocation, GLOBAL_STATS_FILE); 442 443 FileSystem fs; 444 try { 445 fs = HadoopToolsUtil.getFileSystem(out); 446 final FSDataOutputStream os = fs.create(out); 447 IOUtils.writeASCII(os, new WritableEnumCounter<TextEntryType>(counters, TextEntryType.values()) { 448 @Override 449 public TextEntryType valueOf(String str) { 450 return TextEntryType.valueOf(str); 451 } 452 453 }); 454 } catch (final IOException e) { 455 } 456 457 } 458 }; 459 return s; 460 } 461 462 /** 463 * Write a timeindex to a {@link Path} 464 * 465 * @param timeMap 466 * @param indexOut 467 * @throws IOException 468 */ 469 public static void writeTimeIndex(TimeFrequencyHolder timeMap, Path indexOut) throws IOException { 470 FSDataOutputStream os = null; 471 try { 472 473 final FileSystem fs = HadoopToolsUtil.getFileSystem(indexOut); 474 os = fs.create(indexOut, true); 475 IOUtils.writeBinary(os, timeMap); 476 os.flush(); 477 } finally { 478 os.close(); 479 } 480 } 481 482 /** 483 * Read a {@link TimeFrequencyHolder} from a {@link Path}. Path is assumed 484 * to be a directory containing many {@link TimeFrequencyHolder} instances. 485 * 486 * @param indexOut 487 * @return a new {@link TimeFrequencyHolder} 488 * @throws IOException 489 */ 490 public static TimeFrequencyHolder readTimeIndex(Path indexOut) throws IOException { 491 if (!HadoopToolsUtil.fileExists(indexOut.toString())) { 492 return null; 493 } 494 System.out.println("Reading time index from: " + indexOut); 495 final TimeFrequencyHolder tfh = new TimeFrequencyHolder(); 496 497 final FileSystem fs = HadoopToolsUtil.getFileSystem(indexOut); 498 final FileStatus[] indexParts = fs.listStatus(indexOut); 499 for (final FileStatus fileStatus : indexParts) { 500 System.out.println("Reading index part: " + fileStatus.getPath()); 501 FSDataInputStream in = null; 502 try { 503 in = fs.open(fileStatus.getPath()); 504 final TimeFrequencyHolder tempTfh = IOUtils.read(in, TimeFrequencyHolder.class); 505 tempTfh.forEachEntry(new TLongObjectProcedure<TimeFrequency>() { 506 @Override 507 public boolean execute(long a, TimeFrequency b) { 508 tfh.put(a, b); // This is safe because each time 509 // frequency should contain completely 510 // unique times! 511 return true; 512 } 513 }); 514 } finally { 515 in.close(); 516 } 517 } 518 tfh.recalculateCumulativeFrequencies(); 519 return tfh; 520 521 } 522 523 /** 524 * @param outpath 525 * @return the index location if it exists 526 */ 527 public static Path constructIndexPath(Path outpath) { 528 final Path retPath = new Path(new Path(outpath, CountTweetsInTimeperiod.TIMECOUNT_DIR), TIMEINDEX_FILE); 529 return retPath; 530 } 531}