Write messaging applications with ODP.NET and Oracle Streams Advanced Queuing代码实例
程序员文章站
2022-05-14 10:40:42
write messaging applications with odp.net and oracle streams advanced queuing代码实例
write messaging...
write messaging applications with odp.net and oracle streams advanced queuing代码实例
write messaging applications with odp.net and oracle streams advanced queuing -- part i: database setup required for this demo ------------------------------------------------------------------ -- sql to grant appropriate privilege to database user, scott ------------------------------------------------------------------ sql> alter user scott account unlock identified by pwd4sct; user altered. grant all on dbms_aqadm to scott; ------------------------------------------------------------------ -- plsql to create queue-table and queue and start queue for scott ------------------------------------------------------------------ begin dbms_aqadm.create_queue_table( queue_table=>'scott.test_q_tab', queue_payload_type=>'raw', multiple_consumers=>false); dbms_aqadm.create_queue( queue_name=>'scott.test_q', queue_table=>'scott.test_q_tab'); dbms_aqadm.start_queue(queue_name=>'scott.test_q'); end; / ------------------------------------------------------------------ -- plsql to stop queue and drop queue & queue-table from scott ------------------------------------------------------------------ begin dbms_aqadm.stop_queue('scott.test_q'); dbms_aqadm.drop_queue( queue_name => 'scott.test_q', auto_commit => true); dbms_aqadm.drop_queue_table( queue_table => 'scott.test_q_tab', force => false, auto_commit => true); end; / -- end of part i, database setup. //part ii: demonstrates using the listen method //c# using system; using system.text; using oracle.dataaccess.client; using oracle.dataaccess.types; using system.threading; namespace odpsample { /// <summary> /// demonstrates how a thread can listen and wait until a message is enqueued. /// once a message is enqueued, the listening thread returns from the /// blocked listen() method invocation and dequeues the message. /// </summary> class enqueuedequeue { static bool s_blistenreturned = false; static void main(string[] args) { // create connection string constr = "user id=scott;password=pwd4sct;data source=oracle"; oracleconnection con = new oracleconnection(constr); // create queue oracleaqqueue queue = new oracleaqqueue("scott.test_q", con); try { // open connection con.open(); // set message type for the queue queue.messagetype = oracleaqmessagetype.raw; // spawning a thread which will listen for a message threadstart ts = new threadstart(testlisten); thread t = new thread(ts); t.start(); system.threading.thread.sleep(2000); // begin transaction for enqueue oracletransaction txn = con.begintransaction(); // prepare message and raw payload oracleaqmessage enqmsg = new oracleaqmessage(); byte[] bytepayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; enqmsg.payload = bytepayload; // prepare to enqueue queue.enqueueoptions.visibility = oracleaqvisibilitymode.oncommit; console.writeline("[main thread] enqueuing a message..."); console.writeline("[main thread] enqueued message payload : " + bytearraytostring(enqmsg.payload as byte[])); console.writeline(); // enqueue message queue.enqueue(enqmsg); // enqueue transaction commit txn.commit(); // loop till listen returns while (!s_blistenreturned) system.threading.thread.sleep(1000); } catch (exception e) { console.writeline("error: {0}", e.message); } finally { // close/dispose objects queue.dispose(); con.close(); con.dispose(); } } static void testlisten() { // create connection string constr = "user id=scott;password=pwd4sct;data source=oracle"; oracleconnection conlisten = new oracleconnection(constr); // create queue oracleaqqueue queuelisten = new oracleaqqueue("scott.test_q", conlisten); try { // open the connection for listen thread. // connection blocked on listen thread can not be used for other db // operations conlisten.open(); // set message type for the queue queuelisten.messagetype = oracleaqmessagetype.raw; // listen queuelisten.listen(null); console.writeline("[listen thread] listen returned... dequeuing..."); // begin txn for dequeue oracletransaction txn = conlisten.begintransaction(); // prepare to dequeue queuelisten.dequeueoptions.visibility = oracleaqvisibilitymode.oncommit; queuelisten.dequeueoptions.wait = 10; // dequeue message oracleaqmessage deqmsg = queuelisten.dequeue(); console.writeline("[listen thread] dequeued message payload : " + bytearraytostring(deqmsg.payload as byte[])); // dequeue txn commit txn.commit(); // allow the main thread to exit s_blistenreturned = true; } catch (exception e) { console.writeline("error: {0}", e.message); } finally { // close/dispose objects queuelisten.dispose(); conlisten.close(); conlisten.dispose(); } } // function to convert byte[] to string static private string bytearraytostring(byte[] bytearray) { stringbuilder sb = new stringbuilder(); for (int n = 0; n < bytearray.length; n++) { sb.append((int.parse(bytearray[n].tostring())).tostring("x")); } return sb.tostring(); } } } -- part i: database setup required for this demo ------------------------------------------------------------------ -- sql to grant appropriate privilege to database user, scott ------------------------------------------------------------------ sql> alter user scott account unlock identified by pwd4sct; user altered. sql> grant all on dbms_aqadm to scott; ------------------------------------------------------------------ -- plsql to create queue-table and queue and start queue for scott ------------------------------------------------------------------ begin dbms_aqadm.create_queue_table( queue_table=>'scott.test_q_tab', queue_payload_type=>'raw', multiple_consumers=>false); dbms_aqadm.create_queue( queue_name=>'scott.test_q', queue_table=>'scott.test_q_tab'); dbms_aqadm.start_queue(queue_name=>'scott.test_q'); end; / ------------------------------------------------------------------ -- plsql to stop queue and drop queue & queue-table from scott ------------------------------------------------------------------ begin dbms_aqadm.stop_queue('scott.test_q'); dbms_aqadm.drop_queue( queue_name => 'scott.test_q', auto_commit => true); dbms_aqadm.drop_queue_table( queue_table => 'scott.test_q_tab', force => false, auto_commit => true); end; / -- end of part i, database setup. //part ii: demonstrates application notification //c# using system; using system.text; using oracle.dataaccess.client; using oracle.dataaccess.types; namespace odpsample { /// <summary> /// demonstrates how the application can be notified when a message is /// available in a queue. /// </summary> class notification { static bool isnotified = false; static void main(string[] args) { // create connection string constr = "user id=scott;password=pwd4sct;data source=oracle"; oracleconnection con = new oracleconnection(constr); // create queue oracleaqqueue queue = new oracleaqqueue("scott.test_q", con); try { // open connection con.open(); // set message type for the queue queue.messagetype = oracleaqmessagetype.raw; // add the event handler to handle the notification. the // msgreceived method will be invoked when a message is enqueued queue.messageavailable += new oracleaqmessageavailableeventhandler(notification.msgreceived); console.writeline("notification registered..."); // begin txn for enqueue oracletransaction txn = con.begintransaction(); console.writeline("now enqueuing message..."); // prepare message and raw payload oracleaqmessage enqmsg = new oracleaqmessage(); byte[] bytepayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; enqmsg.payload = bytepayload; // prepare to enqueue queue.enqueueoptions.visibility = oracleaqvisibilitymode.oncommit; // enqueue message queue.enqueue(enqmsg); console.writeline("enqueued message payload : " + bytearraytostring(enqmsg.payload as byte[])); console.writeline("messageid of enqueued message : " + bytearraytostring(enqmsg.messageid)); console.writeline(); // enqueue txn commit txn.commit(); // loop while waiting for notification while (isnotified == false) { system.threading.thread.sleep(2000); } } catch (exception e) { console.writeline("error: {0}", e.message); } finally { // close/dispose objects queue.dispose(); con.close(); con.dispose(); } } static void msgreceived(object src, oracleaqmessageavailableeventargs arg) { try { console.writeline("notification received..."); console.writeline("queuename : {0}", arg.queuename); console.writeline("notification type : {0}", arg.notificationtype); //following type-cast to "byte[]" is required only for .net 1.x byte[] notifiedmsgid = (byte[]) arg.messageid[0]; console.writeline("messageid of notified message : " + bytearraytostring(notifiedmsgid)); isnotified = true; } catch (exception e) { console.writeline("error: {0}", e.message); } } // function to convert byte[] to string static private string bytearraytostring(byte[] bytearray) { stringbuilder sb = new stringbuilder(); for (int n = 0; n < bytearray.length; n++) { sb.append((int.parse(bytearray[n].tostring())).tostring("x")); } return sb.tostring(); } } }