001/**
002 * Copyright (c) 2012, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.kestrel;
031
032import java.io.Closeable;
033import java.net.InetSocketAddress;
034import java.util.ArrayList;
035import java.util.List;
036import java.util.concurrent.TimeUnit;
037
038import org.apache.log4j.Logger;
039import org.jboss.netty.buffer.ChannelBuffer;
040import org.jboss.netty.buffer.ChannelBuffers;
041import org.jboss.netty.util.CharsetUtil;
042
043import backtype.storm.spout.KestrelThriftClient;
044
045import com.twitter.finagle.ChannelClosedException;
046import com.twitter.finagle.ServiceFactory;
047import com.twitter.finagle.builder.ClientBuilder;
048import com.twitter.finagle.builder.ClientConfig.Yes;
049import com.twitter.finagle.kestrel.java.Client;
050import com.twitter.finagle.kestrel.protocol.Command;
051import com.twitter.finagle.kestrel.protocol.Kestrel;
052import com.twitter.finagle.kestrel.protocol.Response;
053import com.twitter.finagle.memcached.util.ChannelBufferUtils;
054import com.twitter.util.Duration;
055import com.twitter.util.Time;
056
057/**
058 * A simple Kestrel client taken from
059 * https://github.com/hogelog/simple-kestrel-client by Hogelog. Using this one
060 * over {@link KestrelThriftClient} which seemed to have some major issues
061 *
062 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
063 *
064 */
065public class SimpleKestrelClient implements Closeable {
066        private static final Logger LOG = Logger.getLogger(SimpleKestrelClient.class);
067
068        private final Client client;
069
070        /**
071         * Initialise an {@link InetSocketAddress#InetSocketAddress(String, int)}
072         * instance
073         *
074         * @param host
075         * @param port
076         */
077        public SimpleKestrelClient(String host, int port) {
078                this(new InetSocketAddress(host, port));
079        }
080
081        /**
082         * initialise a {@link Client} instance using {@link ServiceFactory} from a
083         * {@link ClientBuilder}
084         *
085         * @param addr
086         */
087        public SimpleKestrelClient(InetSocketAddress addr) {
088                final ClientBuilder<Command, Response, Yes, Yes, Yes> builder = ClientBuilder.get()
089                                .codec(Kestrel.get())
090                                .hosts(addr)
091                                .hostConnectionLimit(1);
092                final ServiceFactory<Command, Response> kestrelClientBuilder = ClientBuilder.safeBuildFactory(builder);
093                client = Client.newInstance(kestrelClientBuilder);
094        }
095
096        /**
097         * Calls {@link Client#delete(String)} on the underlying {@link Client}
098         * instance. This deletes the underlying journal instance in the kestrel
099         * queue
100         *
101         * @param queueName
102         */
103        public void delete(String queueName) {
104                client.delete(queueName).apply();
105        }
106
107        @Override
108        public void close() {
109                client.close();
110        }
111
112        /**
113         * Performs a put kestrel call on the string value with a expiary of 0 (i.e.
114         * does not expire)
115         *
116         * @param queueName
117         * @param value
118         */
119        public void set(String queueName, String value) {
120                set(queueName, 0, value);
121        }
122
123        /**
124         * Performs a put kestrel call with the provided expiration.
125         *
126         * @param queueName
127         * @param exp
128         * @param value
129         */
130        public void set(String queueName, int exp, String value) {
131                final Time expTime = Time.fromMilliseconds(exp);
132                final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(value.getBytes(CharsetUtil.UTF_8));
133                client.set(queueName, buffer, expTime);
134        }
135
136        /**
137         * Performs a put with the byte array value with no expiration
138         *
139         * @param queueName
140         * @param value
141         */
142        public void set(String queueName, byte[] value) {
143                set(queueName, 0, value);
144        }
145
146        /**
147         * Performs a put with the byte array valye with an expiration
148         *
149         * @param queueName
150         * @param exp
151         * @param value
152         */
153        public void set(String queueName, int exp, byte[] value) {
154                final Time expTime = Time.fromMilliseconds(exp);
155                final ChannelBuffer buffer = ChannelBufferUtils.bytesToChannelBuffer(value);
156                client.set(queueName, buffer, expTime);
157        }
158
159        /**
160         * Get the next value in the queue
161         *
162         * @param queueName
163         * @return the next value
164         */
165        public String get(String queueName) {
166                return get(queueName, 0);
167        }
168
169        /**
170         * Get the next value in the queue
171         *
172         * @param queueName
173         * @return the next value
174         */
175        public byte[] getByte(String queueName) {
176                return getByte(queueName, 0);
177        }
178
179        /**
180         * Get the next value in the queue
181         *
182         * @param queueName
183         * @param waitFor
184         * @return the next value
185         */
186        public byte[] getByte(String queueName, int waitFor) {
187                final Duration waitDuration = Duration.apply(waitFor, TimeUnit.MILLISECONDS);
188                return getByte(queueName, waitDuration);
189        }
190
191        /**
192         * Get the next value in the queue
193         *
194         * @param queueName
195         * @param waitFor
196         * @return the next value
197         */
198        public String get(String queueName, int waitFor) {
199                final Duration waitDuration = Duration.apply(waitFor, TimeUnit.MILLISECONDS);
200                return get(queueName, waitDuration);
201        }
202
203        private static final List<Class<? extends Exception>> THROUGH_EXCEPTIONS = new ArrayList<Class<? extends Exception>>();
204        static {
205                THROUGH_EXCEPTIONS.add(ChannelClosedException.class);
206        };
207
208        /**
209         * Get the next value from the queue
210         *
211         * @param queueName
212         * @param waitDuration
213         * @return the next value
214         */
215        public String get(String queueName, Duration waitDuration) {
216                try {
217                        final ChannelBuffer value = client.get(queueName, waitDuration).apply();
218                        return value == null ? null : value.toString(CharsetUtil.UTF_8);
219                } catch (final Exception e) {
220                        if (THROUGH_EXCEPTIONS.contains(e.getClass())) {
221                                return null;
222                        }
223                        LOG.error(e.getMessage(), e);
224                        throw new IllegalStateException(e);
225                }
226        }
227
228        /**
229         * Get the next value from the queue
230         *
231         * @param queueName
232         * @param waitDuration
233         *            an amount of time to wait before returning null
234         * @return the next value
235         */
236        public byte[] getByte(String queueName, Duration waitDuration) {
237                try {
238                        final ChannelBuffer value = client.get(queueName, waitDuration).apply();
239                        return value == null ? null : value.array();
240                } catch (final Exception e) {
241                        if (THROUGH_EXCEPTIONS.contains(e.getClass())) {
242                                return null;
243                        }
244                        LOG.error(e.getMessage(), e);
245                        throw new IllegalStateException(e);
246                }
247        }
248
249        /**
250         * Get the next value without popping it
251         *
252         * @param queueName
253         * @return the next value
254         */
255        public String peek(String queueName) {
256                return peek(queueName, 0);
257        }
258
259        /**
260         * The next value without popping it
261         *
262         * @param queueName
263         * @param waitFor
264         *            an amount of time to wait before returning null
265         * @return the next value
266         */
267        public String peek(String queueName, int waitFor) {
268                final Duration waitDuration = Duration.apply(waitFor, TimeUnit.MILLISECONDS);
269                return peek(queueName, waitDuration);
270        }
271
272        /**
273         * The next value without popping it
274         *
275         * @param queueName
276         * @param waitDuration
277         *            an amount of time to wait before returning null
278         * @return the next value
279         */
280        public String peek(String queueName, Duration waitDuration) {
281                return get(queueName + "/peek", waitDuration);
282        }
283
284        /**
285         * Get the next value without popping it
286         *
287         * @param queueName
288         * @return the next value
289         */
290        public byte[] peekByte(String queueName) {
291                return peekByte(queueName, 0);
292        }
293
294        /**
295         * The next value without popping it
296         *
297         * @param queueName
298         * @param waitFor
299         *            an amount of time to wait before returning null
300         * @return the next value
301         */
302        public byte[] peekByte(String queueName, int waitFor) {
303                final Duration waitDuration = Duration.apply(waitFor, TimeUnit.MILLISECONDS);
304                return peekByte(queueName, waitDuration);
305        }
306
307        /**
308         * The next value without popping it
309         *
310         * @param queueName
311         * @param waitDuration
312         *            an amount of time to wait before returning null
313         * @return the next value
314         */
315        public byte[] peekByte(String queueName, Duration waitDuration) {
316                return getByte(queueName + "/peek", waitDuration);
317        }
318}