001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv;
031
032import java.io.DataInput;
033import java.io.IOException;
034import java.io.StringWriter;
035import java.util.ArrayList;
036import java.util.HashMap;
037
038import org.apache.hadoop.io.BytesWritable;
039import org.apache.hadoop.io.NullWritable;
040import org.apache.hadoop.io.Text;
041import org.apache.hadoop.mapreduce.Mapper;
042import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF;
043import org.openimaj.io.IOUtils;
044import org.openimaj.io.wrappers.ReadableListBinary;
045import org.openimaj.util.pair.IndependentPair;
046
047import com.Ostermiller.util.CSVPrinter;
048
049/**
050 * Emits each word with the total number of times the word was seen
051 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
052 *
053 */
054public class MapValuesByWord extends Mapper<Text,BytesWritable,NullWritable,Text>{
055        
056        private static String[] options;
057        private static HashMap<String, IndependentPair<Long, Long>> wordIndex;
058        private static HashMap<Long, IndependentPair<Long, Long>> timeIndex;
059
060        /**
061         * construct the map instance (do nothing)
062         */
063        public MapValuesByWord() {
064                // TODO Auto-generated constructor stub
065        }
066        
067        protected static synchronized void loadOptions(Mapper<Text,BytesWritable,NullWritable,Text>.Context context) throws IOException {
068                if (options == null) {
069                        try {
070                                options = context.getConfiguration().getStrings(Values.ARGS_KEY);
071                                wordIndex = WordIndex.readWordCountLines(options[0]);
072                                timeIndex = TimeIndex.readTimeCountLines(options[0]);
073                                System.out.println("Wordindex loaded: " + wordIndex.size());
074                                System.out.println("timeindex loaded: " + timeIndex.size());
075                        } catch (Exception e) {
076                                throw new IOException(e);
077                        }
078                }
079        }
080
081        @Override
082        protected void setup(Mapper<Text,BytesWritable,NullWritable,Text>.Context context) throws IOException, InterruptedException {
083                loadOptions(context);
084        }
085
086        @Override
087        public void map(final Text key, BytesWritable value, final Mapper<Text,BytesWritable,NullWritable,Text>.Context context) throws IOException, InterruptedException{
088                final StringWriter swriter = new StringWriter();
089                final CSVPrinter writer = new CSVPrinter(swriter);
090                try {
091                        IndependentPair<Long, Long> wordIndexPair = wordIndex.get(key.toString());
092                        if(key.toString().equals("!")){
093                                System.out.println("The string was: " + key);
094                                System.out.println("The string's pair was" + wordIndexPair);
095                                System.out.println("But the map's value for ! is: " + wordIndex.get("!"));
096                        }
097                        if(wordIndexPair == null) {
098                                
099                                return;
100                        }
101                        final long wordI = wordIndexPair.secondObject();
102                        IOUtils.deserialize(value.getBytes(), new ReadableListBinary<Object>(new ArrayList<Object>()){
103                                @Override
104                                protected Object readValue(DataInput in) throws IOException {
105                                        WordDFIDF idf = new WordDFIDF();
106                                        idf.readBinary(in);
107                                        IndependentPair<Long, Long> timePeriod = timeIndex.get(idf.timeperiod);
108                                        if(timePeriod == null) return new Object();
109                                        long timeI = timeIndex.get(idf.timeperiod).secondObject();
110                                        writer.writeln(new String[]{wordI + "",timeI + "",idf.wf + "",idf.tf + "",idf.Twf + "", idf.Ttf + ""});
111                                        writer.flush();
112                                        swriter.flush();
113                                        return new Object();
114                                }
115                        });
116                        context.write(NullWritable.get(), new Text(swriter.toString()));
117                } catch (IOException e) {
118                        e.printStackTrace();
119                        System.err.println("Couldnt read word or timeperiod from word: " + key);
120                }
121                
122        }
123}