write messaging applications with odp.net and oracle streams advanced queuing代码实例
write messaging applications with odp.net and oracle streams advanced queuing -- part i: database setup required for this demo ------------------------------------------------------------------ -- sql to grant appropriate privilege to database user, scott ------------------------------------------------------------------ sql> alter user scott account unlock identified by pwd4sct; user altered. grant all on dbms_aqadm to scott; ------------------------------------------------------------------ -- plsql to create queue-table and queue and start queue for scott ------------------------------------------------------------------ begin dbms_aqadm.create_queue_table( queue_table=>'scott.test_q_tab', queue_payload_type=>'raw', multiple_consumers=>false); dbms_aqadm.create_queue( queue_name=>'scott.test_q', queue_table=>'scott.test_q_tab'); dbms_aqadm.start_queue(queue_name=>'scott.test_q'); end; / ------------------------------------------------------------------ -- plsql to stop queue and drop queue & queue-table from scott ------------------------------------------------------------------ begin dbms_aqadm.stop_queue('scott.test_q'); dbms_aqadm.drop_queue( queue_name => 'scott.test_q', auto_commit => true); dbms_aqadm.drop_queue_table( queue_table => 'scott.test_q_tab', force => false, auto_commit => true); end; / -- end of part i, database setup. //part ii: demonstrates using the listen method //c# using system; using system.text; using oracle.dataaccess.client; using oracle.dataaccess.types; using system.threading; namespace odpsample { /// <summary> /// demonstrates how a thread can listen and wait until a message is enqueued. /// once a message is enqueued, the listening thread returns from the /// blocked listen() method invocation and dequeues the message. /// </summary> class enqueuedequeue { static bool s_blistenreturned = false; static void main(string[] args) { // create connection string constr = "user id=scott;password=pwd4sct;data source=oracle"; oracleconnection con = new oracleconnection(constr); // create queue oracleaqqueue queue = new oracleaqqueue("scott.test_q", con); try { // open connection con.open(); // set message type for the queue queue.messagetype = oracleaqmessagetype.raw; // spawning a thread which will listen for a message threadstart ts = new threadstart(testlisten); thread t = new thread(ts); t.start(); system.threading.thread.sleep(2000); // begin transaction for enqueue oracletransaction txn = con.begintransaction(); // prepare message and raw payload oracleaqmessage enqmsg = new oracleaqmessage(); byte[] bytepayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; enqmsg.payload = bytepayload; // prepare to enqueue queue.enqueueoptions.visibility = oracleaqvisibilitymode.oncommit; console.writeline("[main thread] enqueuing a message..."); console.writeline("[main thread] enqueued message payload : " + bytearraytostring(enqmsg.payload as byte[])); console.writeline(); // enqueue message queue.enqueue(enqmsg); // enqueue transaction commit txn.commit(); // loop till listen returns while (!s_blistenreturned) system.threading.thread.sleep(1000); } catch (exception e) { console.writeline("error: {0}", e.message); } finally { // close/dispose objects queue.dispose(); con.close(); con.dispose(); } } static void testlisten() { // create connection string constr = "user id=scott;password=pwd4sct;data source=oracle"; oracleconnection conlisten = new oracleconnection(constr); // create queue oracleaqqueue queuelisten = new oracleaqqueue("scott.test_q", conlisten); try { // open the connection for listen thread. // connection blocked on listen thread can not be used for other db // operations conlisten.open(); // set message type for the queue queuelisten.messagetype = oracleaqmessagetype.raw; // listen queuelisten.listen(null); console.writeline("[listen thread] listen returned... dequeuing..."); // begin txn for dequeue oracletransaction txn = conlisten.begintransaction(); // prepare to dequeue queuelisten.dequeueoptions.visibility = oracleaqvisibilitymode.oncommit; queuelisten.dequeueoptions.wait = 10; // dequeue message oracleaqmessage deqmsg = queuelisten.dequeue(); console.writeline("[listen thread] dequeued message payload : " + bytearraytostring(deqmsg.payload as byte[])); // dequeue txn commit txn.commit(); // allow the main thread to exit s_blistenreturned = true; } catch (exception e) { console.writeline("error: {0}", e.message); } finally { // close/dispose objects queuelisten.dispose(); conlisten.close(); conlisten.dispose(); } } // function to convert byte[] to string static private string bytearraytostring(byte[] bytearray) { stringbuilder sb = new stringbuilder(); for (int n = 0; n < bytearray.length; n++) { sb.append((int.parse(bytearray[n].tostring())).tostring("x")); } return sb.tostring(); } } } -- part i: database setup required for this demo ------------------------------------------------------------------ -- sql to grant appropriate privilege to database user, scott ------------------------------------------------------------------ sql> alter user scott account unlock identified by pwd4sct; user altered. sql> grant all on dbms_aqadm to scott; ------------------------------------------------------------------ -- plsql to create queue-table and queue and start queue for scott ------------------------------------------------------------------ begin dbms_aqadm.create_queue_table( queue_table=>'scott.test_q_tab', queue_payload_type=>'raw', multiple_consumers=>false); dbms_aqadm.create_queue( queue_name=>'scott.test_q', queue_table=>'scott.test_q_tab'); dbms_aqadm.start_queue(queue_name=>'scott.test_q'); end; / ------------------------------------------------------------------ -- plsql to stop queue and drop queue & queue-table from scott ------------------------------------------------------------------ begin dbms_aqadm.stop_queue('scott.test_q'); dbms_aqadm.drop_queue( queue_name => 'scott.test_q', auto_commit => true); dbms_aqadm.drop_queue_table( queue_table => 'scott.test_q_tab', force => false, auto_commit => true); end; / -- end of part i, database setup. //part ii: demonstrates application notification //c# using system; using system.text; using oracle.dataaccess.client; using oracle.dataaccess.types; namespace odpsample { /// <summary> /// demonstrates how the application can be notified when a message is /// available in a queue. /// </summary> class notification { static bool isnotified = false; static void main(string[] args) { // create connection string constr = "user id=scott;password=pwd4sct;data source=oracle"; oracleconnection con = new oracleconnection(constr); // create queue oracleaqqueue queue = new oracleaqqueue("scott.test_q", con); try { // open connection con.open(); // set message type for the queue queue.messagetype = oracleaqmessagetype.raw; // add the event handler to handle the notification. the // msgreceived method will be invoked when a message is enqueued queue.messageavailable += new oracleaqmessageavailableeventhandler(notification.msgreceived); console.writeline("notification registered..."); // begin txn for enqueue oracletransaction txn = con.begintransaction(); console.writeline("now enqueuing message..."); // prepare message and raw payload oracleaqmessage enqmsg = new oracleaqmessage(); byte[] bytepayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; enqmsg.payload = bytepayload; // prepare to enqueue queue.enqueueoptions.visibility = oracleaqvisibilitymode.oncommit; // enqueue message queue.enqueue(enqmsg); console.writeline("enqueued message payload : " + bytearraytostring(enqmsg.payload as byte[])); console.writeline("messageid of enqueued message : " + bytearraytostring(enqmsg.messageid)); console.writeline(); // enqueue txn commit txn.commit(); // loop while waiting for notification while (isnotified == false) { system.threading.thread.sleep(2000); } } catch (exception e) { console.writeline("error: {0}", e.message); } finally { // close/dispose objects queue.dispose(); con.close(); con.dispose(); } } static void msgreceived(object src, oracleaqmessageavailableeventargs arg) { try { console.writeline("notification received..."); console.writeline("queuename : {0}", arg.queuename); console.writeline("notification type : {0}", arg.notificationtype); //following type-cast to "byte[]" is required only for .net 1.x byte[] notifiedmsgid = (byte[]) arg.messageid[0]; console.writeline("messageid of notified message : " + bytearraytostring(notifiedmsgid)); isnotified = true; } catch (exception e) { console.writeline("error: {0}", e.message); } } // function to convert byte[] to string static private string bytearraytostring(byte[] bytearray) { stringbuilder sb = new stringbuilder(); for (int n = 0; n < bytearray.length; n++) { sb.append((int.parse(bytearray[n].tostring())).tostring("x")); } return sb.tostring(); } } }