001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv;
031
032import java.io.BufferedReader;
033import java.io.IOException;
034import java.io.InputStreamReader;
035import java.util.LinkedHashMap;
036import java.util.Map.Entry;
037
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.io.BytesWritable;
042import org.apache.hadoop.io.LongWritable;
043import org.apache.hadoop.io.NullWritable;
044import org.apache.hadoop.io.Text;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.mapreduce.Mapper;
047import org.apache.hadoop.mapreduce.Reducer;
048import org.openimaj.hadoop.mapreduce.stage.StageProvider;
049import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage;
050import org.openimaj.hadoop.mapreduce.stage.helper.SimpleSequenceFileTextStage;
051import org.openimaj.hadoop.tools.HadoopToolsUtil;
052import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF;
053import org.openimaj.hadoop.tools.twitter.utils.WordDFIDFTimeSeries;
054import org.openimaj.util.pair.IndependentPair;
055
056import com.Ostermiller.util.CSVParser;
057
058/**
059 * Output the word/time values for each word
060 *
061 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
062 *
063 */
064public class Values extends StageProvider {
065        private String outputPath;
066        private int valueReduceSplit;
067        private boolean sortValueByTime;
068        private boolean matlabOutput;
069
070        /**
071         * Assign the output path for the stage
072         *
073         * @param outputPath
074         * @param valueReduceSplit
075         * @param sortValueByTime
076         * @param matlabOutput
077         */
078        public Values(String outputPath, int valueReduceSplit, boolean sortValueByTime, boolean matlabOutput) {
079                this.outputPath = outputPath;
080                this.valueReduceSplit = valueReduceSplit;
081                this.sortValueByTime = sortValueByTime;
082                this.matlabOutput = matlabOutput;
083        }
084
085        /**
086         * The index location config option
087         */
088        public static final String ARGS_KEY = "INDEX_ARGS";
089        public static final String MATLAB_OUT = "org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv.matlab_out";
090
091        @Override
092        public SequenceFileTextStage<?, ?, ?, ?, ?, ?> stage() {
093                if (this.sortValueByTime) {
094                        return new SequenceFileTextStage<Text, BytesWritable, LongWritable, BytesWritable, NullWritable, Text>() {
095                                @Override
096                                public void setup(Job job) {
097                                        job.setNumReduceTasks(valueReduceSplit);
098                                        job.getConfiguration().setStrings(Values.ARGS_KEY, new String[] { outputPath.toString() });
099                                        job.getConfiguration().setBoolean(MATLAB_OUT, matlabOutput);
100                                }
101
102                                @Override
103                                public Class<? extends Mapper<Text, BytesWritable, LongWritable, BytesWritable>> mapper() {
104                                        return MapValuesByTime.class;
105                                }
106
107                                @Override
108                                public Class<? extends Reducer<LongWritable, BytesWritable, NullWritable, Text>> reducer() {
109                                        return ReduceValuesByTime.class;
110                                }
111
112                                @Override
113                                public String outname() {
114                                        return "values";
115                                }
116
117                                @Override
118                                public void finished(Job job) {
119                                        if (matlabOutput) {
120                                                try {
121                                                        WordIndex.writeToMatlab(outputPath.toString());
122                                                        TimeIndex.writeToMatlab(outputPath.toString());
123                                                        System.out.println("Done writing the word and time index files to matlab");
124                                                } catch (final IOException e) {
125                                                        System.out.println("Failed to write the word and time index files");
126                                                }
127                                        }
128                                }
129                        };
130                }
131                else {
132                        return new SimpleSequenceFileTextStage<Text, BytesWritable, NullWritable, Text>() {
133                                @Override
134                                public void setup(Job job) {
135                                        job.setNumReduceTasks(valueReduceSplit);
136                                        job.getConfiguration().setStrings(Values.ARGS_KEY, new String[] { outputPath.toString() });
137                                }
138
139                                @Override
140                                public Class<? extends Mapper<Text, BytesWritable, NullWritable, Text>> mapper() {
141                                        return MapValuesByWord.class;
142                                }
143
144                                @Override
145                                public Class<? extends Reducer<NullWritable, Text, NullWritable, Text>> reducer() {
146                                        return ReduceValuesByWord.class;
147                                }
148
149                                @Override
150                                public String outname() {
151                                        return "values";
152                                }
153                        };
154                }
155        }
156
157        /**
158         * Construct a time series per word
159         *
160         * @param path
161         * @param timeIndex
162         * @param wordIndex
163         * @return hashmap containing a {@link WordDFIDFTimeSeries} instance per
164         *         word
165         * @throws IOException
166         */
167        public static LinkedHashMap<String, WordDFIDFTimeSeries> readWordDFIDF(String path,
168                        LinkedHashMap<Long, IndependentPair<Long, Long>> timeIndex,
169                        LinkedHashMap<String, IndependentPair<Long, Long>> wordIndex) throws IOException
170        {
171                final LinkedHashMap<String, WordDFIDFTimeSeries> tsMap = new LinkedHashMap<String, WordDFIDFTimeSeries>();
172
173                final long[] timeReverseIndex = new long[timeIndex.size()];
174                for (final Entry<Long, IndependentPair<Long, Long>> l : timeIndex.entrySet()) {
175                        final long lineNum = l.getValue().secondObject();
176                        timeReverseIndex[(int) lineNum] = l.getKey();
177                }
178
179                final String[] wordReverseIndex = new String[wordIndex.size()];
180                for (final Entry<String, IndependentPair<Long, Long>> w : wordIndex.entrySet()) {
181                        final long lineNum = w.getValue().secondObject();
182                        wordReverseIndex[(int) lineNum] = w.getKey();
183                }
184                final String wordPath = path + "/values";
185                final Path p = HadoopToolsUtil.getInputPaths(wordPath)[0];
186                final FileSystem fs = HadoopToolsUtil.getFileSystem(p);
187                final FSDataInputStream toRead = fs.open(p);
188                final BufferedReader reader = new BufferedReader(new InputStreamReader(toRead));
189                final CSVParser csvreader = new CSVParser(reader);
190                String[] next = null;
191
192                while ((next = csvreader.getLine()) != null && next.length > 0) {
193                        // writer.writeln(new String[]{wordI + "",timeI + "",idf.wf +
194                        // "",idf.tf + "",idf.Twf + "", idf.Ttf + ""});
195                        final int wordI = Integer.parseInt(next[0]);
196                        final int timeI = Integer.parseInt(next[1]);
197                        final long wf = Long.parseLong(next[2]);
198                        final long tf = Long.parseLong(next[3]);
199                        final long Twf = Long.parseLong(next[4]);
200                        final long Ttf = Long.parseLong(next[5]);
201                        final long time = timeReverseIndex[timeI];
202                        final WordDFIDF wordDFIDF = new WordDFIDF(time, wf, tf, Twf, Ttf);
203                        final String word = wordReverseIndex[wordI];
204                        WordDFIDFTimeSeries current = tsMap.get(word);
205                        if (current == null) {
206                                tsMap.put(word, current = new WordDFIDFTimeSeries());
207                        }
208                        current.add(time, wordDFIDF);
209                }
210
211                return tsMap;
212        }
213}