001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.demos.sandbox.ml.linear.learner.stream.recorder; 031 032import java.io.IOException; 033import java.net.MalformedURLException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.Map.Entry; 040 041import org.apache.commons.io.IOUtils; 042import org.apache.log4j.Logger; 043import org.openimaj.demos.sandbox.ml.linear.learner.stream.MongoDBOutputOp; 044import org.openimaj.demos.sandbox.ml.linear.learner.stream.YahooFinanceStream; 045import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.ContextTwitterStatusAsUSMFStatus; 046import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterPreprocessingFunction; 047import org.openimaj.stream.provider.twitter.ContextRoundRobinTwitterSearchAPIDataset; 048import org.openimaj.tools.twitter.modes.preprocessing.CountryCodeMode; 049import org.openimaj.tools.twitter.modes.preprocessing.LanguageDetectionMode; 050import org.openimaj.tools.twitter.modes.preprocessing.StopwordMode; 051import org.openimaj.tools.twitter.modes.preprocessing.TokeniseMode; 052import org.openimaj.twitter.USMFStatus; 053import org.openimaj.util.api.auth.DefaultTokenFactory; 054import org.openimaj.util.api.auth.common.TwitterAPIToken; 055import org.openimaj.util.concurrent.ArrayBlockingDroppingQueue; 056import org.openimaj.util.data.Context; 057import org.openimaj.util.function.context.ContextFunctionAdaptor; 058import org.openimaj.util.function.context.ContextListFunction; 059import org.openimaj.util.pair.IndependentPair; 060import org.openimaj.util.stream.Stream; 061import org.openimaj.util.stream.combine.StreamCombiner; 062import org.openimaj.util.stream.window.ContextRealTimeWindowFunction; 063 064import twitter4j.GeoLocation; 065import twitter4j.Query; 066 067import com.mongodb.BasicDBObject; 068import com.mongodb.DBObject; 069import com.mongodb.ServerAddress; 070import com.mongodb.util.JSON; 071 072/** 073 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 074 * 075 */ 076public class GeoFinancialSearchAPIRecorder { 077 static Logger logger = Logger.getLogger(GeoFinancialSearchAPIRecorder.class); 078 079 /** 080 * @param args 081 * @throws MalformedURLException 082 * @throws IOException 083 */ 084 public static void main(String[] args) throws MalformedURLException, IOException { 085 086 // The financial stream 087 final String[] tickers = new String[] { 088 "apple", "google", "virgin", "oracle", "sony", "microsoft" 089 }; 090 final ContextRealTimeWindowFunction<Map<String, Double>> yahooWindow = new ContextRealTimeWindowFunction<Map<String, Double>>( 091 5000); 092 final Stream<Context> yahooAveragePriceStream = new YahooFinanceStream(true, tickers).transform(yahooWindow); 093 094 final List<Map<String, String>> geoLocs = loadGeoLocs("/org/openimaj/demos/sandbox/ml/linear/learner/stream/locations_input_srv_II.txt"); 095 096 // The Twitter Stream 097 final ArrayBlockingDroppingQueue<Context> buffer = new ArrayBlockingDroppingQueue<Context>(1000); 098 final LanguageDetectionMode languageDetectionMode = new LanguageDetectionMode(); 099 final StopwordMode stopwordMode = new StopwordMode(); 100 final TokeniseMode tokeniseMode = new TokeniseMode(); 101 final CountryCodeMode ccm = new CountryCodeMode(); 102 103 final Stream<Context> twitterUserWordCountStream = 104 new ContextRoundRobinTwitterSearchAPIDataset( 105 geoLocQueries(geoLocs), 106 DefaultTokenFactory.get(TwitterAPIToken.class), 107 buffer 108 ) 109 .transform(new ContextRealTimeWindowFunction<Context>(10000)) 110 .map( 111 new ContextListFunction<Context, Context>(new ContextTwitterStatusAsUSMFStatus(), 112 "item" 113 ) 114 ) 115 .map( 116 new ContextListFunction<Context, Context>(new ContextFunctionAdaptor<USMFStatus, USMFStatus>("usmfstatus", 117 new TwitterPreprocessingFunction(languageDetectionMode, tokeniseMode, 118 stopwordMode) 119 ), 120 "item" 121 ) 122 ); 123 // twitterUserWordCountStream.forEach(new Operation<Context>() { 124 // 125 // @Override 126 // public void perform(Context objectTime) { 127 // List<Context> object = objectTime.getTyped("item"); 128 // for (Context statusContext : object) { 129 // USMFStatus usmfStatus = statusContext.getTyped("usmfstatus"); 130 // System.out.format("@%s (location: %s): (geo: %s) (place: %s)\n",usmfStatus.user.name,usmfStatus.user.location,usmfStatus.geo,usmfStatus.location); 131 // } 132 // } 133 // 134 // }); 135 final List<ServerAddress> serverList = Arrays.asList( 136 new ServerAddress("rumi", 27017), 137 new ServerAddress("hafez", 27017) 138 ); 139 StreamCombiner.combine(twitterUserWordCountStream, yahooAveragePriceStream) 140 .forEach( 141 new MongoDBOutputOp< 142 IndependentPair< 143 Context, 144 Context 145 >> 146 (serverList) 147 { 148 149 @Override 150 public String getCollectionName() { 151 return "searchapi_yahoo_billgeo"; 152 } 153 154 @Override 155 public DBObject asDBObject(IndependentPair<Context, Context> obj) { 156 final BasicDBObject dbobj = new BasicDBObject(); 157 final List<Context> tweets = obj.firstObject().getTyped("item"); 158 final List<Object> dbtweets = new ArrayList<Object>(); 159 final List<Object> normaltweets = new ArrayList<Object>(); 160 String actualQuery = null; 161 final HashMap<String, List<Integer>> queries = new HashMap<String, List<Integer>>(); 162 int item = 0; 163 for (final Context tweetContext : tweets) { 164 final USMFStatus usmfStatus = tweetContext.getTyped("usmfstatus"); 165 dbtweets.add(JSON.parse(usmfStatus.toJson())); 166 normaltweets.add(JSON.parse((String) tweetContext.getTyped("status_json"))); 167 actualQuery = ((Query) tweetContext.getTyped("query")).getGeocode(); 168 List<Integer> itemMap = queries.get(actualQuery); 169 if (itemMap == null) 170 queries.put(actualQuery, itemMap = new ArrayList<Integer>()); 171 itemMap.add(item++); 172 } 173 if (actualQuery != null) { 174 dbobj.append("twitter_query", prepareQueries(queries)); 175 } 176 dbobj.append("tweets", dbtweets); 177 dbobj.append("tweets_raw", normaltweets); 178 dbobj.append("search", "bill area code"); 179 final List<?> stockTicks = obj.getSecondObject().getTyped("item"); 180 dbobj.append("tickers", stockTicks); 181 final long timestamp = (Long) obj.getSecondObject().getTyped("windowstart"); 182 dbobj.append("timestamp", timestamp); 183 logger.debug(String.format("Dumping %d tweets and %d stock-ticks at %d with %d queries", 184 dbtweets.size(), stockTicks.size(), timestamp, queries.size())); 185 return dbobj; 186 } 187 188 private List<Map<String, Object>> prepareQueries(HashMap<String, List<Integer>> queries) { 189 final List<Map<String, Object>> ret = new ArrayList<Map<String, Object>>(); 190 for (final Entry<String, List<Integer>> ql : queries.entrySet()) { 191 final Map<String, Object> map = new HashMap<String, Object>(); 192 map.put("geoquery", ql.getKey()); 193 map.put("applies_to", ql.getValue()); 194 ret.add(map); 195 } 196 return ret; 197 } 198 199 @Override 200 public String getDBName() { 201 return "twitterticker"; 202 } 203 } 204 ); 205 } 206 207 private static List<Map<String, String>> loadGeoLocs(String resource) { 208 try { 209 final String rawLocations = IOUtils.toString(GeoFinancialSearchAPIRecorder.class 210 .getResourceAsStream(resource)); 211 final String[] lines = rawLocations.split("\n"); 212 final List<Map<String, String>> ret = new ArrayList<Map<String, String>>(); 213 for (final String line : lines) { 214 final Map<String, String> location = new HashMap<String, String>(); 215 final String[] fileLoc = line.split("/"); 216 final String infoPart = fileLoc[fileLoc.length - 1]; 217 final String[] infoParts = infoPart.split(","); 218 location.put("country", infoParts[0]); 219 location.put("city", infoParts[1]); 220 location.put("lat", infoParts[2]); 221 location.put("lon", infoParts[3]); 222 location.put("rad", infoParts[4]); 223 location.put("lang", infoParts[5]); 224 location.put("group", infoParts[6]); 225 ret.add(location); 226 } 227 return ret; 228 } catch (final Throwable e) { 229 return null; 230 } 231 } 232 233 private static List<Query> geoLocQueries(List<Map<String, String>> geolocs) { 234 final List<Query> ret = new ArrayList<Query>(); 235 for (final Map<String, String> geoloc : geolocs) { 236 final Query q = new Query(); 237 q.geoCode( 238 new GeoLocation( 239 Double.parseDouble(geoloc.get("lat")), 240 Double.parseDouble(geoloc.get("lon")) 241 ), 242 Double.parseDouble(geoloc.get("rad")), 243 Query.KILOMETERS 244 ); 245 ret.add(q); 246 } 247 return ret; 248 } 249}