001package org.openimaj.picslurper;
002
003import java.io.InputStream;
004import java.util.Arrays;
005import java.util.HashSet;
006import java.util.Iterator;
007import java.util.Map;
008import java.util.Set;
009
010import org.openimaj.twitter.collection.StreamJSONStatusList;
011import org.openimaj.twitter.collection.StreamJSONStatusList.ReadableWritableJSON;
012
013import backtype.storm.spout.SpoutOutputCollector;
014import backtype.storm.task.TopologyContext;
015import backtype.storm.topology.IRichSpout;
016import backtype.storm.topology.OutputFieldsDeclarer;
017import backtype.storm.tuple.Fields;
018
019public abstract class LocalTweetSpout implements IRichSpout{
020        /**
021         * 
022         */
023        private static final long serialVersionUID = 202196766956713428L;
024        private static boolean isStreamsFinished = false;
025        private static boolean isProcessingFinished = false;
026        private Iterator<ReadableWritableJSON> iterator;
027        private Set<Object> waitingFor = new HashSet<Object>();
028        
029        
030protected SpoutOutputCollector collector;
031        
032        @Override
033        public void open(@SuppressWarnings("rawtypes")
034        Map conf, TopologyContext context,SpoutOutputCollector collector) {
035                this.collector = collector;
036        }
037        
038        @Override
039        public void close() {}
040
041        @Override
042        public void activate() {}
043
044        @Override
045        public void deactivate() {}
046        
047        @Override
048        public Map<String, Object> getComponentConfiguration() {
049                // TODO Auto-generated method stub
050                return null;
051        }
052        
053        @Override
054        public void nextTuple() {
055                ReadableWritableJSON next = nextTweet();
056                if(next==null) return;
057                Object messageId = next.get("id");
058                this.collector.emit(Arrays.asList((Object)next), messageId);
059                addWaitingFor(messageId);
060        }
061        
062        @Override
063        public void declareOutputFields(OutputFieldsDeclarer declarer) {
064                declarer.declare(new Fields("tweet"));
065        }
066        
067        protected ReadableWritableJSON nextTweet() {
068                if(this.iterator != null && this.iterator.hasNext()){
069                        return this.iterator.next();
070                }
071                
072                try {
073                        InputStream nextInputStream = nextInputStream();
074                        if(nextInputStream == null) {
075                                // Also, finished!
076                                setStreamFinished(true);
077                                // For the situation where nothing was emitted 
078                                if(this.waitingFor.size() == 0){
079                                        setProcessingFinished(true);
080                                }
081                                this.iterator = null;
082                                return null;
083                        }
084                        StreamJSONStatusList l = StreamJSONStatusList.read(nextInputStream);
085                        this.iterator = l.iterator();
086                        return this.iterator.next();
087                } catch (Exception e) {
088                        // Something went wrong trying to get the next stream, try again!
089                        return nextTweet();
090                }
091        }
092
093        private static synchronized void setStreamFinished(boolean b) {
094                isStreamsFinished = b;          
095        }
096        
097        private static synchronized void setProcessingFinished(boolean b) {
098                isProcessingFinished = b;               
099        }
100
101        protected abstract InputStream nextInputStream() throws Exception;
102
103        public static synchronized boolean isFinished() {
104                return isStreamsFinished && isProcessingFinished;
105        }
106        
107        protected void addWaitingFor(Object messageId) {
108                this.waitingFor .add(messageId);
109                setProcessingFinished(false);
110        }
111        
112
113
114        @Override
115        public void ack(Object msgId) {
116                this.waitingFor.remove(msgId);
117                if(this.waitingFor.size() == 0){
118                        setProcessingFinished(true);
119                }
120        }
121
122        @Override
123        public void fail(Object msgId) {
124                ack(msgId);
125        }
126}