欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Oracle Database Advance Queue

程序员文章站 2022-05-05 12:09:58
...

AQ 实例

授权

GRANT EXECUTE ON DBMS_AQ TO APPS;
GRANT EXECUTE ON DBMS_AQADM TO APPS;

BEGIN
  DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY', 'APPS', FALSE);
  DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('DEQUEUE_ANY', 'APPS', FALSE);
END;

Create payload type

-- ============= system role =======================
-- create payload type
CREATE OR REPLACE TYPE CUXWMS_CMD_TYPE AS OBJECT
(
  cuxwms_cmd_id  NUMBER,
  cuxwms_cmd     VARCHAR2(1000),
  transaction_id VARCHAR2(20),
  datalength     VARCHAR2(20)
);

--
GRANT ALL ON SYSTEM.CUXWMS_CMD_TYPE TO APPS;

Create queue table

-- ==============apps role =======================
-- Create queue table
BEGIN
  sys.dbms_aqadm.create_queue_table(queue_table        => 'APPS.CUXWMS_CMD_QT',
                                    queue_payload_type => 'SYSTEM.CUXWMS_CMD_TYPE',
                                    multiple_consumers => TRUE);
END;

Create queue

-- ==============apps role =======================
--create queue
BEGIN
  sys.dbms_aqadm.create_queue(queue_name     => 'APPS.CUXWMS_CMD_Q',
                              queue_table    => 'APPS.CUXWMS_CMD_QT',
                              queue_type     => sys.dbms_aqadm.normal_queue,
                              max_retries    => 5,
                              retry_delay    => 0,
                              retention_time => 0);
END;

--查看
SELECT * from dba_objects t WHERE t.OBJECT_NAME LIKE '%CUXWMS_CMD%';

SELECT * from dba_queues t WHERE t.name LIKE '%CUXWMS_CMD%';

Start queue

-- ==============apps role =======================
-- start queue
BEGIN
  sys.dbms_aqadm.start_queue(queue_name => 'APPS.CUXWMS_CMD_Q');
END;

Enqueue

-- 入列
DECLARE
  recipients         DBMS_AQ.aq$_recipient_list_t;
  enqopt    dbms_aq.enqueue_options_t;
  mprop     dbms_aq.message_properties_t;
  enq_msgid RAW(16);

BEGIN
  recipients(1) := sys.aq$_agent('AGENT1', 'APPS.CUXWMS_CMD_Q', NULL);
  recipients(2) := sys.aq$_agent('AGENT2', 'APPS.CUXWMS_CMD_Q', NULL);
  
  mprop.recipient_list := recipients;

  dbms_aq.enqueue(queue_name         => 'APPS.CUXWMS_CMD_Q',
                  enqueue_options    => enqopt,
                  message_properties => mprop,
                  payload            => system.cuxwms_cmd_type_2(1,
                                                            'test message',
                                                            10001,
                                                            12),
                  msgid              => enq_msgid);
  COMMIT;
END;

SELECT * from AQ$CUXWMS_CMD_QT;

单消费者 多消费者 多消费者
QUEUE CUXWMS_CMD_Q CUXWMS_CMD_Q_2 CUXWMS_CMD_Q_2
MSG_ID 4E0A2F92F8B82488E05318F0640AE853 4E09B39276D12486E05318F0640AE80F 4E09B39276D12486E05318F0640AE80F
CORR_ID
MSG_PRIORITY 1 1 1
MSG_STATE READY READY READY
DELAY
DELAY_TIMESTAMP
EXPIRATION
ENQ_TIME 2017/4/26 11:10:43 2017/4/26 11:15:03 2017/4/26 11:15:03
ENQ_TIMESTAMP 26-APR-17 11.10.43.387722 AM 26-APR-17 11.15.03.656672 AM 26-APR-17 11.15.03.656672 AM
ENQ_USER_ID APPS APPS APPS
ENQ_TXN_ID 16.6.36452 22.13.31284 22.13.31284
DEQ_TIME
DEQ_TIMESTAMP
DEQ_USER_ID
DEQ_TXN_ID
RETRY_COUNT 0
EXCEPTION_QUEUE_OWNER
EXCEPTION_QUEUE
USER_DATA.CUXWMS_CMD_ID 1 1 1
USER_DATA.CUXWMS_CMD test message test message test message
USER_DATA.TRANSACTION_ID 10001 10001 10001
USER_DATA.DATALENGTH 12 12 12
PROPAGATED_MSGID
SENDER_NAME
SENDER_ADDRESS
SENDER_PROTOCOL
ORIGINAL_MSGID
ORIGINAL_QUEUE_NAME
ORIGINAL_QUEUE_OWNER
EXPIRATION_REASON
CONSUMER_NAME AGENT1 AGENT2
ADDRESS
PROTOCOL

对于多消费者,当全部消费者均出队列之后,该信息的所有记录才会被清除。

Dequeue

-- 出列
DECLARE
  deqopt  dbms_aq.dequeue_options_t;
  mprop   dbms_aq.message_properties_t;
  msgid   RAW(16);
  payload system.cuxwms_cmd_type;
BEGIN
  deqopt.consumer_name := 'AGENT1';
  deqopt.navigation    := dbms_aq.first_message;
  deqopt.wait          := 0;
  dbms_aq.dequeue(queue_name         => 'APPS.CUXWMS_CMD_Q',
                  dequeue_options    => deqopt,
                  message_properties => mprop,
                  payload            => payload,
                  msgid              => msgid);
  dbms_output.put_line('payload.cuxwms_cmd:' || payload.cuxwms_cmd);
  COMMIT;
END;

Edition 特性

Oracle Database 从11g 版本之后,启用Edtion 特性

主要控制层次在用户层:

USERNAME SYSTEM PO APPS AR
USER_ID 5 71 173 64
PASSWORD
ACCOUNT_STATUS OPEN OPEN OPEN OPEN
LOCK_DATE
EXPIRY_DATE
DEFAULT_TABLESPACE SYSTEM APPS_TS_TX_DATA APPS_TS_TX_DATA APPS_TS_TX_DATA
TEMPORARY_TABLESPACE TEMP TEMP TEMP TEMP
CREATED 2000/5/12 14:29:07 2000/5/14 18:18:22 2000/5/14 18:23:06 2000/5/14 18:18:21
PROFILE DEFAULT DEFAULT DEFAULT DEFAULT
INITIAL_RSRC_CONSUMER_GROUP SYS_GROUP DEFAULT_CONSUMER_GROUP DEFAULT_CONSUMER_GROUP DEFAULT_CONSUMER_GROUP
EXTERNAL_NAME
PASSWORD_VERSIONS 10G 11G 10G 11G 10G 11G 10G 11G
EDITIONS_ENABLED N Y Y Y

edition控制对象:
Oracle Database Advance Queue
因此,TYPE 是edition控制对象, QUEUE_TABLE 不是edition 控制对象。如果在edition 用户创建 type, queue_type 时,会报错:
Oracle Database Advance Queue
因此,建议的部署方案是:在非edtion用户(比如system),创建type, 并授权给业务用户使用,在业务用户下创建queue_type, queue.

相关标签: Database AQ QUEUE