-
Notifications
You must be signed in to change notification settings - Fork 696
Description
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
-
Advanced Java: Producer-Consumer using BlockingQueue and ExecutorService
-
This handles synchronization automatically, preventing race conditions.
*/
public class AdvancedConcurrency {public static void main(String[] args) {
// Creates a thread-safe queue with capacity 10
BlockingQueue sharedQueue = new ArrayBlockingQueue<>(10);// Executor Service to manage threads ExecutorService producerExecutor = Executors.newFixedThreadPool(2); ExecutorService consumerExecutor = Executors.newFixedThreadPool(2); // Submitting Producers for (int i = 0; i < 2; i++) { producerExecutor.submit(new Producer(sharedQueue, i)); } // Submitting Consumers for (int i = 0; i < 2; i++) { consumerExecutor.submit(new Consumer(sharedQueue, i)); } // Shutdown executors properly producerExecutor.shutdown(); consumerExecutor.shutdown();}
// Producer class
static class Producer implements Runnable {
private final BlockingQueue queue;
private final int producerId;public Producer(BlockingQueue<Integer> queue, int id) { this.queue = queue; this.producerId = id; } @Override public void run() { try { for (int i = 0; i < 5; i++) { System.out.println("Producer " + producerId + " produced: " + i); queue.put(i); // Blocks if queue is full Thread.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }}
// Consumer class
static class Consumer implements Runnable {
private final BlockingQueue queue;
private final int consumerId;public Consumer(BlockingQueue<Integer> queue, int id) { this.queue = queue; this.consumerId = id; } @Override public void run() { try { while (true) { Integer item = queue.take(); // Blocks if queue is empty System.out.println("Consumer " + consumerId + " consumed: " + item); Thread.sleep(200); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }}
}