001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.demos.sandbox.kestrel; 031 032import net.lag.kestrel.thrift.Item; 033 034import org.apache.thrift7.TException; 035import org.openimaj.kestrel.SimpleKestrelClient; 036 037import backtype.storm.spout.KestrelThriftClient; 038 039import com.google.common.collect.Sets; 040 041public class KestrelPlay { 042 public static void main(String[] args) throws TException { 043 // client.set("sina",100, "Cheese!"); 044 // System.out.println(client.get("sina")); 045 // client.delete("sina"); 046 // client.close(); 047 // producerConsumer(500,100); 048 // producerConsumer(100,500); 049 producerUnreliableConsumer(500, 500); 050 } 051 052 private static void producerUnreliableConsumer(final long produceRate, final long consumeRate) throws TException { 053 final KestrelThriftClient client = new KestrelThriftClient("127.0.0.1", 2229); 054 client.delete_queue("sina"); 055 client.close(); 056 new Thread(new Runnable() { 057 @Override 058 public void run() { 059 KestrelThriftClient client = null; 060 try { 061 client = new KestrelThriftClient("127.0.0.1", 2229); 062 int i = 0; 063 while (true) { 064 client.put("sina", "Cheese " + i++, 0); 065 try { 066 Thread.sleep(produceRate); 067 } catch (final InterruptedException e) { 068 } 069 } 070 } catch (final TException e1) { 071 // TODO Auto-generated catch block 072 e1.printStackTrace(); 073 } 074 finally { 075 client.close(); 076 } 077 } 078 }).start(); 079 080 new Thread(new Runnable() { 081 082 @Override 083 public void run() { 084 KestrelThriftClient client = null; 085 try { 086 int i = 0; 087 client = new KestrelThriftClient("127.0.0.1", 2229); 088 while (true) { 089 final Item item = client.get("sina", 1, 1000, 1000).get(0); 090 final String itemStr = new String(item.get_data()); 091 if (i++ % 2 == 0) { 092 System.out.println("Read successfully: " + itemStr); 093 client.confirm("sina", Sets.newHashSet(item.get_id())); 094 } 095 else { 096 System.err.println("Failed to read: " + itemStr); 097 } 098 try { 099 Thread.sleep(consumeRate); 100 } catch (final InterruptedException e) { 101 e.printStackTrace(); 102 } 103 } 104 } catch (final Exception e1) { 105 // TODO Auto-generated catch block 106 e1.printStackTrace(); 107 } 108 finally { 109 client.close(); 110 } 111 112 } 113 }).start(); 114 } 115 116 private static void producerConsumer(final long produceRate, final long consumeRate) { 117 final SimpleKestrelClient client = new SimpleKestrelClient("127.0.0.1", 22133); 118 client.delete("sina"); 119 client.close(); 120 new Thread(new Runnable() { 121 @Override 122 public void run() { 123 SimpleKestrelClient client = null; 124 try { 125 client = new SimpleKestrelClient("127.0.0.1", 22133); 126 int i = 0; 127 while (true) { 128 client.set("sina", "Cheese " + i++); 129 try { 130 Thread.sleep(produceRate); 131 } catch (final InterruptedException e) { 132 } 133 } 134 } 135 finally { 136 client.close(); 137 } 138 } 139 }).start(); 140 141 new Thread(new Runnable() { 142 @Override 143 public void run() { 144 SimpleKestrelClient client = null; 145 try { 146 client = new SimpleKestrelClient("127.0.0.1", 22133); 147 while (true) { 148 System.out.println(client.get("sina")); 149 try { 150 Thread.sleep(consumeRate); 151 } catch (final InterruptedException e) { 152 } 153 } 154 } 155 finally { 156 client.close(); 157 } 158 159 } 160 }).start(); 161 } 162}