001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.demos.sandbox.ml.linear.learner.stream.recorder; 031 032import java.io.IOException; 033import java.net.MalformedURLException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.List; 037import java.util.Map; 038 039import org.apache.commons.lang.StringUtils; 040import org.apache.log4j.Logger; 041import org.openimaj.demos.sandbox.ml.linear.learner.stream.MongoDBOutputOp; 042import org.openimaj.demos.sandbox.ml.linear.learner.stream.YahooFinanceStream; 043import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterPredicateFunction; 044import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterPreprocessingFunction; 045import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterStatusAsUSMFStatus; 046import org.openimaj.stream.provider.twitter.TwitterSearchDataset; 047import org.openimaj.tools.twitter.modes.filter.LanguageFilter; 048import org.openimaj.tools.twitter.modes.preprocessing.LanguageDetectionMode; 049import org.openimaj.tools.twitter.modes.preprocessing.StopwordMode; 050import org.openimaj.tools.twitter.modes.preprocessing.TokeniseMode; 051import org.openimaj.twitter.USMFStatus; 052import org.openimaj.util.api.auth.DefaultTokenFactory; 053import org.openimaj.util.api.auth.common.TwitterAPIToken; 054import org.openimaj.util.concurrent.ArrayBlockingDroppingQueue; 055import org.openimaj.util.pair.IndependentPair; 056import org.openimaj.util.stream.Stream; 057import org.openimaj.util.stream.combine.StreamCombiner; 058import org.openimaj.util.stream.window.RealTimeWindowFunction; 059import org.openimaj.util.stream.window.Window; 060import org.openimaj.util.stream.window.WindowFilter; 061import org.openimaj.util.stream.window.WindowFunction; 062 063import twitter4j.Query; 064import twitter4j.Status; 065 066import com.mongodb.BasicDBObject; 067import com.mongodb.DBObject; 068import com.mongodb.ServerAddress; 069import com.mongodb.util.JSON; 070 071/** 072 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 073 * 074 */ 075public class FinancialSearchAPIRecorder { 076 static Logger logger = Logger.getLogger(FinancialSearchAPIRecorder.class); 077 /** 078 * @param args 079 * @throws MalformedURLException 080 * @throws IOException 081 */ 082 public static void main(String[] args) throws MalformedURLException, IOException { 083 084 // The financial stream 085 String[] tickers = new String[]{ 086 "AAPL","GOOG","GE","GM","TWX" 087 }; 088 RealTimeWindowFunction<Map<String,Double>> yahooWindow = new RealTimeWindowFunction<Map<String,Double>>(5000); 089 Stream<Window<Map<String, Double>,Long>> yahooAveragePriceStream = new YahooFinanceStream(tickers).transform(yahooWindow); 090 091 // The Twitter Stream 092 final ArrayBlockingDroppingQueue<Status> buffer = new ArrayBlockingDroppingQueue<Status>(1); 093 final LanguageDetectionMode languageDetectionMode = new LanguageDetectionMode(); 094 final StopwordMode stopwordMode = new StopwordMode(); 095 final TokeniseMode tokeniseMode = new TokeniseMode(); 096 097 final String queryStr = StringUtils.join(dollar(tickers), " OR "); 098 Stream<Window<USMFStatus,Long>> twitterUserWordCountStream = new TwitterSearchDataset( 099 new Query(queryStr),DefaultTokenFactory.get(TwitterAPIToken.class),buffer 100 ) 101 .transform(new RealTimeWindowFunction<Status>(10000)) 102 .map(new WindowFunction<Status,USMFStatus,Long>(new TwitterStatusAsUSMFStatus())) 103 .map(new WindowFunction<USMFStatus,USMFStatus,Long>(new TwitterPreprocessingFunction(languageDetectionMode,tokeniseMode,stopwordMode))) 104 .map(new WindowFilter<USMFStatus,Long>(new TwitterPredicateFunction(new LanguageFilter("en")))); 105 106// twitterUserWordCountStream.forEach(new Operation<List<USMFStatus>>() { 107// 108// @Override 109// public void perform(List<USMFStatus> object) { 110// for (USMFStatus usmfStatus : object) { 111// System.out.format("@%s: %s\n",usmfStatus.user.name,usmfStatus.text); 112// } 113// } 114// }); 115 List<ServerAddress> serverList = Arrays.asList( 116 new ServerAddress("rumi",27017), 117 new ServerAddress("hafez",27017) 118 ); 119 StreamCombiner.combine(twitterUserWordCountStream,yahooAveragePriceStream) 120 .forEach( 121 new MongoDBOutputOp< 122 IndependentPair< 123 Window<USMFStatus,Long>, 124 Window<Map<String,Double>,Long> 125 >> 126 (serverList) { 127 128 @Override 129 public String getCollectionName() { 130 return "searchapi_yahoo"; 131 } 132 133 @Override 134 public DBObject asDBObject(IndependentPair<Window<USMFStatus,Long>,Window<Map<String,Double>,Long>> obj) { 135 BasicDBObject dbobj = new BasicDBObject(); 136 List<USMFStatus> tweets = obj.firstObject().getPayload(); 137 List<Object> dbtweets = new ArrayList<Object>(); 138 for (USMFStatus usmfStatus : tweets) { 139 dbtweets.add(JSON.parse(usmfStatus.toJson())); 140 } 141 dbobj.append("tweets", dbtweets); 142 dbobj.append("search", queryStr); 143 dbobj.append("tickers", obj.secondObject().getPayload()); 144 long timestamp = obj.secondObject().getMeta(); 145 dbobj.append("timestamp", timestamp); 146 logger.debug(String.format("Dumping %d tweets and %d stock-ticks at %d",dbtweets.size(),obj.secondObject().getPayload().size(),timestamp)); 147 return dbobj; 148 } 149 150 @Override 151 public String getDBName() { 152 return "twitterticker"; 153 } 154 } 155 ); 156 } 157 private static String[] dollar(String[] tickers) { 158 String[] ret = new String[tickers.length]; 159 for (int i = 0; i < tickers.length; i++) { 160 ret[i] = "$" + tickers[i]; 161 } 162 return ret; 163 } 164}