001package org.openimaj.picslurper;
002
003import java.util.Comparator;
004import java.util.concurrent.Executors;
005import java.util.concurrent.ThreadPoolExecutor;
006
007import org.apache.log4j.Logger;
008import org.openimaj.util.function.Operation;
009import org.openimaj.util.parallel.GlobalExecutorPool.DaemonThreadFactory;
010import org.openimaj.util.parallel.Parallel;
011import org.openimaj.util.parallel.partition.FixedSizeBlockingChunkPartitioner;
012import org.openimaj.util.queue.BoundedPriorityQueue;
013
014import twitter4j.StallWarning;
015import twitter4j.Status;
016import twitter4j.StatusDeletionNotice;
017import twitter4j.StatusListener;
018import twitter4j.TwitterException;
019import twitter4j.TwitterStream;
020import twitter4j.TwitterStreamFactory;
021import twitter4j.auth.AccessToken;
022
023/**
024 * Uses the {@link TwitterStreamFactory} of twitter4j and oAuth using
025 * 
026 * @author Jonathan Hare (jsh2@ecs.soton.ac.uk), Sina Samangooei
027 *         (ss@ecs.soton.ac.uk), David Duplaw (dpd@ecs.soton.ac.uk)
028 * 
029 */
030public class Twitter4JStreamFeeder implements StatusFeeder {
031        Logger logger = Logger.getLogger(Twitter4JStreamFeeder.class);
032
033        private final class StatusTimeComparator implements Comparator<Status> {
034
035                @Override
036                public int compare(Status o1, Status o2) {
037                        if (o2 == null && o1 == null) {
038                                return 0;
039                        }
040                        if (o2 == null) {
041                                return -1;
042                        }
043                        if (o1 == null) {
044                                return 1;
045                        }
046
047                        final long id1 = o1.getId();
048                        final long id2 = o2.getId();
049                        if (id1 < id2) {
050                                return -1;
051                        } else if (id1 > id2) {
052                                return 1;
053                        }
054
055                        return 0;
056                }
057
058        }
059
060        private final class PriorityQueueStatusListener implements StatusListener {
061                private BoundedPriorityQueue<Status> queue;
062
063                public PriorityQueueStatusListener(final PicSlurper slurper) {
064                        this.queue = new BoundedPriorityQueue<Status>(1000, new StatusTimeComparator());
065                        // Start a thread which feeds the slurper from the queue
066                        // final ThreadPoolExecutor pool = GlobalExecutorPool.getPool();
067                        final ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(slurper.nThreads,
068                                        new DaemonThreadFactory());
069                        final FixedSizeBlockingChunkPartitioner<Status> partitioner = new FixedSizeBlockingChunkPartitioner<Status>(
070                                        this.queue);
071                        final Operation<Status> r = new Operation<Status>() {
072
073                                @Override
074                                public void perform(Status s) {
075                                        slurper.handleStatus(s);
076                                }
077
078                        };
079                        new Thread(new Runnable() {
080
081                                @Override
082                                public void run() {
083                                        Parallel.forEach(partitioner, r, pool);
084                                }
085                        }).start();
086
087                }
088
089                @Override
090                public void onStatus(Status status) {
091                        if (status.getURLEntities() != null && status.getURLEntities().length != 0) {
092                                synchronized (queue) {
093                                        queue.add(status);
094                                }
095                                logger.debug("Adding status to queue, current queue size: " + queue.size());
096                                // try {
097                                // Thread.sleep(100);
098                                // } catch (InterruptedException e) {
099                                // }
100                        }
101
102                }
103
104                @Override
105                public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
106                }
107
108                @Override
109                public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
110                }
111
112                @Override
113                public void onException(Exception ex) {
114                        ex.printStackTrace();
115                }
116
117                @Override
118                public void onScrubGeo(long userId, long upToStatusId) {
119                        // TODO Auto-generated method stub
120
121                }
122
123                @Override
124                public void onStallWarning(StallWarning warning) {
125                        // TODO Auto-generated method stub
126
127                }
128        }
129
130        private AccessToken accessToken;
131
132        /**
133         * @throws TwitterException
134         * 
135         */
136        public Twitter4JStreamFeeder() throws TwitterException {
137                // Twitter twitter = new TwitterFactory().getInstance();
138                // twitter.setOAuthConsumer(
139                // System.getProperty("twitter4j.oauth.consumerKey"),
140                // System.getProperty("twitter4j.oauth.consumerSecret")
141                // );
142                // RequestToken requestToken = twitter.getOAuthRequestToken();
143                this.accessToken = new AccessToken(
144                                System.getProperty("twitter4j.oauth.accessKey"),
145                                System.getProperty("twitter4j.oauth.accessSecret")
146                                );
147        }
148
149        @Override
150        public void feedStatus(final PicSlurper slurper) {
151                final StatusListener listener = new PriorityQueueStatusListener(slurper);
152                final TwitterStream twitterStream = new TwitterStreamFactory().getInstance(accessToken);
153                twitterStream.addListener(listener);
154                twitterStream.sample();
155        }
156}