001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.demos.sandbox.ml.linear.learner.stream.recorder;
031
032import java.io.IOException;
033import java.net.MalformedURLException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.List;
037import java.util.Map;
038
039import org.apache.log4j.Logger;
040import org.mortbay.util.ajax.JSON;
041import org.openimaj.demos.sandbox.ml.linear.learner.stream.MongoDBOutputOp;
042import org.openimaj.demos.sandbox.ml.linear.learner.stream.YahooFinanceStream;
043import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterPredicateFunction;
044import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterPreprocessingFunction;
045import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterStatusAsUSMFStatus;
046import org.openimaj.stream.provider.twitter.TwitterStreamDataset;
047import org.openimaj.tools.twitter.modes.filter.LanguageFilter;
048import org.openimaj.tools.twitter.modes.preprocessing.LanguageDetectionMode;
049import org.openimaj.tools.twitter.modes.preprocessing.StopwordMode;
050import org.openimaj.tools.twitter.modes.preprocessing.TokeniseMode;
051import org.openimaj.twitter.USMFStatus;
052import org.openimaj.util.api.auth.DefaultTokenFactory;
053import org.openimaj.util.api.auth.common.TwitterAPIToken;
054import org.openimaj.util.concurrent.ArrayBlockingDroppingQueue;
055import org.openimaj.util.pair.IndependentPair;
056import org.openimaj.util.stream.Stream;
057import org.openimaj.util.stream.window.MetaPayload;
058import org.openimaj.util.stream.window.MetaPayloadStreamCombiner;
059import org.openimaj.util.stream.window.RealTimeWindowFunction;
060import org.openimaj.util.stream.window.Window;
061import org.openimaj.util.stream.window.WindowFilter;
062import org.openimaj.util.stream.window.WindowFunction;
063
064import twitter4j.Status;
065
066import com.mongodb.BasicDBObject;
067import com.mongodb.DBObject;
068import com.mongodb.ServerAddress;
069
070/**
071 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
072 * 
073 */
074public class FinancialStreamRecorder {
075        static Logger logger = Logger.getLogger(FinancialStreamRecorder.class);
076
077        /**
078         * @param args
079         * @throws MalformedURLException
080         * @throws IOException
081         */
082        public static void main(String[] args) throws MalformedURLException, IOException {
083
084                // The financial stream
085                final RealTimeWindowFunction<Map<String, Double>> yahooWindow = new RealTimeWindowFunction<Map<String, Double>>(
086                                5000);
087                final Stream<Window<Map<String, Double>, Long>> yahooAveragePriceStream = new YahooFinanceStream("AAPL", "GOOG",
088                                "GE", "GM", "TWX")
089                                .transform(yahooWindow);
090
091                // The Twitter Stream
092                final ArrayBlockingDroppingQueue<Status> buffer = new ArrayBlockingDroppingQueue<Status>(1);
093                final LanguageDetectionMode languageDetectionMode = new LanguageDetectionMode();
094                final StopwordMode stopwordMode = new StopwordMode();
095                final TokeniseMode tokeniseMode = new TokeniseMode();
096
097                final Stream<Window<USMFStatus, Long>> twitterUserWordCountStream = new TwitterStreamDataset(
098                                DefaultTokenFactory.get(TwitterAPIToken.class), buffer
099                                )
100                                                .transform(new RealTimeWindowFunction<Status>(5000))
101                                                .map(new WindowFunction<Status, USMFStatus, Long>(new TwitterStatusAsUSMFStatus()))
102                                                .map(new WindowFunction<USMFStatus, USMFStatus, Long>(new TwitterPreprocessingFunction(
103                                                                languageDetectionMode, tokeniseMode, stopwordMode)))
104                                                .map(new WindowFilter<USMFStatus, Long>(new TwitterPredicateFunction(new LanguageFilter("en"))));
105
106                final List<ServerAddress> serverList = Arrays.asList(
107                                new ServerAddress("rumi", 27017),
108                                new ServerAddress("hafez", 27017)
109                                );
110                MetaPayloadStreamCombiner
111                                .combine(twitterUserWordCountStream, yahooAveragePriceStream)
112                                .forEach(
113                                                new MongoDBOutputOp<
114                                                MetaPayload<IndependentPair<List<USMFStatus>, List<Map<String, Double>>>, IndependentPair<Long, Long>>
115                                                >
116                                                (serverList)
117                                                {
118
119                                                        @Override
120                                                        public String getCollectionName() {
121                                                                return "streamapi_yahoo";
122                                                        }
123
124                                                        @Override
125                                                        public DBObject
126                                                                        asDBObject(
127                                                                                        MetaPayload<IndependentPair<List<USMFStatus>, List<Map<String, Double>>>, IndependentPair<Long, Long>> aggr)
128                                                        {
129                                                                final BasicDBObject dbobj = new BasicDBObject();
130                                                                final IndependentPair<List<USMFStatus>, List<Map<String, Double>>> obj = aggr
131                                                                                .getPayload();
132                                                                final IndependentPair<Long, Long> times = aggr.getMeta();
133                                                                final List<USMFStatus> tweets = obj.firstObject();
134                                                                final List<Object> dbtweets = new ArrayList<Object>();
135                                                                for (final USMFStatus usmfStatus : tweets) {
136                                                                        dbtweets.add(JSON.parse(usmfStatus.toJson()));
137                                                                }
138                                                                dbobj.append("tweets", dbtweets);
139                                                                dbobj.append("tickers", obj.secondObject());
140                                                                final long timestamp = times.firstObject();
141                                                                dbobj.append("timestamp", timestamp);
142                                                                logger.debug(String.format("Dumping %d tweets and %d stock-ticks at %d", dbtweets.size(),
143                                                                                obj.secondObject().size(), timestamp));
144                                                                return dbobj;
145                                                        }
146
147                                                        @Override
148                                                        public String getDBName() {
149                                                                return "twitterticker";
150                                                        }
151                                                }
152                                );
153        }
154}