001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.demos.sandbox.kestrel;
031
032import net.lag.kestrel.thrift.Item;
033
034import org.apache.thrift7.TException;
035import org.openimaj.kestrel.SimpleKestrelClient;
036
037import backtype.storm.spout.KestrelThriftClient;
038
039import com.google.common.collect.Sets;
040
041public class KestrelPlay {
042        public static void main(String[] args) throws TException {
043                // client.set("sina",100, "Cheese!");
044                // System.out.println(client.get("sina"));
045                // client.delete("sina");
046                // client.close();
047                // producerConsumer(500,100);
048                // producerConsumer(100,500);
049                producerUnreliableConsumer(500, 500);
050        }
051
052        private static void producerUnreliableConsumer(final long produceRate, final long consumeRate) throws TException {
053                final KestrelThriftClient client = new KestrelThriftClient("127.0.0.1", 2229);
054                client.delete_queue("sina");
055                client.close();
056                new Thread(new Runnable() {
057                        @Override
058                        public void run() {
059                                KestrelThriftClient client = null;
060                                try {
061                                        client = new KestrelThriftClient("127.0.0.1", 2229);
062                                        int i = 0;
063                                        while (true) {
064                                                client.put("sina", "Cheese " + i++, 0);
065                                                try {
066                                                        Thread.sleep(produceRate);
067                                                } catch (final InterruptedException e) {
068                                                }
069                                        }
070                                } catch (final TException e1) {
071                                        // TODO Auto-generated catch block
072                                        e1.printStackTrace();
073                                }
074                                finally {
075                                        client.close();
076                                }
077                        }
078                }).start();
079
080                new Thread(new Runnable() {
081
082                        @Override
083                        public void run() {
084                                KestrelThriftClient client = null;
085                                try {
086                                        int i = 0;
087                                        client = new KestrelThriftClient("127.0.0.1", 2229);
088                                        while (true) {
089                                                final Item item = client.get("sina", 1, 1000, 1000).get(0);
090                                                final String itemStr = new String(item.get_data());
091                                                if (i++ % 2 == 0) {
092                                                        System.out.println("Read successfully: " + itemStr);
093                                                        client.confirm("sina", Sets.newHashSet(item.get_id()));
094                                                }
095                                                else {
096                                                        System.err.println("Failed to read: " + itemStr);
097                                                }
098                                                try {
099                                                        Thread.sleep(consumeRate);
100                                                } catch (final InterruptedException e) {
101                                                        e.printStackTrace();
102                                                }
103                                        }
104                                } catch (final Exception e1) {
105                                        // TODO Auto-generated catch block
106                                        e1.printStackTrace();
107                                }
108                                finally {
109                                        client.close();
110                                }
111
112                        }
113                }).start();
114        }
115
116        private static void producerConsumer(final long produceRate, final long consumeRate) {
117                final SimpleKestrelClient client = new SimpleKestrelClient("127.0.0.1", 22133);
118                client.delete("sina");
119                client.close();
120                new Thread(new Runnable() {
121                        @Override
122                        public void run() {
123                                SimpleKestrelClient client = null;
124                                try {
125                                        client = new SimpleKestrelClient("127.0.0.1", 22133);
126                                        int i = 0;
127                                        while (true) {
128                                                client.set("sina", "Cheese " + i++);
129                                                try {
130                                                        Thread.sleep(produceRate);
131                                                } catch (final InterruptedException e) {
132                                                }
133                                        }
134                                }
135                                finally {
136                                        client.close();
137                                }
138                        }
139                }).start();
140
141                new Thread(new Runnable() {
142                        @Override
143                        public void run() {
144                                SimpleKestrelClient client = null;
145                                try {
146                                        client = new SimpleKestrelClient("127.0.0.1", 22133);
147                                        while (true) {
148                                                System.out.println(client.get("sina"));
149                                                try {
150                                                        Thread.sleep(consumeRate);
151                                                } catch (final InterruptedException e) {
152                                                }
153                                        }
154                                }
155                                finally {
156                                        client.close();
157                                }
158
159                        }
160                }).start();
161        }
162}