001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.storm.tools.twitter.bolts; 031 032import java.io.ByteArrayOutputStream; 033import java.io.OutputStreamWriter; 034import java.io.PrintWriter; 035import java.nio.charset.Charset; 036import java.util.Iterator; 037import java.util.List; 038 039import org.kohsuke.args4j.CmdLineException; 040import org.openimaj.kestrel.KestrelServerSpec; 041import org.openimaj.tools.twitter.modes.preprocessing.TwitterPreprocessingMode; 042import org.openimaj.tools.twitter.options.AbstractTwitterPreprocessingToolOptions; 043import org.openimaj.twitter.USMFStatus; 044 045import backtype.storm.spout.KestrelThriftClient; 046 047/** 048 * Instantiate a {@link AbstractTwitterPreprocessingToolOptions} and preprocess 049 * tweets 050 * 051 * @author Jon Hare (jsh2@ecs.soton.ac.uk), Sina Samangooei (ss@ecs.soton.ac.uk) 052 * 053 */ 054public class TweetPreprocessingBolt extends BaseTwitterRichBolt { 055 056 /** 057 * 058 */ 059 private static final long serialVersionUID = -3251642991777398431L; 060 private String[] args; 061 private AbstractTwitterPreprocessingToolOptions options; 062 private List<String> kestrelHosts; 063 private Iterator<KestrelThriftClient> kestrelServers; 064 private String outputQueue; 065 066 /** 067 * @param outputQueue 068 * @param hosts 069 * @param args 070 */ 071 public TweetPreprocessingBolt(String outputQueue, List<String> hosts, String[] args) { 072 this.args = args; 073 this.kestrelHosts = hosts; 074 this.outputQueue = outputQueue; 075 } 076 077 @Override 078 public void prepare() { 079 try { 080 this.options = new AbstractTwitterPreprocessingToolOptions(args) { 081 082 @Override 083 public boolean validate() throws CmdLineException { 084 return true; 085 } 086 }; 087 } catch (CmdLineException e) { 088 throw new RuntimeException(e); 089 } 090 List<KestrelServerSpec> kestrelSpecList = KestrelServerSpec.parseKestrelAddressList(kestrelHosts); 091 this.kestrelServers = KestrelServerSpec.thriftClientIterator(kestrelSpecList); 092 } 093 094 private int expire = 0; 095 096 public void setExpireTime(int expire) { 097 this.expire = expire; 098 } 099 100 @Override 101 public void processTweet(String statusString) throws Exception { 102 USMFStatus status = new USMFStatus(options.statusType.type()); 103 status.fillFromString(statusString); 104 List<TwitterPreprocessingMode<?>> modes = options.modeOptionsOp; 105 if (status.isInvalid()) 106 return; 107 108 if (options.preProcessesSkip(status)) 109 return; 110 for (TwitterPreprocessingMode<?> mode : modes) { 111 try { 112 TwitterPreprocessingMode.results(status, mode); 113 } catch (Exception e) { 114 logger.error("Failed mode: " + mode); 115 } 116 } 117 if (options.postProcessesSkip(status)) 118 return; 119 120 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 121 OutputStreamWriter ow = new OutputStreamWriter(baos, "UTF-8"); 122 PrintWriter outTweetWriter = new PrintWriter(ow); 123 try { 124 options.ouputMode().output(options.convertToOutputFormat(status), outTweetWriter); 125 outTweetWriter.flush(); 126 byte[] toEmit = baos.toByteArray(); 127 KestrelThriftClient client = this.kestrelServers.next(); 128 client.put(this.outputQueue, new String(toEmit, Charset.forName("UTF-8")), this.expire); 129 } catch (Exception e) { 130 logger.error("Failed to write tweet: " + status.text); 131 logger.error("With error: "); 132 e.printStackTrace(); 133 } 134 } 135}