001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.storm.tools.twitter.bolts;
031
032import java.io.ByteArrayOutputStream;
033import java.io.OutputStreamWriter;
034import java.io.PrintWriter;
035import java.nio.charset.Charset;
036import java.util.Iterator;
037import java.util.List;
038
039import org.kohsuke.args4j.CmdLineException;
040import org.openimaj.kestrel.KestrelServerSpec;
041import org.openimaj.tools.twitter.modes.preprocessing.TwitterPreprocessingMode;
042import org.openimaj.tools.twitter.options.AbstractTwitterPreprocessingToolOptions;
043import org.openimaj.twitter.USMFStatus;
044
045import backtype.storm.spout.KestrelThriftClient;
046
047/**
048 * Instantiate a {@link AbstractTwitterPreprocessingToolOptions} and preprocess
049 * tweets
050 * 
051 * @author Jon Hare (jsh2@ecs.soton.ac.uk), Sina Samangooei (ss@ecs.soton.ac.uk)
052 * 
053 */
054public class TweetPreprocessingBolt extends BaseTwitterRichBolt {
055
056        /**
057         * 
058         */
059        private static final long serialVersionUID = -3251642991777398431L;
060        private String[] args;
061        private AbstractTwitterPreprocessingToolOptions options;
062        private List<String> kestrelHosts;
063        private Iterator<KestrelThriftClient> kestrelServers;
064        private String outputQueue;
065
066        /**
067         * @param outputQueue
068         * @param hosts
069         * @param args
070         */
071        public TweetPreprocessingBolt(String outputQueue, List<String> hosts, String[] args) {
072                this.args = args;
073                this.kestrelHosts = hosts;
074                this.outputQueue = outputQueue;
075        }
076
077        @Override
078        public void prepare() {
079                try {
080                        this.options = new AbstractTwitterPreprocessingToolOptions(args) {
081
082                                @Override
083                                public boolean validate() throws CmdLineException {
084                                        return true;
085                                }
086                        };
087                } catch (CmdLineException e) {
088                        throw new RuntimeException(e);
089                }
090                List<KestrelServerSpec> kestrelSpecList = KestrelServerSpec.parseKestrelAddressList(kestrelHosts);
091                this.kestrelServers = KestrelServerSpec.thriftClientIterator(kestrelSpecList);
092        }
093
094        private int expire = 0;
095
096        public void setExpireTime(int expire) {
097                this.expire = expire;
098        }
099
100        @Override
101        public void processTweet(String statusString) throws Exception {
102                USMFStatus status = new USMFStatus(options.statusType.type());
103                status.fillFromString(statusString);
104                List<TwitterPreprocessingMode<?>> modes = options.modeOptionsOp;
105                if (status.isInvalid())
106                        return;
107
108                if (options.preProcessesSkip(status))
109                        return;
110                for (TwitterPreprocessingMode<?> mode : modes) {
111                        try {
112                                TwitterPreprocessingMode.results(status, mode);
113                        } catch (Exception e) {
114                                logger.error("Failed mode: " + mode);
115                        }
116                }
117                if (options.postProcessesSkip(status))
118                        return;
119
120                ByteArrayOutputStream baos = new ByteArrayOutputStream();
121                OutputStreamWriter ow = new OutputStreamWriter(baos, "UTF-8");
122                PrintWriter outTweetWriter = new PrintWriter(ow);
123                try {
124                        options.ouputMode().output(options.convertToOutputFormat(status), outTweetWriter);
125                        outTweetWriter.flush();
126                        byte[] toEmit = baos.toByteArray();
127                        KestrelThriftClient client = this.kestrelServers.next();
128                        client.put(this.outputQueue, new String(toEmit, Charset.forName("UTF-8")), this.expire);
129                } catch (Exception e) {
130                        logger.error("Failed to write tweet: " + status.text);
131                        logger.error("With error: ");
132                        e.printStackTrace();
133                }
134        }
135}