001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.stream.provider.twitter; 031 032import org.openimaj.data.dataset.StreamingDataset; 033import org.openimaj.util.api.auth.common.TwitterAPIToken; 034import org.openimaj.util.concurrent.BlockingDroppingQueue; 035import org.openimaj.util.stream.BlockingDroppingBufferedStream; 036 037import twitter4j.Status; 038import twitter4j.StatusAdapter; 039import twitter4j.TwitterStream; 040import twitter4j.TwitterStreamFactory; 041import twitter4j.conf.Configuration; 042import twitter4j.conf.ConfigurationBuilder; 043import twitter4j.json.DataObjectFactory; 044 045/** 046 * Base class for Live twitter streams based on the live Twitter streaming API. 047 * <p> 048 * This class is abstract in terms of the object the stream consumes. Instances 049 * of this class can construct the specific object they require for each twitter 050 * status using: 051 * <ul> 052 * <li>the status object 053 * <li>the raw json of the status 054 * </ul> 055 * 056 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 057 * @param <T> 058 * Type of items in the stream 059 */ 060public abstract class AbstractTwitterStreamDataset<T> extends BlockingDroppingBufferedStream<T> 061 implements 062 StreamingDataset<T> 063{ 064 /** 065 * Construct with the given Twitter API credentials and buffer 066 * 067 * @param token 068 * the Twitter api authentication credentials 069 * @param buffer 070 * the backing buffer for storing data before consumption from 071 * the stream 072 */ 073 public AbstractTwitterStreamDataset(TwitterAPIToken token, BlockingDroppingQueue<T> buffer) { 074 super(buffer); 075 076 final TwitterStream twitterStream = new TwitterStreamFactory(makeConfiguration(token)).getInstance(); 077 078 twitterStream.addListener(new StatusAdapter() { 079 @Override 080 public void onStatus(Status status) { 081 try { 082 registerStatus(status, DataObjectFactory.getRawJSON(status)); 083 } catch (final InterruptedException e) { 084 // ignore 085 } 086 } 087 }); 088 089 twitterStream.sample(); 090 } 091 092 private Configuration makeConfiguration(TwitterAPIToken token) { 093 final ConfigurationBuilder cb = new ConfigurationBuilder() 094 .setOAuthConsumerKey(token.consumerKey) 095 .setOAuthConsumerSecret(token.consumerSecret) 096 .setOAuthAccessToken(token.accessToken) 097 .setOAuthAccessTokenSecret(token.accessSecret); 098 099 return cb.build(); 100 } 101 102 /** 103 * Handle the given incoming status and optionally {@link #register(Object)} 104 * it with the stream. 105 * 106 * @param status 107 * the parsed {@link Status} 108 * @param json 109 * the json string 110 * @throws InterruptedException 111 */ 112 protected abstract void registerStatus(Status status, String json) throws InterruptedException; 113 114 @Override 115 public T getRandomInstance() { 116 return this.next(); 117 } 118 119 @Override 120 public int numInstances() { 121 return Integer.MAX_VALUE; 122 } 123}