001 package com.github.sarxos.webcam; 002 003 import java.util.concurrent.ExecutorService; 004 import java.util.concurrent.Executors; 005 import java.util.concurrent.RejectedExecutionException; 006 import java.util.concurrent.SynchronousQueue; 007 import java.util.concurrent.ThreadFactory; 008 import java.util.concurrent.atomic.AtomicBoolean; 009 import java.util.concurrent.atomic.AtomicInteger; 010 011 012 public class WebcamProcessor { 013 014 /** 015 * Thread factory for processor. 016 * 017 * @author Bartosz Firyn (SarXos) 018 */ 019 private static final class ProcessorThreadFactory implements ThreadFactory { 020 021 private static final AtomicInteger N = new AtomicInteger(0); 022 023 @Override 024 public Thread newThread(Runnable r) { 025 Thread t = new Thread(r, String.format("atomic-processor-%d", N.incrementAndGet())); 026 t.setUncaughtExceptionHandler(WebcamExceptionHandler.getInstance()); 027 t.setDaemon(true); 028 return t; 029 } 030 } 031 032 /** 033 * Heart of overall processing system. This class process all native calls 034 * wrapped in tasks, by doing this all tasks executions are 035 * super-synchronized. 036 * 037 * @author Bartosz Firyn (SarXos) 038 */ 039 private static final class AtomicProcessor implements Runnable { 040 041 private SynchronousQueue<WebcamTask> inbound = new SynchronousQueue<WebcamTask>(true); 042 private SynchronousQueue<WebcamTask> outbound = new SynchronousQueue<WebcamTask>(true); 043 044 /** 045 * Process task. 046 * 047 * @param task the task to be processed 048 * @return Processed task 049 * @throws InterruptedException when thread has been interrupted 050 */ 051 public void process(WebcamTask task) throws InterruptedException { 052 053 inbound.put(task); 054 055 Throwable t = outbound.take().getThrowable(); 056 if (t != null) { 057 throw new WebcamException("Cannot execute task", t); 058 } 059 } 060 061 @Override 062 public void run() { 063 while (true) { 064 WebcamTask t = null; 065 try { 066 (t = inbound.take()).handle(); 067 } catch (InterruptedException e) { 068 break; 069 } catch (Throwable e) { 070 t.setThrowable(e); 071 } finally { 072 if (t != null) { 073 try { 074 outbound.put(t); 075 } catch (InterruptedException e) { 076 break; 077 } catch (Exception e) { 078 throw new RuntimeException("Cannot put task into outbound queue", e); 079 } 080 } 081 } 082 } 083 } 084 } 085 086 /** 087 * Is processor started? 088 */ 089 private static final AtomicBoolean started = new AtomicBoolean(false); 090 091 /** 092 * Execution service. 093 */ 094 private static final ExecutorService runner = Executors.newSingleThreadExecutor(new ProcessorThreadFactory()); 095 096 /** 097 * Static processor. 098 */ 099 private static final AtomicProcessor processor = new AtomicProcessor(); 100 101 /** 102 * Singleton instance. 103 */ 104 private static final WebcamProcessor INSTANCE = new WebcamProcessor();; 105 106 private WebcamProcessor() { 107 } 108 109 /** 110 * Process single webcam task. 111 * 112 * @param task the task to be processed 113 * @throws InterruptedException when thread has been interrupted 114 */ 115 public void process(WebcamTask task) throws InterruptedException { 116 if (started.compareAndSet(false, true)) { 117 runner.execute(processor); 118 } 119 if (!runner.isShutdown()) { 120 processor.process(task); 121 } else { 122 throw new RejectedExecutionException("Cannot process because processor runner has been already shut down"); 123 } 124 } 125 126 public static synchronized WebcamProcessor getInstance() { 127 return INSTANCE; 128 } 129 }