Introducing The Erlang AMQP Client
This article introduces the AMQP client library for Erlang, which is currently available as a contribution to the RabbitMQ server.
This library will be of interest to those wanting to:
- Use Erlang to connect to an AMQP server;
- Deploy Erlang modules directly into RabbitMQ and expose them via AMQP.
This is the first of a series of articles describing the Erlang AMQP client. This article assumes that you have a basic understanding of the AMQP model.
Originally, the intention of the library was to provide an embeddable client for Rabbit, which itself is written in Erlang. However, this would have meant the client would be tied to a particular version of the internal Rabbit API that would most probably change over time. The implication of this approach, therefore, is that each refactoring in the server would run the risk of breaking this client. Hence, the introduction of a client API was considered to decrease the maintenance effort of the server.
The goal was then reformulated to define a client abstraction that would remain unaffected in the face of changes made to the implementation of the server. The irony was that there was a perfectly good client abstraction that the server already implemented - namely AMQP.
The design decision then boiled down to piggybacking off two architectural insights that were already provided:
- The AMQP specification is divided into layers that abstract the execution model from the underlying wire format, framing and transport concerns;
- The Erlang language is based on explicit message passing.
As the server modules handling the unmarshalling of AMQP wireframes were separated from the modules that processed the execution model via Erlang message passing, it was quite easy to send AMQP methods directly to the modules that process the AMQP model in the server. Effectively, this meant providing an alternative transport mechanism to the protocol stack, for which Erlang provides all of the tools out of the box.
To see how this works from a client perspective, it is assumed that you already know the basics of Erlang and that you have it installed. Next, you need to obtain the server and client code. This example is based on:
- The RabbitMQ development snapshot from Dec 23 2007;
- A snapshot of the client from Jan 11 2008.
Once the server code has been downloaded, you need to build it and ensure it is linked into the OTP library you are running. This can be achieved by building from the makefile and adding a symbolic link from the lib directory in your OTP installation to the root directory of the RabbitMQ server.
Now you can build the client from the makefile in the client tarball and start an Erlang shell in the current working dir of the client distribution:
$ erl -pa ebin -mnesia dir $MNESIA_DIR -boot start_sasl
-s rabbit
This will boot the RabbitMQ server, pass in a directory where you would like mnesia to store its data files, and put the client binaries onto the load path of the Erlang runtime.
If you would like to run the unit tests for the client you will need to get a copy of Eunit from their svn repo, link it into your OTP installation and run the test module:
1> direct_client_test:test().
=INFO REPORT==== 13-Jan-2008::14:15:51 ===
Requeueing 1 messages, 0 already on queue
All 7 tests successful.
ok
However, if you are more interested in a practical example of how to program AMQP in Erlang, you might want to write an Erlang module similar to the following that demonstrates the lifecycle of AMQP from a client perspective:
User = Password = "guest",
Realm = <<"/data">>,
%% Start a connection to the server
Connection = amqp_connection:start(User, Password),
%% Once you have a connection to the server, you can
%% start an AMQP channel to gain access to a realm
Channel = amqp_connection:open_channel(Connection),
Access = #'access.request'{realm = Realm,
exclusive = false,
passive = true,
active = true,
write = true,
read = true},
#'access.request_ok'{ticket = Ticket} =
amqp_channel:call(Channel, Access),
%% Now that you have access to a realm within the
%% server, you can declare a queue and bind it to an
%% exchange
Q = <<"a.b.c">>,
X = <<"x">>,
BindKey = <<"a.b.c.*">>,
QueueDeclare = #'queue.declare'{ticket = Ticket,
queue = Q,
passive = false,
durable = false,
exclusive = false,
auto_delete = false,
nowait = false,
arguments = []},
#'queue.declare_ok'{queue = Q,
message_count = MessageCount,
consumer_count = ConsumerCount}
= amqp_channel:call(Channel, QueueDeclare),
ExchangeDeclare = #'exchange.declare'{ticket = Ticket,
exchange = X,
type= <<"topic">>,
passive = false,
durable = false,
auto_delete=false,
internal = false,
nowait = false,
arguments = []},
#'exchange.declare_ok'{}
= amqp_channel:call(Channel, ExchangeDeclare),
QueueBind = #'queue.bind'{ticket = Ticket,
queue = Q,
exchange = X,
routing_key = BindKey,
nowait = false,
arguments = []},
#'queue.bind_ok'{}
= amqp_channel:call(Channel, QueueBind),
%% The queue has now been set up and you have an open
%% channel so you can do something useful ...
%% After you have finished with the channel and
%% connection you should close both down
ChannelClose
= #'channel.close'{reply_code = 200,
reply_text = <<"Goodbye">>,
class_id = 0,
method_id = 0},
#'channel.close_ok'{}
= amqp_channel:call(Channel, ChannelClose),
ConnectionClose
= #'connection.close'{reply_code = 200,
reply_text = <<"Goodbye">>,
class_id = 0,
method_id = 0},
#'connection.close_ok'{}
= amqp_connection:close(Connection, ConnectionClose).
The interesting bit in the middle has been deliberately excluded so that you can focus on the set up and tear down of an AMQP channel using the Erlang client library. The lifecycle process follows these steps:
- Connect to the server;
- Open a channel;
- Acquire a ticket to access objects within a realm;
- Declare the usage of a queue (which creates a new queue if it does not exist);
- Declare an exchange, which works in the same way as a queue;
- Bind the queue to the exchange;
- Do something useful with the channel and queues, like sending and receiving data;
- Close the channel;
- Close the connection to the server.
The modus operandi is fairly generic and follows the AMQP specification. The record structures used are the Erlang representation of the methods defined in the protocol. This example demonstrates the usage of the two main modules in the Erlang client package:
- amqp_connection, which starts and stops a connection to the server as well as creating new channels;
- amqp_channel, which deals with sending and receiving AMQP methods to and from server channels.
So far we have not done anything useful with the channel and the exchange. For example, if we wanted to send a message to the exchange we could use the following code:
BasicPublish = #'basic.publish'{ticket = Ticket,
exchange = X,
routing_key = Key,
mandatory = false,
immediate = false},
Content = #content{class_id = 60,
properties = amqp_util:basic_properties(),
properties_bin = none,
payload_fragments_rev = [Payload]
},
amqp_channel:cast(Channel, BasicPublish, Content).
where the payload can be any binary structure. Notice that sending a message uses the cast/3 function in the amqp_channel module, as opposed to the call/2 functions used in the previous example. cast/3 sends a message asynchronously whereas call/2 is a synchronous invocation. call/2 is effectively an RPC, although it is not formally defined as such.
To be able to receive a message, an Erlang process needs to be registered as a consumer of a message queue. The following example shows how a process could register itself, but we could potentially register any process capable of receiving AMQP Basic Deliver methods:
%% Register a consumer to listen to a queue
BasicConsume = #'basic.consume'{ticket = Ticket,
queue = Q,
consumer_tag = <<"">>,
no_local = false,
no_ack = true,
exclusive = false,
nowait = false},
#'basic.consume_ok'{consumer_tag = ConsumerTag}
= amqp_channel:call(Channel, BasicConsume, self()),
%% If the registration was sucessful, the consumer will
%% be notified
receive
#'basic.consume_ok'{consumer_tag = ConsumerTag} ->
ok
end,
%% When a message is routed to the queue, it will be
%% delivered to this consumer
receive
{#'basic.deliver'{delivery_tag = DeliveryTag},
Content} ->
#content{payload_fragments_rev = [Payload]}
= Content,
io:format("Message received: ~p~n", [Payload])
after 2000 ->
exit(did_not_receive_message)
end,
%% After the consumer is finished interacting with the
%% queue, it can deregister itself
BasicCancel = #'basic.cancel'{consumer_tag = Tag,
nowait = false},
#'basic.cancel_ok'{consumer_tag = Tag}
= amqp_channel:call(Channel,BasicCancel).
The main part of this function is the second receive block where the process receives the AMQP Basic Deliver method combined with its content. In this example the receiver blocks for two seconds to obtain the message from its mailbox and then cancels its subscription to the queue. In a more realistic scenario, a consumer process would loop around the reception block and would not timeout, but this example aims to demonstrate the mechanics of the client API.
Lastly, you would then call the message sending and receiving functions after you had set up the exchange properly (the part of the lifecycle that we previously left blank to do something interesting in - see point [7] above):
spawn(fun() -> setup_consumer(Channel, Ticket, Q) end),
send_message(Channel, Ticket, X, <<"a.b.c.d">>,
<<"Hello World">>),
receive
after 2000 -> ok
end,
This little piece of code starts a consumer in a separate process so that its blocking receive does not stop the execution of the lifecycle process. Also, the lifecycle process is suspended briefly to enable the message process to consume the message before the lifecycle process proceeds to tear down the channel and the connection.
The source code for this example can be downloaded from here.
This article demonstrates the basic lifecycle and usage of the AMQP client using native Erlang message passing. In follow up articles I will discuss further aspects of this library, including its ability to send AMQP frames over a TCP connection.
This client has been sucessfully tested with version 1.3.0 of the RabbitMQ server.

Reader Comments (4)
Hi Ben,
Great article. Have you thought about what kinds of higher-level API might make sense for Erlang, and how they might usefully abstract away from the very low-level aspects of the AMQP protocol?
(Aside: the shell command-line "erl -pa ..." is being truncated by your stylesheet, so quite a few of the important command-line arguments can only be gotten at by page source or selection+cut-and-paste...)
Tony
Thanks for your comments Tony, I have fixed the truncation.
As for the high level Erlang API, I haven't had any thoughts about it yet apart from implementing sensible defaults for each AMQP method type. However, as these are generated from the spec, this may require changing the spec to add the sensible defaults to the XML definition of the spec.
Did you have any ideas about what may be useful in a high level API?
Ben
favorited this one, bro
Nice intro to the Erlang client. Wish I'd seen this a few months ago :)