001package org.openimaj.picslurper; 002 003import java.io.InputStream; 004import java.util.Arrays; 005import java.util.HashSet; 006import java.util.Iterator; 007import java.util.Map; 008import java.util.Set; 009 010import org.openimaj.twitter.collection.StreamJSONStatusList; 011import org.openimaj.twitter.collection.StreamJSONStatusList.ReadableWritableJSON; 012 013import backtype.storm.spout.SpoutOutputCollector; 014import backtype.storm.task.TopologyContext; 015import backtype.storm.topology.IRichSpout; 016import backtype.storm.topology.OutputFieldsDeclarer; 017import backtype.storm.tuple.Fields; 018 019public abstract class LocalTweetSpout implements IRichSpout{ 020 /** 021 * 022 */ 023 private static final long serialVersionUID = 202196766956713428L; 024 private static boolean isStreamsFinished = false; 025 private static boolean isProcessingFinished = false; 026 private Iterator<ReadableWritableJSON> iterator; 027 private Set<Object> waitingFor = new HashSet<Object>(); 028 029 030protected SpoutOutputCollector collector; 031 032 @Override 033 public void open(@SuppressWarnings("rawtypes") 034 Map conf, TopologyContext context,SpoutOutputCollector collector) { 035 this.collector = collector; 036 } 037 038 @Override 039 public void close() {} 040 041 @Override 042 public void activate() {} 043 044 @Override 045 public void deactivate() {} 046 047 @Override 048 public Map<String, Object> getComponentConfiguration() { 049 // TODO Auto-generated method stub 050 return null; 051 } 052 053 @Override 054 public void nextTuple() { 055 ReadableWritableJSON next = nextTweet(); 056 if(next==null) return; 057 Object messageId = next.get("id"); 058 this.collector.emit(Arrays.asList((Object)next), messageId); 059 addWaitingFor(messageId); 060 } 061 062 @Override 063 public void declareOutputFields(OutputFieldsDeclarer declarer) { 064 declarer.declare(new Fields("tweet")); 065 } 066 067 protected ReadableWritableJSON nextTweet() { 068 if(this.iterator != null && this.iterator.hasNext()){ 069 return this.iterator.next(); 070 } 071 072 try { 073 InputStream nextInputStream = nextInputStream(); 074 if(nextInputStream == null) { 075 // Also, finished! 076 setStreamFinished(true); 077 // For the situation where nothing was emitted 078 if(this.waitingFor.size() == 0){ 079 setProcessingFinished(true); 080 } 081 this.iterator = null; 082 return null; 083 } 084 StreamJSONStatusList l = StreamJSONStatusList.read(nextInputStream); 085 this.iterator = l.iterator(); 086 return this.iterator.next(); 087 } catch (Exception e) { 088 // Something went wrong trying to get the next stream, try again! 089 return nextTweet(); 090 } 091 } 092 093 private static synchronized void setStreamFinished(boolean b) { 094 isStreamsFinished = b; 095 } 096 097 private static synchronized void setProcessingFinished(boolean b) { 098 isProcessingFinished = b; 099 } 100 101 protected abstract InputStream nextInputStream() throws Exception; 102 103 public static synchronized boolean isFinished() { 104 return isStreamsFinished && isProcessingFinished; 105 } 106 107 protected void addWaitingFor(Object messageId) { 108 this.waitingFor .add(messageId); 109 setProcessingFinished(false); 110 } 111 112 113 114 @Override 115 public void ack(Object msgId) { 116 this.waitingFor.remove(msgId); 117 if(this.waitingFor.size() == 0){ 118 setProcessingFinished(true); 119 } 120 } 121 122 @Override 123 public void fail(Object msgId) { 124 ack(msgId); 125 } 126}