001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv;
031
032import java.io.ByteArrayInputStream;
033import java.io.DataInputStream;
034import java.io.IOException;
035import java.io.StringWriter;
036import java.nio.channels.Channels;
037import java.util.ArrayList;
038import java.util.HashMap;
039
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.io.BytesWritable;
044import org.apache.hadoop.io.LongWritable;
045import org.apache.hadoop.io.NullWritable;
046import org.apache.hadoop.io.Text;
047import org.apache.hadoop.mapreduce.Reducer;
048import org.openimaj.hadoop.tools.HadoopToolsUtil;
049import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF;
050import org.openimaj.util.pair.IndependentPair;
051
052import com.Ostermiller.util.CSVPrinter;
053import com.jmatio.io.MatFileWriter;
054import com.jmatio.types.MLArray;
055import com.jmatio.types.MLInt64;
056import com.jmatio.types.MLSparse;
057
058/**
059 * Writes each word,count
060 *
061 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
062 *
063 */
064public class ReduceValuesByTime extends Reducer<LongWritable, BytesWritable, NullWritable, Text> {
065        /**
066         * construct the reduce instance, do nothing
067         */
068        public ReduceValuesByTime() {
069                // TODO Auto-generated constructor stub
070        }
071
072        @Override
073        public void setup(Reducer<LongWritable, BytesWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException {
074                loadOptions(context);
075        }
076
077        private static String[] options;
078        private static HashMap<String, IndependentPair<Long, Long>> wordIndex;
079        private static HashMap<Long, IndependentPair<Long, Long>> timeIndex;
080        private static String valuesLocation;
081        private static boolean matlabOut;
082
083        protected static synchronized void loadOptions(Reducer<LongWritable, BytesWritable, NullWritable, Text>.Context context) throws IOException {
084                if (options == null) {
085                        try {
086                                options = context.getConfiguration().getStrings(Values.ARGS_KEY);
087                                matlabOut = context.getConfiguration().getBoolean(Values.MATLAB_OUT, false);
088                                timeIndex = TimeIndex.readTimeCountLines(options[0]);
089                                if (matlabOut) {
090                                        wordIndex = WordIndex.readWordCountLines(options[0]);
091                                        valuesLocation = options[0] + "/values/values.%d.mat";
092                                }
093                                System.out.println("timeindex loaded: " + timeIndex.size());
094                        } catch (Exception e) {
095                                throw new IOException(e);
096                        }
097                }
098        }
099        @Override
100        public void reduce(LongWritable timeslot, Iterable<BytesWritable> manylines, Reducer<LongWritable, BytesWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException {
101                try {
102                        if (matlabOut) {
103                                System.out.println("Creating matlab file for timeslot: " + timeslot);
104                                createWriteToMatlab(timeslot, manylines);
105                        }
106                        else {
107                                final StringWriter swriter = new StringWriter();
108                                final CSVPrinter writer = new CSVPrinter(swriter);
109                                for (BytesWritable word : manylines) {
110                                        ByteArrayInputStream bais = new ByteArrayInputStream(word.getBytes());
111                                        DataInputStream dis = new DataInputStream(bais);
112                                        WordDFIDF idf = new WordDFIDF();
113                                        idf.readBinary(dis);
114                                        int timeI = (int) ((long) timeIndex.get(idf.timeperiod).secondObject());
115                                        int wordI = dis.readInt();
116                                        writer.writeln(new String[] { wordI + "", timeI + "", idf.wf + "", idf.tf + "", idf.Twf + "", idf.Ttf + "" });
117                                        writer.flush();
118                                        swriter.flush();
119                                }
120                                context.write(NullWritable.get(), new Text(swriter.toString()));
121                        }
122
123                } catch (Exception e) {
124                        e.printStackTrace();
125                        System.err.println("Couldn't reduce to final file");
126                        throw new IOException(e);
127                }
128        }
129
130        private void createWriteToMatlab(LongWritable timeslot, Iterable<BytesWritable> manylines) throws IOException {
131                System.out.println("Creating matlab file for timeslot: " + timeslot);
132                MLSparse matarr = new MLSparse(String.format("values_%d", timeslot.get()), new int[] { wordIndex.size(), 2 }, 0, wordIndex.size() * 2);
133                long Ttf = 0;
134                long tf = 0;
135                boolean set = false;
136                for (BytesWritable word : manylines) {
137                        ByteArrayInputStream bais = new ByteArrayInputStream(word.getBytes());
138                        DataInputStream dis = new DataInputStream(bais);
139                        WordDFIDF idf = new WordDFIDF();
140                        idf.readBinary(dis);
141                        int wordI = dis.readInt();
142                        // writer.writeln(new String[]{wordI + "",timeI + "",idf.wf +
143                        // "",idf.tf + "",idf.Twf + "", idf.Ttf + ""});
144                        // writer.flush();
145                        // swriter.flush();
146                        if (!set) {
147                                tf = idf.tf;
148                                Ttf = idf.Ttf;
149                                set = true;
150                        }
151                        else {
152                                if (tf != idf.tf)
153                                        throw new IOException("Error writing matlab file, tf doesn't match");
154                                if (Ttf != idf.Ttf)
155                                        throw new IOException("Error writing matlab file, Ttf doesn't match");
156                        }
157                        matarr.set((double) idf.wf, wordI, 0);
158                        matarr.set((double) idf.Twf, wordI, 1);
159                }
160                MLInt64 tfMat = new MLInt64(String.format("tf_%d", timeslot.get()), new long[][] { new long[] { tf } });
161                MLInt64 TtfMat = new MLInt64(String.format("Ttf_%d", timeslot.get()), new long[][] { new long[] { Ttf } });
162                ArrayList<MLArray> list = new ArrayList<MLArray>();
163                list.add(tfMat);
164                list.add(TtfMat);
165                list.add(matarr);
166                Path outLoc = new Path(String.format(valuesLocation, timeslot.get()));
167                FileSystem fs = HadoopToolsUtil.getFileSystem(outLoc);
168                FSDataOutputStream os = fs.create(outLoc);
169                new MatFileWriter(Channels.newChannel(os), list);
170        }
171}