001package org.openimaj.picslurper.output; 002 003import java.io.IOException; 004import java.io.StringWriter; 005import java.net.URL; 006 007import org.apache.log4j.Logger; 008import org.openimaj.io.IOUtils; 009import org.zeromq.ZMQ; 010import org.zeromq.ZMQ.Socket; 011 012/** 013 * Using ZeroMQ (because it is awesome and I wanted to learn it so fuck off) we set up a 014 * subscription based server which tells anyone listening about any new images. 015 * 016 * The name of the queue can be specified 017 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 018 * 019 */ 020public class ZMQOutputListener implements OutputListener { 021 022 private static final Logger logger = Logger.getLogger(ZMQOutputListener.class); 023 024 private Socket publisher; 025 026 /** 027 * Construct the publishing connector 028 */ 029 public ZMQOutputListener() { 030 031 } 032 033 @Override 034 public void newImageDownloaded(WriteableImageOutput written) { 035 try { 036 StringWriter writer = new StringWriter(); 037 IOUtils.writeASCII(writer, written); 038 publisher.send("IMAGE".getBytes("UTF-8"), ZMQ.SNDMORE); 039 boolean sent = publisher.send(writer.toString().getBytes("UTF-8"), 0); 040 if(!sent){ 041 throw new IOException("Send failed"); 042 } 043 } catch (IOException e) { 044 logger.error("Unable to send written image: " + written.url); 045 logger.error(e.getMessage()); 046 } 047 } 048 049 @Override 050 public void failedURL(URL url, String reason) { 051 try { 052 StringWriter writer = new StringWriter(); 053 if(url==null)return; 054 IOUtils.writeASCII(writer, new WriteableFailedURL(url, reason)); 055 publisher.send("FAIL".getBytes("UTF-8"), ZMQ.SNDMORE); 056 boolean sent = publisher.send(writer.toString().getBytes("UTF-8"), 0); 057 if(!sent){ 058 throw new IOException("Send failed"); 059 } 060 } catch (IOException e) { 061 logger.error("Unable to send failure!"); 062 } 063 } 064 065 @Override 066 public void finished() { 067 publisher.close(); 068 } 069 070 @Override 071 public void prepare() { 072 ZMQ.Context context = ZMQ.context(1); 073 publisher = context.socket(ZMQ.PUB); 074 075 publisher.bind("tcp://*:5563"); 076 } 077 078}