001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.outputmode.correlation; 031 032import java.io.DataInput; 033import java.io.IOException; 034import java.io.StringWriter; 035import java.util.ArrayList; 036import java.util.HashMap; 037import java.util.Map; 038 039import org.apache.commons.math.linear.BlockRealMatrix; 040import org.apache.commons.math.stat.correlation.PearsonsCorrelation; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.io.BytesWritable; 044import org.apache.hadoop.io.NullWritable; 045import org.apache.hadoop.io.Text; 046import org.apache.hadoop.mapreduce.Mapper; 047import org.openimaj.hadoop.tools.HadoopToolsUtil; 048import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap; 049import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF; 050import org.openimaj.hadoop.tools.twitter.utils.WordDFIDFTimeSeries; 051import org.openimaj.io.IOUtils; 052import org.openimaj.io.wrappers.ReadableListBinary; 053import org.openimaj.ml.timeseries.processor.IntervalSummationProcessor; 054import org.openimaj.ml.timeseries.series.DoubleTimeSeries; 055import org.openimaj.twitter.finance.YahooFinanceData; 056 057import com.Ostermiller.util.CSVPrinter; 058 059/** 060 * Separate WordDFIDF entries for each word 061 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 062 * 063 */ 064public class WordTimeperiodValueMapper extends Mapper<Text, BytesWritable, NullWritable, Text> { 065 066 private static final long SINGLE_DAY = 60 * 60 * 24 * 1000; 067 static YahooFinanceData finance; 068 static Map<String, DoubleTimeSeries> financeSeries; 069 private static IntervalSummationProcessor<WordDFIDF[],WordDFIDF, WordDFIDFTimeSeries> interp; 070 protected static synchronized void loadOptions(Mapper<Text, BytesWritable, NullWritable, Text>.Context context) throws IOException { 071 if (finance == null) { 072 Path financeLoc = new Path(context.getConfiguration().getStrings(CorrelateWordTimeSeries.FINANCE_DATA)[0]); 073 FileSystem fs = HadoopToolsUtil.getFileSystem(financeLoc); 074 finance = IOUtils.read(fs.open(financeLoc),YahooFinanceData.class); 075 financeSeries = finance.seriesMapInerp(SINGLE_DAY); 076 long[] times = financeSeries.get("High").getTimes(); 077 interp = new IntervalSummationProcessor<WordDFIDF[],WordDFIDF, WordDFIDFTimeSeries>(times); 078 } 079 } 080 081 /** 082 * 083 */ 084 public WordTimeperiodValueMapper() { 085 } 086 087 088 private HashMap<Long, TweetCountWordMap> tweetWordMap; 089 090 @Override 091 protected void setup(Mapper<Text, BytesWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException { 092 loadOptions(context); 093 } 094 095 /** 096 * for each word, read its time period and quantised to a finance time period 097 * emit for each word a quantised time period, the data needed to calculate DF-IDF at that time and the value from finance 098 */ 099 @Override 100 protected void map(final Text word, BytesWritable value, final Mapper<Text, BytesWritable, NullWritable, Text>.Context context) 101 throws IOException ,InterruptedException { 102 final WordDFIDFTimeSeries wts = new WordDFIDFTimeSeries(); 103 IOUtils.deserialize(value.getBytes(), new ReadableListBinary<Object>(new ArrayList<Object>()){ 104 WordDFIDF idf = new WordDFIDF(); 105 @Override 106 protected Object readValue(DataInput in) throws IOException { 107 idf.readBinary(in); 108 wts.add(idf.timeperiod, idf); 109 return new Object(); 110 } 111 }); 112 interp.process(wts); 113 double[][] tocorr = new double[2][]; 114 tocorr[0] = wts.doubleTimeSeries().getData(); 115 116 for (String ticker : finance.labels()) { 117 try{ 118 if(!financeSeries.containsKey(ticker))continue; 119 tocorr[1] = financeSeries.get(ticker).getData(); 120 BlockRealMatrix m = new BlockRealMatrix(tocorr); 121 // Calculate and write pearsons correlation 122 PearsonsCorrelation pcorr = new PearsonsCorrelation(m.transpose()); 123 double corr = pcorr.getCorrelationMatrix().getEntry(0, 1); 124 double pval = pcorr.getCorrelationPValues().getEntry(0, 1); 125 StringWriter swrit = new StringWriter(); 126 CSVPrinter csvp = new CSVPrinter(swrit); 127 csvp.write(new String[]{word.toString(),ticker,""+corr,""+pval}); 128 csvp.flush(); 129 context.write(NullWritable.get(), new Text(swrit.toString())); 130 } 131 catch (Exception e) { 132 System.out.println(e.getMessage()); 133 } 134 } 135 }; 136 137}