001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv; 031 032import org.apache.hadoop.fs.Path; 033import org.kohsuke.args4j.Option; 034import org.openimaj.hadoop.mapreduce.MultiStagedJob; 035import org.openimaj.hadoop.tools.HadoopToolsUtil; 036import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions; 037import org.openimaj.hadoop.tools.twitter.token.mode.TwitterTokenMode; 038import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.CountTweetsInTimeperiod; 039import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.CountWordsAcrossTimeperiod; 040import org.openimaj.hadoop.tools.twitter.token.outputmode.TwitterTokenOutputMode; 041 042/** 043 * Create a sparse CSV token output. The directory created contains 3 files: 044 * words/ : contains a list of words ordered by count across all time. 045 * times/ : contains a list of times ordered by count of all tweets 046 * values/ : a list of (wordIndex,timeIndex,wordTimeCount,tweetTimeCount,tweetCount,wordCount) 047 * 048 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 049 * 050 */ 051public class SparseCSVTokenOutputMode extends TwitterTokenOutputMode { 052 053 private MultiStagedJob stages; 054 @Option(name="--value-reduce-split", aliases="-vrs", required=false, usage="The number of reducers to use when spitting out the DFIDF values") 055 int valueSplitReduce = 1; 056 057 @Option(name="--word-occurence-threshold", aliases="-wot", required=false, usage="The number of times a given word must appear total throughout the time period before it is involved in the count and index") 058 int wordCountThreshold = 0; 059 060 @Option(name="--word-time-occurence-threshold", aliases="-wtot", required=false, usage="The number of times a given word must appear in one or more time period before the word is chosen for indexing") 061 int wordTimeCountThreshold = 0; 062 063 @Option(name="--top-n-words", aliases="-tnw", required=false, usage="Select only the top n words (as ordered by total occurence in the time period)") 064 int topNWords = -1; 065 066 @Option(name="--sort-value-by-time", aliases="-svbt", required=false, usage="This flag sorts value by time instead of word") 067 boolean sortValueByTime = false; 068 069 @Option(name="--matlab-output", aliases="-matlab", required=false, usage="This flag sorts value by time instead of word") 070 boolean matlabOutput = false; 071 @Override 072 public void write( 073 HadoopTwitterTokenToolOptions opts, 074 TwitterTokenMode completedMode) throws Exception{ 075 076 HadoopToolsUtil.validateOutput(outputPath,replace); 077 078 this.stages = new MultiStagedJob( 079 HadoopToolsUtil.getInputPaths(completedMode.finalOutput(opts) , CountWordsAcrossTimeperiod.WORDCOUNT_DIR), 080 HadoopToolsUtil.getOutputPath(outputPath), 081 opts.getArgs() 082 ); 083 matlabOutput = matlabOutput && sortValueByTime; 084 // Three stage process 085 // 1a. Write all the words (word per line) 086// stages.queueStage(new WordIndex().stage()); 087 new WordIndex(wordCountThreshold,wordTimeCountThreshold,topNWords).stage(stages); 088 final Path wordIndex = stages.runAll(); 089 // 1b. Write all the times (time per line) 090 this.stages = new MultiStagedJob( 091 HadoopToolsUtil.getInputPaths(completedMode.finalOutput(opts) , CountTweetsInTimeperiod.TIMECOUNT_DIR), 092 HadoopToolsUtil.getOutputPath(outputPath), 093 opts.getArgs() 094 ); 095 stages.queueStage(new TimeIndex().stage()); 096 final Path timeIndex = stages.runAll(); 097 // 3. Write all the values (loading in the words and times) 098 099 this.stages = new MultiStagedJob( 100 HadoopToolsUtil.getInputPaths(completedMode.finalOutput(opts) , CountWordsAcrossTimeperiod.WORDCOUNT_DIR), 101 HadoopToolsUtil.getOutputPath(outputPath), 102 opts.getArgs() 103 ); 104 stages.queueStage(new Values(outputPath,valueSplitReduce,sortValueByTime,matlabOutput).stage()); 105 stages.runAll(); 106 } 107 108 109}