001    package com.github.sarxos.webcam;
002    
003    import java.util.concurrent.ExecutorService;
004    import java.util.concurrent.Executors;
005    import java.util.concurrent.RejectedExecutionException;
006    import java.util.concurrent.SynchronousQueue;
007    import java.util.concurrent.ThreadFactory;
008    import java.util.concurrent.atomic.AtomicBoolean;
009    import java.util.concurrent.atomic.AtomicInteger;
010    
011    
012    public class WebcamProcessor {
013    
014            /**
015             * Thread factory for processor.
016             * 
017             * @author Bartosz Firyn (SarXos)
018             */
019            private static final class ProcessorThreadFactory implements ThreadFactory {
020    
021                    private static final AtomicInteger N = new AtomicInteger(0);
022    
023                    @Override
024                    public Thread newThread(Runnable r) {
025                            Thread t = new Thread(r, String.format("atomic-processor-%d", N.incrementAndGet()));
026                            t.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance());
027                            t.setDaemon(true);
028                            return t;
029                    }
030            }
031    
032            /**
033             * Heart of overall processing system. This class process all native calls
034             * wrapped in tasks, by doing this all tasks executions are
035             * super-synchronized.
036             * 
037             * @author Bartosz Firyn (SarXos)
038             */
039            private static final class AtomicProcessor implements Runnable {
040    
041                    private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true);
042                    private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true);
043    
044                    /**
045                     * Process task.
046                     * 
047                     * @param task the task to be processed
048                     * @return Processed task
049                     * @throws InterruptedException when thread has been interrupted
050                     */
051                    public void process(WebcamTask task) throws InterruptedException {
052    
053                            inbound.put(task);
054    
055                            Throwable t = outbound.take().getThrowable();
056                            if (t != null) {
057                                    throw new WebcamException("Cannot execute task", t);
058                            }
059                    }
060    
061                    @Override
062                    public void run() {
063                            while (true) {
064                                    WebcamTask t = null;
065                                    try {
066                                            (t = inbound.take()).handle();
067                                    } catch (InterruptedException e) {
068                                            break;
069                                    } catch (Throwable e) {
070                                            t.setThrowable(e);
071                                    } finally {
072                                            if (t != null) {
073                                                    try {
074                                                            outbound.put(t);
075                                                    } catch (InterruptedException e) {
076                                                            break;
077                                                    } catch (Exception e) {
078                                                            throw new RuntimeException("Cannot put task into outbound queue", e);
079                                                    }
080                                            }
081                                    }
082                            }
083                    }
084            }
085    
086            /**
087             * Is processor started?
088             */
089            private static final AtomicBoolean started = new AtomicBoolean(false);
090    
091            /**
092             * Execution service.
093             */
094            private static final ExecutorService runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory());
095    
096            /**
097             * Static processor.
098             */
099            private static final AtomicProcessor processor = new AtomicProcessor();
100    
101            /**
102             * Singleton instance.
103             */
104            private static final WebcamProcessor INSTANCE = new WebcamProcessor();;
105    
106            private WebcamProcessor() {
107            }
108    
109            /**
110             * Process single webcam task.
111             * 
112             * @param task the task to be processed
113             * @throws InterruptedException when thread has been interrupted
114             */
115            public void process(WebcamTask task) throws InterruptedException {
116                    if (started.compareAndSet(false, true)) {
117                            runner.execute(processor);
118                    }
119                    if (!runner.isShutdown()) {
120                            processor.process(task);
121                    } else {
122                            throw new RejectedExecutionException("Cannot process because processor runner has been already shut down");
123                    }
124            }
125    
126            public static synchronized WebcamProcessor getInstance() {
127                    return INSTANCE;
128            }
129    }