AnnotatedBuffer.java

(download)

import java.util.Random;
import java.util.logging.*;

public class AnnotatedBuffer<E> {

  private static final Logger logger;
  static {
    Handler h = new ConsoleHandler();
    h.setFormatter(new Formatter() {
        public String format (LogRecord r) {
          return String.format("%1$tH:%1$tM:%1$tS.%1$tL: %2$s%n",
                               r.getMillis(), r.getMessage());
        }
      });
    logger = Logger.getLogger("buffer");
    logger.setUseParentHandlers(false);
    logger.addHandler(new MemoryHandler(h, 100, Level.SEVERE));
  }

  public final int capacity;
  private final E[] store;
  private int head, tail, size;

  int waitSetSize;
  long msgCount, lastChange;
  
  @SuppressWarnings("unchecked")
  public AnnotatedBuffer (int capacity) {
    this.capacity = capacity;
    this.store = (E[])new Object[capacity];
  }
  private int next (int x) {
    return (x + 1) % store.length;
  }
  public synchronized void put (E e) throws InterruptedException {
    String name = Thread.currentThread().getName();
    while (isFull()) {
      logger.info(String.format("buffer full; %s waits", name));
      waitSetSize++;
      wait();
      logger.info(String.format("%s is notified", name));
    }
    logger.info(String.format("%s successfully puts", name));
    notify();
    if (waitSetSize > 0)
      waitSetSize--;
    logger.fine(String.format("wait set size is %d", waitSetSize));
    store[tail] = e;
    tail = next(tail);
    size++;
    lastChange = System.currentTimeMillis();
  }
  public synchronized E get () throws InterruptedException {
    String name = Thread.currentThread().getName();
    while (isEmpty()) {
      logger.info(String.format("buffer empty; %s waits", name));
      waitSetSize++;
      logger.fine(String.format("wait set size is %d", waitSetSize));
      wait();
      logger.info(String.format("%s is notified", name));
    }
    logger.info(String.format("%s successfully gets", name));
    logger.finer(String.format("%d messages retrieved", ++msgCount));
    notify();
    if (waitSetSize > 0)
      waitSetSize--;
    E e = store[head];
    store[head] = null; // for GC
    head = next(head);
    size--;
    lastChange = System.currentTimeMillis();
    return e;
  }
  public synchronized boolean isFull () {
    return size == capacity;
  }
  public synchronized boolean isEmpty () {
    return size == 0;
  }

  public static void main (String[] args) throws Exception {
    int size = Integer.parseInt(args[0]);
    int prodCount = Integer.parseInt(args[1]);
    int consCount = Integer.parseInt(args[2]);
    final int sleepProd = Integer.parseInt(args[3]);
    final int sleepCons = Integer.parseInt(args[4]);
    final int threadCount = prodCount + consCount;
    final AnnotatedBuffer<String> buffer = new AnnotatedBuffer<String>(size);
    final Thread[] participants = new Thread[threadCount];
    final Random rand = new Random();

    class Producer extends Thread {
      public Producer (int n) {
        super("producer-"+n);
      }
      public void run () {
        String name = getName();
        try {
          while (true) {
            sleep(rand.nextInt(sleepProd));
            buffer.put(name);
          }
        } catch (InterruptedException e) {
          return;
        }
      }
    }

    class Consumer extends Thread {
      public Consumer (int n) {
        super("consumer-"+n);
      }
      public void run () {
        String name = getName();
        try {
          while (true) {
            buffer.get();
            sleep(rand.nextInt(sleepCons));
          }
        } catch (InterruptedException e) {
          return;
        }
      }
    }

    int i = 0;
    for (int j=0; j<prodCount; j++)
      participants[i++] = new Producer(j);
    for (int j=0; j<consCount; j++)
      participants[i++] = new Consumer(j);
    for (Thread t : participants)
      t.start();

    long time = System.currentTimeMillis();
    while (true) {
      Thread.sleep(60000); // check for deadlock every minute
      synchronized (buffer) {
        if (buffer.waitSetSize == threadCount) {
          logger.severe(String.format
                        ("DEADLOCK after %d messages and %.1f seconds!",
                         buffer.msgCount, (buffer.lastChange - time) / 1e3));
          for (Thread t : participants)
            t.interrupt();
          return;
        }
      }
    }
  }
}