001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv; 031 032import java.io.BufferedReader; 033import java.io.IOException; 034import java.io.InputStreamReader; 035import java.util.LinkedHashMap; 036import java.util.Map.Entry; 037 038import org.apache.hadoop.fs.FSDataInputStream; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.io.BytesWritable; 042import org.apache.hadoop.io.LongWritable; 043import org.apache.hadoop.io.NullWritable; 044import org.apache.hadoop.io.Text; 045import org.apache.hadoop.mapreduce.Job; 046import org.apache.hadoop.mapreduce.Mapper; 047import org.apache.hadoop.mapreduce.Reducer; 048import org.openimaj.hadoop.mapreduce.stage.StageProvider; 049import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage; 050import org.openimaj.hadoop.mapreduce.stage.helper.SimpleSequenceFileTextStage; 051import org.openimaj.hadoop.tools.HadoopToolsUtil; 052import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF; 053import org.openimaj.hadoop.tools.twitter.utils.WordDFIDFTimeSeries; 054import org.openimaj.util.pair.IndependentPair; 055 056import com.Ostermiller.util.CSVParser; 057 058/** 059 * Output the word/time values for each word 060 * 061 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 062 * 063 */ 064public class Values extends StageProvider { 065 private String outputPath; 066 private int valueReduceSplit; 067 private boolean sortValueByTime; 068 private boolean matlabOutput; 069 070 /** 071 * Assign the output path for the stage 072 * 073 * @param outputPath 074 * @param valueReduceSplit 075 * @param sortValueByTime 076 * @param matlabOutput 077 */ 078 public Values(String outputPath, int valueReduceSplit, boolean sortValueByTime, boolean matlabOutput) { 079 this.outputPath = outputPath; 080 this.valueReduceSplit = valueReduceSplit; 081 this.sortValueByTime = sortValueByTime; 082 this.matlabOutput = matlabOutput; 083 } 084 085 /** 086 * The index location config option 087 */ 088 public static final String ARGS_KEY = "INDEX_ARGS"; 089 public static final String MATLAB_OUT = "org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv.matlab_out"; 090 091 @Override 092 public SequenceFileTextStage<?, ?, ?, ?, ?, ?> stage() { 093 if (this.sortValueByTime) { 094 return new SequenceFileTextStage<Text, BytesWritable, LongWritable, BytesWritable, NullWritable, Text>() { 095 @Override 096 public void setup(Job job) { 097 job.setNumReduceTasks(valueReduceSplit); 098 job.getConfiguration().setStrings(Values.ARGS_KEY, new String[] { outputPath.toString() }); 099 job.getConfiguration().setBoolean(MATLAB_OUT, matlabOutput); 100 } 101 102 @Override 103 public Class<? extends Mapper<Text, BytesWritable, LongWritable, BytesWritable>> mapper() { 104 return MapValuesByTime.class; 105 } 106 107 @Override 108 public Class<? extends Reducer<LongWritable, BytesWritable, NullWritable, Text>> reducer() { 109 return ReduceValuesByTime.class; 110 } 111 112 @Override 113 public String outname() { 114 return "values"; 115 } 116 117 @Override 118 public void finished(Job job) { 119 if (matlabOutput) { 120 try { 121 WordIndex.writeToMatlab(outputPath.toString()); 122 TimeIndex.writeToMatlab(outputPath.toString()); 123 System.out.println("Done writing the word and time index files to matlab"); 124 } catch (final IOException e) { 125 System.out.println("Failed to write the word and time index files"); 126 } 127 } 128 } 129 }; 130 } 131 else { 132 return new SimpleSequenceFileTextStage<Text, BytesWritable, NullWritable, Text>() { 133 @Override 134 public void setup(Job job) { 135 job.setNumReduceTasks(valueReduceSplit); 136 job.getConfiguration().setStrings(Values.ARGS_KEY, new String[] { outputPath.toString() }); 137 } 138 139 @Override 140 public Class<? extends Mapper<Text, BytesWritable, NullWritable, Text>> mapper() { 141 return MapValuesByWord.class; 142 } 143 144 @Override 145 public Class<? extends Reducer<NullWritable, Text, NullWritable, Text>> reducer() { 146 return ReduceValuesByWord.class; 147 } 148 149 @Override 150 public String outname() { 151 return "values"; 152 } 153 }; 154 } 155 } 156 157 /** 158 * Construct a time series per word 159 * 160 * @param path 161 * @param timeIndex 162 * @param wordIndex 163 * @return hashmap containing a {@link WordDFIDFTimeSeries} instance per 164 * word 165 * @throws IOException 166 */ 167 public static LinkedHashMap<String, WordDFIDFTimeSeries> readWordDFIDF(String path, 168 LinkedHashMap<Long, IndependentPair<Long, Long>> timeIndex, 169 LinkedHashMap<String, IndependentPair<Long, Long>> wordIndex) throws IOException 170 { 171 final LinkedHashMap<String, WordDFIDFTimeSeries> tsMap = new LinkedHashMap<String, WordDFIDFTimeSeries>(); 172 173 final long[] timeReverseIndex = new long[timeIndex.size()]; 174 for (final Entry<Long, IndependentPair<Long, Long>> l : timeIndex.entrySet()) { 175 final long lineNum = l.getValue().secondObject(); 176 timeReverseIndex[(int) lineNum] = l.getKey(); 177 } 178 179 final String[] wordReverseIndex = new String[wordIndex.size()]; 180 for (final Entry<String, IndependentPair<Long, Long>> w : wordIndex.entrySet()) { 181 final long lineNum = w.getValue().secondObject(); 182 wordReverseIndex[(int) lineNum] = w.getKey(); 183 } 184 final String wordPath = path + "/values"; 185 final Path p = HadoopToolsUtil.getInputPaths(wordPath)[0]; 186 final FileSystem fs = HadoopToolsUtil.getFileSystem(p); 187 final FSDataInputStream toRead = fs.open(p); 188 final BufferedReader reader = new BufferedReader(new InputStreamReader(toRead)); 189 final CSVParser csvreader = new CSVParser(reader); 190 String[] next = null; 191 192 while ((next = csvreader.getLine()) != null && next.length > 0) { 193 // writer.writeln(new String[]{wordI + "",timeI + "",idf.wf + 194 // "",idf.tf + "",idf.Twf + "", idf.Ttf + ""}); 195 final int wordI = Integer.parseInt(next[0]); 196 final int timeI = Integer.parseInt(next[1]); 197 final long wf = Long.parseLong(next[2]); 198 final long tf = Long.parseLong(next[3]); 199 final long Twf = Long.parseLong(next[4]); 200 final long Ttf = Long.parseLong(next[5]); 201 final long time = timeReverseIndex[timeI]; 202 final WordDFIDF wordDFIDF = new WordDFIDF(time, wf, tf, Twf, Ttf); 203 final String word = wordReverseIndex[wordI]; 204 WordDFIDFTimeSeries current = tsMap.get(word); 205 if (current == null) { 206 tsMap.put(word, current = new WordDFIDFTimeSeries()); 207 } 208 current.add(time, wordDFIDF); 209 } 210 211 return tsMap; 212 } 213}