001package org.openimaj.picslurper.output;
002
003import java.io.IOException;
004import java.io.StringWriter;
005import java.net.URL;
006
007import org.apache.log4j.Logger;
008import org.openimaj.io.IOUtils;
009import org.zeromq.ZMQ;
010import org.zeromq.ZMQ.Socket;
011
012/**
013 * Using ZeroMQ (because it is awesome and I wanted to learn it so fuck off) we set up a
014 * subscription based server which tells anyone listening about any new images.
015 *
016 * The name of the queue can be specified
017 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
018 *
019 */
020public class ZMQOutputListener implements OutputListener {
021
022        private static final Logger logger = Logger.getLogger(ZMQOutputListener.class);
023
024        private Socket publisher;
025
026        /**
027         * Construct the publishing connector
028         */
029        public ZMQOutputListener() {
030
031        }
032
033        @Override
034        public void newImageDownloaded(WriteableImageOutput written) {
035                try {
036                        StringWriter writer = new StringWriter();
037                        IOUtils.writeASCII(writer, written);
038                        publisher.send("IMAGE".getBytes("UTF-8"), ZMQ.SNDMORE);
039                        boolean sent = publisher.send(writer.toString().getBytes("UTF-8"), 0);
040                        if(!sent){
041                                throw new IOException("Send failed");
042                        }
043                } catch (IOException e) {
044                        logger.error("Unable to send written image: " + written.url);
045                        logger.error(e.getMessage());
046                }
047        }
048
049        @Override
050        public void failedURL(URL url, String reason) {
051                try {
052                        StringWriter writer = new StringWriter();
053                        if(url==null)return;
054                        IOUtils.writeASCII(writer, new WriteableFailedURL(url, reason));
055                        publisher.send("FAIL".getBytes("UTF-8"), ZMQ.SNDMORE);
056                        boolean sent = publisher.send(writer.toString().getBytes("UTF-8"), 0);
057                        if(!sent){
058                                throw new IOException("Send failed");
059                        }
060                } catch (IOException e) {
061                        logger.error("Unable to send failure!");
062                }
063        }
064
065        @Override
066        public void finished() {
067                publisher.close();
068        }
069
070        @Override
071        public void prepare() {
072                ZMQ.Context context = ZMQ.context(1);
073                publisher = context.socket(ZMQ.PUB);
074
075                publisher.bind("tcp://*:5563");
076        }
077
078}