001package org.openimaj.picslurper; 002 003import java.util.Comparator; 004import java.util.concurrent.Executors; 005import java.util.concurrent.ThreadPoolExecutor; 006 007import org.apache.log4j.Logger; 008import org.openimaj.util.function.Operation; 009import org.openimaj.util.parallel.GlobalExecutorPool.DaemonThreadFactory; 010import org.openimaj.util.parallel.Parallel; 011import org.openimaj.util.parallel.partition.FixedSizeBlockingChunkPartitioner; 012import org.openimaj.util.queue.BoundedPriorityQueue; 013 014import twitter4j.StallWarning; 015import twitter4j.Status; 016import twitter4j.StatusDeletionNotice; 017import twitter4j.StatusListener; 018import twitter4j.TwitterException; 019import twitter4j.TwitterStream; 020import twitter4j.TwitterStreamFactory; 021import twitter4j.auth.AccessToken; 022 023/** 024 * Uses the {@link TwitterStreamFactory} of twitter4j and oAuth using 025 * 026 * @author Jonathan Hare (jsh2@ecs.soton.ac.uk), Sina Samangooei 027 * (ss@ecs.soton.ac.uk), David Duplaw (dpd@ecs.soton.ac.uk) 028 * 029 */ 030public class Twitter4JStreamFeeder implements StatusFeeder { 031 Logger logger = Logger.getLogger(Twitter4JStreamFeeder.class); 032 033 private final class StatusTimeComparator implements Comparator<Status> { 034 035 @Override 036 public int compare(Status o1, Status o2) { 037 if (o2 == null && o1 == null) { 038 return 0; 039 } 040 if (o2 == null) { 041 return -1; 042 } 043 if (o1 == null) { 044 return 1; 045 } 046 047 final long id1 = o1.getId(); 048 final long id2 = o2.getId(); 049 if (id1 < id2) { 050 return -1; 051 } else if (id1 > id2) { 052 return 1; 053 } 054 055 return 0; 056 } 057 058 } 059 060 private final class PriorityQueueStatusListener implements StatusListener { 061 private BoundedPriorityQueue<Status> queue; 062 063 public PriorityQueueStatusListener(final PicSlurper slurper) { 064 this.queue = new BoundedPriorityQueue<Status>(1000, new StatusTimeComparator()); 065 // Start a thread which feeds the slurper from the queue 066 // final ThreadPoolExecutor pool = GlobalExecutorPool.getPool(); 067 final ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(slurper.nThreads, 068 new DaemonThreadFactory()); 069 final FixedSizeBlockingChunkPartitioner<Status> partitioner = new FixedSizeBlockingChunkPartitioner<Status>( 070 this.queue); 071 final Operation<Status> r = new Operation<Status>() { 072 073 @Override 074 public void perform(Status s) { 075 slurper.handleStatus(s); 076 } 077 078 }; 079 new Thread(new Runnable() { 080 081 @Override 082 public void run() { 083 Parallel.forEach(partitioner, r, pool); 084 } 085 }).start(); 086 087 } 088 089 @Override 090 public void onStatus(Status status) { 091 if (status.getURLEntities() != null && status.getURLEntities().length != 0) { 092 synchronized (queue) { 093 queue.add(status); 094 } 095 logger.debug("Adding status to queue, current queue size: " + queue.size()); 096 // try { 097 // Thread.sleep(100); 098 // } catch (InterruptedException e) { 099 // } 100 } 101 102 } 103 104 @Override 105 public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { 106 } 107 108 @Override 109 public void onTrackLimitationNotice(int numberOfLimitedStatuses) { 110 } 111 112 @Override 113 public void onException(Exception ex) { 114 ex.printStackTrace(); 115 } 116 117 @Override 118 public void onScrubGeo(long userId, long upToStatusId) { 119 // TODO Auto-generated method stub 120 121 } 122 123 @Override 124 public void onStallWarning(StallWarning warning) { 125 // TODO Auto-generated method stub 126 127 } 128 } 129 130 private AccessToken accessToken; 131 132 /** 133 * @throws TwitterException 134 * 135 */ 136 public Twitter4JStreamFeeder() throws TwitterException { 137 // Twitter twitter = new TwitterFactory().getInstance(); 138 // twitter.setOAuthConsumer( 139 // System.getProperty("twitter4j.oauth.consumerKey"), 140 // System.getProperty("twitter4j.oauth.consumerSecret") 141 // ); 142 // RequestToken requestToken = twitter.getOAuthRequestToken(); 143 this.accessToken = new AccessToken( 144 System.getProperty("twitter4j.oauth.accessKey"), 145 System.getProperty("twitter4j.oauth.accessSecret") 146 ); 147 } 148 149 @Override 150 public void feedStatus(final PicSlurper slurper) { 151 final StatusListener listener = new PriorityQueueStatusListener(slurper); 152 final TwitterStream twitterStream = new TwitterStreamFactory().getInstance(accessToken); 153 twitterStream.addListener(listener); 154 twitterStream.sample(); 155 } 156}