001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv;
031
032import java.io.BufferedReader;
033import java.io.DataInput;
034import java.io.IOException;
035import java.io.InputStreamReader;
036import java.io.StringWriter;
037import java.nio.channels.Channels;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.LinkedHashMap;
041import java.util.Map.Entry;
042
043import org.apache.hadoop.fs.FSDataInputStream;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.io.BytesWritable;
047import org.apache.hadoop.io.LongWritable;
048import org.apache.hadoop.io.NullWritable;
049import org.apache.hadoop.io.Text;
050import org.apache.hadoop.mapreduce.Job;
051import org.apache.hadoop.mapreduce.Mapper;
052import org.apache.hadoop.mapreduce.Reducer;
053import org.openimaj.hadoop.mapreduce.MultiStagedJob;
054import org.openimaj.hadoop.mapreduce.stage.StageAppender;
055import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileStage;
056import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage;
057import org.openimaj.hadoop.tools.HadoopToolsUtil;
058import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF;
059import org.openimaj.io.IOUtils;
060import org.openimaj.io.wrappers.ReadableListBinary;
061import org.openimaj.util.pair.IndependentPair;
062
063import com.Ostermiller.util.CSVParser;
064import com.Ostermiller.util.CSVPrinter;
065import com.jmatio.io.MatFileWriter;
066import com.jmatio.types.MLArray;
067import com.jmatio.types.MLCell;
068import com.jmatio.types.MLChar;
069import com.jmatio.types.MLDouble;
070
071public class WordIndex extends StageAppender {
072
073        /**
074         * Emits each word with the total number of times the word was seen
075         * 
076         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
077         * 
078         */
079        public static class Map extends Mapper<Text, BytesWritable, Text, LongWritable> {
080                private int wordTimeCountThresh;
081
082                public Map() {
083                        // TODO Auto-generated constructor stub
084                }
085
086                @Override
087                protected void setup(Mapper<Text, BytesWritable, Text, LongWritable>.Context context) throws IOException,
088                                InterruptedException
089                {
090                        this.wordTimeCountThresh = context.getConfiguration().getInt(WORDCOUNT_TIMETHRESH, 0);
091                };
092
093                @Override
094                public void map(final Text key, BytesWritable value,
095                                final Mapper<Text, BytesWritable, Text, LongWritable>.Context context) throws InterruptedException
096                {
097                        try {
098                                final long[] largest = new long[] { 0 };
099                                final boolean[] anyDayOverLimit = new boolean[] { false };
100                                IOUtils.deserialize(value.getBytes(), new ReadableListBinary<Object>(new ArrayList<Object>()) {
101                                        @Override
102                                        protected Object readValue(DataInput in) throws IOException {
103                                                final WordDFIDF idf = new WordDFIDF();
104                                                idf.readBinary(in);
105                                                if (idf.wf > wordTimeCountThresh) {
106                                                        anyDayOverLimit[0] = true;
107                                                }
108                                                if (largest[0] < idf.Twf)
109                                                        largest[0] = idf.Twf;
110
111                                                return new Object();
112                                        }
113                                });
114                                if (anyDayOverLimit[0]) // One of the days was over the day
115                                                                                // limit
116                                        context.write(key, new LongWritable(largest[0]));
117
118                        } catch (final IOException e) {
119                                System.err.println("Couldnt read word: " + key);
120                        }
121
122                }
123        }
124
125        /**
126         * Writes each word,count
127         * 
128         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
129         * 
130         */
131        public static class Reduce extends Reducer<Text, LongWritable, LongWritable, Text> {
132                private int wordCountThresh;
133
134                public Reduce() {
135                        // TODO Auto-generated constructor stub
136                }
137
138                @Override
139                protected void setup(Reducer<Text, LongWritable, LongWritable, Text>.Context context) throws IOException,
140                                InterruptedException
141                {
142                        this.wordCountThresh = context.getConfiguration().getInt(WORDCOUNT_THRESH, 0);
143                };
144
145                @Override
146                public void reduce(Text word, Iterable<LongWritable> counts,
147                                final Reducer<Text, LongWritable, LongWritable, Text>.Context context) throws IOException,
148                                InterruptedException
149                {
150                        long countL = 0;
151                        for (final LongWritable count : counts) {
152                                countL += count.get();
153                        }
154                        if (countL < this.wordCountThresh)
155                                return;
156                        final StringWriter swriter = new StringWriter();
157                        final CSVPrinter writer = new CSVPrinter(swriter);
158                        writer.write(new String[] { word.toString(), countL + "" });
159                        writer.flush();
160                        context.write(new LongWritable(countL), new Text(swriter.toString()));
161                }
162        }
163
164        protected static final String WORDCOUNT_THRESH = "org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv.wordcountthresh";
165        protected static final String WORDCOUNT_TOPN = "org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv.wordcounttopn";
166        protected static final String WORDCOUNT_TIMETHRESH = "org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv.wordtimecountthresh";
167        private int wordCountThreshold;
168        private int topNWords;
169        private int wordTimeThreshold;
170
171        public WordIndex(int wordCountThreshold, int topNWords) {
172                this.wordCountThreshold = wordCountThreshold;
173                this.topNWords = topNWords;
174        }
175
176        public WordIndex(int wordCountThreshold, int wordTimeThreshold, int topNWords) {
177                this.wordCountThreshold = wordCountThreshold;
178                this.topNWords = topNWords;
179                this.wordTimeThreshold = wordTimeThreshold;
180        }
181
182        public WordIndex() {
183                this.wordCountThreshold = 0;
184                this.topNWords = -1;
185        }
186
187        /**
188         * @param path
189         * @return map of words to counts and index
190         * @throws IOException
191         */
192        public static LinkedHashMap<String, IndependentPair<Long, Long>> readWordCountLines(String path) throws IOException {
193                return readWordCountLines(path, "/words");
194        }
195
196        /**
197         * from a report output path get the words
198         * 
199         * @param path
200         *            report output path
201         * @param ext
202         *            where the words are in the path
203         * @return map of words to counts and index
204         * @throws IOException
205         */
206        public static LinkedHashMap<String, IndependentPair<Long, Long>> readWordCountLines(String path, String ext)
207                        throws IOException
208        {
209                final String wordPath = path + ext;
210                final Path p = HadoopToolsUtil.getInputPaths(wordPath)[0];
211                final FileSystem fs = HadoopToolsUtil.getFileSystem(p);
212                final FSDataInputStream toRead = fs.open(p);
213                final BufferedReader reader = new BufferedReader(new InputStreamReader(toRead, "UTF-8"));
214                final CSVParser csvreader = new CSVParser(reader);
215                long lineN = 0;
216                String[] next = null;
217                final LinkedHashMap<String, IndependentPair<Long, Long>> toRet = new LinkedHashMap<String, IndependentPair<Long, Long>>();
218                while ((next = csvreader.getLine()) != null && next.length > 0) {
219                        if (next.length != 2) {
220                                System.out.println("PROBLEM READLINE LINE: " + Arrays.toString(next));
221                                continue;
222                        }
223                        toRet.put(next[0], IndependentPair.pair(Long.parseLong(next[1]), lineN));
224                        lineN++;
225                }
226                return toRet;
227        }
228
229        @Override
230        public void stage(MultiStagedJob mjob) {
231                mjob.removeIntermediate(true);
232                final SequenceFileStage<Text, BytesWritable, Text, LongWritable, LongWritable, Text> collateWords = new SequenceFileStage<Text, BytesWritable, Text, LongWritable, LongWritable, Text>()
233                {
234                        @Override
235                        public void setup(Job job) {
236                                job.getConfiguration().setInt(WORDCOUNT_THRESH, wordCountThreshold);
237                                job.getConfiguration().setInt(WORDCOUNT_TIMETHRESH, wordTimeThreshold);
238                                job.setNumReduceTasks(1);
239                        }
240
241                        @Override
242                        public Class<? extends Mapper<Text, BytesWritable, Text, LongWritable>> mapper() {
243                                return WordIndex.Map.class;
244                        }
245
246                        @Override
247                        public Class<? extends Reducer<Text, LongWritable, LongWritable, Text>> reducer() {
248                                return WordIndex.Reduce.class;
249                        }
250
251                        @Override
252                        public String outname() {
253                                return "words-collated";
254                        }
255                };
256
257                final SequenceFileTextStage<LongWritable, Text, LongWritable, Text, NullWritable, Text> sortedWords = new SequenceFileTextStage<LongWritable, Text, LongWritable, Text, NullWritable, Text>()
258                {
259                        @Override
260                        public void setup(Job job) {
261                                job.getConfiguration().setInt(WORDCOUNT_TOPN, topNWords);
262                                job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
263                                job.setNumReduceTasks(1);
264                        }
265
266                        @Override
267                        public Class<? extends Reducer<LongWritable, Text, NullWritable, Text>> reducer() {
268                                return WordIndexSort.Reduce.class;
269                        }
270
271                        @Override
272                        public String outname() {
273                                return "words";
274                        }
275                };
276
277                mjob.queueStage(collateWords);
278                mjob.queueStage(sortedWords);
279        }
280
281        public static void main(String[] args) throws IOException {
282                final LinkedHashMap<String, IndependentPair<Long, Long>> wi = WordIndex
283                                .readWordCountLines("/Users/ss/Development/data/trendminer/sheffield/2010/09/tweets.2010-09-01.sparsecsv");
284                System.out.println("Number of words index: " + wi.size());
285                for (final Entry<String, IndependentPair<Long, Long>> e : wi.entrySet()) {
286                        if (e.getValue() == null) {
287                                System.out.println(e.getKey() + " was null!");
288                        }
289                }
290                System.out.println(wi.get("!"));
291        }
292
293        /**
294         * Write a CSV wordIndex to a {@link MLCell} writen to a .mat data file
295         * 
296         * @param path
297         * @throws IOException
298         */
299        public static void writeToMatlab(String path) throws IOException {
300                final Path wordMatPath = new Path(path + "/words/wordIndex.mat");
301                final FileSystem fs = HadoopToolsUtil.getFileSystem(wordMatPath);
302                final LinkedHashMap<String, IndependentPair<Long, Long>> wordIndex = readWordCountLines(path);
303                final MLCell wordCell = new MLCell("words", new int[] { wordIndex.size(), 2 });
304
305                System.out.println("... reading words");
306                for (final Entry<String, IndependentPair<Long, Long>> ent : wordIndex.entrySet()) {
307                        final String word = ent.getKey();
308                        final int wordCellIndex = (int) (long) ent.getValue().secondObject();
309                        final long count = ent.getValue().firstObject();
310                        wordCell.set(new MLChar(null, word), wordCellIndex, 0);
311                        wordCell.set(new MLDouble(null, new double[][] { new double[] { count } }), wordCellIndex, 1);
312                }
313                final ArrayList<MLArray> list = new ArrayList<MLArray>();
314                list.add(wordCell);
315                new MatFileWriter(Channels.newChannel(fs.create(wordMatPath)), list);
316        }
317
318}