001/** 002 * Copyright (c) 2012, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.storm.util.graph; 031 032import java.util.HashMap; 033import java.util.Map; 034import java.util.Map.Entry; 035 036import org.jgrapht.DirectedGraph; 037import org.jgrapht.graph.DefaultEdge; 038import org.jgrapht.graph.ListenableDirectedGraph; 039import org.openimaj.storm.util.graph.StormGraphCreator.NamingStrategy.DefaultNamingStrategy; 040 041import backtype.storm.generated.Bolt; 042import backtype.storm.generated.ComponentCommon; 043import backtype.storm.generated.GlobalStreamId; 044import backtype.storm.generated.Grouping; 045import backtype.storm.generated.SpoutSpec; 046import backtype.storm.generated.StormTopology; 047 048/** 049 * Create {@link DirectedGraph} instances from {@link StormTopology} instances 050 * 051 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 052 * 053 */ 054public class StormGraphCreator { 055 /** 056 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 057 * 058 */ 059 public interface NamingStrategy { 060 /** 061 * @param name 062 * the id of a {@link ComponentCommon} 063 * @return the name to render 064 */ 065 public String name(String name); 066 067 /** 068 * Uses id as name 069 * 070 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 071 * 072 */ 073 public static class DefaultNamingStrategy implements NamingStrategy { 074 075 @Override 076 public String name(String name) { 077 return name; 078 } 079 080 } 081 082 /** 083 * Names {@link ComponentCommon} as A, B, C... and provides a lookup map 084 * 085 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 086 * 087 */ 088 public static class AlphabeticNamingStrategy implements NamingStrategy { 089 char currentLetter = 'A'; 090 /** 091 * maps names to their original name 092 */ 093 public Map<String, String> lookup = new HashMap<String, String>(); 094 private Map<String, String> seen = new HashMap<String, String>(); 095 096 @Override 097 public String name(String name) { 098 if (!seen.containsKey(name)) { 099 final String newName = new String(new char[] { currentLetter++ }); 100 seen.put(name, newName); 101 lookup.put(newName, name); 102 } 103 return seen.get(name); 104 } 105 106 } 107 } 108 109 enum Type { 110 SPOUT, BOLT; 111 } 112 113 private HashMap<String, NamedNode> nns; 114 private NamingStrategy namingStrategy; 115 116 private StormGraphCreator() { 117 nns = new HashMap<String, NamedNode>(); 118 namingStrategy = new DefaultNamingStrategy(); 119 } 120 121 private StormGraphCreator(NamingStrategy strat) { 122 nns = new HashMap<String, NamedNode>(); 123 namingStrategy = strat; 124 } 125 126 /** 127 * A name and type node 128 * 129 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 130 * 131 */ 132 public class NamedNode { 133 /** 134 * name of the node 135 */ 136 public String name; 137 /** 138 * the node's type 139 */ 140 public Type node; 141 142 /** 143 * @param name 144 * @param node 145 */ 146 public NamedNode(String name, Type node) { 147 this.name = namingStrategy.name(name); 148 this.node = node; 149 } 150 151 @Override 152 public boolean equals(Object other) { 153 if (!(other instanceof NamedNode)) 154 return false; 155 final NamedNode that = (NamedNode) other; 156 if (that.node != this.node) 157 return false; 158 return this.hashCode() == that.hashCode(); 159 } 160 161 @Override 162 public int hashCode() { 163 return name.hashCode(); 164 } 165 166 @Override 167 public String toString() { 168 return "<" + name + ">"; 169 } 170 } 171 172 private ListenableDirectedGraph<NamedNode, DefaultEdge> _asGraph(StormTopology t) { 173 final Map<String, Bolt> bolts = t.get_bolts(); 174 final Map<String, SpoutSpec> spouts = t.get_spouts(); 175 final ListenableDirectedGraph<NamedNode, DefaultEdge> ret = new ListenableDirectedGraph<NamedNode, DefaultEdge>( 176 DefaultEdge.class); 177 178 createSpouts(spouts, ret); 179 createBolts(bolts, ret); 180 createConnections(bolts, ret); 181 return ret; 182 } 183 184 private void createConnections(Map<String, Bolt> bolts, ListenableDirectedGraph<NamedNode, DefaultEdge> ret) { 185 for (final Entry<String, Bolt> boltspec : bolts.entrySet()) { 186 final Bolt bolt = boltspec.getValue(); 187 final String id = boltspec.getKey(); 188 final Map<GlobalStreamId, Grouping> inputs = bolt.get_common().get_inputs(); 189 for (final Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) { 190 final GlobalStreamId from = input.getKey(); 191 // Grouping grouping = input.getValue(); 192 final String fromId = from.get_componentId(); 193 // String streamId = from.get_streamId(); 194 ret.addEdge(nns.get(fromId), nns.get(id)); 195 } 196 197 } 198 } 199 200 private void createSpouts(Map<String, SpoutSpec> spouts, ListenableDirectedGraph<NamedNode, DefaultEdge> ret) { 201 for (final Entry<String, SpoutSpec> spoutEntries : spouts.entrySet()) { 202 final String name = spoutEntries.getKey(); 203 if (!nns.containsKey(name)) 204 nns.put(name, new NamedNode(name, Type.SPOUT)); 205 ret.addVertex(nns.get(name)); 206 } 207 } 208 209 private void createBolts(Map<String, Bolt> bolts, ListenableDirectedGraph<NamedNode, DefaultEdge> ret) { 210 for (final Entry<String, Bolt> boltEntries : bolts.entrySet()) { 211 final String name = boltEntries.getKey(); 212 if (!nns.containsKey(name)) 213 nns.put(name, new NamedNode(name, Type.BOLT)); 214 ret.addVertex(nns.get(name)); 215 } 216 } 217 218 /** 219 * @param t 220 * {@link StormTopology} to graph 221 * @return a {@link ListenableDirectedGraph} usable with JGraph 222 */ 223 public static ListenableDirectedGraph<NamedNode, DefaultEdge> asGraph(StormTopology t) { 224 final StormGraphCreator creator = new StormGraphCreator(); 225 return creator._asGraph(t); 226 } 227 228 /** 229 * @param t 230 * {@link StormTopology} to graph 231 * @param strat 232 * the naming strategy 233 * @return a {@link ListenableDirectedGraph} usable with JGraph 234 */ 235 public static ListenableDirectedGraph<NamedNode, DefaultEdge> asGraph(StormTopology t, NamingStrategy strat) { 236 final StormGraphCreator creator = new StormGraphCreator(strat); 237 return creator._asGraph(t); 238 } 239}