001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.demos.sandbox.ml.linear.learner.stream.recorder;
031
032import java.io.IOException;
033import java.net.MalformedURLException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039import java.util.Map.Entry;
040
041import org.apache.commons.io.IOUtils;
042import org.apache.log4j.Logger;
043import org.openimaj.demos.sandbox.ml.linear.learner.stream.MongoDBOutputOp;
044import org.openimaj.demos.sandbox.ml.linear.learner.stream.YahooFinanceStream;
045import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.ContextTwitterStatusAsUSMFStatus;
046import org.openimaj.demos.sandbox.ml.linear.learner.stream.twitter.TwitterPreprocessingFunction;
047import org.openimaj.stream.provider.twitter.ContextRoundRobinTwitterSearchAPIDataset;
048import org.openimaj.tools.twitter.modes.preprocessing.CountryCodeMode;
049import org.openimaj.tools.twitter.modes.preprocessing.LanguageDetectionMode;
050import org.openimaj.tools.twitter.modes.preprocessing.StopwordMode;
051import org.openimaj.tools.twitter.modes.preprocessing.TokeniseMode;
052import org.openimaj.twitter.USMFStatus;
053import org.openimaj.util.api.auth.DefaultTokenFactory;
054import org.openimaj.util.api.auth.common.TwitterAPIToken;
055import org.openimaj.util.concurrent.ArrayBlockingDroppingQueue;
056import org.openimaj.util.data.Context;
057import org.openimaj.util.function.context.ContextFunctionAdaptor;
058import org.openimaj.util.function.context.ContextListFunction;
059import org.openimaj.util.pair.IndependentPair;
060import org.openimaj.util.stream.Stream;
061import org.openimaj.util.stream.combine.StreamCombiner;
062import org.openimaj.util.stream.window.ContextRealTimeWindowFunction;
063
064import twitter4j.GeoLocation;
065import twitter4j.Query;
066
067import com.mongodb.BasicDBObject;
068import com.mongodb.DBObject;
069import com.mongodb.ServerAddress;
070import com.mongodb.util.JSON;
071
072/**
073 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
074 * 
075 */
076public class GeoFinancialSearchAPIRecorder {
077        static Logger logger = Logger.getLogger(GeoFinancialSearchAPIRecorder.class);
078
079        /**
080         * @param args
081         * @throws MalformedURLException
082         * @throws IOException
083         */
084        public static void main(String[] args) throws MalformedURLException, IOException {
085
086                // The financial stream
087                final String[] tickers = new String[] {
088                                "apple", "google", "virgin", "oracle", "sony", "microsoft"
089                };
090                final ContextRealTimeWindowFunction<Map<String, Double>> yahooWindow = new ContextRealTimeWindowFunction<Map<String, Double>>(
091                                5000);
092                final Stream<Context> yahooAveragePriceStream = new YahooFinanceStream(true, tickers).transform(yahooWindow);
093
094                final List<Map<String, String>> geoLocs = loadGeoLocs("/org/openimaj/demos/sandbox/ml/linear/learner/stream/locations_input_srv_II.txt");
095
096                // The Twitter Stream
097                final ArrayBlockingDroppingQueue<Context> buffer = new ArrayBlockingDroppingQueue<Context>(1000);
098                final LanguageDetectionMode languageDetectionMode = new LanguageDetectionMode();
099                final StopwordMode stopwordMode = new StopwordMode();
100                final TokeniseMode tokeniseMode = new TokeniseMode();
101                final CountryCodeMode ccm = new CountryCodeMode();
102
103                final Stream<Context> twitterUserWordCountStream =
104                                new ContextRoundRobinTwitterSearchAPIDataset(
105                                                geoLocQueries(geoLocs),
106                                                DefaultTokenFactory.get(TwitterAPIToken.class),
107                                                buffer
108                                )
109                                                .transform(new ContextRealTimeWindowFunction<Context>(10000))
110                                                .map(
111                                                                new ContextListFunction<Context, Context>(new ContextTwitterStatusAsUSMFStatus(),
112                                                                                "item"
113                                                                )
114                                                )
115                                                .map(
116                                                                new ContextListFunction<Context, Context>(new ContextFunctionAdaptor<USMFStatus, USMFStatus>("usmfstatus",
117                                                                                new TwitterPreprocessingFunction(languageDetectionMode, tokeniseMode,
118                                                                                                stopwordMode)
119                                                                ),
120                                                                                "item"
121                                                                )
122                                                );
123                // twitterUserWordCountStream.forEach(new Operation<Context>() {
124                //
125                // @Override
126                // public void perform(Context objectTime) {
127                // List<Context> object = objectTime.getTyped("item");
128                // for (Context statusContext : object) {
129                // USMFStatus usmfStatus = statusContext.getTyped("usmfstatus");
130                // System.out.format("@%s (location: %s): (geo: %s) (place: %s)\n",usmfStatus.user.name,usmfStatus.user.location,usmfStatus.geo,usmfStatus.location);
131                // }
132                // }
133                //
134                // });
135                final List<ServerAddress> serverList = Arrays.asList(
136                                new ServerAddress("rumi", 27017),
137                                new ServerAddress("hafez", 27017)
138                                );
139                StreamCombiner.combine(twitterUserWordCountStream, yahooAveragePriceStream)
140                                .forEach(
141                                                new MongoDBOutputOp<
142                                                IndependentPair<
143                                                Context,
144                                                Context
145                                                >>
146                                                (serverList)
147                                                {
148
149                                                        @Override
150                                                        public String getCollectionName() {
151                                                                return "searchapi_yahoo_billgeo";
152                                                        }
153
154                                                        @Override
155                                                        public DBObject asDBObject(IndependentPair<Context, Context> obj) {
156                                                                final BasicDBObject dbobj = new BasicDBObject();
157                                                                final List<Context> tweets = obj.firstObject().getTyped("item");
158                                                                final List<Object> dbtweets = new ArrayList<Object>();
159                                                                final List<Object> normaltweets = new ArrayList<Object>();
160                                                                String actualQuery = null;
161                                                                final HashMap<String, List<Integer>> queries = new HashMap<String, List<Integer>>();
162                                                                int item = 0;
163                                                                for (final Context tweetContext : tweets) {
164                                                                        final USMFStatus usmfStatus = tweetContext.getTyped("usmfstatus");
165                                                                        dbtweets.add(JSON.parse(usmfStatus.toJson()));
166                                                                        normaltweets.add(JSON.parse((String) tweetContext.getTyped("status_json")));
167                                                                        actualQuery = ((Query) tweetContext.getTyped("query")).getGeocode();
168                                                                        List<Integer> itemMap = queries.get(actualQuery);
169                                                                        if (itemMap == null)
170                                                                                queries.put(actualQuery, itemMap = new ArrayList<Integer>());
171                                                                        itemMap.add(item++);
172                                                                }
173                                                                if (actualQuery != null) {
174                                                                        dbobj.append("twitter_query", prepareQueries(queries));
175                                                                }
176                                                                dbobj.append("tweets", dbtweets);
177                                                                dbobj.append("tweets_raw", normaltweets);
178                                                                dbobj.append("search", "bill area code");
179                                                                final List<?> stockTicks = obj.getSecondObject().getTyped("item");
180                                                                dbobj.append("tickers", stockTicks);
181                                                                final long timestamp = (Long) obj.getSecondObject().getTyped("windowstart");
182                                                                dbobj.append("timestamp", timestamp);
183                                                                logger.debug(String.format("Dumping %d tweets and %d stock-ticks at %d with %d queries",
184                                                                                dbtweets.size(), stockTicks.size(), timestamp, queries.size()));
185                                                                return dbobj;
186                                                        }
187
188                                                        private List<Map<String, Object>> prepareQueries(HashMap<String, List<Integer>> queries) {
189                                                                final List<Map<String, Object>> ret = new ArrayList<Map<String, Object>>();
190                                                                for (final Entry<String, List<Integer>> ql : queries.entrySet()) {
191                                                                        final Map<String, Object> map = new HashMap<String, Object>();
192                                                                        map.put("geoquery", ql.getKey());
193                                                                        map.put("applies_to", ql.getValue());
194                                                                        ret.add(map);
195                                                                }
196                                                                return ret;
197                                                        }
198
199                                                        @Override
200                                                        public String getDBName() {
201                                                                return "twitterticker";
202                                                        }
203                                                }
204                                );
205        }
206
207        private static List<Map<String, String>> loadGeoLocs(String resource) {
208                try {
209                        final String rawLocations = IOUtils.toString(GeoFinancialSearchAPIRecorder.class
210                                        .getResourceAsStream(resource));
211                        final String[] lines = rawLocations.split("\n");
212                        final List<Map<String, String>> ret = new ArrayList<Map<String, String>>();
213                        for (final String line : lines) {
214                                final Map<String, String> location = new HashMap<String, String>();
215                                final String[] fileLoc = line.split("/");
216                                final String infoPart = fileLoc[fileLoc.length - 1];
217                                final String[] infoParts = infoPart.split(",");
218                                location.put("country", infoParts[0]);
219                                location.put("city", infoParts[1]);
220                                location.put("lat", infoParts[2]);
221                                location.put("lon", infoParts[3]);
222                                location.put("rad", infoParts[4]);
223                                location.put("lang", infoParts[5]);
224                                location.put("group", infoParts[6]);
225                                ret.add(location);
226                        }
227                        return ret;
228                } catch (final Throwable e) {
229                        return null;
230                }
231        }
232
233        private static List<Query> geoLocQueries(List<Map<String, String>> geolocs) {
234                final List<Query> ret = new ArrayList<Query>();
235                for (final Map<String, String> geoloc : geolocs) {
236                        final Query q = new Query();
237                        q.geoCode(
238                                        new GeoLocation(
239                                                        Double.parseDouble(geoloc.get("lat")),
240                                                        Double.parseDouble(geoloc.get("lon"))
241                                        ),
242                                        Double.parseDouble(geoloc.get("rad")),
243                                        Query.KILOMETERS
244                                        );
245                        ret.add(q);
246                }
247                return ret;
248        }
249}