001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.demos.sandbox.ml.linear.learner.stream.recorder; 031 032import java.io.IOException; 033import java.net.MalformedURLException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.List; 037import java.util.Map; 038 039import org.apache.log4j.Logger; 040import org.mortbay.util.ajax.JSON; 041import org.openimaj.demos.sandbox.ml.linear.learner.stream.MongoDBOutputOp; 042import org.openimaj.demos.sandbox.ml.linear.learner.stream.YahooFinanceStream; 043import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterPredicateFunction; 044import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterPreprocessingFunction; 045import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterStatusAsUSMFStatus; 046import org.openimaj.stream.provider.twitter.TwitterStreamDataset; 047import org.openimaj.tools.twitter.modes.filter.LanguageFilter; 048import org.openimaj.tools.twitter.modes.preprocessing.LanguageDetectionMode; 049import org.openimaj.tools.twitter.modes.preprocessing.StopwordMode; 050import org.openimaj.tools.twitter.modes.preprocessing.TokeniseMode; 051import org.openimaj.twitter.USMFStatus; 052import org.openimaj.util.api.auth.DefaultTokenFactory; 053import org.openimaj.util.api.auth.common.TwitterAPIToken; 054import org.openimaj.util.concurrent.ArrayBlockingDroppingQueue; 055import org.openimaj.util.pair.IndependentPair; 056import org.openimaj.util.stream.Stream; 057import org.openimaj.util.stream.window.MetaPayload; 058import org.openimaj.util.stream.window.MetaPayloadStreamCombiner; 059import org.openimaj.util.stream.window.RealTimeWindowFunction; 060import org.openimaj.util.stream.window.Window; 061import org.openimaj.util.stream.window.WindowFilter; 062import org.openimaj.util.stream.window.WindowFunction; 063 064import twitter4j.Status; 065 066import com.mongodb.BasicDBObject; 067import com.mongodb.DBObject; 068import com.mongodb.ServerAddress; 069 070/** 071 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 072 * 073 */ 074public class FinancialStreamRecorder { 075 static Logger logger = Logger.getLogger(FinancialStreamRecorder.class); 076 077 /** 078 * @param args 079 * @throws MalformedURLException 080 * @throws IOException 081 */ 082 public static void main(String[] args) throws MalformedURLException, IOException { 083 084 // The financial stream 085 final RealTimeWindowFunction<Map<String, Double>> yahooWindow = new RealTimeWindowFunction<Map<String, Double>>( 086 5000); 087 final Stream<Window<Map<String, Double>, Long>> yahooAveragePriceStream = new YahooFinanceStream("AAPL", "GOOG", 088 "GE", "GM", "TWX") 089 .transform(yahooWindow); 090 091 // The Twitter Stream 092 final ArrayBlockingDroppingQueue<Status> buffer = new ArrayBlockingDroppingQueue<Status>(1); 093 final LanguageDetectionMode languageDetectionMode = new LanguageDetectionMode(); 094 final StopwordMode stopwordMode = new StopwordMode(); 095 final TokeniseMode tokeniseMode = new TokeniseMode(); 096 097 final Stream<Window<USMFStatus, Long>> twitterUserWordCountStream = new TwitterStreamDataset( 098 DefaultTokenFactory.get(TwitterAPIToken.class), buffer 099 ) 100 .transform(new RealTimeWindowFunction<Status>(5000)) 101 .map(new WindowFunction<Status, USMFStatus, Long>(new TwitterStatusAsUSMFStatus())) 102 .map(new WindowFunction<USMFStatus, USMFStatus, Long>(new TwitterPreprocessingFunction( 103 languageDetectionMode, tokeniseMode, stopwordMode))) 104 .map(new WindowFilter<USMFStatus, Long>(new TwitterPredicateFunction(new LanguageFilter("en")))); 105 106 final List<ServerAddress> serverList = Arrays.asList( 107 new ServerAddress("rumi", 27017), 108 new ServerAddress("hafez", 27017) 109 ); 110 MetaPayloadStreamCombiner 111 .combine(twitterUserWordCountStream, yahooAveragePriceStream) 112 .forEach( 113 new MongoDBOutputOp< 114 MetaPayload<IndependentPair<List<USMFStatus>, List<Map<String, Double>>>, IndependentPair<Long, Long>> 115 > 116 (serverList) 117 { 118 119 @Override 120 public String getCollectionName() { 121 return "streamapi_yahoo"; 122 } 123 124 @Override 125 public DBObject 126 asDBObject( 127 MetaPayload<IndependentPair<List<USMFStatus>, List<Map<String, Double>>>, IndependentPair<Long, Long>> aggr) 128 { 129 final BasicDBObject dbobj = new BasicDBObject(); 130 final IndependentPair<List<USMFStatus>, List<Map<String, Double>>> obj = aggr 131 .getPayload(); 132 final IndependentPair<Long, Long> times = aggr.getMeta(); 133 final List<USMFStatus> tweets = obj.firstObject(); 134 final List<Object> dbtweets = new ArrayList<Object>(); 135 for (final USMFStatus usmfStatus : tweets) { 136 dbtweets.add(JSON.parse(usmfStatus.toJson())); 137 } 138 dbobj.append("tweets", dbtweets); 139 dbobj.append("tickers", obj.secondObject()); 140 final long timestamp = times.firstObject(); 141 dbobj.append("timestamp", timestamp); 142 logger.debug(String.format("Dumping %d tweets and %d stock-ticks at %d", dbtweets.size(), 143 obj.secondObject().size(), timestamp)); 144 return dbobj; 145 } 146 147 @Override 148 public String getDBName() { 149 return "twitterticker"; 150 } 151 } 152 ); 153 } 154}