hits counter

Brian Whitman @ variogr.am Brian Whitman @ variogr.am

Scroll to Info & Navigation

Replacing Amazon SQS with something faster and cheaper

Did you know that SQS costs a dollar per million messages? That’s sent or received, so really it’s 2 bucks a million. Another thing with SQS: it’s slow. Even with 200 connections adding 10K messages takes minutes. [Actual #: 10000 100 byte strings from an EC2 instance takes 200 seconds.]

So I played with RabbitMQ. Phil J told me about it. Normally when he says something I just do it, but this time I wanted at least to prove to myself that he was right. 

If you want to replace SQS with an AMQP like RabbitMQ, here’s what you do. You make a ’direct exchange’ with the routing key being the queue name. You bind the queue to the exchange with the routing key and messages sent to the exchange are forwarded to the relevant queues. Consumers then get directly from the queue and send an “ack” when they are done, which pops it off the queue. There’s two consuming methods, one is called basic_consume which appears to block all other consumers from reading the queue (don’t know why) and one is basic_get which acts like SQS in that multiple consumers can check the queue and send acks independently of the get. Other exchange types are “fanout” which sends a message to everybody, and “topic” which lets you set a hierarchy of queue names to group clusters of workers. I used the py-amqp library in python to get this done, it just worked. Other good links: performance test code, queue w/ thrift, amqp description.

Adding 10000 ~100byte json strings to Rabbit in durable mode from a low-end amzn instance (not the rabbit instance) takes ~5 seconds. That’s 40x faster for sending than SQS. Messages can be retrieved serially from a single thread at about 25 msgs/second (again, more threads == same retrieval rate) Adding 100K messages to Rabbit took 55 seconds. 

During message production @ 10K msgs, the beam process has about 15% CPU and 5% RAM on the amzn instance it’s hosted on. During retrieval, the CPU drops to 0.5%. At idle RAM stays at 5 with no CPU. I added 2M messages to test RAM usage and RAM went up to 50% (1.7GB total on these boxes.) Lots of talk about how to make this easier on yourself here but the overall word is to not have that many unread messages sitting around. 

Costs for rabbit: $72/mo for the rabbit instance if I want to keep it segregated. Requests are free if i stay in ec2. 

Upsides: much faster adding, more flexible, slightly cheaper.

Downsides: something I have to think about, backup, image, etc, although so far it seems pretty hands-off and I can put the persistent data on an EBS w/ an hourly snapshot just in case. 

Edit: Some code in Python using py-amqp follows. All I had to do on the server was apt-get install erlang and then install the RabbitMQ .deb file and poke a hoke in the amzn security group for the AMQP port 5672. I didn’t do any other config on the rabbitMQ server to be able to run this code. (Obviously for real world use you’ll want auth & stuff)

 
import simplejson
import amqplib.client_0_8 as amqp
        
class Queue():
    """Base class for AMQP queue service."""
    def __init__(self, queue_name):
        self.conn = amqp.Connection('HOST_HERE', userid='guest', password='guest', ssl=False) 
        self.queue_name = queue_name
        self.ch = self.conn.channel()

    def declare(self):
        return self.ch.queue_declare(self.queue_name, passive=False, durable=True, exclusive=False, auto_delete=False)
        
    def __len__(self):
        """Return number of messages waiting in this queue"""
        _,n_msgs,_ = self.declare()
        return n_msgs
    
    def consumers(self):
        """Return how many clients are currently listening to this queue."""
        _,_,n_consumers = self.declare()
        return n_consumers
    

class QueueProducer(Queue):
    def __init__(self, queue_name):
        """Create new queue producer (guy that creates messages.) 
        Will create a queue if the given name does not already exist."""
        Queue.__init__(self, queue_name)
        self.ch.access_request('/data',active=True,read=False,write=True)
        self.ch.exchange_declare('sqs_exchange', 'direct', durable=True, auto_delete=False)
        qname, n_msgs, n_consumers  = self.declare()
        print "Connected to %s (%d msgs, %d consumers)" % (qname, n_msgs, n_consumers)
        self.ch.queue_bind(self.queue_name, 'sqs_exchange', self.queue_name)
    
    def delete(self):
        """Delete a queue and closes the queue connection."""
        self.ch.queue_delete(self.queue_name)
        self.ch.close()
        
    def write(self, message):
        """Write a single message to the queue. Message can be a dict or a list or whatever."""
        m = amqp.Message(simplejson.dumps(message), content_type='text/x-json')
        self.ch.basic_publish(m, 'sqs_exchange', self.queue_name)

class QueueConsumer(Queue):
    def __init__(self, queue_name):
        """Create new queue consumer (guy that listens to messages.)"""
        Queue.__init__(self, queue_name)
        self.ch.access_request('/data',active=True, read=True, write=False)
        self.ch.queue_bind(self.queue_name, 'sqs_exchange', self.queue_name)
    
    def ack(self, delivery_tag):
        """Acknowledge receipt of the message (which will remove it off the queue.)
         Do this after you've completed your processing of the message. 
         Otherwise after some amount of time (about a minute) it will go back on the queue.
         e.g. 
        
         (object, tag) = consumer.get()
         if(object is not None):
             error = doSomethingWithMessage(object)
             if(error is None):
                 consumer.ack(tag)
        
        """
        self.ch.basic_ack(delivery_tag)
        
    def get(self):
        """Get a message. Returns the object and a delivery tag.""" 
        m = self.ch.basic_get(self.queue_name)
        if(m is not None):
            try:
                ret = simplejson.loads(m.body)
            except ValueError:
                print "Problem decoding json for body " + str(m.body) + ". deleting."
                self.ack(m.delivery_tag)
                return (None, None)
            return (ret, m.delivery_tag)
        else:
            return (None,None)