001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv; 031 032import java.io.BufferedReader; 033import java.io.DataInput; 034import java.io.IOException; 035import java.io.InputStreamReader; 036import java.io.StringWriter; 037import java.nio.channels.Channels; 038import java.util.ArrayList; 039import java.util.Arrays; 040import java.util.LinkedHashMap; 041import java.util.Map.Entry; 042 043import org.apache.hadoop.fs.FSDataInputStream; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.io.BytesWritable; 047import org.apache.hadoop.io.LongWritable; 048import org.apache.hadoop.io.NullWritable; 049import org.apache.hadoop.io.Text; 050import org.apache.hadoop.mapreduce.Job; 051import org.apache.hadoop.mapreduce.Mapper; 052import org.apache.hadoop.mapreduce.Reducer; 053import org.openimaj.hadoop.mapreduce.MultiStagedJob; 054import org.openimaj.hadoop.mapreduce.stage.StageAppender; 055import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileStage; 056import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage; 057import org.openimaj.hadoop.tools.HadoopToolsUtil; 058import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF; 059import org.openimaj.io.IOUtils; 060import org.openimaj.io.wrappers.ReadableListBinary; 061import org.openimaj.util.pair.IndependentPair; 062 063import com.Ostermiller.util.CSVParser; 064import com.Ostermiller.util.CSVPrinter; 065import com.jmatio.io.MatFileWriter; 066import com.jmatio.types.MLArray; 067import com.jmatio.types.MLCell; 068import com.jmatio.types.MLChar; 069import com.jmatio.types.MLDouble; 070 071public class WordIndex extends StageAppender { 072 073 /** 074 * Emits each word with the total number of times the word was seen 075 * 076 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 077 * 078 */ 079 public static class Map extends Mapper<Text, BytesWritable, Text, LongWritable> { 080 private int wordTimeCountThresh; 081 082 public Map() { 083 // TODO Auto-generated constructor stub 084 } 085 086 @Override 087 protected void setup(Mapper<Text, BytesWritable, Text, LongWritable>.Context context) throws IOException, 088 InterruptedException 089 { 090 this.wordTimeCountThresh = context.getConfiguration().getInt(WORDCOUNT_TIMETHRESH, 0); 091 }; 092 093 @Override 094 public void map(final Text key, BytesWritable value, 095 final Mapper<Text, BytesWritable, Text, LongWritable>.Context context) throws InterruptedException 096 { 097 try { 098 final long[] largest = new long[] { 0 }; 099 final boolean[] anyDayOverLimit = new boolean[] { false }; 100 IOUtils.deserialize(value.getBytes(), new ReadableListBinary<Object>(new ArrayList<Object>()) { 101 @Override 102 protected Object readValue(DataInput in) throws IOException { 103 final WordDFIDF idf = new WordDFIDF(); 104 idf.readBinary(in); 105 if (idf.wf > wordTimeCountThresh) { 106 anyDayOverLimit[0] = true; 107 } 108 if (largest[0] < idf.Twf) 109 largest[0] = idf.Twf; 110 111 return new Object(); 112 } 113 }); 114 if (anyDayOverLimit[0]) // One of the days was over the day 115 // limit 116 context.write(key, new LongWritable(largest[0])); 117 118 } catch (final IOException e) { 119 System.err.println("Couldnt read word: " + key); 120 } 121 122 } 123 } 124 125 /** 126 * Writes each word,count 127 * 128 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 129 * 130 */ 131 public static class Reduce extends Reducer<Text, LongWritable, LongWritable, Text> { 132 private int wordCountThresh; 133 134 public Reduce() { 135 // TODO Auto-generated constructor stub 136 } 137 138 @Override 139 protected void setup(Reducer<Text, LongWritable, LongWritable, Text>.Context context) throws IOException, 140 InterruptedException 141 { 142 this.wordCountThresh = context.getConfiguration().getInt(WORDCOUNT_THRESH, 0); 143 }; 144 145 @Override 146 public void reduce(Text word, Iterable<LongWritable> counts, 147 final Reducer<Text, LongWritable, LongWritable, Text>.Context context) throws IOException, 148 InterruptedException 149 { 150 long countL = 0; 151 for (final LongWritable count : counts) { 152 countL += count.get(); 153 } 154 if (countL < this.wordCountThresh) 155 return; 156 final StringWriter swriter = new StringWriter(); 157 final CSVPrinter writer = new CSVPrinter(swriter); 158 writer.write(new String[] { word.toString(), countL + "" }); 159 writer.flush(); 160 context.write(new LongWritable(countL), new Text(swriter.toString())); 161 } 162 } 163 164 protected static final String WORDCOUNT_THRESH = "org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv.wordcountthresh"; 165 protected static final String WORDCOUNT_TOPN = "org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv.wordcounttopn"; 166 protected static final String WORDCOUNT_TIMETHRESH = "org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv.wordtimecountthresh"; 167 private int wordCountThreshold; 168 private int topNWords; 169 private int wordTimeThreshold; 170 171 public WordIndex(int wordCountThreshold, int topNWords) { 172 this.wordCountThreshold = wordCountThreshold; 173 this.topNWords = topNWords; 174 } 175 176 public WordIndex(int wordCountThreshold, int wordTimeThreshold, int topNWords) { 177 this.wordCountThreshold = wordCountThreshold; 178 this.topNWords = topNWords; 179 this.wordTimeThreshold = wordTimeThreshold; 180 } 181 182 public WordIndex() { 183 this.wordCountThreshold = 0; 184 this.topNWords = -1; 185 } 186 187 /** 188 * @param path 189 * @return map of words to counts and index 190 * @throws IOException 191 */ 192 public static LinkedHashMap<String, IndependentPair<Long, Long>> readWordCountLines(String path) throws IOException { 193 return readWordCountLines(path, "/words"); 194 } 195 196 /** 197 * from a report output path get the words 198 * 199 * @param path 200 * report output path 201 * @param ext 202 * where the words are in the path 203 * @return map of words to counts and index 204 * @throws IOException 205 */ 206 public static LinkedHashMap<String, IndependentPair<Long, Long>> readWordCountLines(String path, String ext) 207 throws IOException 208 { 209 final String wordPath = path + ext; 210 final Path p = HadoopToolsUtil.getInputPaths(wordPath)[0]; 211 final FileSystem fs = HadoopToolsUtil.getFileSystem(p); 212 final FSDataInputStream toRead = fs.open(p); 213 final BufferedReader reader = new BufferedReader(new InputStreamReader(toRead, "UTF-8")); 214 final CSVParser csvreader = new CSVParser(reader); 215 long lineN = 0; 216 String[] next = null; 217 final LinkedHashMap<String, IndependentPair<Long, Long>> toRet = new LinkedHashMap<String, IndependentPair<Long, Long>>(); 218 while ((next = csvreader.getLine()) != null && next.length > 0) { 219 if (next.length != 2) { 220 System.out.println("PROBLEM READLINE LINE: " + Arrays.toString(next)); 221 continue; 222 } 223 toRet.put(next[0], IndependentPair.pair(Long.parseLong(next[1]), lineN)); 224 lineN++; 225 } 226 return toRet; 227 } 228 229 @Override 230 public void stage(MultiStagedJob mjob) { 231 mjob.removeIntermediate(true); 232 final SequenceFileStage<Text, BytesWritable, Text, LongWritable, LongWritable, Text> collateWords = new SequenceFileStage<Text, BytesWritable, Text, LongWritable, LongWritable, Text>() 233 { 234 @Override 235 public void setup(Job job) { 236 job.getConfiguration().setInt(WORDCOUNT_THRESH, wordCountThreshold); 237 job.getConfiguration().setInt(WORDCOUNT_TIMETHRESH, wordTimeThreshold); 238 job.setNumReduceTasks(1); 239 } 240 241 @Override 242 public Class<? extends Mapper<Text, BytesWritable, Text, LongWritable>> mapper() { 243 return WordIndex.Map.class; 244 } 245 246 @Override 247 public Class<? extends Reducer<Text, LongWritable, LongWritable, Text>> reducer() { 248 return WordIndex.Reduce.class; 249 } 250 251 @Override 252 public String outname() { 253 return "words-collated"; 254 } 255 }; 256 257 final SequenceFileTextStage<LongWritable, Text, LongWritable, Text, NullWritable, Text> sortedWords = new SequenceFileTextStage<LongWritable, Text, LongWritable, Text, NullWritable, Text>() 258 { 259 @Override 260 public void setup(Job job) { 261 job.getConfiguration().setInt(WORDCOUNT_TOPN, topNWords); 262 job.setSortComparatorClass(LongWritable.DecreasingComparator.class); 263 job.setNumReduceTasks(1); 264 } 265 266 @Override 267 public Class<? extends Reducer<LongWritable, Text, NullWritable, Text>> reducer() { 268 return WordIndexSort.Reduce.class; 269 } 270 271 @Override 272 public String outname() { 273 return "words"; 274 } 275 }; 276 277 mjob.queueStage(collateWords); 278 mjob.queueStage(sortedWords); 279 } 280 281 public static void main(String[] args) throws IOException { 282 final LinkedHashMap<String, IndependentPair<Long, Long>> wi = WordIndex 283 .readWordCountLines("/Users/ss/Development/data/trendminer/sheffield/2010/09/tweets.2010-09-01.sparsecsv"); 284 System.out.println("Number of words index: " + wi.size()); 285 for (final Entry<String, IndependentPair<Long, Long>> e : wi.entrySet()) { 286 if (e.getValue() == null) { 287 System.out.println(e.getKey() + " was null!"); 288 } 289 } 290 System.out.println(wi.get("!")); 291 } 292 293 /** 294 * Write a CSV wordIndex to a {@link MLCell} writen to a .mat data file 295 * 296 * @param path 297 * @throws IOException 298 */ 299 public static void writeToMatlab(String path) throws IOException { 300 final Path wordMatPath = new Path(path + "/words/wordIndex.mat"); 301 final FileSystem fs = HadoopToolsUtil.getFileSystem(wordMatPath); 302 final LinkedHashMap<String, IndependentPair<Long, Long>> wordIndex = readWordCountLines(path); 303 final MLCell wordCell = new MLCell("words", new int[] { wordIndex.size(), 2 }); 304 305 System.out.println("... reading words"); 306 for (final Entry<String, IndependentPair<Long, Long>> ent : wordIndex.entrySet()) { 307 final String word = ent.getKey(); 308 final int wordCellIndex = (int) (long) ent.getValue().secondObject(); 309 final long count = ent.getValue().firstObject(); 310 wordCell.set(new MLChar(null, word), wordCellIndex, 0); 311 wordCell.set(new MLDouble(null, new double[][] { new double[] { count } }), wordCellIndex, 1); 312 } 313 final ArrayList<MLArray> list = new ArrayList<MLArray>(); 314 list.add(wordCell); 315 new MatFileWriter(Channels.newChannel(fs.create(wordMatPath)), list); 316 } 317 318}