001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.outputmode.correlation;
031
032import java.io.IOException;
033import java.io.StringWriter;
034import java.util.HashMap;
035import java.util.Map;
036import org.apache.commons.math.linear.BlockRealMatrix;
037import org.apache.commons.math.stat.correlation.PearsonsCorrelation;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.io.BytesWritable;
041import org.apache.hadoop.io.NullWritable;
042import org.apache.hadoop.io.Text;
043import org.apache.hadoop.mapreduce.Reducer;
044import org.openimaj.hadoop.tools.HadoopToolsUtil;
045import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap;
046import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF;
047import org.openimaj.hadoop.tools.twitter.utils.WordDFIDFTimeSeries;
048import org.openimaj.io.IOUtils;
049import org.openimaj.ml.timeseries.processor.IntervalSummationProcessor;
050import org.openimaj.ml.timeseries.series.DoubleTimeSeries;
051import org.openimaj.twitter.finance.YahooFinanceData;
052
053import com.Ostermiller.util.CSVPrinter;
054
055public class WordValueCorrelationReducer extends Reducer<Text, BytesWritable, NullWritable, Text>{
056        
057        private static final long SINGLE_DAY = 60 * 60 * 24 * 1000;
058        static YahooFinanceData finance;
059        static Map<String, DoubleTimeSeries> financeSeries;
060        private static IntervalSummationProcessor<WordDFIDF[],WordDFIDF, WordDFIDFTimeSeries> interp;
061        protected static synchronized void loadOptions(Reducer<Text,BytesWritable,NullWritable,Text>.Context context) throws IOException {
062                if (finance == null) {
063                        Path financeLoc = new Path(context.getConfiguration().getStrings(CorrelateWordTimeSeries.FINANCE_DATA)[0]);
064                        FileSystem fs = HadoopToolsUtil.getFileSystem(financeLoc);
065                        finance = IOUtils.read(fs.open(financeLoc),YahooFinanceData.class);
066                        financeSeries = finance.seriesMapInerp(SINGLE_DAY);
067                        long[] times = financeSeries.get("High").getTimes();
068                        interp = new IntervalSummationProcessor<WordDFIDF[],WordDFIDF, WordDFIDFTimeSeries>(times);
069                }
070        }
071
072        private HashMap<Long, TweetCountWordMap> tweetWordMap;
073
074        @Override
075        protected void setup(Reducer<Text,BytesWritable,NullWritable,Text>.Context context) throws IOException, InterruptedException {
076                loadOptions(context);
077        }
078        
079        
080        /**
081         * For each word,
082         */
083        @Override
084        protected void reduce(Text word, Iterable<BytesWritable> idfvalues, Reducer<Text,BytesWritable,NullWritable,Text>.Context context) throws IOException ,InterruptedException 
085        {
086                WordDFIDFTimeSeries wts = new WordDFIDFTimeSeries();
087                for (BytesWritable bytesWritable : idfvalues) {
088                        WordDFIDF instance = IOUtils.deserialize(bytesWritable.getBytes(), WordDFIDF.class);
089                        wts.add(instance.timeperiod, instance);
090                }
091                interp.process(wts);
092                
093                double[][] tocorr = new double[2][];
094                tocorr[0] = wts.doubleTimeSeries().getData();
095                
096                for (String ticker : finance.labels()) {
097                        try{
098                                if(!financeSeries.containsKey(ticker))continue;
099                                tocorr[1] = financeSeries.get(ticker).getData();
100                                BlockRealMatrix m = new BlockRealMatrix(tocorr);
101                                // Calculate and write pearsons correlation
102                                PearsonsCorrelation pcorr = new PearsonsCorrelation(m.transpose());
103                                double corr = pcorr.getCorrelationMatrix().getEntry(0, 1);
104                                double pval = pcorr.getCorrelationPValues().getEntry(0, 1);
105                                StringWriter swrit = new StringWriter();
106                                CSVPrinter csvp = new CSVPrinter(swrit);
107                                csvp.write(new String[]{word.toString(),ticker,""+corr,""+pval});
108                                csvp.flush();
109                                context.write(NullWritable.get(), new Text(swrit.toString()));
110                        }
111                        catch (Exception e) {
112                                System.out.println(e.getMessage());
113                        }
114                }
115        };
116}