17. Using Oracle Advanced Queuing (AQ)

Oracle Advanced Queuing allows applications to use producer-consumer message passing. Queuing is highly configurable and scalable, providing a great way to distribute workloads. Messages can be queued by multiple producers. Different consumers can filter messages. Messages can also be transformed or propagated to queues in other databases. Oracle AQ is available in all editions of the database, and has interfaces in many languages, allowing different applications to communicate. For more details about AQ and its options, refer to the Oracle Advanced Queuing User’s Guide.

Note

In this release, Oracle AQ is only supported in node-oracledb Thick mode. See Enabling node-oracledb Thick Mode.

Node-oracledb APIs for AQ were introduced in node-oracledb 4.0. With earlier versions, use AQ’s PL/SQL interface.

Oracle Advanced Queues are represented in node-oracledb by several classes. A single top level AqQueue object in node-oracledb contains aqQueue.deqOptions and aqQueue.enqOptions object properties which can be used to change queue behavior. A single AqQueue object can be used for enqueuing, or dequeuing, or both at the same time.

Messages are enqueued by passing them to an enqueue method directly, or by wrapping them in a JavaScript object. Dequeued messages are returned as an AqMessage object.

The following examples show how to enqueue and dequeue messages in node-oracledb. Before using a queue in node-oracledb, it must be created in the database using the DBMS_AQADM PL/SQL package. For these examples, create a new Oracle user demoqueue with permission to create and use queues. Connect in SQL*Plus as SYSDBA and run:

CREATE USER demoqueue IDENTIFIED BY &password;
ALTER USER demoqueue DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS;
GRANT CONNECT, RESOURCE TO demoqueue;
GRANT AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE TO demoqueue;
GRANT EXECUTE ON DBMS_AQ TO demoqueue;

When you have finished testing, remove the DEMOQUEUE schema.

17.1. Sending Simple AQ Messages

You can use AQ to send RAW payloads by using a String or Buffer as the message.

Before enqueuing and dequeuing messages, you need to create and start queues in Oracle Database. For example, to create a queue for simple messaging, use SQL*Plus to connect as the new DEMOQUEUE user and run:

-- Create and start a queue
BEGIN
    DBMS_AQADM.CREATE_QUEUE_TABLE(
        QUEUE_TABLE        =>  'DEMOQUEUE.DEMO_RAW_QUEUE_TAB',
        QUEUE_PAYLOAD_TYPE =>  'RAW');

    DBMS_AQADM.CREATE_QUEUE(
        QUEUE_NAME         =>  'DEMOQUEUE.DEMO_RAW_QUEUE',
        QUEUE_TABLE        =>  'DEMOQUEUE.DEMO_RAW_QUEUE_TAB');

    DBMS_AQADM.START_QUEUE(
        QUEUE_NAME         => 'DEMOQUEUE.DEMO_RAW_QUEUE');
END;
/

The default payload type is RAW and it is not necessary to explicitly specify the payloadType attribute in connection.getQueue(). To get a queue of RAW payload type using this default setting:

connection.getQueue(queueName);

You can also explicitly set the payloadType attribute to oracledb.DB_TYPE_RAW in connection.getQueue():

connection.getQueue(queueName, { payloadType: oracledb.DB_TYPE_RAW });

To enqueue a single, simple message, run:

const queueName = "DEMO_RAW_QUEUE";
// Getting a queue of RAW payload type
const queue = await connection.getQueue(queueName);
const msg = await queue.enqOne("This is my message");
await connection.commit();

The variable msg will be an AqMessage object. It contains information about the message that was sent such as payload, correlation, delay, deliveryMode, msgId, priority, and other metadata.

Messages can be passed directly to enqOne() as shown above. Alternatively, they can be the payload property of a JavaScript object passed to enqOne(), as shown in Changing AQ options.

To dequeue a message, run:

const queueName = "DEMO_RAW_QUEUE";
const queue = await connection.getQueue(queueName);
const msg = await queue.deqOne();
await connection.commit();
console.log(msg.payload.toString());

By default, deqOne() will wait until a message is available.

The variable msg will be an AqMessage object. It contains information about the dequeued message such as payload, correlation, delay, deliveryMode, msgId, priority, and other metadata. String messages are encoded as UTF-8 Buffers. This example displays This is my message.

See examples/aqraw.js for a runnable example.

Each enqueued message sent using queue.enqOne() or retrieved using queue.deqOne() is uniquely identified by an internally generated message identifier (msgId). The msgId attribute is of type Buffer. For example, to view the msgId of an enqueued message:

const queueName = "DEMO_RAW_QUEUE";
const queue = await connection.getQueue(queueName);
const msg = await queue.enqOne("This is my message");
console.log(msg.msgId.toString("hex"));
await connection.commit();

This will print an identifier like:

01ecb9cb8737a12de063ba60466437c7

Similarly, you can view the msgId of a dequeued message, for example:

const queueName = "DEMO_RAW_QUEUE";
const queue = await connection.getQueue(queueName);
const msg = await queue.deqOne();
await connection.commit();
console.log(msg.msgId.toString("hex"));

This will print an identifier like:

01ecb9cb8737a12de063ba60466437b6

17.2. Sending Oracle Database JSON AQ Messages

Starting from Oracle Database 21c, Advanced Queuing supports the JSON payloads. To use this payload type, the Oracle Client libraries must also be version 21 or later.

You can use AQ to send JSON payloads by using a JavaScript object as the message.

Before enqueuing and dequeuing messages, you need to create and start queues in Oracle Database. For example, to create a queue suitable for sending JSON messages, use SQL*Plus to connect as the new DEMOQUEUE user and run:

-- Create and start a queue
BEGIN
    DBMS_AQADM.CREATE_QUEUE_TABLE(
        QUEUE_TABLE        =>  'DEMOQUEUE.DEMO_JSON_QUEUE_TAB',
        QUEUE_PAYLOAD_TYPE =>  'JSON');

    DBMS_AQADM.CREATE_QUEUE(
        QUEUE_NAME         =>  'DEMOQUEUE.DEMO_JSON_QUEUE',
        QUEUE_TABLE        =>  'DEMOQUEUE.DEMO_JSON_QUEUE_TAB');

    DBMS_AQADM.START_QUEUE(
        QUEUE_NAME         => 'DEMOQUEUE.DEMO_JSON_QUEUE');
END;
/

Using connection.getQueue(), you can get the queue by setting the payloadType attribute to oracledb.DB_TYPE_JSON as shown below.

To enqueue a single JSON AQ message, run:

const queueName = "DEMO_JSON_QUEUE";
// Getting a queue of JSON payload type
const queue = await connection.getQueue(queueName, { payloadType: oracledb.DB_TYPE_JSON });
const myData = {
    empName: "Scott",
    empCity: "Redwood"
};
const msg = await queue.enqOne({
    payload: myData
});
await connection.commit();

The variable msg will be an AqMessage object. It contains information about the message that was sent such as payload, correlation, delay, deliveryMode, msgId, priority, and other metadata.

To dequeue a JSON AQ message, run:

const queueName = "DEMO_JSON_QUEUE";
const queue = await connection.getQueue(queueName, { payloadType: oracledb.DB_TYPE_JSON });
const msg = await queue.deqOne();
await connection.commit();
console.log("empName ", msg.payload.empName);
console.log("empCity ", msg.payload.empCity);

By default, deqOne() will wait until a message is available.

This prints:

empName Scott
empCity Redwood

Each enqueued message sent using queue.enqOne() or retrieved using queue.deqOne() is uniquely identified by an internally generated message identifier (msgId). The msgId attribute is of type Buffer. For example, to view the msgId of an enqueued message:

const queue = await connection.getQueue(queueName, { payloadType: oracledb.DB_TYPE_JSON });
const myData = {
    empName: "Scott",
    empCity: "Redwood"
};
const msg = await queue.enqOne({
    payload: myData
});
console.log(msg.msgId.toString("hex"));
await connection.commit();

This will print an identifier like:

01fbb9cb8737a12de063ba60466437c7

Similarly, you can view the msgId of a dequeued message, for example:

const queue = await connection.getQueue(queueName, { payloadType: oracledb.DB_TYPE_JSON });
const msg = await queue.deqOne();
console.log(msg.msgId.toString("hex");)

This will print an identifier like:

01dfb9cb8737a12de063ba60466437b6

17.3. Sending Oracle Database Object AQ Messages

You can use AQ to send Database Object payloads by using DbObject Class objects as the message.

Before enqueuing and dequeuing messages, you need to create database object types, and create and start queues in Oracle Database. For example, connect as the new demoqueue user and run:

-- For the data we want to queue
CREATE OR REPLACE TYPE USER_ADDRESS_TYPE AS OBJECT (
    NAME        VARCHAR2(10),
    ADDRESS     VARCHAR2(50)
);
/

-- Create and start a queue
BEGIN
    DBMS_AQADM.CREATE_QUEUE_TABLE(
        QUEUE_TABLE        =>  'DEMOQUEUE.ADDR_QUEUE_TAB',
        QUEUE_PAYLOAD_TYPE =>  'DEMOQUEUE.USER_ADDRESS_TYPE');

    DBMS_AQADM.CREATE_QUEUE(
        QUEUE_NAME         =>  'DEMOQUEUE.ADDR_QUEUE',
        QUEUE_TABLE        =>  'DEMOQUEUE.ADDR_QUEUE_TAB');

    DBMS_AQADM.START_QUEUE(
        QUEUE_NAME         => 'DEMOQUEUE.ADDR_QUEUE',
        ENQUEUE            => TRUE);
END;
/

In the RAW and JSON examples, the QUEUE_PAYLOAD_TYPE was ‘RAW’ and ‘JSON’ respectively. Here, the Oracle Database object type name DEMOQUEUE.USER_ADDRESS_TYPE is used.

Using connection.getQueue(), you can get the queue of object payloads by setting the payloadType attribute to the name of an Oracle Database object type as shown below, or a DbObject Class earlier acquired from connection.getDbObjectClass().

In node-oracledb, a queue is initialized for an Oracle Database object type:

const queueName = "ADDR_QUEUE";
// Getting a queue of Oracle Database object type
const queue = await connection.getQueue(queueName, {payloadType: "DEMOQUEUE.USER_ADDRESS_TYPE"});

For efficiency, it is recommended to use a fully qualified name for the type.

A DbObject for the message is created and queued:

const message = new queue.payloadTypeClass(
    {
        NAME: "scott",
        ADDRESS: "The Kennel"
    }
);
const msg = await queue.enqOne(message);
await connection.commit();

The variable msg will be an AqMessage object. It contains information about the message that was sent such as payload, correlation, delay, deliveryMode, msgId, priority, and other metadata.

Dequeuing objects is done with:

const queue = await connection.getQueue(queueName, {payloadType: "DEMOQUEUE.USER_ADDRESS_TYPE"});
const msg = await queue.deqOne();
await connection.commit();

By default, deqOne() will wait until a message is available.

The message can be printed:

const o = msg.payload;
console.log(o);

See examples/aqobject.js for a runnable example.

Each enqueued message sent using queue.enqOne() or retrieved using queue.deqOne() is uniquely identified by an internally generated message identifier (msgId). The msgId attribute is of type Buffer. For example, to view the msgId of an enqueued message:

const msg = await queue.enqOne(message);
console.log(msg.msgId.toString("hex"));

This will print an identifier like:

01ecb9cb8737a12de063ba60466437c7

Similarly, you can view the msgId of a dequeued message, for example:

const msg = await queue.deqOne();
console.log(msg.msgId.toString("hex"));

This will print an identifier like:

01ecb9cb8737a12de063ba60466437b6

17.4. Changing AQ options

The AqQueue object created by calling connection.getQueue() contains enqOptions and deqOptions attribute objects that can be configured. These options can be changed before each enqueue or dequeue call.

Messages that are enqueued can also contain properties, such as an expiration. Instead of passing a message String, Buffer or DbObject directly to enqOne(), a payload property of a JavaScript object is set to the message. Other object properties control the message behavior. For example, to expire a message after five seconds if it has not been dequeued:

const message = {
    expiration: 5,
    payload: "This is my message"
};

const queueName = "DEMO_RAW_QUEUE";
const queue = await connection.getQueue(queueName);
const msg = await queue.enqOne(message);
await connection.commit();

For RAW queues, the payload value can be a String or Buffer. For JSON queues, the payload value should be a JavaScript object. For object queues, the payload value should be a DbObject object.

To change the enqueue behavior of a queue, alter the aqQueue.enqOptions attributes. For example to make a message buffered, and not persistent:

const queueName = "DEMO_RAW_QUEUE";
const queue = await connection.getQueue(queueName);
queue.enqOptions.deliveryMode = oracledb.AQ_MSG_DELIV_MODE_BUFFERED;
await queue.enqOne(message);
await connection.commit();

To send a message immediately without requiring a commit, you can change the queue’s message visibility:

const queueName = "DEMO_RAW_QUEUE";
const queue = await connection.getQueue(queueName);
queue.enqOptions.visibility = oracledb.AQ_VISIBILITY_IMMEDIATE;
await queue.enqOne(message);

To change the queue behavior when dequeuing, alter the deqOptions attributes. For example, to change the visibility of the message (so no explicit commit is required after dequeuing a message) and to continue without blocking if the queue is empty:

const queueName = "DEMO_RAW_QUEUE";
const queue = await connection.getQueue(queueName);
queue.deqOptions.visibility = oracledb.AQ_VISIBILITY_IMMEDIATE;
queue.deqOptions.wait = oracledb.AQ_DEQ_NO_WAIT;
await msg = queue.deqOne();

To change multiple properties at once, you can also use syntax like:

Object.assign(queue.deqOptions,
            {
                mode: oracledb.AQ_DEQ_MODE_BROWSE,
                visibility: oracledb.AQ_VISIBILITY_IMMEDIATE,
                wait: 10
            });

See examples/aqoptions.js for a runnable example.

17.5. Enqueuing and Dequeuing Multiple Messages

Enqueuing multiple messages in one operation is similar to the basic examples. However, instead of passing a single message to queue.enqOne(), the queue.enqMany() method is passed an array of messages.

Multiple messages can be dequeued in one call with queue.deqMany(). This method takes a maxMessages parameter indicating the maximum number of messages that should be dequeued in one call. Depending on the queue options, zero or more messages up to the limit will be dequeued.

Using RAW Payloads

To enqueue multiple messages, run:

const queueName = "DEMO_RAW_QUEUE";
const queue = await connection.getQueue(queueName);
const messages = [
    "Message 1",
    "Message 2",
    "Message 3",
    "Message 4"
];
const msgs = await queue.enqMany(messages);
await connection.commit();

The variable msgs will be an array of AqMessage objects. It contains information about the messages that were sent such as payload, correlation, delay, deliveryMode, msgId, priority, and other metadata.

Warning

Calling enqMany() in parallel on different connections acquired from the same pool may cause a problem with older versions of Oracle (see Oracle bug 29928074). Ensure that enqMany() is not run in parallel. Instead, use standalone connections or make multiple calls to enqOne(). The deqMany() method is not affected.

To dequeue multiple messages, run:

const queue = await connection.getQueue(queueName);
const messages = await queue.deqMany(5);
console.log("Dequeued " + messages.length + " messages");
for (const msg of messages) {
    console.log(msg.payload.toString());
}
await connection.commit();

By default, deqMany() will wait until a message is available.

Each element of the messages array is an AqMessage object, the same as returned by queue.deqOne().

See examples/aqmulti.js for a runnable example.

Each enqueued message sent using queue.enqMany() or dequeued message retrieved using queue.deqMany() is uniquely identified by an internally generated message identifier (msgId). The msgId is of type Buffer. For example, to view the message identifier of a multiple enqueued message:

const queueName = "DEMO_RAW_QUEUE";
const queue = await connection.getQueue(queueName);
const messages = [
    "Message 1",
    "Message 2",
    "Message 3",
    "Message 4"
];
const msgs = await queue.enqMany(messages);
for (let i = 0; i < msgs.length; i++) {
    console.log (i, "msgId: ", msgs[i].msgId.toString("hex"));
}
await connection.commit();

This will print identifiers such as:

0  msgId:  01ecb9cb8738a12de063ba60466437c7
1  msgId:  01ecb9cb8739a12de063ba60466437c7
2  msgId:  01ecb9cb873aa12de063ba60466437c7
3  msgId:  01ecb9cb873ba12de063ba60466437c7

Similarly, you can view the msgId of a multiple dequeued message, for example:

const queue = await connection.getQueue(queueName);
const msgs = await queue.deqMany(5);
for (let i = 0; i < msgs.length; i++) {
    console.log (i, "msgId: ", msgs[i].msgId.toString("hex"));
}
await connection.commit();

This will print identifiers such as:

0  msgId:  01ecb9cb8738a12de063ba60466437e9
1  msgId:  01ecb9cb8739a12de063ba60466437e9
2  msgId:  01ecb9cb873aa12de063ba60466437e9
3  msgId:  01ecb9cb873ba12de063ba60466437e9

Using JSON Payloads

To enqueue multiple JSON messages, run:

const queueName = "DEMO_JSON_QUEUE";
const queue = await connection.getQueue (queueName, { payloadType: oracledb.DB_TYPE_JSON });
const empList = [
    {payload: { empName: "Employee #1", empId: 101 }},
    {payload: { empName: "Employee #2", empId: 102 }},
    {payload: { empName: "Employee #3", empId: 103 }}
];
await queue.enqMany (empList);
await connection.commit();

See the advisory note about using enqMany().

To dequeue multiple JSON messages, run:

const queue = await connection.getQueue(queueName, { payloadType: oracledb.DB_TYPE_JSON });
Object.assign(queue.deqOptions,
  {
    navigation: oracledb.AQ_DEQ_NAV_FIRST_MSG,
    wait: oracledb.AQ_DEQ_NO_WAIT
  }
);

const msgs = await queue.deqMany(5); // get at most 5 messages
console.log ( "msgs received : " + msgs.length );
for ( let i = 0; i < msgs.length; i ++ ) {
    console.log ( i + ". empName : " + msgs[i].payload.empName);
    console.log ( i + ". empId : " + msgs[i].payload.empId);
}

By default, deqMany() will wait until a message is available.

This prints:

msgs received : 3
1. empName : Employee #1
2. empId : 101
3. empName : Employee #2
4. empId : 102
5. empName : Employee #3
6. empId : 103

17.6. Advanced Queuing Notifications

The connection.subscribe() method can be used to register interest in a queue, allowing a callback to be invoked when there are messages to dequeue. To subscribe to a queue, pass its name to subscribe() and set the namespace option to oracledb.SUBSCR_NAMESPACE_AQ:

For example:

const queueName = "DEMO_RAW_QUEUE";

const subscrOptions = {
    namespace: oracledb.SUBSCR_NAMESPACE_AQ,
    callback: ProcessAqMessage
};

async function ProcessAqMessage(message) {
    const connection = await oracledb.getConnection();  // get connection from a pool
    const queue = await connection.getQueue(queueName);
    const msg = await queue.deqOne();
    console.log(msg.payload.toString());
    console.log(message.msgId.toString("hex")); // prints the msgId of the message
    console.log(msg.msgId.toString("hex")); // prints the same msgId as above
    await connection.close();
}

const connection = await oracledb.getConnection();  // get connection from a pool
await connection.subscribe(queueName, subscrOptions);
await connection.close();

await connection.unsubscribe(queueName); // unsubscribes from a queue

See Continuous Query Notification (CQN) for more information about subscriptions and notifications.

AQ notifications require the same configuration as CQN. Specifically the database must be able to connect back to node-oracledb.

17.7. Recipient Lists

A list of recipient names can be associated with a message at the time a message is enqueued. This allows a limited set of recipients to dequeue each message. The recipient list associated with the message overrides the queue subscriber list, if there is one. The recipient names need not be in the subscriber list but can be, if desired.

To dequeue a message, the consumerName attribute can be set to one of the recipient names. The original message recipient list is not available on dequeued messages. All recipients have to dequeue a message before it gets removed from the queue.

Subscribing to a queue is like subscribing to a magazine: each subscriber can dequeue all the messages placed into a specific queue, just as each magazine subscriber has access to all its articles. Being a recipient, however, is like getting a letter: each recipient is a designated target of a particular message.

For example, to enqueue a message meant for “payroll” recipients:

await queue.enqOne({
    payload: "Message 1",
    recipients: [ "payroll" ]
});

Later, when dequeuing messages, the “payroll” recipient can be set using the consumerName property to get the message:

Object.assign(
    queue.deqOptions,
    { consumerName: "payroll" }
);
const msg = await queue.deqOne();