001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.demos.sandbox.ml.linear.learner.stream.recorder;
031
032import java.io.IOException;
033import java.net.MalformedURLException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.List;
037import java.util.Map;
038
039import org.apache.commons.lang.StringUtils;
040import org.apache.log4j.Logger;
041import org.openimaj.demos.sandbox.ml.linear.learner.stream.MongoDBOutputOp;
042import org.openimaj.demos.sandbox.ml.linear.learner.stream.YahooFinanceStream;
043import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterPredicateFunction;
044import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterPreprocessingFunction;
045import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterStatusAsUSMFStatus;
046import org.openimaj.stream.provider.twitter.TwitterSearchDataset;
047import org.openimaj.tools.twitter.modes.filter.LanguageFilter;
048import org.openimaj.tools.twitter.modes.preprocessing.LanguageDetectionMode;
049import org.openimaj.tools.twitter.modes.preprocessing.StopwordMode;
050import org.openimaj.tools.twitter.modes.preprocessing.TokeniseMode;
051import org.openimaj.twitter.USMFStatus;
052import org.openimaj.util.api.auth.DefaultTokenFactory;
053import org.openimaj.util.api.auth.common.TwitterAPIToken;
054import org.openimaj.util.concurrent.ArrayBlockingDroppingQueue;
055import org.openimaj.util.pair.IndependentPair;
056import org.openimaj.util.stream.Stream;
057import org.openimaj.util.stream.combine.StreamCombiner;
058import org.openimaj.util.stream.window.RealTimeWindowFunction;
059import org.openimaj.util.stream.window.Window;
060import org.openimaj.util.stream.window.WindowFilter;
061import org.openimaj.util.stream.window.WindowFunction;
062
063import twitter4j.Query;
064import twitter4j.Status;
065
066import com.mongodb.BasicDBObject;
067import com.mongodb.DBObject;
068import com.mongodb.ServerAddress;
069import com.mongodb.util.JSON;
070
071/**
072 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
073 *
074 */
075public class FinancialSearchAPIRecorder {
076        static Logger logger = Logger.getLogger(FinancialSearchAPIRecorder.class);
077        /**
078         * @param args
079         * @throws MalformedURLException
080         * @throws IOException
081         */
082        public static void main(String[] args) throws MalformedURLException, IOException {
083
084                // The financial stream
085                String[] tickers = new String[]{
086                        "AAPL","GOOG","GE","GM","TWX"
087                };
088                RealTimeWindowFunction<Map<String,Double>> yahooWindow = new RealTimeWindowFunction<Map<String,Double>>(5000);
089                Stream<Window<Map<String, Double>,Long>> yahooAveragePriceStream = new YahooFinanceStream(tickers).transform(yahooWindow);
090
091                // The Twitter Stream
092                final ArrayBlockingDroppingQueue<Status> buffer = new ArrayBlockingDroppingQueue<Status>(1);
093                final LanguageDetectionMode languageDetectionMode = new LanguageDetectionMode();
094                final StopwordMode stopwordMode = new StopwordMode();
095                final TokeniseMode tokeniseMode = new TokeniseMode();
096
097                final String queryStr = StringUtils.join(dollar(tickers), " OR ");
098                Stream<Window<USMFStatus,Long>> twitterUserWordCountStream = new TwitterSearchDataset(
099                        new Query(queryStr),DefaultTokenFactory.get(TwitterAPIToken.class),buffer
100                )
101                .transform(new RealTimeWindowFunction<Status>(10000))
102                .map(new WindowFunction<Status,USMFStatus,Long>(new TwitterStatusAsUSMFStatus()))
103                .map(new WindowFunction<USMFStatus,USMFStatus,Long>(new TwitterPreprocessingFunction(languageDetectionMode,tokeniseMode,stopwordMode)))
104                .map(new WindowFilter<USMFStatus,Long>(new TwitterPredicateFunction(new LanguageFilter("en"))));
105
106//              twitterUserWordCountStream.forEach(new Operation<List<USMFStatus>>() {
107//
108//                      @Override
109//                      public void perform(List<USMFStatus> object) {
110//                              for (USMFStatus usmfStatus : object) {
111//                                      System.out.format("@%s: %s\n",usmfStatus.user.name,usmfStatus.text);
112//                              }
113//                      }
114//              });
115                List<ServerAddress> serverList = Arrays.asList(
116                        new ServerAddress("rumi",27017),
117                        new ServerAddress("hafez",27017)
118                );
119                StreamCombiner.combine(twitterUserWordCountStream,yahooAveragePriceStream)
120                .forEach(
121                        new MongoDBOutputOp<
122                                IndependentPair<
123                                        Window<USMFStatus,Long>,
124                                        Window<Map<String,Double>,Long>
125                        >>
126                        (serverList) {
127
128                                @Override
129                                public String getCollectionName() {
130                                        return "searchapi_yahoo";
131                                }
132
133                                @Override
134                                public DBObject asDBObject(IndependentPair<Window<USMFStatus,Long>,Window<Map<String,Double>,Long>> obj) {
135                                        BasicDBObject dbobj = new BasicDBObject();
136                                        List<USMFStatus> tweets = obj.firstObject().getPayload();
137                                        List<Object> dbtweets = new ArrayList<Object>();
138                                        for (USMFStatus usmfStatus : tweets) {
139                                                dbtweets.add(JSON.parse(usmfStatus.toJson()));
140                                        }
141                                        dbobj.append("tweets", dbtweets);
142                                        dbobj.append("search", queryStr);
143                                        dbobj.append("tickers", obj.secondObject().getPayload());
144                                        long timestamp = obj.secondObject().getMeta();
145                                        dbobj.append("timestamp", timestamp);
146                                        logger.debug(String.format("Dumping %d tweets and %d stock-ticks at %d",dbtweets.size(),obj.secondObject().getPayload().size(),timestamp));
147                                        return dbobj;
148                                }
149
150                                @Override
151                                public String getDBName() {
152                                        return "twitterticker";
153                                }
154                        }
155                );
156        }
157        private static String[] dollar(String[] tickers) {
158                String[] ret = new String[tickers.length];
159                for (int i = 0; i < tickers.length; i++) {
160                        ret[i] = "$" + tickers[i];
161                }
162                return ret;
163        }
164}