001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.outputmode.correlation; 031 032import java.io.IOException; 033import java.io.StringWriter; 034import java.util.HashMap; 035import java.util.Map; 036import org.apache.commons.math.linear.BlockRealMatrix; 037import org.apache.commons.math.stat.correlation.PearsonsCorrelation; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.io.BytesWritable; 041import org.apache.hadoop.io.NullWritable; 042import org.apache.hadoop.io.Text; 043import org.apache.hadoop.mapreduce.Reducer; 044import org.openimaj.hadoop.tools.HadoopToolsUtil; 045import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap; 046import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF; 047import org.openimaj.hadoop.tools.twitter.utils.WordDFIDFTimeSeries; 048import org.openimaj.io.IOUtils; 049import org.openimaj.ml.timeseries.processor.IntervalSummationProcessor; 050import org.openimaj.ml.timeseries.series.DoubleTimeSeries; 051import org.openimaj.twitter.finance.YahooFinanceData; 052 053import com.Ostermiller.util.CSVPrinter; 054 055public class WordValueCorrelationReducer extends Reducer<Text, BytesWritable, NullWritable, Text>{ 056 057 private static final long SINGLE_DAY = 60 * 60 * 24 * 1000; 058 static YahooFinanceData finance; 059 static Map<String, DoubleTimeSeries> financeSeries; 060 private static IntervalSummationProcessor<WordDFIDF[],WordDFIDF, WordDFIDFTimeSeries> interp; 061 protected static synchronized void loadOptions(Reducer<Text,BytesWritable,NullWritable,Text>.Context context) throws IOException { 062 if (finance == null) { 063 Path financeLoc = new Path(context.getConfiguration().getStrings(CorrelateWordTimeSeries.FINANCE_DATA)[0]); 064 FileSystem fs = HadoopToolsUtil.getFileSystem(financeLoc); 065 finance = IOUtils.read(fs.open(financeLoc),YahooFinanceData.class); 066 financeSeries = finance.seriesMapInerp(SINGLE_DAY); 067 long[] times = financeSeries.get("High").getTimes(); 068 interp = new IntervalSummationProcessor<WordDFIDF[],WordDFIDF, WordDFIDFTimeSeries>(times); 069 } 070 } 071 072 private HashMap<Long, TweetCountWordMap> tweetWordMap; 073 074 @Override 075 protected void setup(Reducer<Text,BytesWritable,NullWritable,Text>.Context context) throws IOException, InterruptedException { 076 loadOptions(context); 077 } 078 079 080 /** 081 * For each word, 082 */ 083 @Override 084 protected void reduce(Text word, Iterable<BytesWritable> idfvalues, Reducer<Text,BytesWritable,NullWritable,Text>.Context context) throws IOException ,InterruptedException 085 { 086 WordDFIDFTimeSeries wts = new WordDFIDFTimeSeries(); 087 for (BytesWritable bytesWritable : idfvalues) { 088 WordDFIDF instance = IOUtils.deserialize(bytesWritable.getBytes(), WordDFIDF.class); 089 wts.add(instance.timeperiod, instance); 090 } 091 interp.process(wts); 092 093 double[][] tocorr = new double[2][]; 094 tocorr[0] = wts.doubleTimeSeries().getData(); 095 096 for (String ticker : finance.labels()) { 097 try{ 098 if(!financeSeries.containsKey(ticker))continue; 099 tocorr[1] = financeSeries.get(ticker).getData(); 100 BlockRealMatrix m = new BlockRealMatrix(tocorr); 101 // Calculate and write pearsons correlation 102 PearsonsCorrelation pcorr = new PearsonsCorrelation(m.transpose()); 103 double corr = pcorr.getCorrelationMatrix().getEntry(0, 1); 104 double pval = pcorr.getCorrelationPValues().getEntry(0, 1); 105 StringWriter swrit = new StringWriter(); 106 CSVPrinter csvp = new CSVPrinter(swrit); 107 csvp.write(new String[]{word.toString(),ticker,""+corr,""+pval}); 108 csvp.flush(); 109 context.write(NullWritable.get(), new Text(swrit.toString())); 110 } 111 catch (Exception e) { 112 System.out.println(e.getMessage()); 113 } 114 } 115 }; 116}