Skip to content

Instantly share code, notes, and snippets.

@scvalex
Created October 6, 2010 10:43

Revisions

  1. scvalex revised this gist Oct 6, 2010. 1 changed file with 6 additions and 0 deletions.
    6 changes: 6 additions & 0 deletions output
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,6 @@
    ...
    Consumed 9997
    Consumed 9998
    Consumed 9999
    Test took 264.141s

  2. scvalex created this gist Oct 6, 2010.
    104 changes: 104 additions & 0 deletions TxDontLoseMessages.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,104 @@
    // The contents of this file are subject to the Mozilla Public License
    // Version 1.1 (the "License"); you may not use this file except in
    // compliance with the License. You may obtain a copy of the License at
    // http://www.mozilla.org/MPL/
    //
    // Software distributed under the License is distributed on an "AS IS"
    // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
    // License for the specific language governing rights and limitations
    // under the License.
    //
    // The Original Code is RabbitMQ.
    //
    // The Initial Developers of the Original Code are LShift Ltd,
    // Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
    //
    // Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
    // Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
    // are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
    // Technologies LLC, and Rabbit Technologies Ltd.
    //
    // Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
    // Ltd. Portions created by Cohesive Financial Technologies LLC are
    // Copyright (C) 2007-2010 Cohesive Financial Technologies
    // LLC. Portions created by Rabbit Technologies Ltd are Copyright
    // (C) 2007-2010 Rabbit Technologies Ltd.
    //
    // All Rights Reserved.
    //
    // Contributor(s): ______________________________________.
    //

    package com.rabbitmq.examples;

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    import com.rabbitmq.client.QueueingConsumer;

    import java.io.IOException;

    public class TxDontLoseMessages {
    final static int MSG_COUNT = 10000;
    final static String QUEUE_NAME = "tx-test";
    static ConnectionFactory connectionFactory;

    public static void main(String[] args)
    throws IOException, InterruptedException
    {
    connectionFactory = new ConnectionFactory();

    (new Thread(new Consumer())).start();
    (new Thread(new Publisher())).start();
    }

    static class Publisher implements Runnable {
    public void run() {
    try {
    long startTime = System.currentTimeMillis();

    Connection conn = connectionFactory.newConnection();
    Channel ch = conn.createChannel();
    ch.queueDeclare(QUEUE_NAME, true, false, true, null);
    ch.txSelect();
    for (int i = 0; i < MSG_COUNT; ++i) {
    ch.basicPublish("", QUEUE_NAME,
    MessageProperties.PERSISTENT_BASIC,
    "nop".getBytes());
    ch.txCommit();
    }
    ch.close();
    conn.close();

    long endTime = System.currentTimeMillis();
    System.out.printf("Test took %.3fs\n", (float)(endTime - startTime)/1000);
    } catch (Throwable e) {
    System.out.println("foobar :(");
    System.out.print(e);
    }
    }
    }

    static class Consumer implements Runnable {
    public void run() {
    try {
    Connection conn = connectionFactory.newConnection();
    Channel ch = conn.createChannel();
    ch.queueDeclare(QUEUE_NAME, true, false, true, null);
    QueueingConsumer qc = new QueueingConsumer(ch);
    ch.basicConsume(QUEUE_NAME, true, qc);
    for (int i = 0; i < MSG_COUNT; ++i) {
    qc.nextDelivery();
    System.out.printf("Consumed %d\n", i);
    }
    ch.close();
    conn.close();
    } catch (Throwable e) {
    System.out.println("Whoosh!");
    System.out.print(e);
    }
    }
    }
    }