001/**
002 * Copyright (c) 2012, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.storm;
031
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Map;
035
036import org.apache.log4j.Level;
037import org.apache.log4j.Logger;
038
039import backtype.storm.Config;
040import backtype.storm.LocalCluster;
041import backtype.storm.generated.StormTopology;
042import backtype.storm.spout.SpoutOutputCollector;
043import backtype.storm.task.OutputCollector;
044import backtype.storm.task.TopologyContext;
045import backtype.storm.topology.OutputFieldsDeclarer;
046import backtype.storm.topology.TopologyBuilder;
047import backtype.storm.topology.base.BaseRichBolt;
048import backtype.storm.topology.base.BaseRichSpout;
049import backtype.storm.tuple.Fields;
050import backtype.storm.tuple.Tuple;
051import backtype.storm.tuple.Values;
052import backtype.storm.utils.Utils;
053import cern.jet.random.Uniform;
054import cern.jet.random.engine.MersenneTwister;
055
056/**
057 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
058 * 
059 */
060
061public class StormPlayground {
062        static {
063                Logger.getRootLogger().setLevel(Level.FATAL);
064        }
065
066        /**
067         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
068         * 
069         */
070        public static class RandomFieldSpout extends BaseRichSpout {
071                private static final long serialVersionUID = 1L;
072
073                private static final String FIELD_TEMPLATE = "field_%d";
074                private int nFields;
075                private int rootRandomSeed;
076                private ArrayList<Uniform> randomGenerators;
077                private SpoutOutputCollector collector;
078                private int min;
079                private int max;
080
081                public RandomFieldSpout(int nFields, int rootRandomSeed, int min, int max) {
082                        this.nFields = nFields;
083                        this.rootRandomSeed = rootRandomSeed;
084                        this.min = min;
085                        this.max = max;
086                }
087
088                public RandomFieldSpout(int nFields, int min, int max) {
089                        this(nFields, 0, min, max);
090                }
091
092                @Override
093                public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector)
094                {
095                        this.randomGenerators = new ArrayList<Uniform>();
096                        for (int i = 0; i < nFields; i++) {
097                                this.randomGenerators.add(new Uniform(min, max, new MersenneTwister(rootRandomSeed + i)));
098                        }
099                        this.collector = collector;
100                }
101
102                @Override
103                public void nextTuple() {
104                        this.collector.emit(generate());
105                        try {
106                                Thread.sleep(100);
107                        } catch (final InterruptedException e) {
108                        }
109                }
110
111                private Values generate() {
112                        final Values ret = new Values();
113                        for (final Uniform r : this.randomGenerators) {
114                                ret.add(r.nextIntFromTo(min, max));
115                        }
116                        return ret;
117                }
118
119                @Override
120                public void declareOutputFields(OutputFieldsDeclarer declarer) {
121                        declarer.declare(getFields());
122                }
123
124                public Fields getFields() {
125                        final List<String> fieldNames = new ArrayList<String>();
126                        for (int i = 0; i < nFields; i++) {
127                                fieldNames.add(String.format(FIELD_TEMPLATE, i));
128                        }
129                        return new Fields(fieldNames);
130                }
131
132        }
133
134        public static class JoinBolt extends BaseRichBolt {
135                private static final long serialVersionUID = 1L;
136
137                @Override
138                public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
139                                OutputCollector collector)
140                {
141                }
142
143                @Override
144                public void execute(Tuple input) {
145                        System.out.println(this.toString() + ": " + input);
146                }
147
148                @Override
149                public void declareOutputFields(OutputFieldsDeclarer declarer) {
150                        declarer.declare(new Fields());
151
152                }
153
154                public static void connectNewBolt(TopologyBuilder builder) {
155                        final JoinBolt b = new JoinBolt();
156                        builder.setBolt("joinBolt", b, 2).fieldsGrouping("randomSpout1", new Fields("field_0"))
157                                        .fieldsGrouping("randomSpout2", new Fields("field_1"));
158                }
159
160        }
161
162        public static void main(String[] args) {
163                final Config conf = new Config();
164                conf.setDebug(false);
165                conf.setNumWorkers(2);
166                conf.setMaxSpoutPending(1);
167                conf.setFallBackOnJavaSerialization(false);
168                conf.setSkipMissingKryoRegistrations(false);
169                final LocalCluster cluster = new LocalCluster();
170                final TopologyBuilder builder = new TopologyBuilder();
171                builder.setSpout("randomSpout1", new RandomFieldSpout(2, 0, 0, 1)); // (nfields,seed,min,max)
172                builder.setSpout("randomSpout2", new RandomFieldSpout(2, 10, 0, 1)); // (nfields,seed,min,max)
173                JoinBolt.connectNewBolt(builder);
174                final StormTopology topology = builder.createTopology();
175                cluster.submitTopology("playTopology", conf, topology);
176                Utils.sleep(10000);
177                cluster.killTopology("playTopology");
178                cluster.shutdown();
179
180        }
181}