001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.demos.sandbox.ml.linear.learner.stream;
031
032import java.util.ArrayList;
033import java.util.Comparator;
034import java.util.HashMap;
035import java.util.HashSet;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.Set;
040
041import org.openimaj.util.data.Context;
042import org.openimaj.util.function.Operation;
043import org.openimaj.util.stream.CollectionStream;
044import org.openimaj.util.stream.Stream;
045import org.openimaj.util.stream.window.SequentialStreamAggregator;
046
047/**
048 * Given a paired stream of user-word-counts and ticker-price, implement a
049 * {@link SequentialStreamAggregator} with a comparator which tests stock
050 * prices. Identical stock prices (i.e. the same for each ticker) are combined
051 * 
052 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
053 * 
054 */
055public final class StockPriceAggregator
056                extends
057                SequentialStreamAggregator<Context>
058{
059        private static final class StockPriceComparator
060                        implements
061                        Comparator<Context>
062        {
063                private double thresh;
064
065                public StockPriceComparator(double thresh) {
066                        this.thresh = thresh;
067                }
068
069                @Override
070                public int compare(Context o1, Context o2)
071                {
072                        final Map<String, Double> a = o1.getTyped("averageticks");
073                        final Map<String, Double> b = o2.getTyped("averageticks");
074
075                        final Set<String> sharedKeys = new HashSet<String>(a.keySet());
076                        sharedKeys.addAll(b.keySet());
077                        double diff = 0;
078                        for (final String ticker : sharedKeys) {
079                                diff += a.get(ticker) - b.get(ticker);
080                        }
081                        if (Math.abs(diff) < this.thresh)
082                                return 0;
083                        else if (diff < 0)
084                                return -1;
085                        else if (diff > 0)
086                                return 1;
087                        else
088                                return 0;
089                }
090        }
091
092        public StockPriceAggregator(double thresh)
093        {
094                super(new StockPriceComparator(thresh));
095        }
096
097        @Override
098        public Context combine(List<Context> window)
099        {
100                final Map<String, Map<String, Double>> combinedUserWords = new HashMap<String, Map<String, Double>>();
101                final int nItems = window.size();
102                Map<String, Double> stocks = null;
103                final long timestamp = (Long) window.get(0).getTyped("timestamp");
104                for (final Context itemAggr : window) {
105                        final Map<String, Map<String, Double>> bagofwords = itemAggr.getTyped("bagofwords");
106                        stocks = itemAggr.getTyped("averageticks"); // They should all be
107                                                                                                                // the same
108                        for (final Entry<String, Map<String, Double>> es : bagofwords.entrySet()) {
109                                addUsers(combinedUserWords, es, nItems);
110                        }
111                }
112                final Context newC = window.get(0).clone();
113                newC.put("timestamp", timestamp);
114                newC.put("bagofwords", combinedUserWords);
115                newC.put("averageticks", stocks);
116                return newC;
117        }
118
119        private void addUsers(Map<String, Map<String, Double>> combinedUserWords, Entry<String, Map<String, Double>> es,
120                        int nItems)
121        {
122                Map<String, Double> userWords = combinedUserWords.get(es.getKey());
123                if (userWords == null)
124                        combinedUserWords.put(es.getKey(), userWords = new HashMap<String, Double>());
125                for (final Entry<String, Double> wordTotals : es.getValue().entrySet()) {
126                        addWord(userWords, wordTotals, nItems);
127                }
128        }
129
130        private void addWord(Map<String, Double> userWords, Entry<String, Double> wordTotals, int nItems) {
131                final String word = wordTotals.getKey();
132                Double wordTotal = userWords.get(word);
133                if (wordTotal == null)
134                        wordTotal = 0d;
135                userWords.put(word, wordTotal + (wordTotals.getValue() / nItems));
136        }
137
138        /**
139         * tests
140         * 
141         * @param args
142         */
143        public static void main(String[] args) {
144                final List<Context> coll = new ArrayList<Context>();
145
146                final Map<String, Map<String, Double>> t1 = sparcify("u1: this is cheese", "u2: cheese is good",
147                                "u3: i like cheese");
148                final Map<String, Map<String, Double>> t2 = sparcify("u1: it is still cheese", "u3: cheese is good",
149                                "u4: i like cheese");
150                final Map<String, Map<String, Double>> t3 = sparcify("u2: this is cheese");
151                final Map<String, Map<String, Double>> t4 = sparcify("u1: this is cheese", "u2: cheese is good",
152                                "u3: i like cheese");
153                final Map<String, Double> price1 = sparcifyPrice("p1: 100", "p2: 200");
154                final Map<String, Double> price2 = sparcifyPrice("p1: 100", "p2: 201");
155
156                Context c = new Context();
157                c.put("timestamp", 0l);
158                c.put("bagofwords", t1);
159                c.put("averageticks", price1);
160                coll.add(c);
161                c = new Context();
162                c.put("timestamp", 1l);
163                c.put("bagofwords", t2);
164                c.put("averageticks", price1);
165                coll.add(c);
166                c = new Context();
167                c.put("timestamp", 2l);
168                c.put("bagofwords", t3);
169                c.put("averageticks", price2);
170                coll.add(c);
171                c = new Context();
172                c.put("timestamp", 3l);
173                c.put("bagofwords", t4);
174                c.put("averageticks", price1);
175                coll.add(c);
176                final Stream<Context> stream = new CollectionStream<Context>(coll);
177                stream.transform(new StockPriceAggregator(0.0001)).forEach(
178                                new Operation<Context>() {
179
180                                        @Override
181                                        public void perform(Context object) {
182                                                System.out.println(object.getTyped("timestamp"));
183                                                System.out.println(object.getTyped("bagofwords"));
184                                                System.out.println(object.getTyped("averageticks"));
185                                        }
186                                });
187
188        }
189
190        private static Map<String, Double> sparcifyPrice(String... strings) {
191
192                final Map<String, Double> ret = new HashMap<String, Double>();
193                for (final String string : strings) {
194                        final String[] tickPrice = string.split(":");
195                        ret.put(tickPrice[0].trim(), Double.parseDouble(tickPrice[1].trim()));
196                }
197                return ret;
198        }
199
200        private static Map<String, Map<String, Double>> sparcify(String... strings) {
201                final Map<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();
202                for (final String string : strings) {
203                        final String[] userWords = string.split(":");
204                        final String user = userWords[0].trim();
205                        final String[] words = userWords[1].trim().split(" ");
206
207                        Map<String, Double> wordCounts = ret.get(user);
208                        if (wordCounts == null)
209                                ret.put(user, wordCounts = new HashMap<String, Double>());
210                        ret.put(user, wordCounts);
211                        for (final String word : words) {
212                                Double count = wordCounts.get(word);
213                                if (count == null)
214                                        count = 0d;
215                                wordCounts.put(word, count + 1);
216                        }
217                }
218                return ret;
219        }
220
221}