Oracle® Streams Advanced Queuing User's Guide 11g Release 2 (11.2) Part Number E11013-04 |
|
|
PDF · Mobi · ePub |
This chapter describes how to use and manage Oracle Streams Advanced Queuing when enqueuing and propagating. It describes ANYDATA
queues and user messages.
Oracle Streams uses queues of type ANYDATA
to store three types of messages:
Captured logical change record (LCR)
This message type, produced by an Oracle Streams capture process, is not discussed in this guide.
See Also:
"Streams Capture Process" in Oracle Streams Concepts and AdministrationUser-enqueued LCR
This is a message containing an LCR that was enqueued by a user or application.
User message
This is a non-LCR message created and enqueued by a user or application.
All three types of messages can be used for information sharing within a single database or between databases.
This chapter contains these topics:
Propagating User Messages from ANYDATA Queues to Typed Queues
Propagating User-Enqueued LCRs from ANYDATA Queues to Typed Queues
See Also:
Oracle Database PL/SQL Packages and Types Reference for more information about theANYDATA
typeThis section contains these topics:
You can wrap almost any type of payload in an ANYDATA
payload with the Convert
data_type
static functions of the ANYDATA
type, where data_type
is the type of object to wrap. These functions take the object as input and return an ANYDATA
object.
The following datatypes cannot be wrapped in an ANYDATA
wrapper:
Nested table
ROWID and UROWID
The following datatypes can be directly wrapped in an ANYDATA
wrapper, but they cannot be present in a user-defined type payload wrapped in an ANYDATA
wrapper:
See Also:
Oracle Database PL/SQL Packages and Types Reference for more information about theANYDATA
typeYour applications can use the following programmatic interfaces to enqueue user messages into an ANYDATA
queue and dequeue user messages from an ANYDATA
queue:
PL/SQL (DBMS_AQ
package)
Java Message Service (JMS)
OCI
The following sections provide information about using these interfaces to enqueue user messages into and dequeue user messages from an ANYDATA
queue.
See Also:
Chapter 3, "Oracle Streams Advanced Queuing: Programmatic Interfaces" for more information about these programmatic interfacesTo enqueue a user message containing an LCR into an ANYDATA
queue using PL/SQL, first create the LCR to be enqueued. You use the constructor for the SYS.LCR$_ROW_RECORD
type to create a row LCR, and you use the constructor for the SYS.LCR$_DDL_RECORD
type to create a DDL LCR. Then you use the ANYDATA.ConvertObject
function to convert the LCR into an ANYDATA
payload and enqueue it using the DBMS_AQ.ENQUEUE
procedure.
To enqueue a user message containing a non-LCR object into an ANYDATA
queue using PL/SQL, you use one of the ANYDATA.Convert*
functions to convert the object into an ANYDATA
payload and enqueue it using the DBMS_AQ.ENQUEUE
procedure.
See Also:
Oracle Streams Concepts and Administration, "Managing a Streams Messaging Environment"
To enqueue a user message containing an LCR into an ANYDATA
queue using JMS or OCI, you must represent the LCR in XML format. To construct an LCR, use the oracle.xdb.XMLType
class. LCRs are defined in the SYS
schema. The LCR schema must be loaded into the SYS
schema using the catxlcr.sql
script in ORACLE_HOME
/rdbms/admin
.
To enqueue a message using OCI, perform the same actions that you would to enqueue a message into a typed queue. To enqueue a message using JMS, a user must have EXECUTE
privilege on the DBMS_AQ
, DBMS_AQIN
and DBMS_AQJMS
packages.
Note:
Enqueue of JMS types and XML types does not work withANYDATA
queues unless you call DBMS_AQADM.ENABLE_JMS_TYPES(
queue_table_name
)
after DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_name
)
. Enabling a queue for these types may affect import/export of the queue table.A non-LCR user message can be a message of any user-defined type or a JMS type. The JMS types include the following:
javax.jms.TextMessage
javax.jms.MapMessage
javax.jms.StreamMessage
javax.jms.ObjectMessage
javax.jms.BytesMessage
When using user-defined types, you must generate the Java class for the message using Jpublisher, which implements the ORAData
interface. To enqueue a message into an ANYDATA
queue, you can use methods QueueSender.send
or TopicPublisher.publish
.
See Also:
Oracle XML DB Developer's Guide for more information about representing messages in XML format
Oracle Streams Advanced Queuing Java API Reference for more information about the oracle.jms
Java package
The OCIAQenq
function in the Oracle Call Interface Programmer's Guide for more information about enqueuing messages using OCI
To dequeue a user message from an ANYDATA
queue using PL/SQL, you use the DBMS_AQ.DEQUEUE
procedure and specify ANYDATA
as the payload. The user message can contain an LCR or another type of object.
See Also:
"Dequeuing Messages"In an ANYDATA
queue, user messages containing LCRs in XML format are represented as oracle.xdb.XMLType
. Non-LCR messages can be any user-defined type or a JMS type.
To dequeue a message from an ANYDATA
queue using JMS, you can use methods QueueReceiver
, TopicSubscriber
, or TopicReceiver
. Because the queue can contain different types of objects wrapped in ANYDATA
wrappers, you must register a list of SQL types and their corresponding Java classes in the type map of the JMS session. JMS types are already preregistered in the type map.
For example, suppose a queue contains user-enqueued LCR messages represented as oracle.xdb.XMLType
and non-LCR messages of type person
and address
. The classes JPerson.java
and JAddress.java
are the ORAData
mappings for person
and address
, respectively. Before dequeuing the message, the type map must be populated as follows:
java.util.Map map = ((AQjmsSession)q_sess).getTypeMap(); map.put("SCOTT.PERSON", Class.forName("JPerson")); map.put("SCOTT.ADDRESS", Class.forName("JAddress")); map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLType")); // For LCRs
When using a messageSelector
with a QueueReceiver
or TopicPublisher
, the selector can contain any SQL expression that has a combination of one or more of the following:
JMS message header fields or properties
These include JMSPriority
, JMSCorrelationID
, JMSType
, JMSXUserI
, JMSXAppID
, JMSXGroupID
, and JMSXGroupSeq
. An example of a JMS message field messageSelector
is:
JMSPriority < 3 AND JMSCorrelationID = 'Fiction'
User-defined message properties
An example of a user-defined message properties messageSelector
is:
color IN ('RED', 'BLUE', 'GREEN') AND price < 30000
PL/SQL functions
An example of a PL/SQL function messageSelector
is:
hr.GET_TYPE(tab.user_data) = 'HR.EMPLOYEES'
To dequeue a message from an ANYDATA
queue using OCI, perform the same actions that you would to dequeue a message from a typed queue.
See Also:
Oracle XML DB Developer's Guide for more information about representing messages in XML format
Oracle Streams Advanced Queuing Java API Reference for more information about the oracle.jms
Java package
The OCIAQdeq
function in the Oracle Call Interface Programmer's Guide for more information about dequeuing messages using OCI
ANYDATA
queues can interoperate with typed queues. Table 22-1 shows the types of propagation possible between queues.
Table 22-1 Propagation Between Different Types of Queues
Source Queue | Destination Queue | Transformation |
---|---|---|
|
|
None |
Typed |
|
Implicit Note: Propagation is possible only if the messages in the typed queue meet the restrictions outlined in "Object Type Support". |
|
Typed |
Requires a rule to filter messages and a user-defined transformation. Only messages containing a payload of the same type as the typed queue can be propagated to the typed queue. |
Typed |
Typed |
Follows Oracle Streams Advanced Queuing rules |
Note:
Propagations cannot propagate user-enqueuedANYDATA
messages that encapsulate payloads of object types, varrays, or nested tables between databases with different character sets. Propagations can propagate such messages between databases with the same character set.Although you cannot use Simple Object Access Protocol (SOAP) to interact directly with an ANYDATA
queue, you can use SOAP by propagating messages between an ANYDATA
queue and a typed queue. If you want to enqueue a message into an ANYDATA
queue using SOAP, you must first configure propagation from a typed queue to the ANYDATA
queue. Then you can use SOAP to enqueue a message into the typed queue. The message is propagated automatically from the typed queue to the ANYDATA
queue.
If you want to use SOAP to dequeue a message that is in an ANYDATA
queue, then you can configure propagation from the ANYDATA
queue to a typed queue. The message is propagated automatically from the ANYDATA
queue to the typed queue, where it is available for access using SOAP.
See Also:
"Propagating Messages Between an ANYDATA Queue and a Typed Queue" in Oracle Streams Concepts and AdministrationThis section provides examples of enqueuing messages into an ANYDATA
queue. The examples assume you are in a SQL*Plus testing environment with access to two databases named db01
and db02
. The first few examples prepare the testing environment for the other examples in this chapter.
In Example 22-1, you connect as a user with administrative privileges at databases db01
and db02
to create administrator user strmadmin
and to grant EXECUTE
privilege on the DBMS_AQ
package to sample schema user oe
.
Example 22-1 Creating ANYDATA Users
GRANT EXECUTE ON DBMS_AQ TO oe; CREATE USER strmadmin IDENTIFIED BY strmadmin DEFAULT TABLESPACE example; GRANT DBA TO strmadmin; GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin; GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;
In Example 22-2, you connect to db01
as strmadmin
to create ANYDATA
queue oe_queue_any
. The oe
user is configured automatically as a secure user of the oe_queue_any
queue and is given ENQUEUE
and DEQUEUE
privileges on the queue.
Example 22-2 Creating an ANYDATA Queue
CONNECT strmadmin;
Enter password: password
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'oe_qtab_any',
queue_name => 'oe_queue_any',
queue_user => 'oe');
END;
/
In Example 22-3, you add a subscriber to the oe_queue_any
queue. This subscriber performs explicit dequeues of messages. The ADD_SUBSCRIBER
procedure will automatically create an AQ_AGENT
.
Example 22-3 Adding a Subscriber to the ANYDATA Queue
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_queue_any', subscriber => subscriber); END; /
In Example 22-4, you associate the oe
user with the local_agent
agent.
Example 22-4 Associating a User with an AQ_AGENT
BEGIN DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'local_agent', db_username => 'oe'); END; /
In Example 22-5, you connect to database db01
as user oe
to create an enqueue procedure. It takes an object of ANYDATA
type as an input parameter and enqueues a message containing the payload into an existing ANYDATA
queue.
Example 22-5 Creating an Enqueue Procedure
set echo off set verify off ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE CONNECT oe/&password@db01; set echo on CREATE PROCEDURE oe.enq_proc (payload ANYDATA) IS enqopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_queue_any', enqueue_options => enqopt, message_properties => mprop, payload => payload, msgid => enq_msgid); END; /
In Example 22-6, you use procedure oe.enq_proc
to enqueue a message of type VARCHAR2
into an ANYDATA
queue.
Example 22-6 Enqueuing a VARCHAR2 Message into an ANYDATA Queue
EXEC oe.enq_proc(ANYDATA.ConvertVarchar2('Chemicals - SW')); COMMIT;
In Example 22-7, you use procedure oe.enq_proc
to enqueue a message of type NUMBER
into an ANYDATA
queue.
Example 22-7 Enqueuing a NUMBER Message into an ANYDATA Queue
EXEC oe.enq_proc(ANYDATA.ConvertNumber('16')); COMMIT;
In Example 22-8, you use procedure oe.enq_proc
to enqueue a user-defined type message into an ANYDATA
queue.
Example 22-8 Enqueuing a User-Defined Type Message into an ANYDATA Queue
BEGIN oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ( '1646 Brazil Blvd','361168','Chennai','Tam', 'IN'))); END; / COMMIT;
See Also:
"Viewing the Contents of User-Enqueued Events in a Queue" in Oracle Streams Concepts and AdministrationThis section provides examples of dequeuing messages from an ANYDATA
queue. The examples assume that you have completed the examples in "Enqueuing User Messages in ANYDATA Queues".
To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$
queue_table_name
view, where queue_table_name
is the name of the queue table containing the queue.
In Example 22-9, you connect to database db01
as strmadmin
, the owner of queue oe_queue_any
, and perform a query on the AQ$OE_QTAB_ANY
view. The query returns three rows, with LOCAL_AGENT
as the CONSUMER_NAME
in each row.
Example 22-9 Determining the Consumer of Messages in a Queue
CONNECT strmadmin;
Enter password: password
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_QTAB_ANY;
In Example 22-10, you connect to database db01
as user oe
to create a dequeue procedure that takes as an input the consumer of the messages you want to dequeue, dequeues messages of oe.cust_address_typ
, and prints the contents of the messages.
Example 22-10 Creating a Dequeue Procedure for an ANYDATA Queue
CONNECT oe; -- @db01
Enter password: password
CREATE PROCEDURE oe.get_cust_address (
consumer IN VARCHAR2) AS
address OE.CUST_ADDRESS_TYP;
deq_address ANYDATA;
msgid RAW(16);
deqopt DBMS_AQ.DEQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
new_addresses BOOLEAN := TRUE;
next_trans EXCEPTION;
no_messages EXCEPTION;
pragma exception_init (next_trans, -25235);
pragma exception_init (no_messages, -25228);
num_var pls_integer;
BEGIN
deqopt.consumer_name := consumer;
deqopt.wait := 1;
WHILE (new_addresses) LOOP
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'strmadmin.oe_queue_any',
dequeue_options => deqopt,
message_properties => mprop,
payload => deq_address,
msgid => msgid);
deqopt.navigation := DBMS_AQ.NEXT;
DBMS_OUTPUT.PUT_LINE('****');
IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN
DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName());
num_var := deq_address.GetObject(address);
DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** ');
DBMS_OUTPUT.PUT_LINE(address.street_address);
DBMS_OUTPUT.PUT_LINE(address.postal_code);
DBMS_OUTPUT.PUT_LINE(address.city);
DBMS_OUTPUT.PUT_LINE(address.state_province);
DBMS_OUTPUT.PUT_LINE(address.country_id);
ELSE
DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName());
END IF;
COMMIT;
EXCEPTION
WHEN next_trans THEN
deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
WHEN no_messages THEN
new_addresses := FALSE;
DBMS_OUTPUT.PUT_LINE('No more messages');
END;
END LOOP;
END;
/
In Example 22-11, you use procedure oe.get_cust_address
, created in Example 22-10, specifying LOCAL_AGENT
as the consumer.
Example 22-11 Dequeuing Messages from an ANYDATA Queue
SET SERVEROUTPUT ON SIZE 100000 EXEC oe.get_cust_address('LOCAL_AGENT');
The example returns:
**** Message TYPE is: SYS.VARCHAR2 **** Message TYPE is: SYS.NUMBER **** Message TYPE is: OE.CUST_ADDRESS_TYP **** CUSTOMER ADDRESS **** 1646 Brazil Blvd 361168 Chennai Tam IN No more messages
This section provides examples showing how to propagate non-LCR user messages between an ANYDATA
queue and a typed queue.
Note:
The examples in this section assume that you have completed the examples in "Enqueuing User Messages in ANYDATA Queues".See Also:
"Message Propagation and ANYDATA Queues" for more information about propagation betweenANYDATA
and typed queuesThe first few examples set up propagation from the ANYDATA
queue oe_queue_any
, created in Example 22-2, to a typed queue in database db02
. In Example 22-12, you connect as sample schema user oe
to grant EXECUTE
privilege on oe.cust_address_typ
at databases db01
and db02
to administrator user strmadmin
.
Example 22-12 Granting EXECUTE Privilege on a Type
CONNECT oe; -- @db01 Enter password: password GRANT EXECUTE ON oe.cust_address_typ TO strmadmin; CONNECT oe; -- @db02 Enter password: password GRANT EXECUTE ON oe.cust_address_typ TO strmadmin;
In Example 22-13, you connect to database db02
as administrator user strmadmin
and create a destination queue of type oe.cust_address_typ
.
Example 22-13 Creating a Typed Destination Queue
CONNECT strmadmin;
Enter password: password
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'strmadmin.oe_qtab_address',
queue_payload_type => 'oe.cust_address_typ',
multiple_consumers => true);
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'strmadmin.oe_queue_address',
queue_table => 'strmadmin.oe_qtab_address');
DBMS_AQADM.START_QUEUE(
queue_name => 'strmadmin.oe_queue_address');
END;
/
In Example 22-14, you connect to database db01
as administrator user strmadmin
to create a database link from db01
to db02
.
Example 22-14 Creating a Database Link
CONNECT strmadmin; Enter password: password CREATE DATABASE LINK db02 CONNECT TO strmadmin IDENTIFIED BY password USING 'db02';
In Example 22-15, you create function any_to_cust_address_typ
in the strmadmin
schema at db01
that takes an ANYDATA
payload containing an oe.cust_address_typ
object and returns an oe.cust_address_typ
object.
Example 22-15 Creating a Function to Extract a Typed Object from an ANYDATA Object
CONNECT strmadmin;
Enter password: password
CREATE FUNCTION strmadmin.any_to_cust_address_typ(in_any IN ANYDATA)
RETURN OE.CUST_ADDRESS_TYP
AS
address OE.CUST_ADDRESS_TYP;
num_var NUMBER;
type_name VARCHAR2(100);
BEGIN
type_name := in_any.GetTypeName();
IF (type_name = 'OE.CUST_ADDRESS_TYP') THEN
num_var := in_any.GetObject(address);
RETURN address;
ELSE
raise_application_error(-20101, 'Conversion failed - ' || type_name);
END IF;
END;
/
In Example 22-16, you create a transformation at db01
using the DBMS_TRANSFORM
package.
Example 22-16 Creating an ANYDATA to Typed Object Transformation
BEGIN DBMS_TRANSFORM.CREATE_TRANSFORMATION( schema => 'strmadmin', name => 'anytoaddress', from_schema => 'SYS', from_type => 'ANYDATA', to_schema => 'oe', to_type => 'cust_address_typ', transformation => 'strmadmin.any_to_cust_address_typ(source.user_data)'); END; /
In Example 22-17, you create a subscriber for the typed queue. The subscriber must contain a rule that ensures that only messages of the appropriate type are propagated to the destination queue.
Example 22-17 Creating Subscriber ADDRESS_AGENT_REMOTE
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT ('ADDRESS_AGENT_REMOTE', 'STRMADMIN.OE_QUEUE_ADDRESS@DB02', 0); DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_queue_any', subscriber => subscriber, rule => 'TAB.USER_DATA.GetTypeName()=''OE.CUST_ADDRESS_TYP''', transformation => 'strmadmin.anytoaddress'); END; /
In Example 22-18, you schedule propagation between the ANYDATA
queue at db01
and the typed queue at db02
.
Example 22-18 Scheduling Propagation from an ANYDATA Queue to a Typed Queue
BEGIN DBMS_AQADM.SCHEDULE_PROPAGATION( queue_name => 'strmadmin.oe_queue_any', destination => 'db02'); END; /
In Example 22-19, you connect to database db01
as sample schema user oe
to enqueue a message of oe.cust_address_typ
type wrapped in an ANYDATA
wrapper. This example uses the enqueue procedure oe.enq_proc
created in Example 22-5.
Example 22-19 Enqueuing a Typed Message in an ANYDATA Wrapper
CONNECT oe;
Enter password: password
BEGIN
oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ(
'1668 Chong Tao','111181','Beijing',NULL,'CN')));
END;
/
COMMIT;
After allowing some time for propagation, in Example 22-20 you query queue table AQ$OE_QTAB_ADDRESS
at db02
to view the propagated message.
Example 22-20 Viewing the Propagated Message
CONNECT strmadmin;
Enter password: password
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_QTAB_ADDRESS;
The example returns one message for ADDRESS_AGENT_REMOTE
:
MSG_ID MSG_STATE CONSUMER_NAME -------------------------------- ---------------- ------------------------------ EBEF5CACC4665A6FE030578CE70A370D READY ADDRESS_AGENT_REMOTE 1 row selected.
See Also:
Chapter 20, "Oracle Messaging Gateway Message Conversion" for more information about transformations during propagationYou can propagate user-enqueued LCRs to an appropriate typed queue, but propagation of captured LCRs to a typed queue is not supported.
See Also:
"Streams Capture Process" in Oracle Streams Concepts and Administration for more information on capture processesTo propagate user-enqueued LCRs from an ANYDATA
queue to a typed queue, you complete the same steps as you do for non-LCR messages, but Oracle supplies the transformation functions. You can use the following functions in the DBMS_STREAMS
package to transform LCRs in ANYDATA
queues to messages in typed queues:
CONVERT_ANYDATA_TO_LCR_ROW
transforms an ANYDATA
payload containing a row LCR into a SYS.LCR$_ROW_RECORD
payload.
CONVERT_ANYDATA_TO_LCR_DDL
transforms an ANYDATA
payload containing a DDL LCR into a SYS.LCR$_DDL_RECORD
payload.
The examples in this section set up propagation of row LCRs from an ANYDATA
queue named oe_queue_any
to a typed queue of type SYS.LCR$_ROW_RECORD
named oe_queue_lcr
. The source queue oe_queue_any
is at database db01
, and the destination queue oe_queue_lcr
is created at database db02
in Example 22-21.
Note:
The examples in this section assume you have already run the examples in the preceding sections of this chapter.Example 22-21 Creating a Queue of Type LCR$_ROW_RECORD
CONNECT strmadmin;
Enter password: password
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'strmadmin.oe_qtab_lcr',
queue_payload_type => 'SYS.LCR$_ROW_RECORD',
multiple_consumers => true);
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'strmadmin.oe_queue_lcr',
queue_table => 'strmadmin.oe_qtab_lcr');
DBMS_AQADM.START_QUEUE(
queue_name => 'strmadmin.oe_queue_lcr');
END;
/
In Example 22-22, you connect to db01
as administrator user strmadmin
to create an ANYDATA
to LCR$_ROW_RECORD
transformation at db01
using the DBMS_TRANSFORM
package.
Example 22-22 Creating an ANYDATA to LCR$_ROW_RECORD Transformation
CONNECT strmadmin;
Enter password: password
BEGIN
DBMS_TRANSFORM.CREATE_TRANSFORMATION(
schema => 'strmadmin',
name => 'anytolcr',
from_schema => 'SYS',
from_type => 'ANYDATA',
to_schema => 'SYS',
to_type => 'LCR$_ROW_RECORD',
transformation =>
'SYS.DBMS_STREAMS.CONVERT_ANYDATA_TO_LCR_ROW(source.user_data)');
END;
/
In Example 22-23, you create a subscriber at the typed queue. The subscriber specifies the anytolcr
transformation created in Example 22-22 for the transformation
parameter.
Example 22-23 Creating Subscriber ROW_LCR_AGENT_REMOTE
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT( 'ROW_LCR_AGENT_REMOTE', 'STRMADMIN.OE_QUEUE_LCR@DB02', 0); DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_queue_any', subscriber => subscriber, rule => 'TAB.USER_DATA.GetTypeName()=''SYS.LCR$_ROW_RECORD''', transformation => 'strmadmin.anytolcr'); END; /
In Example 22-24, you connect to database db01
as sample schema user oe
to create a procedure to construct and enqueue a row LCR into the strmadmin.oe_queue_any
queue.
Example 22-24 Creating a Procedure to Construct and Enqueue a Row LCR
CONNECT oe;
Enter password: password
CREATE PROCEDURE oe.enq_row_lcr_proc(
source_dbname VARCHAR2,
cmd_type VARCHAR2,
obj_owner VARCHAR2,
obj_name VARCHAR2,
old_vals SYS.LCR$_ROW_LIST,
new_vals SYS.LCR$_ROW_LIST)
AS
eopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_msgid RAW(16);
row_lcr SYS.LCR$_ROW_RECORD;
BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL);
row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
source_database_name => source_dbname,
command_type => cmd_type,
object_owner => obj_owner,
object_name => obj_name,
old_values => old_vals,
new_values => new_vals);
DBMS_AQ.ENQUEUE(
queue_name => 'strmadmin.oe_queue_any',
enqueue_options => eopt,
message_properties => mprop,
payload => ANYDATA.ConvertObject(row_lcr),
msgid => enq_msgid);
END enq_row_lcr_proc;
/
In Example 22-25, you use the oe.enq_row_lcr_proc
procedure first to create a row LCR that inserts a row into the oe.inventories
table, and then to enqueue the row LCR into the strmadmin.oe_queue_any
queue.
Note:
This example does not insert a new row in theoe.inventories
table. The new row is inserted when an Oracle Streams apply process dequeues the row LCR and applies it.Example 22-25 Creating and Enqueuing a Row LCR
DECLARE newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newunit3 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN newunit1 := SYS.LCR$_ROW_UNIT( 'PRODUCT_ID', ANYDATA.ConvertNumber(3503), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit2 := SYS.LCR$_ROW_UNIT( 'WAREHOUSE_ID', ANYDATA.ConvertNumber(1), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit3 := SYS.LCR$_ROW_UNIT( 'QUANTITY_ON_HAND', ANYDATA.ConvertNumber(157), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3); oe.enq_row_lcr_proc( source_dbname => 'DB01', cmd_type => 'INSERT', obj_owner => 'OE', obj_name => 'INVENTORIES', old_vals => NULL, new_vals => newvals); END; / COMMIT;
The LCR is propagated to database db02
by the schedule created in Example 22-18. After allowing some time for propagation, in Example 22-26 you query queue table AQ$OE_QTAB_LCR
at db02
to view the propagated message.
Example 22-26 Viewing the Propagated LCR
CONNECT strmadmin;
Enter password: password
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_QTAB_LCR;
The example returns one message for ROW_LCR_AGENT_REMOTE
:
MSG_ID MSG_STATE CONSUMER_NAME -------------------------------- ---------------- ------------------------------ ECE2B0F912DDFF5EE030578CE70A04BB READY ROW_LCR_AGENT_REMOTE
See Also:
"DBMS_STREAMS" in Oracle Database PL/SQL Packages and Types Reference for more information about the row LCR and DDL LCR conversion functions