The following code is used for demonstrating thread-safety issues on eclipse.
Step 1: ProducerConsumerTest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | package com.debug.multithread; public class ProducerConsumerTest { public static void main(String[] args) throws InterruptedException { ProducerConsumer pc = new ProducerConsumer(); //spawn a new producer thread and start Thread producer = new Thread(new ProducerThread(pc)); producer.start(); Thread.sleep(10); //main thread sleeps for 1 second //spawn a new consumer thread and start Thread consumer = new Thread(new ConsumerThread(pc)); consumer.start(); } } |
Step 2: Producer thread.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | package com.debug.multithread; public class ProducerThread implements Runnable { private ProducerConsumer pc; public ProducerThread(ProducerConsumer pc) { this.pc = pc; } @Override public void run() { pc.produce(); } } |
Step 3: Consumer thread.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | package com.debug.multithread; public class ConsumerThread implements Runnable { private ProducerConsumer pc; public ConsumerThread(ProducerConsumer pc) { this.pc = pc; } @Override public void run() { pc.consume(); } } |
Step 4: ProducerConsumer logic.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | package com.debug.multithread; import java.util.concurrent.ArrayBlockingQueue; //only one thread can access either produce or consume methods as both are synchronized public class ProducerConsumer { private int count = 0; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3); public synchronized void produce() { while (true) { try { wait(100); } catch (InterruptedException ie) {} if (queue.isEmpty()) { count++; try { Thread.sleep(4000); // takes 4 secs to produce } catch (InterruptedException e) {} queue.add(count); System.out.println(Thread.currentThread().getName() + " produced: " + count); notifyAll(); } }//end while } public synchronized void consume() { while (true) { try { wait(100); } catch (InterruptedException ie) {} if (!queue.isEmpty()) { Integer consumed = queue.remove(); // consumed System.out.println(Thread.currentThread().getName() + " consumed: " + consumed); notifyAll(); } }//end while } } |
Key Points
1) The wait & notifyAll methods in the java.lang.Object class is used for inter-thread communication to wait for the object’s lock or to tell other waiting threads that the lock has been relinquished.
2) The wait & notifyAll methods must be invoked within a synchronized method or block.
3) The “ProducerThread” and “ConsumerThread” are invoked by the JVM main thread, and they run for ever until the program is killed as they are run within ” while (true)“.
4) ArrayBlockingQueue is a “FIFO” (i.e. First In First Out) data structure, which is typically used to have on thread produce objects, which another thread consumes.
5) When the blocking queue reaches its upper limit, the producing thread is blocked while trying to insert the new object. It remains blocked until a consuming thread takes an object out of the queue. The consume thread gets blocked when the queue is empty. The “ConsumerThread” waits for the “ProducerThread” to insert an object into queue and the notify all the blocked threads.