001/**
002 * Copyright (c) 2012, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.rdf.storm.utils;
031
032import java.util.ArrayList;
033import java.util.Iterator;
034import java.util.List;
035
036import org.openimaj.kestrel.KestrelServerSpec;
037
038import backtype.storm.Config;
039
040import com.esotericsoftware.kryo.Kryo;
041import com.esotericsoftware.kryo.Serializer;
042import com.esotericsoftware.kryo.io.Input;
043import com.esotericsoftware.kryo.io.Output;
044import com.hp.hpl.jena.datatypes.RDFDatatype;
045import com.hp.hpl.jena.datatypes.TypeMapper;
046import com.hp.hpl.jena.graph.Graph;
047import com.hp.hpl.jena.graph.Node;
048import com.hp.hpl.jena.graph.Node_Blank;
049import com.hp.hpl.jena.graph.Node_Literal;
050import com.hp.hpl.jena.graph.Node_URI;
051import com.hp.hpl.jena.graph.Node_Variable;
052import com.hp.hpl.jena.graph.Triple;
053import com.hp.hpl.jena.graph.compose.MultiUnion;
054import com.hp.hpl.jena.graph.impl.LiteralLabel;
055import com.hp.hpl.jena.mem.GraphMem;
056import com.hp.hpl.jena.rdf.model.AnonId;
057import com.hp.hpl.jena.reasoner.rulesys.Rule;
058import com.hp.hpl.jena.shared.AddDeniedException;
059import com.hp.hpl.jena.sparql.core.BasicPattern;
060import com.hp.hpl.jena.sparql.syntax.ElementFilter;
061import com.hp.hpl.jena.sparql.syntax.Template;
062
063/**
064 * A collections to tools for letting Jena play nicely with Storm
065 * 
066 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
067 * @author David Monks (dm11g08@ecs.soton.ac.uk)
068 * 
069 */
070public class JenaStormUtils {
071
072        /**
073         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
074         * 
075         */
076        public static class NodeSerialiser_URI extends Serializer<Node_URI> {
077
078                @Override
079                public void write(Kryo kryo, Output output, Node_URI object) {
080                        output.writeString(object.getURI());
081                }
082
083                @Override
084                public Node_URI read(Kryo kryo, Input input, Class<Node_URI> type) {
085                        return (Node_URI) Node.createURI(input.readString());
086                }
087
088        }
089
090        /**
091         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
092         * 
093         */
094        public static class TemplateSerialiser extends Serializer<Template> {
095
096                @Override
097                public void write(Kryo kryo, Output output, Template object) {
098                        final BasicPattern bgp = object.getBGP();
099                        output.writeInt(bgp.size());
100                        for (final Triple triple : bgp) {
101                                kryo.writeClassAndObject(output, triple);
102                        }
103                }
104
105                @Override
106                public Template read(Kryo kryo, Input input, Class<Template> type) {
107                        final BasicPattern bgp = new BasicPattern();
108                        final int count = input.readInt();
109                        for (int i = 0; i < count; i++) {
110                                bgp.add((Triple) kryo.readClassAndObject(input));
111                        }
112                        return new Template(bgp);
113                }
114
115        }
116
117        /**
118         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
119         * 
120         */
121        public static class NodeSerialiser_Literal extends Serializer<Node_Literal> {
122
123                @Override
124                public void write(Kryo kryo, Output output, Node_Literal object) {
125                        final LiteralLabel label = object.getLiteral();
126                        output.writeString(label.getLexicalForm());
127                        output.writeString(label.language());
128                        output.writeString(label.getDatatypeURI());
129                }
130
131                @Override
132                public Node_Literal read(Kryo kryo, Input input, Class<Node_Literal> type) {
133                        final String lexicalForm = input.readString();
134                        final String langauge = input.readString();
135                        final String datatypeURI = input.readString();
136                        final RDFDatatype dtype = TypeMapper.getInstance().getSafeTypeByName(datatypeURI);
137                        return (Node_Literal) Node.createLiteral(lexicalForm, langauge, dtype);
138
139                }
140
141        }
142
143        /**
144         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
145         * 
146         */
147        public static class NodeSerialiser_Blank extends Serializer<Node_Blank> {
148
149                @Override
150                public void write(Kryo kryo, Output output, Node_Blank object) {
151                        final String blankNodeString = object.toString();
152                        output.writeString(blankNodeString);
153                }
154
155                @Override
156                public Node_Blank read(Kryo kryo, Input input, Class<Node_Blank> type) {
157                        final String label = input.readString();
158                        final Node_Blank retNode = (Node_Blank) Node.createAnon(AnonId.create(label));
159                        return retNode;
160                }
161
162        }
163
164        /**
165         * @author David Monks<dm11g08@ecs.soton.ac.uk>
166         * 
167         */
168        public static class NodeSerialiser_Variable extends Serializer<Node_Variable> {
169
170                @Override
171                public void write(Kryo kryo, Output output, Node_Variable object) {
172                        final String blankNodeString = object.toString();
173                        output.writeString(blankNodeString);
174                }
175
176                @Override
177                public Node_Variable read(Kryo kryo, Input input, Class<Node_Variable> type) {
178                        final String label = input.readString();
179                        final Node_Variable retNode = (Node_Variable) Node.createVariable(label.replaceFirst("\\?", ""));
180                        return retNode;
181                }
182
183        }
184
185        /**
186         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
187         * 
188         */
189        public static class TripleSerialiser extends Serializer<Triple> {
190
191                @Override
192                public void write(Kryo kryo, Output output, Triple object) {
193                        final Node s = object.getSubject();
194                        final Node p = object.getPredicate();
195                        final Node o = object.getObject();
196                        kryo.writeClassAndObject(output, s);
197                        kryo.writeClassAndObject(output, p);
198                        kryo.writeClassAndObject(output, o);
199                }
200
201                @Override
202                public Triple read(Kryo kryo, Input input, Class<Triple> type) {
203                        final Node s = (Node) kryo.readClassAndObject(input);
204                        final Node p = (Node) kryo.readClassAndObject(input);
205                        final Node o = (Node) kryo.readClassAndObject(input);
206                        return new Triple(s, p, o);
207                }
208
209        }
210
211        /**
212         * 
213         * @author David Monks <dm11g08@ecs.soton.ac.uk>
214         */
215        public static class GraphSerialiser extends Serializer<Graph> {
216
217                @Override
218                public void write(Kryo kryo, Output output, Graph object) {
219                        output.writeInt(object.size());
220                        final Iterator<Triple> it = object.find(null, null, null);
221                        while (it.hasNext()) {
222                                final Triple next = it.next();
223                                kryo.writeClassAndObject(output, next);
224                        }
225                }
226
227                @Override
228                public Graph read(Kryo kryo, Input input, Class<Graph> type) {
229                        final int size = input.readInt();
230                        Graph graph = null;
231                        graph = new GraphMem();
232                        final List<Triple> overflow = new ArrayList<Triple>();
233                        for (int i = 0; i < size; i++) {
234                                final Object obj = kryo.readClassAndObject(input);
235                                try {
236                                        graph.add((Triple) obj);
237                                } catch (final AddDeniedException ex) {
238                                        overflow.add((Triple) obj);
239                                }
240                        }
241                        Iterator<Triple> it = overflow.iterator();
242                        while (!overflow.isEmpty()) {
243                                if (!it.hasNext())
244                                        it = overflow.iterator();
245                                try {
246                                        graph.add(it.next());
247                                        it.remove();
248                                } catch (final AddDeniedException ex) {
249                                }
250                        }
251                        return graph;
252                }
253
254        }
255
256        /**
257         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
258         * 
259         */
260        public static class NodeSerialiser_ARRAY extends Serializer<Node[]> {
261
262                @Override
263                public void write(Kryo kryo, Output output, Node[] object) {
264                        output.writeInt(object.length);
265                        for (final Node node : object) {
266                                kryo.writeClassAndObject(output, node);
267                        }
268                }
269
270                @Override
271                public Node[] read(Kryo kryo, Input input, Class<Node[]> type) {
272                        final Node[] out = new Node[input.readInt()];
273                        for (int i = 0; i < out.length; i++) {
274                                out[i] = (Node) kryo.readClassAndObject(input);
275                        }
276                        return out;
277                }
278
279        }
280
281        /**
282         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
283         * 
284         */
285        public static class KestrelServerSpec_Serializer extends Serializer<KestrelServerSpec> {
286
287                @Override
288                public void write(Kryo kryo, Output output, KestrelServerSpec object) {
289                        output.writeString(object.host);
290                        output.writeInt(object.port);
291                }
292
293                @Override
294                public KestrelServerSpec read(Kryo kryo, Input input, Class<KestrelServerSpec> type) {
295                        return new KestrelServerSpec(input.readString(), input.readInt());
296                }
297
298        }
299
300        /**
301         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
302         * 
303         */
304        public static class RuleSerializer extends Serializer<Rule> {
305
306                @Override
307                public void write(Kryo kryo, Output output, Rule object) {
308                        output.writeString(object.toString());
309                }
310
311                @Override
312                public Rule read(Kryo kryo, Input input, Class<Rule> type) {
313                        return Rule.parseRule(input.readString());
314                }
315
316        }
317
318        /**
319         * @param conf
320         *            register some Jena serialisers to this configuration
321         */
322        public static void registerSerializers(Config conf) {
323                conf.registerSerialization(Node[].class, NodeSerialiser_ARRAY.class);
324                conf.registerSerialization(Node_URI.class, NodeSerialiser_URI.class);
325                conf.registerSerialization(Node_Literal.class, NodeSerialiser_Literal.class);
326                conf.registerSerialization(Node_Blank.class, NodeSerialiser_Blank.class);
327                conf.registerSerialization(Node_Variable.class, NodeSerialiser_Variable.class);
328                conf.registerSerialization(Triple.class, TripleSerialiser.class);
329                conf.registerSerialization(ArrayList.class);
330                conf.registerSerialization(KestrelServerSpec.class, KestrelServerSpec_Serializer.class);
331                conf.registerSerialization(Rule.class, RuleSerializer.class);
332                conf.registerSerialization(Graph.class, GraphSerialiser.class);
333                conf.registerSerialization(GraphMem.class, GraphSerialiser.class);
334                conf.registerSerialization(MultiUnion.class, GraphSerialiser.class);
335                conf.registerSerialization(Template.class, TemplateSerialiser.class);
336                conf.registerSerialization(ElementFilter.class);
337                // conf.registerSerialization(Node_NULL.class);
338                // conf.registerSerialization(Node_Blank.class);
339        }
340}