Please be aware that you are viewing our bleeding edge unstable documentation. Unless you wanted to view the bleeding edge (and possibly unstable) documentation, we recommend you use our stable docs.

Go to Ably's stable canonical documentation »

I know what I'm doing, let me see the bleeding edge docs »

You are viewing our bleeding edge unstable documentation. We recommend you use our stable documentation »
Fork me on GitHub

Reactor Queues

Our Reactor Queues are traditional message queues that provide a reliable and straightforward mechanism for customers to consume, process, store, augment or reroute data from our realtime platform efficiently by your servers. Reactor Queues are offered as part of our Ably Reactor service which is available to all customers of the Ably platform.

What are Reactor Queues?

Ably’s Reactor Queues provide an asynchronous machine-to-machine communication protocol that follows a traditional message queueing pattern. At a high level, each machine participates in one or both roles: producers (Ably channels) publish messages (data) to a queue; consumers retrieve messages from the queue. The queue service is responsible for: storing published messages by placing them at the back of the queue; picking off the oldest messages from the front of the queue and handing them to a consumer; ensuring a FIFO or ‘first in, first out’ policy is followed; ensuring messages that are consumed successfully are only handed to one of the consumers. This messaging pattern provides decoupling (publishers can publish without waiting for consumers), scalability (adding more consumers increases throughput capacity) and resilience (messages are stored until a consumer has acknowledged the message has been processed successfully).

Ably’s Realtime Core provides channels for realtime data distribution using the pub/sub messaging pattern. Unlike queues, pub/sub channels provide fan-out so that every message published on a channel is received by all devices subscribed for that data. When delivered with our connection state recovery, this pattern provides a decoupled, resilient and scalable means to publishing realtime data to any number of devices.

No single pattern is better than the other, both have their merits and valid use cases. Take for example a delivery van driving through a city publishing its location periodically. Any number of customers waiting for their parcel can subscribe for updates and thus a pub/sub channel is well suited due to its inherent fan-out capability. However, emails may need to be triggered when the van is nearing its destination as well. A message queueing pattern is a better fit here as multiple worker servers can share the workload by consuming the location messages from the queue and performing work for each message without having to share any state. The message queue ensures that work is distributed evenly to the pool of servers, work is not duplicated (resulting for example in more than one email notification being sent) and the system is resilient to crashes or spikes in load (messages are stored until a consumer is ready to retrieve them).

Ably combines both pub/sub and queueing functionality in its platform as seen in the diagram below:


Ably Reactor Message Queues diagram

When should I use queues instead of pub/sub channels?

Queues are more appropriate where:

Please bear in mind that with the Ably platform all realtime data originates from pub/sub channels i.e. you never publish directly to a queue, you publish to a channel. If a queue rule exists that matches the channel name, then the message published will be automatically published into the designated queue. Therefore if you need to publish and consume data, you will have to publish data to channels over REST or Realtime protocols, and consume your data using an AMQP or STOMP client library.

Using the Reactor Queues

All Ably accounts have access to Reactor Queue functionality, however to get started you need to provision a physical queue and set up a queue rule to move data from channels into that queue.

Provisioning Reactor Queues

Unlike pub/sub channels that can exist in any datacenter and are provisioned on-demand by clients, queues need to be provisioned in advance and exist in one region.

Queues are setup within your app dashboard and you will need to configure:

Please note that the total number of queues, TTL and max length for each queue is a limited based on your account type. Find out more about account and package limits.

Follow step-by-step instructions to provision a queue now »

Setting up queue rules

Once you have provisioned a physical queue, you need to set up one or more queue rules to republish messages, presence events or channel events from pub/sub channels into a queue. Queue rules can either be used to publish to internal queues (hosted by Ably) or external external streams or queues (such as Kinesis, Kafka, RabbitMQ). Publishing to external streams or queues is part of our Ably Reactor Firehose servers which is only available to Enterprise customers.

Queues rules are setup at the bottom of the queues tab within your app dashboard. For internal queue rules you set up you will need to configure:

Follow step-by-step instructions to set up a queue rule now »

Queue dashboards and stats

Provisioned queues are visible in your app dashboard and provide near-realtime stats for the current state of each queue. See an example screenshot below:


Queue dashboard example

Whilst the queue dashboard stats show the current state of your queue, your app and account dashboard provide up-to-date live and historical stats for all messages published to your queues. See an example screenshot from an app dashboard below:


App stats example

Testing your queue rules

Once your Reactor Queue is provisioned, and your Queue rules are configured, there are a number of ways we recommend customers can debug the configured rules and queues:

Checking queue dashboard stats

Use the dev console to generate messages or events that match your queue rule. You can confirm messages are being delivered if the “Messages ready” count in your queue dashboard increases (see above). Note that the messages ready count won’t increase if you have a client consuming messages from this queue.

Using a CLI to consume messages

Install a command line tool for consuming messages using the AMQP protocol to check that messages published on channels (using the dev console or from any other source) are being pushed into the queues based on the queue rules.

You can install Node AMQP Consume CLI with:

$ npm install amqp-consume-cli -g

Then you need to go to your app dashboard to retrieve an API key that has access to the queues (your root key will typically have access to subscribe to all queues). Then check the server endpoint, vhost and queue name (which is always prefixed with a scope which is your appId) from the queue dashboard (see above) and issue a command such as:

amqp-consume --queue-name [Name] \
  --host [Server endpoint host] --port [Server endpoint port] \
  --ssl --vhost shared --creds [your API key]

Whenever a message is published to the queue you are subscribing to, the amqp-consume tool will output the message details such as:

Message received
Attributes: { contentType: 'application/json',
  headers: {},
  deliveryMode: 1,
  timestamp: 1485914937984 }
Data: {
  "id":"cOOo9g|108YY6nPAA3SrE56067277:10",
  "source":"channel.message",
  "channel":"foo",
  "site":"eu-west-1-A",
  "ruleId":"cOOo9g",
  "timestamp":1485914937984,
  "messageId":"vjzxPR-XK3:3",
  "messages":[
    {
      "id":"vjzxPR-XK3:3:0",
      "name":"event",
      "connectionId":"vjzxPR-XK3",
      "timestamp":1485914937909,
      "data":"payload"
    }
  ]
}

Please note that the messages attribute is an Array so that future envelope options may allow messages to be bundled into a single envelope (WebHooks currently bundle messages). However, with the current queue rule design, an envelope will only ever contain one message.

Consuming messages from queues

Consuming messages from Ably Reactor Message Queues is mostly the same as consuming from any other queue supporting AMQP or STOMP protocols. However, there a few tips below to avoid common pitfalls.

Using AMQP

The AMQP protocol provides a rich set of functionality to amongst other things bind to exchanges, provision queues and configure routing. This functionality exists so that queues can be dynamically provisioned by clients and messages can be routed to these queues as required.

However, unlike our pub/sub channels, queues are pre-provisioned via our queue dashboards and all routing is handled by the queue rules. As such, when subscribing to messages from the provisioned queues, you must not attempt to bind to an exchange or declare a queue as these requests will be rejected. Instead, you should subscribe directly to the queue you wish to consume messages from.

Take the following queue as an example:

Queue dashboard example

In order to subscribe to messages from this queue you will need:

The queue name
UATwBQ:example-queue which is made up of your app ID and the name you assigned to your queue
The host
us-east-1-a-queue.ably.io
The port
5671 which is the TLS port you consume from. We only support TLS connections for security reasons
The vhost
shared
The username
the part before the : of an API key that has access to queues. For example, the username for an API key such as APPID.KEYID:SECRET would be APPID.KEYID.
The password
the part after the : of the API key. For example, the password for an API key such as APPID.KEYID:SECRET would be SECRET.

A simple example of subscribing to this queue in Node.js can be seen below:

const url = 'amqps://APPID.KEYID:SECRET@us-east-1-a-queue.ably.io/shared'
amqp.connect(url, (err, conn) => {
  if (err) { return handleError(err) }

  /* Opens a channel for communication. The word channel is overloaded
     and this has nothing to do with pub/sub channels */
  conn.createChannel((err, ch) => {
    if (err) { return handleError(err) }

    /* Wait for messages published to the Ably Reactor queue */
    ch.consume('UATwBQ:example-queue', (item) => {
      let decodedEnvelope = JSON.parse(item.content)

      /* The envelope messages attribute will only contain one message. However,
         in future versions, we may allow optional bundling of messages into a
         single queue message and as such this attribute is an Array to support
         that in future */
      let messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages)
      messages.forEach((message) => {
        actionMessage(message)
      })

      /* ACK (success) so that message is removed from queue */
      ch.ack(item)
    })
  })
})

Please note:

See our tutorials section for a few step-by-step examples using a Reactor Queue with AMQP »

Using STOMP

The STOMP protocol is a simple text-based protocol designed for working with message-oriented middleware. It provides an interoperable wire format that allows STOMP clients to talk with any message broker support the STOMP protocol and as such is a good fit for use with Ably Reactor Queues.

Assuming the following queue has been set up, we’ll show you a simple example of subscribing to a STOMP queue:

Queue dashboard example

In order to subscribe to messages from this queue you will need:

The queue name
UATwBQ:example-queue which is made up of your app ID and the name you assigned to your queue
The host
us-east-1-a-queue.ably.io
The port
61614 which is the STOMP TLS port you consume from (the port in the screenshot above is for AMQP). We only support TLS connections for security reasons
The vhost
shared
The username
the part before the : of an API key that has access to queues. For example, the username for an API key such as APPID.KEYID:SECRET would be APPID.KEYID.
The password
the part after the : of the API key. For example, the password for an API key such as APPID.KEYID:SECRET would be SECRET.

A simple example of subscribing to this queue in Node.js can be seen below:

const connectOptions = {
  'host': 'us-east-1-a-queue.ably.io',
  'port': 61614, /* STOMP TLS port */
  'ssl': true,
  'connectHeaders':{
    'host': 'shared',
    'login': 'APPID.KEYID',
    'passcode': 'SECRET'
  }
}

Stompit.connect(connectOptions, (error, client) => {
  if (err) { return handleError(err) }

  const subscribeHeaders = {
    /* To subscribe to an existing queue, /amq/queue prefix is required */
    'destination': '/amq/queue/UATwBQ:example-queue',
    'ack': 'client-individual' /* each message requires an ACK to confirm it has been processed */
  }
  /* Wait for messages published to the Ably Reactor queue */
  client.subscribe(subscribeHeaders, (error, message) => {
    if (err) { return handleError(err) }

    /* STOMP is a text-based protocol so decode UTF-8 string */
    message.readString('utf-8', (error, body) => {
      if (err) { return handleError(err) }

      let decodedEnvelope = JSON.parse(item.content)

      /* The envelope messages attribute will only contain one message. However,
         in future versions, we may allow optional bundling of messages into a
         single queue message and as such this attribute is an Array to support
         that in future */
      let messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages)
      messages.forEach((message) => {
        actionMessage(message)
      })

      client.ack(message)
    })
  })
})

Please note:

See our tutorials section for step-by-step examples using a Reactor Queue with STOMP »

Enveloped and non-enveloped message examples

When you configure a queue rule, you are given the option to envelope messages, which is enabled by default. In most cases, we believe an enveloped message provides more flexibility as it contains additional metadata in a portable format that can be useful such as the clientId of the publisher, or the channel name the message originated from.

However, where performance is a primary concern, you may choose not to envelope messages and instead have only the message payload (data element) published. This has the advantage of requiring one less parsing step, however decoding of the raw payload in the published message will be your responsibility.

Note that messages published to queues are by default encoded as JSON (a text format), however you can choose to have messages encoded with MsgPack (a binary format) in your queue rules.

Enveloped message example

Headers: none

Data:

{
  "id": "cOOo9g|108YY6nPAA3SrE56067277:10",
  "source": "channel.message",
  "channel": "foo",
  "site": "eu-west-1-A",
  "ruleId": "cOOo9g",
  "timestamp": 1485914937984,
  "messageId": "vjzxPR-XK3:3",
  "messages": [
    {
      "id": "vjzxPR-XK3:3:0",
      "name": "",
      "connectionId": "vjzxPR-XK3",
      "timestamp": 1485914937909,
      "data": "textPayload"
    }
  ]
}

Please note that the messages attribute is an Array so that future envelope options may allow messages to be bundled into a single envelope (WebHooks currently bundle messages). However, with the current queue rule design, an envelope will only ever contain one message.

Non-enveloped message example

Headers:

Data:

textPayload
Enveloped presence message example

Headers: none

Data:

{
  "id": "z8R85g|108YY6nPAA3SrE56067277:12",
  "source": "channel.presence",
  "channel": "foo",
  "site": "eu-west-1-A",
  "ruleId": "z8R85g",
  "timestamp": 1485916832965,
  "messageId": "vjzxPR-XK3:5",
  "presence": [
    {
      "id": "vjzxPR-XK3:5:0",
      "clientId": "bob",
      "connectionId": "vjzxPR-XK3",
      "timestamp": 1485916832961,
      "action": "enter",
      "data": "clientData"
    }
  ]
}

Please note that the presence attribute is an Array so that future envelope options may allow presence messages to be bundled into a single envelope (WebHooks currently bundle messages). However, with the current queue rule design, an envelope will only ever contain one presence message.

Non-enveloped presence message example

Headers:

Data:

clientData

Dead letter queues

When you provision a queue, Ably automatically provisions a “special” dead letter queue at the same time. This dead letter queue holds messages that have failed to be processed correctly or expired. It is advisable to consume messages from the dead letter queue so that you can keep track of failed, expired or unprocessable messages. Messages are moved into your dead letter queue when:

Please note that messages already in the dead letter queue that subsequently meet any of the above criteria are deleted i.e. if the TTL for a message in the dead letter queue passes, the message is deleted forever.

A dead letter queue uses the reserved queue name APPID:deadletter where APPID is the app ID in which your queues are provisioned. You will have exactly one deadletter queue per app if you have one or more Reactor Queues, and this queue will appear in your queues dashboard. You can subscribe to a dead letter queue just like any other queue.

Download a client library

For a list of popular AMQP and STOMP client libraries you can use across a wide range of platforms, please see our client library download page.

Queue considerations

When using Reactor Queues, please bear in mind that:

Queue Scalability and High Availability

Ably’s Reactor Message Queue service is offered in two flavours, multi-tenanted and dedicated.

Our multi-tenanted queue service is provided as part of the core platform to all customers. The queues are provided in a high availability configuration (your data is stored in at least two datacenters with automatic fail-over capabilities). Our multi-tenanted queue service is designed for low to medium volumes of messages and has a guideline limit of no more than 200 messages per second per account.

For customers with more demanding requirements (up to millions of messages per second), Ably has two solutions for our Enterprise customers:

Get in touch if you’d like to find out more about our Enterprise offering.

Billing info

Each message published by a rule to a queue counts as one message towards your message quota. For example, if you publish a message on a channel that is in turn republished to a Reactor Queue, that will count as two messages. Find out more about how messages are counted.

Next steps


Back to top