001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.demos.sandbox.ml.linear.learner.stream; 031 032import java.util.ArrayList; 033import java.util.Comparator; 034import java.util.HashMap; 035import java.util.HashSet; 036import java.util.List; 037import java.util.Map; 038import java.util.Map.Entry; 039import java.util.Set; 040 041import org.openimaj.util.data.Context; 042import org.openimaj.util.function.Operation; 043import org.openimaj.util.stream.CollectionStream; 044import org.openimaj.util.stream.Stream; 045import org.openimaj.util.stream.window.SequentialStreamAggregator; 046 047/** 048 * Given a paired stream of user-word-counts and ticker-price, implement a 049 * {@link SequentialStreamAggregator} with a comparator which tests stock 050 * prices. Identical stock prices (i.e. the same for each ticker) are combined 051 * 052 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 053 * 054 */ 055public final class StockPriceAggregator 056 extends 057 SequentialStreamAggregator<Context> 058{ 059 private static final class StockPriceComparator 060 implements 061 Comparator<Context> 062 { 063 private double thresh; 064 065 public StockPriceComparator(double thresh) { 066 this.thresh = thresh; 067 } 068 069 @Override 070 public int compare(Context o1, Context o2) 071 { 072 final Map<String, Double> a = o1.getTyped("averageticks"); 073 final Map<String, Double> b = o2.getTyped("averageticks"); 074 075 final Set<String> sharedKeys = new HashSet<String>(a.keySet()); 076 sharedKeys.addAll(b.keySet()); 077 double diff = 0; 078 for (final String ticker : sharedKeys) { 079 diff += a.get(ticker) - b.get(ticker); 080 } 081 if (Math.abs(diff) < this.thresh) 082 return 0; 083 else if (diff < 0) 084 return -1; 085 else if (diff > 0) 086 return 1; 087 else 088 return 0; 089 } 090 } 091 092 public StockPriceAggregator(double thresh) 093 { 094 super(new StockPriceComparator(thresh)); 095 } 096 097 @Override 098 public Context combine(List<Context> window) 099 { 100 final Map<String, Map<String, Double>> combinedUserWords = new HashMap<String, Map<String, Double>>(); 101 final int nItems = window.size(); 102 Map<String, Double> stocks = null; 103 final long timestamp = (Long) window.get(0).getTyped("timestamp"); 104 for (final Context itemAggr : window) { 105 final Map<String, Map<String, Double>> bagofwords = itemAggr.getTyped("bagofwords"); 106 stocks = itemAggr.getTyped("averageticks"); // They should all be 107 // the same 108 for (final Entry<String, Map<String, Double>> es : bagofwords.entrySet()) { 109 addUsers(combinedUserWords, es, nItems); 110 } 111 } 112 final Context newC = window.get(0).clone(); 113 newC.put("timestamp", timestamp); 114 newC.put("bagofwords", combinedUserWords); 115 newC.put("averageticks", stocks); 116 return newC; 117 } 118 119 private void addUsers(Map<String, Map<String, Double>> combinedUserWords, Entry<String, Map<String, Double>> es, 120 int nItems) 121 { 122 Map<String, Double> userWords = combinedUserWords.get(es.getKey()); 123 if (userWords == null) 124 combinedUserWords.put(es.getKey(), userWords = new HashMap<String, Double>()); 125 for (final Entry<String, Double> wordTotals : es.getValue().entrySet()) { 126 addWord(userWords, wordTotals, nItems); 127 } 128 } 129 130 private void addWord(Map<String, Double> userWords, Entry<String, Double> wordTotals, int nItems) { 131 final String word = wordTotals.getKey(); 132 Double wordTotal = userWords.get(word); 133 if (wordTotal == null) 134 wordTotal = 0d; 135 userWords.put(word, wordTotal + (wordTotals.getValue() / nItems)); 136 } 137 138 /** 139 * tests 140 * 141 * @param args 142 */ 143 public static void main(String[] args) { 144 final List<Context> coll = new ArrayList<Context>(); 145 146 final Map<String, Map<String, Double>> t1 = sparcify("u1: this is cheese", "u2: cheese is good", 147 "u3: i like cheese"); 148 final Map<String, Map<String, Double>> t2 = sparcify("u1: it is still cheese", "u3: cheese is good", 149 "u4: i like cheese"); 150 final Map<String, Map<String, Double>> t3 = sparcify("u2: this is cheese"); 151 final Map<String, Map<String, Double>> t4 = sparcify("u1: this is cheese", "u2: cheese is good", 152 "u3: i like cheese"); 153 final Map<String, Double> price1 = sparcifyPrice("p1: 100", "p2: 200"); 154 final Map<String, Double> price2 = sparcifyPrice("p1: 100", "p2: 201"); 155 156 Context c = new Context(); 157 c.put("timestamp", 0l); 158 c.put("bagofwords", t1); 159 c.put("averageticks", price1); 160 coll.add(c); 161 c = new Context(); 162 c.put("timestamp", 1l); 163 c.put("bagofwords", t2); 164 c.put("averageticks", price1); 165 coll.add(c); 166 c = new Context(); 167 c.put("timestamp", 2l); 168 c.put("bagofwords", t3); 169 c.put("averageticks", price2); 170 coll.add(c); 171 c = new Context(); 172 c.put("timestamp", 3l); 173 c.put("bagofwords", t4); 174 c.put("averageticks", price1); 175 coll.add(c); 176 final Stream<Context> stream = new CollectionStream<Context>(coll); 177 stream.transform(new StockPriceAggregator(0.0001)).forEach( 178 new Operation<Context>() { 179 180 @Override 181 public void perform(Context object) { 182 System.out.println(object.getTyped("timestamp")); 183 System.out.println(object.getTyped("bagofwords")); 184 System.out.println(object.getTyped("averageticks")); 185 } 186 }); 187 188 } 189 190 private static Map<String, Double> sparcifyPrice(String... strings) { 191 192 final Map<String, Double> ret = new HashMap<String, Double>(); 193 for (final String string : strings) { 194 final String[] tickPrice = string.split(":"); 195 ret.put(tickPrice[0].trim(), Double.parseDouble(tickPrice[1].trim())); 196 } 197 return ret; 198 } 199 200 private static Map<String, Map<String, Double>> sparcify(String... strings) { 201 final Map<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>(); 202 for (final String string : strings) { 203 final String[] userWords = string.split(":"); 204 final String user = userWords[0].trim(); 205 final String[] words = userWords[1].trim().split(" "); 206 207 Map<String, Double> wordCounts = ret.get(user); 208 if (wordCounts == null) 209 ret.put(user, wordCounts = new HashMap<String, Double>()); 210 ret.put(user, wordCounts); 211 for (final String word : words) { 212 Double count = wordCounts.get(word); 213 if (count == null) 214 count = 0d; 215 wordCounts.put(word, count + 1); 216 } 217 } 218 return ret; 219 } 220 221}