001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.outputmode.correlation;
031
032import java.io.DataInput;
033import java.io.IOException;
034import java.io.StringWriter;
035import java.util.ArrayList;
036import java.util.HashMap;
037import java.util.Map;
038
039import org.apache.commons.math.linear.BlockRealMatrix;
040import org.apache.commons.math.stat.correlation.PearsonsCorrelation;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.io.BytesWritable;
044import org.apache.hadoop.io.NullWritable;
045import org.apache.hadoop.io.Text;
046import org.apache.hadoop.mapreduce.Mapper;
047import org.openimaj.hadoop.tools.HadoopToolsUtil;
048import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap;
049import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF;
050import org.openimaj.hadoop.tools.twitter.utils.WordDFIDFTimeSeries;
051import org.openimaj.io.IOUtils;
052import org.openimaj.io.wrappers.ReadableListBinary;
053import org.openimaj.ml.timeseries.processor.IntervalSummationProcessor;
054import org.openimaj.ml.timeseries.series.DoubleTimeSeries;
055import org.openimaj.twitter.finance.YahooFinanceData;
056
057import com.Ostermiller.util.CSVPrinter;
058
059/**
060 * Separate WordDFIDF entries for each word
061 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
062 *
063 */
064public class WordTimeperiodValueMapper extends Mapper<Text, BytesWritable, NullWritable, Text> {
065        
066        private static final long SINGLE_DAY = 60 * 60 * 24 * 1000;
067        static YahooFinanceData finance;
068        static Map<String, DoubleTimeSeries> financeSeries;
069        private static IntervalSummationProcessor<WordDFIDF[],WordDFIDF, WordDFIDFTimeSeries> interp;
070        protected static synchronized void loadOptions(Mapper<Text, BytesWritable, NullWritable, Text>.Context context) throws IOException {
071                if (finance == null) {
072                        Path financeLoc = new Path(context.getConfiguration().getStrings(CorrelateWordTimeSeries.FINANCE_DATA)[0]);
073                        FileSystem fs = HadoopToolsUtil.getFileSystem(financeLoc);
074                        finance = IOUtils.read(fs.open(financeLoc),YahooFinanceData.class);
075                        financeSeries = finance.seriesMapInerp(SINGLE_DAY);
076                        long[] times = financeSeries.get("High").getTimes();
077                        interp = new IntervalSummationProcessor<WordDFIDF[],WordDFIDF, WordDFIDFTimeSeries>(times);
078                }
079        }
080        
081        /**
082         * 
083         */
084        public WordTimeperiodValueMapper() {
085        }
086        
087
088        private HashMap<Long, TweetCountWordMap> tweetWordMap;
089
090        @Override
091        protected void setup(Mapper<Text, BytesWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException {
092                loadOptions(context);
093        }
094        
095        /**
096         * for each word, read its time period and quantised to a finance time period 
097         * emit for each word a quantised time period, the data needed to calculate DF-IDF at that time and the value from finance
098         */
099        @Override
100        protected void map(final Text word, BytesWritable value, final Mapper<Text, BytesWritable, NullWritable, Text>.Context context)
101                throws IOException ,InterruptedException {
102                final WordDFIDFTimeSeries wts = new WordDFIDFTimeSeries();
103                IOUtils.deserialize(value.getBytes(), new ReadableListBinary<Object>(new ArrayList<Object>()){
104                        WordDFIDF idf = new WordDFIDF();
105                        @Override
106                        protected Object readValue(DataInput in) throws IOException {
107                                idf.readBinary(in);
108                                wts.add(idf.timeperiod, idf);
109                                return new Object();
110                        }
111                });
112                interp.process(wts);
113                double[][] tocorr = new double[2][];
114                tocorr[0] = wts.doubleTimeSeries().getData();
115                
116                for (String ticker : finance.labels()) {
117                        try{
118                                if(!financeSeries.containsKey(ticker))continue;
119                                tocorr[1] = financeSeries.get(ticker).getData();
120                                BlockRealMatrix m = new BlockRealMatrix(tocorr);
121                                // Calculate and write pearsons correlation
122                                PearsonsCorrelation pcorr = new PearsonsCorrelation(m.transpose());
123                                double corr = pcorr.getCorrelationMatrix().getEntry(0, 1);
124                                double pval = pcorr.getCorrelationPValues().getEntry(0, 1);
125                                StringWriter swrit = new StringWriter();
126                                CSVPrinter csvp = new CSVPrinter(swrit);
127                                csvp.write(new String[]{word.toString(),ticker,""+corr,""+pval});
128                                csvp.flush();
129                                context.write(NullWritable.get(), new Text(swrit.toString()));
130                        }
131                        catch (Exception e) {
132                                System.out.println(e.getMessage());
133                        }
134                }
135        };
136        
137}