package com.rabbitmq.examples; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; public class QosDemo { public static void main(String[] args) throws Exception { Connection con = new ConnectionFactory().newConnection("localhost"); Channel throwAwayChannel = con.createChannel(); String queue = throwAwayChannel.queueDeclare().getQueue(); ExecutorService threadExecutor = Executors.newFixedThreadPool(5); int prefetchCount = 1; Worker fast = new Worker(prefetchCount, threadExecutor, 1, con.createChannel(), queue); Worker slow = new Worker(prefetchCount, threadExecutor, 100, con.createChannel(), queue); Producer producer = new Producer(con.createChannel(), queue); threadExecutor.execute(producer); Thread.sleep(10000); threadExecutor.shutdownNow(); con.close(); System.err.println("Fast worker processed : " + fast.processed); System.err.println("Slow worker processed : " + slow.processed); } static class Worker extends DefaultConsumer { String name; long sleep; Channel channel; String queue; int processed; ExecutorService executorService; public Worker(int prefetch, ExecutorService threadExecutor, long s, Channel c, String q) throws Exception { super(c); sleep = s; channel = c; queue = q; channel.basicQos(prefetch); channel.basicConsume(queue, false, this); executorService = threadExecutor; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Runnable task = new VariableLengthTask(this, envelope.getDeliveryTag(), channel, sleep); executorService.submit(task); } } static class VariableLengthTask implements Runnable { long tag; long sleep; Channel chan; Worker worker; VariableLengthTask(Worker w, long t, Channel c, long s) { worker = w; tag = t; chan = c; sleep = s; } public void run() { try { Thread.sleep(sleep * 10); } catch (InterruptedException e) { throw new RuntimeException(e); } if (chan.isOpen()) { try { chan.basicAck(tag, false); worker.processed++; } catch (IOException e) {} } } } static class Producer implements Runnable { Channel channel; String routingKey; Producer(Channel c, String r) { channel = c; routingKey = r; } public void run() { while (true) { try { channel.basicPublish("", routingKey, MessageProperties.BASIC, null); Thread.sleep(10); } catch (Exception e) { break; } } } } }