001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.stream.provider.twitter;
031
032import java.util.HashMap;
033import java.util.Map;
034
035import org.apache.log4j.Logger;
036import org.openimaj.data.dataset.StreamingDataset;
037import org.openimaj.twitter.utils.Twitter4jUtil;
038import org.openimaj.util.api.auth.common.TwitterAPIToken;
039import org.openimaj.util.concurrent.BlockingDroppingQueue;
040import org.openimaj.util.parallel.GlobalExecutorPool;
041import org.openimaj.util.stream.BlockingDroppingBufferedStream;
042import org.openimaj.util.stream.Stream;
043
044import twitter4j.Query;
045import twitter4j.QueryResult;
046import twitter4j.Status;
047import twitter4j.Twitter;
048import twitter4j.TwitterException;
049import twitter4j.TwitterFactory;
050import twitter4j.conf.Configuration;
051import twitter4j.conf.ConfigurationBuilder;
052import twitter4j.json.DataObjectFactory;
053
054/**
055 * Calls the {@link Twitter#search(Query)} function periodically and offers all
056 * discovered {@link Status} instances with the underlying
057 * {@link BlockingDroppingBufferedStream}. This stream initially takes all
058 * search results available and then attempts to retrieve results which occurred
059 * after the latest Tweet seen. Apart form this, this {@link Stream} makes no
060 * special efforts to:
061 *
062 * <ul>
063 * <li>stop duplicate items
064 * <li>stop when a search returns nothing
065 * <li>remove retweets or spam.
066 * </ul>
067 * These must all be handled externally!
068 * <p>
069 * This class is abstract in terms of the object the stream consumes. Instances
070 * of this class can construct the specific object they require for each twitter
071 * status using:
072 * <ul>
073 * <li>the query that created it
074 * <li>the status object
075 * <li>the raw json of the status
076 * </ul>
077 *
078 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
079 *
080 * @param <T>
081 *            Type of items in the stream
082 */
083public abstract class AbstractTwitterSearchDataset<T> extends BlockingDroppingBufferedStream<T>
084implements
085StreamingDataset<T>
086{
087
088        final class TwitterAPIRunnable implements Runnable {
089                private final Twitter twitter;
090
091                private class QueryMetaInfo {
092                        private long newestID = -1;
093                        private long newest = -1;
094                        private int backoff = 0;
095                }
096
097                private Map<Query, QueryMetaInfo> metaInfoMap;
098
099                private TwitterAPIRunnable(Twitter twitter) {
100                        this.twitter = twitter;
101                        metaInfoMap = new HashMap<Query, QueryMetaInfo>();
102                }
103
104                @Override
105                public void run() {
106                        while (true) {
107                                try {
108                                        logger.debug("Querying...");
109                                        final Query query = AbstractTwitterSearchDataset.this.getQuery();
110
111                                        QueryMetaInfo metaInfo = metaInfoMap.get(query);
112                                        if (metaInfo == null)
113                                                metaInfoMap.put(query, metaInfo = new QueryMetaInfo());
114
115                                        final QueryResult res = twitter.search(query);
116                                        if (metaInfo.newest != -1) {
117                                                query.sinceId(metaInfo.newestID + 1);
118                                        }
119
120                                        if (res.getCount() == 0) {
121                                                metaInfo.backoff++;
122                                                Thread.sleep(ZERO_RESULT_BACKOFF * metaInfo.backoff);
123                                                logger.error("Backing off");
124                                        } else {
125                                                metaInfo.backoff = 0;
126                                        }
127
128                                        for (final Status status : res.getTweets()) {
129                                                String rawjson = DataObjectFactory.getRawJSON(status);
130                                                rawjson = DataObjectFactory.getRawJSON(status);
131
132                                                AbstractTwitterSearchDataset.this.registerStatus(query, status, rawjson);
133                                                final long tweetTime = status.getCreatedAt().getTime();
134
135                                                if (tweetTime > metaInfo.newest) {
136                                                        metaInfo.newest = tweetTime;
137                                                        metaInfo.newestID = status.getId();
138                                                }
139                                        }
140
141                                        Thread.sleep(SLEEP_PER_SEARCH);
142                                } catch (final InterruptedException e) {
143                                        logger.error("Thread interuppted!", e);
144                                        close();
145                                } catch (final TwitterException e) {
146                                        final long waitTime = Twitter4jUtil.handleTwitterException(e, DEFAULT_ERROR_BUT_NO_WAIT_TIME);
147                                        try {
148                                                Thread.sleep(waitTime);
149                                        } catch (final InterruptedException e1) {
150                                        }
151                                }
152                        }
153                }
154        }
155
156        private static final long DEFAULT_ERROR_BUT_NO_WAIT_TIME = 5000;
157        private static final long SLEEP_PER_SEARCH = (1000 * 60) / 30l;
158        private static final int ZERO_RESULT_BACKOFF = 1000;
159
160        protected Query query;
161        protected Logger logger = Logger.getLogger(TwitterSearchDataset.class);
162        protected Configuration config;
163        protected Twitter twitter;
164
165        /**
166         * @param token
167         * @param buffer
168         */
169        protected AbstractTwitterSearchDataset(TwitterAPIToken token, BlockingDroppingQueue<T> buffer, Query query) {
170                super(buffer);
171
172                this.config = makeConfiguration(token);
173                this.twitter = new TwitterFactory(config).getInstance();
174                this.query = query;
175
176                startSearch();
177        }
178
179        /**
180         * Handle the given incoming status and optionally {@link #register(Object)}
181         * it with the stream.
182         *
183         * @param query
184         *            the query
185         * @param status
186         *            the parsed {@link Status}
187         * @param rawjson
188         *            the raw json string
189         * @throws InterruptedException
190         */
191        protected abstract void registerStatus(Query query, Status status, String rawjson) throws InterruptedException;
192
193        private void startSearch() {
194                GlobalExecutorPool.getPool().execute(new TwitterAPIRunnable(twitter));
195        }
196
197        /**
198         * Get the current query
199         *
200         * @return the query
201         */
202        public abstract Query getQuery();
203
204        protected Configuration makeConfiguration(TwitterAPIToken token) {
205                final ConfigurationBuilder cb = new ConfigurationBuilder()
206                .setOAuthConsumerKey(token.consumerKey)
207                .setOAuthConsumerSecret(token.consumerSecret)
208                .setOAuthAccessToken(token.accessToken)
209                .setOAuthAccessTokenSecret(token.accessSecret);
210                cb.setJSONStoreEnabled(true);
211
212                return cb.build();
213        }
214
215        @Override
216        public T getRandomInstance() {
217                return this.next();
218        }
219
220        @Override
221        public int numInstances() {
222                return Integer.MAX_VALUE;
223        }
224}