001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.stream.provider.twitter; 031 032import org.openimaj.data.dataset.StreamingDataset; 033import org.openimaj.util.api.auth.common.TwitterAPIToken; 034import org.openimaj.util.concurrent.BlockingDroppingQueue; 035import org.openimaj.util.stream.BlockingDroppingBufferedStream; 036 037import twitter4j.FilterQuery; 038import twitter4j.Status; 039import twitter4j.StatusAdapter; 040import twitter4j.TwitterStream; 041import twitter4j.TwitterStreamFactory; 042import twitter4j.conf.Configuration; 043import twitter4j.conf.ConfigurationBuilder; 044import twitter4j.json.DataObjectFactory; 045 046/** 047 * Base class for Live twitter streams based on the live Twitter streaming API. 048 * <p> 049 * This class is abstract in terms of the object the stream consumes. Instances 050 * of this class can construct the specific object they require for each twitter 051 * status using: 052 * <ul> 053 * <li>the status object 054 * <li>the raw json of the status 055 * </ul> 056 * 057 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 058 * @param <T> 059 * Type of items in the stream 060 */ 061public abstract class AbstractTwitterStatusesFilterDataset<T> extends BlockingDroppingBufferedStream<T> 062 implements 063 StreamingDataset<T> 064{ 065 /** 066 * Construct with the given Twitter API credentials and buffer 067 * 068 * @param query 069 * the query 070 * @param token 071 * the Twitter api authentication credentials 072 * @param buffer 073 * the backing buffer for storing data before consumption from 074 * the stream 075 */ 076 public AbstractTwitterStatusesFilterDataset(FilterQuery query, TwitterAPIToken token, BlockingDroppingQueue<T> buffer) 077 { 078 super(buffer); 079 080 final TwitterStream twitterStream = new TwitterStreamFactory(makeConfiguration(token)).getInstance(); 081 082 twitterStream.addListener(new StatusAdapter() { 083 @Override 084 public void onStatus(Status status) { 085 try { 086 registerStatus(status, DataObjectFactory.getRawJSON(status)); 087 } catch (final InterruptedException e) { 088 // ignore 089 } 090 } 091 }); 092 093 twitterStream.filter(query); 094 } 095 096 private Configuration makeConfiguration(TwitterAPIToken token) { 097 final ConfigurationBuilder cb = new ConfigurationBuilder() 098 .setOAuthConsumerKey(token.consumerKey) 099 .setOAuthConsumerSecret(token.consumerSecret) 100 .setOAuthAccessToken(token.accessToken) 101 .setOAuthAccessTokenSecret(token.accessSecret); 102 103 return cb.build(); 104 } 105 106 /** 107 * Handle the given incoming status and optionally {@link #register(Object)} 108 * it with the stream. 109 * 110 * @param status 111 * the parsed {@link Status} 112 * @param json 113 * the json string 114 * @throws InterruptedException 115 */ 116 protected abstract void registerStatus(Status status, String json) throws InterruptedException; 117 118 @Override 119 public T getRandomInstance() { 120 return this.next(); 121 } 122 123 @Override 124 public int numInstances() { 125 return Integer.MAX_VALUE; 126 } 127}