001/**
002 * Copyright (c) 2012, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.storm.util.graph;
031
032import java.util.HashMap;
033import java.util.Map;
034import java.util.Map.Entry;
035
036import org.jgrapht.DirectedGraph;
037import org.jgrapht.graph.DefaultEdge;
038import org.jgrapht.graph.ListenableDirectedGraph;
039import org.openimaj.storm.util.graph.StormGraphCreator.NamingStrategy.DefaultNamingStrategy;
040
041import backtype.storm.generated.Bolt;
042import backtype.storm.generated.ComponentCommon;
043import backtype.storm.generated.GlobalStreamId;
044import backtype.storm.generated.Grouping;
045import backtype.storm.generated.SpoutSpec;
046import backtype.storm.generated.StormTopology;
047
048/**
049 * Create {@link DirectedGraph} instances from {@link StormTopology} instances
050 *
051 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
052 *
053 */
054public class StormGraphCreator {
055        /**
056         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
057         *
058         */
059        public interface NamingStrategy {
060                /**
061                 * @param name
062                 *            the id of a {@link ComponentCommon}
063                 * @return the name to render
064                 */
065                public String name(String name);
066
067                /**
068                 * Uses id as name
069                 *
070                 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
071                 *
072                 */
073                public static class DefaultNamingStrategy implements NamingStrategy {
074
075                        @Override
076                        public String name(String name) {
077                                return name;
078                        }
079
080                }
081
082                /**
083                 * Names {@link ComponentCommon} as A, B, C... and provides a lookup map
084                 *
085                 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
086                 *
087                 */
088                public static class AlphabeticNamingStrategy implements NamingStrategy {
089                        char currentLetter = 'A';
090                        /**
091                         * maps names to their original name
092                         */
093                        public Map<String, String> lookup = new HashMap<String, String>();
094                        private Map<String, String> seen = new HashMap<String, String>();
095
096                        @Override
097                        public String name(String name) {
098                                if (!seen.containsKey(name)) {
099                                        final String newName = new String(new char[] { currentLetter++ });
100                                        seen.put(name, newName);
101                                        lookup.put(newName, name);
102                                }
103                                return seen.get(name);
104                        }
105
106                }
107        }
108
109        enum Type {
110                SPOUT, BOLT;
111        }
112
113        private HashMap<String, NamedNode> nns;
114        private NamingStrategy namingStrategy;
115
116        private StormGraphCreator() {
117                nns = new HashMap<String, NamedNode>();
118                namingStrategy = new DefaultNamingStrategy();
119        }
120
121        private StormGraphCreator(NamingStrategy strat) {
122                nns = new HashMap<String, NamedNode>();
123                namingStrategy = strat;
124        }
125
126        /**
127         * A name and type node
128         *
129         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
130         *
131         */
132        public class NamedNode {
133                /**
134                 * name of the node
135                 */
136                public String name;
137                /**
138                 * the node's type
139                 */
140                public Type node;
141
142                /**
143                 * @param name
144                 * @param node
145                 */
146                public NamedNode(String name, Type node) {
147                        this.name = namingStrategy.name(name);
148                        this.node = node;
149                }
150
151                @Override
152                public boolean equals(Object other) {
153                        if (!(other instanceof NamedNode))
154                                return false;
155                        final NamedNode that = (NamedNode) other;
156                        if (that.node != this.node)
157                                return false;
158                        return this.hashCode() == that.hashCode();
159                }
160
161                @Override
162                public int hashCode() {
163                        return name.hashCode();
164                }
165
166                @Override
167                public String toString() {
168                        return "<" + name + ">";
169                }
170        }
171
172        private ListenableDirectedGraph<NamedNode, DefaultEdge> _asGraph(StormTopology t) {
173                final Map<String, Bolt> bolts = t.get_bolts();
174                final Map<String, SpoutSpec> spouts = t.get_spouts();
175                final ListenableDirectedGraph<NamedNode, DefaultEdge> ret = new ListenableDirectedGraph<NamedNode, DefaultEdge>(
176                                DefaultEdge.class);
177
178                createSpouts(spouts, ret);
179                createBolts(bolts, ret);
180                createConnections(bolts, ret);
181                return ret;
182        }
183
184        private void createConnections(Map<String, Bolt> bolts, ListenableDirectedGraph<NamedNode, DefaultEdge> ret) {
185                for (final Entry<String, Bolt> boltspec : bolts.entrySet()) {
186                        final Bolt bolt = boltspec.getValue();
187                        final String id = boltspec.getKey();
188                        final Map<GlobalStreamId, Grouping> inputs = bolt.get_common().get_inputs();
189                        for (final Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
190                                final GlobalStreamId from = input.getKey();
191                                // Grouping grouping = input.getValue();
192                                final String fromId = from.get_componentId();
193                                // String streamId = from.get_streamId();
194                                ret.addEdge(nns.get(fromId), nns.get(id));
195                        }
196
197                }
198        }
199
200        private void createSpouts(Map<String, SpoutSpec> spouts, ListenableDirectedGraph<NamedNode, DefaultEdge> ret) {
201                for (final Entry<String, SpoutSpec> spoutEntries : spouts.entrySet()) {
202                        final String name = spoutEntries.getKey();
203                        if (!nns.containsKey(name))
204                                nns.put(name, new NamedNode(name, Type.SPOUT));
205                        ret.addVertex(nns.get(name));
206                }
207        }
208
209        private void createBolts(Map<String, Bolt> bolts, ListenableDirectedGraph<NamedNode, DefaultEdge> ret) {
210                for (final Entry<String, Bolt> boltEntries : bolts.entrySet()) {
211                        final String name = boltEntries.getKey();
212                        if (!nns.containsKey(name))
213                                nns.put(name, new NamedNode(name, Type.BOLT));
214                        ret.addVertex(nns.get(name));
215                }
216        }
217
218        /**
219         * @param t
220         *            {@link StormTopology} to graph
221         * @return a {@link ListenableDirectedGraph} usable with JGraph
222         */
223        public static ListenableDirectedGraph<NamedNode, DefaultEdge> asGraph(StormTopology t) {
224                final StormGraphCreator creator = new StormGraphCreator();
225                return creator._asGraph(t);
226        }
227
228        /**
229         * @param t
230         *            {@link StormTopology} to graph
231         * @param strat
232         *            the naming strategy
233         * @return a {@link ListenableDirectedGraph} usable with JGraph
234         */
235        public static ListenableDirectedGraph<NamedNode, DefaultEdge> asGraph(StormTopology t, NamingStrategy strat) {
236                final StormGraphCreator creator = new StormGraphCreator(strat);
237                return creator._asGraph(t);
238        }
239}