欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Write messaging applications with ODP.NET and Oracle Streams Advanced Queuing代码实例

程序员文章站 2022-05-14 10:40:42
write messaging applications with odp.net and oracle streams advanced queuing代码实例 write messaging...

write messaging applications with odp.net and oracle streams advanced queuing代码实例

write messaging applications with odp.net and oracle streams advanced queuing

-- part i: database setup required for this demo
 
------------------------------------------------------------------
-- sql to grant appropriate privilege to database user, scott
------------------------------------------------------------------
sql> alter user scott account unlock identified by pwd4sct;
user altered.
grant all on dbms_aqadm to scott;
 
------------------------------------------------------------------
-- plsql to create queue-table and queue and start queue for scott
------------------------------------------------------------------
begin
  dbms_aqadm.create_queue_table(
    queue_table=>'scott.test_q_tab', 
    queue_payload_type=>'raw', 
    multiple_consumers=>false);
 
  dbms_aqadm.create_queue(
    queue_name=>'scott.test_q', 
    queue_table=>'scott.test_q_tab');
 
  dbms_aqadm.start_queue(queue_name=>'scott.test_q');
end;
/
 
------------------------------------------------------------------
-- plsql to stop queue and drop queue & queue-table from scott
------------------------------------------------------------------
begin
  dbms_aqadm.stop_queue('scott.test_q');
 
  dbms_aqadm.drop_queue(
    queue_name => 'scott.test_q', 
    auto_commit => true);
 
  dbms_aqadm.drop_queue_table(
    queue_table => 'scott.test_q_tab',
    force => false, 
    auto_commit => true);
end;
/
-- end of part i, database setup.

//part ii: demonstrates using the listen method
//c#
using system;
using system.text;
using oracle.dataaccess.client;
using oracle.dataaccess.types;
using system.threading;
 
namespace odpsample
{
  /// <summary>
  /// demonstrates how a thread can listen and wait until a message is enqueued.
  /// once a message is enqueued, the listening thread returns from the 
  /// blocked listen() method invocation and dequeues the message.
  /// </summary>
  class enqueuedequeue
  {
    static bool s_blistenreturned = false;
 
    static void main(string[] args)
    {
      // create connection
      string constr = "user id=scott;password=pwd4sct;data source=oracle";
      oracleconnection con = new oracleconnection(constr);
 
      // create queue
      oracleaqqueue queue = new oracleaqqueue("scott.test_q", con);
 
      try
      {
        // open connection
        con.open();
 
        // set message type for the queue
        queue.messagetype = oracleaqmessagetype.raw;
 
        // spawning a thread which will listen for a message
        threadstart ts = new threadstart(testlisten);
        thread t = new thread(ts);
        t.start();
 
        system.threading.thread.sleep(2000);
 
        // begin transaction for enqueue
        oracletransaction txn = con.begintransaction();
 
        // prepare message and raw payload
        oracleaqmessage enqmsg = new oracleaqmessage();
        byte[] bytepayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        enqmsg.payload = bytepayload;
 
        // prepare to enqueue
        queue.enqueueoptions.visibility = oracleaqvisibilitymode.oncommit;
 
        console.writeline("[main thread]   enqueuing a message...");
        console.writeline("[main thread]   enqueued message payload      : "
          + bytearraytostring(enqmsg.payload as byte[]));
        console.writeline();
 
        // enqueue message
        queue.enqueue(enqmsg);
 
        // enqueue transaction commit
        txn.commit();
 
        // loop till listen returns
        while (!s_blistenreturned)
          system.threading.thread.sleep(1000);
      }
      catch (exception e)
      {
        console.writeline("error: {0}", e.message);
      }
      finally
      {
        // close/dispose objects
        queue.dispose();
        con.close();
        con.dispose();
      }
    }
 
    static void testlisten()
    {
      // create connection
      string constr = "user id=scott;password=pwd4sct;data source=oracle";
      oracleconnection conlisten = new oracleconnection(constr);
 
      // create queue
      oracleaqqueue queuelisten = new oracleaqqueue("scott.test_q", conlisten);
 
      try
      {
          // open the connection for listen thread.
          // connection blocked on listen thread can not be used for other db 
          // operations
          conlisten.open();
 
          // set message type for the queue
          queuelisten.messagetype = oracleaqmessagetype.raw;
 
        // listen
        queuelisten.listen(null);
 
        console.writeline("[listen thread] listen returned... dequeuing...");
 
        // begin txn for dequeue
        oracletransaction txn = conlisten.begintransaction();
 
        // prepare to dequeue
        queuelisten.dequeueoptions.visibility = oracleaqvisibilitymode.oncommit;
        queuelisten.dequeueoptions.wait = 10;
 
        // dequeue message
        oracleaqmessage deqmsg = queuelisten.dequeue();
        console.writeline("[listen thread] dequeued message payload      : "
          + bytearraytostring(deqmsg.payload as byte[]));
 
        // dequeue txn commit
        txn.commit();
 
        // allow the main thread to exit
        s_blistenreturned = true;
      }
      catch (exception e)
      {
        console.writeline("error: {0}", e.message);
      }
      finally
      {
        // close/dispose objects
        queuelisten.dispose();
        conlisten.close();
        conlisten.dispose();
      }
    }
 
    // function to convert byte[] to string
    static private string bytearraytostring(byte[] bytearray)
    {
      stringbuilder sb = new stringbuilder();
      for (int n = 0; n < bytearray.length; n++)
      {
        sb.append((int.parse(bytearray[n].tostring())).tostring("x"));
      }
      return sb.tostring();
    }
  }
}




-- part i: database setup required for this demo
 
------------------------------------------------------------------
-- sql to grant appropriate privilege to database user, scott
------------------------------------------------------------------
sql> alter user scott account unlock identified by pwd4sct;
user altered.
sql> grant all on dbms_aqadm to scott;
 
------------------------------------------------------------------
-- plsql to create queue-table and queue and start queue for scott
------------------------------------------------------------------
begin
  dbms_aqadm.create_queue_table(
    queue_table=>'scott.test_q_tab', 
    queue_payload_type=>'raw', 
    multiple_consumers=>false);
 
  dbms_aqadm.create_queue(
    queue_name=>'scott.test_q', 
    queue_table=>'scott.test_q_tab');
 
  dbms_aqadm.start_queue(queue_name=>'scott.test_q');
end;
/
 
------------------------------------------------------------------
-- plsql to stop queue and drop queue & queue-table from scott
------------------------------------------------------------------
begin
  dbms_aqadm.stop_queue('scott.test_q');
 
  dbms_aqadm.drop_queue(
    queue_name => 'scott.test_q', 
    auto_commit => true);
 
  dbms_aqadm.drop_queue_table(
    queue_table => 'scott.test_q_tab',
    force => false, 
    auto_commit => true);
end;
/
-- end of part i, database setup.

//part ii: demonstrates application notification
//c#
using system;
using system.text;
using oracle.dataaccess.client;
using oracle.dataaccess.types;
 
namespace odpsample
{
  /// <summary>
  /// demonstrates how the application can be notified when a message is 
  /// available in a queue.
  /// </summary>
  class notification
  {
    static bool isnotified = false;
 
    static void main(string[] args)
    {
      // create connection
      string constr = "user id=scott;password=pwd4sct;data source=oracle";
      oracleconnection con = new oracleconnection(constr);
 
      // create queue
      oracleaqqueue queue = new oracleaqqueue("scott.test_q", con);
 
      try
      {
        // open connection
        con.open();
 
        // set message type for the queue
        queue.messagetype = oracleaqmessagetype.raw;
 
        // add the event handler to handle the notification. the 
        // msgreceived method will be invoked when a message is enqueued
        queue.messageavailable +=
          new oracleaqmessageavailableeventhandler(notification.msgreceived);
 
        console.writeline("notification registered...");
 
        // begin txn for enqueue
        oracletransaction txn = con.begintransaction();
 
        console.writeline("now enqueuing message...");
 
        // prepare message and raw payload
        oracleaqmessage enqmsg = new oracleaqmessage();
        byte[] bytepayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        enqmsg.payload = bytepayload;
 
        // prepare to enqueue
        queue.enqueueoptions.visibility = oracleaqvisibilitymode.oncommit;
 
        // enqueue message
        queue.enqueue(enqmsg);
 
        console.writeline("enqueued message payload      : "
          + bytearraytostring(enqmsg.payload as byte[]));
        console.writeline("messageid of enqueued message : "
          + bytearraytostring(enqmsg.messageid));
        console.writeline();
 
        // enqueue txn commit
        txn.commit();
 
        // loop while waiting for notification
        while (isnotified == false)
        {
          system.threading.thread.sleep(2000);
        }
      }
      catch (exception e)
      {
        console.writeline("error: {0}", e.message);
      }
      finally
      {
        // close/dispose objects
        queue.dispose();
        con.close();
        con.dispose();
      }
    }
 
    static void msgreceived(object src, oracleaqmessageavailableeventargs arg)
    {
      try
      {
        console.writeline("notification received...");
        console.writeline("queuename : {0}", arg.queuename);
        console.writeline("notification type : {0}", arg.notificationtype);
                
                //following type-cast to "byte[]" is required only for .net 1.x
        byte[] notifiedmsgid = (byte[]) arg.messageid[0];
        console.writeline("messageid of notified message : "
          + bytearraytostring(notifiedmsgid));
        isnotified = true;
      }
      catch (exception e)
      {
        console.writeline("error: {0}", e.message);
      }
    }
        
        // function to convert byte[] to string
    static private string bytearraytostring(byte[] bytearray)
    {
      stringbuilder sb = new stringbuilder();
      for (int n = 0; n < bytearray.length; n++)
      {
        sb.append((int.parse(bytearray[n].tostring())).tostring("x"));
      }
      return sb.tostring();
    }
  }
}