001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv;
031
032import java.io.BufferedReader;
033import java.io.ByteArrayInputStream;
034import java.io.IOException;
035import java.io.InputStreamReader;
036import java.io.StringWriter;
037import java.nio.channels.Channels;
038import java.util.ArrayList;
039import java.util.LinkedHashMap;
040import java.util.Map.Entry;
041
042import org.apache.hadoop.fs.FSDataInputStream;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.io.BytesWritable;
046import org.apache.hadoop.io.LongWritable;
047import org.apache.hadoop.io.NullWritable;
048import org.apache.hadoop.io.Text;
049import org.apache.hadoop.mapreduce.Job;
050import org.apache.hadoop.mapreduce.Mapper;
051import org.apache.hadoop.mapreduce.Reducer;
052import org.openimaj.hadoop.mapreduce.stage.StageProvider;
053import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage;
054import org.openimaj.hadoop.tools.HadoopToolsUtil;
055import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.CountTweetsInTimeperiod;
056import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap;
057import org.openimaj.io.IOUtils;
058import org.openimaj.util.pair.IndependentPair;
059
060import com.Ostermiller.util.CSVParser;
061import com.Ostermiller.util.CSVPrinter;
062import com.jmatio.io.MatFileWriter;
063import com.jmatio.types.MLArray;
064import com.jmatio.types.MLCell;
065import com.jmatio.types.MLDouble;
066
067
068public class TimeIndex extends StageProvider{
069
070        /**
071         * Emits each word with the total number of times the word was seen
072         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
073         *
074         */
075        public static class Map extends Mapper<LongWritable,BytesWritable,LongWritable,LongWritable>{
076                public Map() {
077                        // TODO Auto-generated constructor stub
078                }
079                @Override
080                public void map(final LongWritable key, BytesWritable value, final Mapper<LongWritable,BytesWritable,LongWritable,LongWritable>.Context context){
081                        try {
082                                final TweetCountWordMap periodCountWordCount = IOUtils.read(new ByteArrayInputStream(value.getBytes()), TweetCountWordMap.class);
083                                if(!key.equals(CountTweetsInTimeperiod.Map.END_TIME)){
084                                        context.write(key, new LongWritable(periodCountWordCount.getNTweets()));
085                                }
086                                
087                        } catch (Exception e) {
088                                System.err.println("Couldnt read timeperiod: " + key);
089                        }
090                }
091        }
092        /**
093         * Writes each word,count
094         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
095         *
096         */
097        public static class Reduce extends Reducer<LongWritable,LongWritable,NullWritable,Text>{
098                public Reduce() {
099                        // TODO Auto-generated constructor stub
100                }
101                @Override
102                public void reduce(LongWritable timeslot, Iterable<LongWritable> counts, Reducer<LongWritable,LongWritable,NullWritable,Text>.Context context){
103                        try {
104                                String timeStr = timeslot.toString();
105                                long total = 0;
106                                for (LongWritable count : counts) {
107                                        total += count.get();
108                                }
109                                StringWriter swriter = new StringWriter();
110                                CSVPrinter writer = new CSVPrinter(swriter);
111                                writer.write(new String[]{timeStr,total + ""});
112                                writer.flush();
113                                String toWrote = swriter.toString();
114                                context.write(NullWritable.get(), new Text(toWrote));
115                                return;
116                                
117                        } catch (Exception e) {
118                                System.err.println("Couldn't reduce to final file");
119                        }
120                }
121        }
122        
123        /**
124         * from a report output path get the words
125         * @param path report output path
126         * @return map of time to an a pair containing <count, lineindex> 
127         * @throws IOException 
128         */
129        public static LinkedHashMap<Long, IndependentPair<Long, Long>> readTimeCountLines(String path) throws IOException {
130                String wordPath = path + "/times";
131                Path p = HadoopToolsUtil.getInputPaths(wordPath)[0];
132                FileSystem fs = HadoopToolsUtil.getFileSystem(p);
133                FSDataInputStream toRead = fs.open(p);
134                BufferedReader reader = new BufferedReader(new InputStreamReader(toRead));
135                CSVParser csvreader = new CSVParser(reader);
136                long lineN = 0;
137                String[] next = null;
138                LinkedHashMap<Long, IndependentPair<Long, Long>> toRet = new LinkedHashMap<Long, IndependentPair<Long,Long>>();
139                while((next = csvreader.getLine())!=null && next.length > 0){
140                        toRet.put(Long.parseLong(next[0]), IndependentPair.pair(Long.parseLong(next[1]), lineN));
141                        lineN ++;
142                }
143                return toRet;
144        }
145
146        @Override
147        public SequenceFileTextStage<LongWritable,BytesWritable, LongWritable,LongWritable,NullWritable,Text>stage() {
148                return new SequenceFileTextStage<LongWritable,BytesWritable, LongWritable,LongWritable,NullWritable,Text>() {
149                        
150                        @Override
151                        public void setup(Job job) {
152                                job.setSortComparatorClass(LongWritable.Comparator.class);
153                                job.setNumReduceTasks(1);
154                        }
155                        @Override
156                        public Class<? extends Mapper<LongWritable, BytesWritable, LongWritable, LongWritable>> mapper() {
157                                return TimeIndex.Map.class;
158                        }
159                        @Override
160                        public Class<? extends Reducer<LongWritable, LongWritable,NullWritable,Text>> reducer() {
161                                return TimeIndex.Reduce.class;
162                        }
163                        
164                        @Override
165                        public String outname() {
166                                return "times";
167                        }
168                };
169        }
170
171        /**
172         * Write a CSV timeIndex to a {@link MLCell} writen to a .mat data file
173         * @param path
174         * @throws IOException
175         */
176        public static void writeToMatlab(String path) throws IOException {
177                Path timeMatPath = new Path(path + "/times/timeIndex.mat");
178                FileSystem fs = HadoopToolsUtil.getFileSystem(timeMatPath);
179                LinkedHashMap<Long, IndependentPair<Long, Long>> timeIndex = readTimeCountLines(path);
180                MLCell timeCell = new MLCell("times",new int[]{timeIndex.size(),2});
181                
182                System.out.println("... reading times");
183                for (Entry<Long, IndependentPair<Long, Long>> ent : timeIndex.entrySet()) {
184                        long time = (long)ent.getKey();
185                        int timeCellIndex = (int)(long)ent.getValue().secondObject();
186                        long count = ent.getValue().firstObject();
187                        timeCell.set(new MLDouble(null, new double[][]{new double[]{time}}), timeCellIndex,0);
188                        timeCell.set(new MLDouble(null, new double[][]{new double[]{count}}), timeCellIndex,1);
189                }
190                ArrayList<MLArray> list = new ArrayList<MLArray>();
191                list.add(timeCell);
192                new MatFileWriter(Channels.newChannel(fs.create(timeMatPath)),list );
193        }
194
195}