001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.stream.provider.twitter; 031 032import java.util.HashMap; 033import java.util.Map; 034 035import org.apache.log4j.Logger; 036import org.openimaj.data.dataset.StreamingDataset; 037import org.openimaj.twitter.utils.Twitter4jUtil; 038import org.openimaj.util.api.auth.common.TwitterAPIToken; 039import org.openimaj.util.concurrent.BlockingDroppingQueue; 040import org.openimaj.util.parallel.GlobalExecutorPool; 041import org.openimaj.util.stream.BlockingDroppingBufferedStream; 042import org.openimaj.util.stream.Stream; 043 044import twitter4j.Query; 045import twitter4j.QueryResult; 046import twitter4j.Status; 047import twitter4j.Twitter; 048import twitter4j.TwitterException; 049import twitter4j.TwitterFactory; 050import twitter4j.conf.Configuration; 051import twitter4j.conf.ConfigurationBuilder; 052import twitter4j.json.DataObjectFactory; 053 054/** 055 * Calls the {@link Twitter#search(Query)} function periodically and offers all 056 * discovered {@link Status} instances with the underlying 057 * {@link BlockingDroppingBufferedStream}. This stream initially takes all 058 * search results available and then attempts to retrieve results which occurred 059 * after the latest Tweet seen. Apart form this, this {@link Stream} makes no 060 * special efforts to: 061 * 062 * <ul> 063 * <li>stop duplicate items 064 * <li>stop when a search returns nothing 065 * <li>remove retweets or spam. 066 * </ul> 067 * These must all be handled externally! 068 * <p> 069 * This class is abstract in terms of the object the stream consumes. Instances 070 * of this class can construct the specific object they require for each twitter 071 * status using: 072 * <ul> 073 * <li>the query that created it 074 * <li>the status object 075 * <li>the raw json of the status 076 * </ul> 077 * 078 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 079 * 080 * @param <T> 081 * Type of items in the stream 082 */ 083public abstract class AbstractTwitterSearchDataset<T> extends BlockingDroppingBufferedStream<T> 084implements 085StreamingDataset<T> 086{ 087 088 final class TwitterAPIRunnable implements Runnable { 089 private final Twitter twitter; 090 091 private class QueryMetaInfo { 092 private long newestID = -1; 093 private long newest = -1; 094 private int backoff = 0; 095 } 096 097 private Map<Query, QueryMetaInfo> metaInfoMap; 098 099 private TwitterAPIRunnable(Twitter twitter) { 100 this.twitter = twitter; 101 metaInfoMap = new HashMap<Query, QueryMetaInfo>(); 102 } 103 104 @Override 105 public void run() { 106 while (true) { 107 try { 108 logger.debug("Querying..."); 109 final Query query = AbstractTwitterSearchDataset.this.getQuery(); 110 111 QueryMetaInfo metaInfo = metaInfoMap.get(query); 112 if (metaInfo == null) 113 metaInfoMap.put(query, metaInfo = new QueryMetaInfo()); 114 115 final QueryResult res = twitter.search(query); 116 if (metaInfo.newest != -1) { 117 query.sinceId(metaInfo.newestID + 1); 118 } 119 120 if (res.getCount() == 0) { 121 metaInfo.backoff++; 122 Thread.sleep(ZERO_RESULT_BACKOFF * metaInfo.backoff); 123 logger.error("Backing off"); 124 } else { 125 metaInfo.backoff = 0; 126 } 127 128 for (final Status status : res.getTweets()) { 129 String rawjson = DataObjectFactory.getRawJSON(status); 130 rawjson = DataObjectFactory.getRawJSON(status); 131 132 AbstractTwitterSearchDataset.this.registerStatus(query, status, rawjson); 133 final long tweetTime = status.getCreatedAt().getTime(); 134 135 if (tweetTime > metaInfo.newest) { 136 metaInfo.newest = tweetTime; 137 metaInfo.newestID = status.getId(); 138 } 139 } 140 141 Thread.sleep(SLEEP_PER_SEARCH); 142 } catch (final InterruptedException e) { 143 logger.error("Thread interuppted!", e); 144 close(); 145 } catch (final TwitterException e) { 146 final long waitTime = Twitter4jUtil.handleTwitterException(e, DEFAULT_ERROR_BUT_NO_WAIT_TIME); 147 try { 148 Thread.sleep(waitTime); 149 } catch (final InterruptedException e1) { 150 } 151 } 152 } 153 } 154 } 155 156 private static final long DEFAULT_ERROR_BUT_NO_WAIT_TIME = 5000; 157 private static final long SLEEP_PER_SEARCH = (1000 * 60) / 30l; 158 private static final int ZERO_RESULT_BACKOFF = 1000; 159 160 protected Query query; 161 protected Logger logger = Logger.getLogger(TwitterSearchDataset.class); 162 protected Configuration config; 163 protected Twitter twitter; 164 165 /** 166 * @param token 167 * @param buffer 168 */ 169 protected AbstractTwitterSearchDataset(TwitterAPIToken token, BlockingDroppingQueue<T> buffer, Query query) { 170 super(buffer); 171 172 this.config = makeConfiguration(token); 173 this.twitter = new TwitterFactory(config).getInstance(); 174 this.query = query; 175 176 startSearch(); 177 } 178 179 /** 180 * Handle the given incoming status and optionally {@link #register(Object)} 181 * it with the stream. 182 * 183 * @param query 184 * the query 185 * @param status 186 * the parsed {@link Status} 187 * @param rawjson 188 * the raw json string 189 * @throws InterruptedException 190 */ 191 protected abstract void registerStatus(Query query, Status status, String rawjson) throws InterruptedException; 192 193 private void startSearch() { 194 GlobalExecutorPool.getPool().execute(new TwitterAPIRunnable(twitter)); 195 } 196 197 /** 198 * Get the current query 199 * 200 * @return the query 201 */ 202 public abstract Query getQuery(); 203 204 protected Configuration makeConfiguration(TwitterAPIToken token) { 205 final ConfigurationBuilder cb = new ConfigurationBuilder() 206 .setOAuthConsumerKey(token.consumerKey) 207 .setOAuthConsumerSecret(token.consumerSecret) 208 .setOAuthAccessToken(token.accessToken) 209 .setOAuthAccessTokenSecret(token.accessSecret); 210 cb.setJSONStoreEnabled(true); 211 212 return cb.build(); 213 } 214 215 @Override 216 public T getRandomInstance() { 217 return this.next(); 218 } 219 220 @Override 221 public int numInstances() { 222 return Integer.MAX_VALUE; 223 } 224}