AS3 AMQP Client: First Cut
In a previous article I introduced a proof-of-concept client for AMQP written in AS3, which at the time had not been released as a formal artifact. The proof-of-concept implementation has now been refactored and cleaned up in order to release it formally. The article announces the first release of this library and describes its usage.
Project Website
The AMQP client library is now a hosted project at Sharesource. You can download a release binary from the file download area or you can check out the source tree directly from the Mercurial source code repository. In addition to the main repository, there is a second repository that contains FlexUnit tests for the main client library.
The Initial Release
The released version of the library is 0.1 to indicate that this is a reasonably immature project. Having said that, the development has gone through a number of iterations to reach its current stage. In the spirit of releasing often and early, this release is the first tested and usable cut of the code base in order to receive feedback from other users.
Changes From The Proof-Of-Concept
The library has been been completely re-written in order to achieve a more layered design and to make it a better fit for an AS3/Flex development paradigm. The major points of the re-write include:
- Using code generation to create the AMQP specific methods in AS3;
- The client-side interaction is completely event-driven, as would be expected in AS3;
- The AMQP model execution from the client's perspective is based on the AMQP command set as opposed to exposing specific API calls for each command;
- The session handling has been layered into a model execution layer and a framing layer;
- Implementation of separate handlers for the connection state to the server and each server channel;
- A generic RPC interface to enable arbitrary client RPC invocations;
- The ability for the client to respond to any AMQP model event;
- A generic AMQP event handler that can be used out of the box to send and receive data;
- The flexibility to either subclass the generic AMQP event handler for custom behavior or to provide a complete custom implementation to interface with the framing layer.
Basic Usage
The easiest way to see some example code demonstrating how to use the library is check out the test project and look at the test cases.
If you do check out the test project, please be aware that certain portions of the client code base have been automatically generated from the AMQP protocol specification. This means that you will have to run the code generation before you can compile the client. To do this, go to the top level of the codegen project and run the main method of the CodeGenerator class. This will generate all of the necessary AS3 source files in the correct location for you.
The following code demonstrates how to initiate a connection to the AMQP server:
var state:ConnectionState = new ConnectionState();
state.username = "guest";
state.password = "guest";
state.vhostpath = "/";
state.serverhost = "localhost";
var connection:Connection = new Connection(state);
var baseSession:Session = connection.baseSession;
var sessionManager:SessionManager = connection.sessionManager;
connection.start();
baseSession.addEventListener(new OpenOk(), onOpenOk);
Because the Flash runtime does not support blocking IO, you need to structure your client program to be able to respond to events posted by the AMQP server. Fortunately, the client provides convenient state handlers for both the state of the connection to the AMQP server and the state of each AMQP channel. The handler for the connection state is incorporated into the baseSession. After the connection has been started (by calling connection.start()), you register a callback handler that will be notified after the connection to the AMQP server has been successfully established. In this example, the event will be dispatched to the onOpenOk event handler.
In the onOpenOk handler routine, you can then use the session manager to create a new AMQP channel:
public function onOpenOk(event:ProtocolEvent):void {
sessionHandler = sessionManager.create();
var open:Open = new Open();
var accessRequest:Request = new Request();
accessRequest._realm = realm;
accessRequest._passive = true;
accessRequest._active = true;
accessRequest._read = true;
accessRequest._write = true;
var exchange:org.amqp.methods.exchange.Declare
= new org.amqp.methods.exchange.Declare();
exchange._exchange = x;
exchange._type = x_type;
var queue:org.amqp.methods.queue.Declare
= new org.amqp.methods.queue.Declare();
queue._queue = q;
var bind:Bind = new Bind();
bind._exchange = x;
bind._queue = q;
bind._routingkey = bind_key;
var onBindOk:Function = function(event:ProtocolEvent):void{
trace("onBindOk called");
};
sessionHandler.dispatch(new Command(open));
sessionHandler.dispatch(new Command(accessRequest));
sessionHandler.dispatch(new Command(exchange));
sessionHandler.dispatch(new Command(queue));
sessionHandler.dispatch(new Command(bind));
sessionHandler.addEventListener(new BindOk(), onBindOk);
}
This method implements the following steps in the AMQP model flow:
- Open a channel;
- Request access to the channel;
- Declare an exchange;
- Declare a queue;
- Bind the queue to the exchange.
The example shows the asynchronous nature of the client library. The user can dispatch any number of AMQP commands via the session handler. The handler ensures that these commands get executed in the right order and in accordance with any pre-requisite events a given command may be required to wait on. To achieve this, commands that are not yet eligible for dispatch will be queued up by the session handler and then dispatched when appropriate. For example, you cannot send the Exchange Declare command without the appropriate ticket. The ticket is received as part of the response to the Access Request command, so the state handler buffers the pending Exchange Declare until it has received the ticket. This is kept transparent to the user, hence simplifying the knowledge the client requires of the protocol flow.
At the end, the user registers the onBindOk callback function, which will be invoked when the client receives the BindOk acknowledgement from the server. This also demonstrates how to program in an event-driven fashion. After the method returns, the pending commands have a chance to get dispatched asynchronously to the server.
For the purpose of demonstration, this also enables us to respond to the successful binding of the queue to the exchange. Once this event has been received, we can register a consumer to process messages from the queue:
public function onBindOk(event:ProtocolEvent):void {
var consumer:BasicConsumer = ....... // your own implementation
var consume:Consume = new Consume();
consume._queue = q;
consume._noack = true;
sessionHandler.register(consume, consumer);
}
To process messages, the user has to supply an implementation of the BasicConsumer interface:
public interface BasicConsumer
{
function onConsumeOk(tag:String):void;
function onCancelOk(tag:String):void;
function onDeliver(method:Deliver,
properties:BasicProperties,
content:ByteArray):void;
}
The onConsumeOk/onCancelOk callbacks will be invoked when the queue subscription is started or cancelled. The onDeliver callback will be invoked when the queue wants to deliver a message to the client.
Once the consumer has been set up, we can start to publish messages to the exchange that will get routed to the queue the client is listening to:
public function publish():void {
var data:ByteArray = new ByteArray();
data.writeUTF("hello, world");
var publish:Publish = new Publish();
publish._exchange = x;
publish._routingkey = routing_key;
var props:BasicProperties = Properties.getBasicProperties();
var cmd:Command = new Command(publish, props, data);
sessionHandler.dispatch(cmd);
}
That completes this simple example. As mentioned earlier, the test project contains further examples that exercise the complete AMQP command set.
Compatibility and Testing
So far, the client has only been tested with the latest snapshot version of RabbitMQ server. This implements version 0.8 of the AMQP specification. Because the AS3 bindings for the AMQP command set have been generated from the XML specification document, this makes an upgrade to a newer protocol version a simple matter. The main client interfaces (CommandReceiver and CommandDispatcher) have been designed to be independent of the protocol version, so that no client code needs to be changed when upgrading to a newer version.
Outlook
The intention of the release is to provide a relatively stable API to program against. If the library gains adoption, some changes to the API may be necessary, but it is hoped that these changes will be minor. Some potential improvements include, amongst others:
- Improving the property getters on the generated AMQP methods;
- Adding TLS to the socket layer;
- Generating proper class documentation.
The source code repository for the client has now moved to Github. The old repository is still available, but maintenance will take place using git.
This client has now been sucessfully tested with version 1.3.0 of the RabbitMQ server.
Now there is also a discussion list for this client at Google Groups.
References (2)
-

-
Related: Integrating Flex and RabbitMQ

Reader Comments (11)
When I try to compile as3amqp flex builder complains about missing BasicConsumer, ConsumerRegistry, Properties, and SessionStateHandler. Am I missing something?
How have you set up the project in FlexBuilder? What is the main source path? Have you included all of the classes in the build by selecting them all in the properties window?
Hi Ben,
Very intersting article!
However, I having problems trying to work out what you mean by this!
>This means that you will have to run the code generation before >you can compile the client. To do this, go to the top level of >the codegen project and run the main method of the CodeGenerator >class. This will generate all of the necessary AS3 source files >in the correct location for you.
Do I do this from within FlexBuilder or do I need to comple the java class first? I checked out the test project but there was now reference to the CodeGenerator class.
Could you provide a more detailed step by step guide that takes me through getting the test project up and running with FlexBuilder 3?
I have RabbitMQ running on a dedicated server.
Best Regards,
Carl
Hi Carl,
There is an ant task that generates and compiles all of the AS3 classes for you.
I've posted this answer to the list.
HTH,
Ben
I like to know if it's possible to run as3-amqp on FMS (flash media server) using XMLSocket? (no Socket class on FMS..)
Some solution can be used like as3-stomp using XMLSocket on server side..
Thanks!
Patrick
Hi Ben,
For some reason I'm getting the "Security sandbox violation: http://localhost/AMQP/AMQPTest.swf cannot load data from localhost:5672." error when trying to create a test client that uses this library, I've tried creating a crossdomain.xml at the root web server as well as running a flash policy server to serve the security policy file but to no avail.
Have you seen this error before, do you know how to get around it?
The application will execute beyond the connection.Start method but OpenOK never get called, and I'll get the error after about 30 seconds.
Thanks for your help!
Feichi
Feichi,
If you are using Windows, this could be a socket related issue that was discovered and fixed a while ago.
Are you you using the latest version from github?
Ben
Ben,
Yes, I'm using Windows and the latest version (amqp_client v0.1.2). I just tested it on a different machine, on the second box, I'm not getting the Security sandbox violation error but the connection.OpenOk() event doesn't fire..
could you tell what I might have missed?
Thanks a lot!
Feichi
Hi Feichi,
I was having the same problem as you were running on windows. I managed to solve the problem by serving a crossdomain.xml file on port 843. I got the idea while looking at a JS AMQP lib. Code here:http://github.com/dansimpson/amqp-js/tree/master/policy-server/
Hey Ben
Cookl project! But to try it out, i need to change the port. So i changd it in the sources and tryed. But the current sources are different from the ones in this client example. I noticed, that ConnectionState is now called ConnectionParameters, so I canged the name and move to the next error.
And this I don't get. The Session-Interface has now no addEventListener-definition, so I gave up. Is there an updated version of this client above, which work with the current sources?
Thx in advance, H2OBrain
Hey H20Brain,
The current version of the client is on this Github project.
Have you checked out the examples there?
Cheers,
Ben