Write messaging applications with ODP.NET and Oracle Streams Advanced Queuing代码实例

write messaging applications with odp.net and oracle streams advanced queuing代码实例

write messaging applications with odp.net and oracle streams advanced queuing
-- part i: database setup required for this demo
------------------------------------------------------------------
-- sql to grant appropriate privilege to database user, scott
------------------------------------------------------------------
sql> alter user scott account unlock identified by pwd4sct;
user altered.
grant all on dbms_aqadm to scott;
------------------------------------------------------------------
-- plsql to create queue-table and queue and start queue for scott
------------------------------------------------------------------
begin
dbms_aqadm.create_queue_table(
queue_table=>'scott.test_q_tab', 
queue_payload_type=>'raw', 
multiple_consumers=>false);
dbms_aqadm.create_queue(
queue_name=>'scott.test_q', 
queue_table=>'scott.test_q_tab');
dbms_aqadm.start_queue(queue_name=>'scott.test_q');
end;
/
------------------------------------------------------------------
-- plsql to stop queue and drop queue & queue-table from scott
------------------------------------------------------------------
begin
dbms_aqadm.stop_queue('scott.test_q');
dbms_aqadm.drop_queue(
queue_name => 'scott.test_q', 
auto_commit => true);
dbms_aqadm.drop_queue_table(
queue_table => 'scott.test_q_tab',
force => false, 
auto_commit => true);
end;
/
-- end of part i, database setup.
//part ii: demonstrates using the listen method
//c#
using system;
using system.text;
using oracle.dataaccess.client;
using oracle.dataaccess.types;
using system.threading;
namespace odpsample
{
/// <summary>
/// demonstrates how a thread can listen and wait until a message is enqueued.
/// once a message is enqueued, the listening thread returns from the 
/// blocked listen() method invocation and dequeues the message.
/// </summary>
class enqueuedequeue
{
static bool s_blistenreturned = false;
static void main(string[] args)
{
// create connection
string constr = "user id=scott;password=pwd4sct;data source=oracle";
oracleconnection con = new oracleconnection(constr);
// create queue
oracleaqqueue queue = new oracleaqqueue("scott.test_q", con);
try
{
// open connection
con.open();
// set message type for the queue
queue.messagetype = oracleaqmessagetype.raw;
// spawning a thread which will listen for a message
threadstart ts = new threadstart(testlisten);
thread t = new thread(ts);
t.start();
system.threading.thread.sleep(2000);
// begin transaction for enqueue
oracletransaction txn = con.begintransaction();
// prepare message and raw payload
oracleaqmessage enqmsg = new oracleaqmessage();
byte[] bytepayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
enqmsg.payload = bytepayload;
// prepare to enqueue
queue.enqueueoptions.visibility = oracleaqvisibilitymode.oncommit;
console.writeline("[main thread]   enqueuing a message...");
console.writeline("[main thread]   enqueued message payload      : "
+ bytearraytostring(enqmsg.payload as byte[]));
console.writeline();
// enqueue message
queue.enqueue(enqmsg);
// enqueue transaction commit
txn.commit();
// loop till listen returns
while (!s_blistenreturned)
system.threading.thread.sleep(1000);
}
catch (exception e)
{
console.writeline("error: {0}", e.message);
}
finally
{
// close/dispose objects
queue.dispose();
con.close();
con.dispose();
}
}
static void testlisten()
{
// create connection
string constr = "user id=scott;password=pwd4sct;data source=oracle";
oracleconnection conlisten = new oracleconnection(constr);
// create queue
oracleaqqueue queuelisten = new oracleaqqueue("scott.test_q", conlisten);
try
{
// open the connection for listen thread.
// connection blocked on listen thread can not be used for other db 
// operations
conlisten.open();
// set message type for the queue
queuelisten.messagetype = oracleaqmessagetype.raw;
// listen
queuelisten.listen(null);
console.writeline("[listen thread] listen returned... dequeuing...");
// begin txn for dequeue
oracletransaction txn = conlisten.begintransaction();
// prepare to dequeue
queuelisten.dequeueoptions.visibility = oracleaqvisibilitymode.oncommit;
queuelisten.dequeueoptions.wait = 10;
// dequeue message
oracleaqmessage deqmsg = queuelisten.dequeue();
console.writeline("[listen thread] dequeued message payload      : "
+ bytearraytostring(deqmsg.payload as byte[]));
// dequeue txn commit
txn.commit();
// allow the main thread to exit
s_blistenreturned = true;
}
catch (exception e)
{
console.writeline("error: {0}", e.message);
}
finally
{
// close/dispose objects
queuelisten.dispose();
conlisten.close();
conlisten.dispose();
}
}
// function to convert byte[] to string
static private string bytearraytostring(byte[] bytearray)
{
stringbuilder sb = new stringbuilder();
for (int n = 0; n < bytearray.length; n++)
{
sb.append((int.parse(bytearray[n].tostring())).tostring("x"));
}
return sb.tostring();
}
}
}
-- part i: database setup required for this demo
------------------------------------------------------------------
-- sql to grant appropriate privilege to database user, scott
------------------------------------------------------------------
sql> alter user scott account unlock identified by pwd4sct;
user altered.
sql> grant all on dbms_aqadm to scott;
------------------------------------------------------------------
-- plsql to create queue-table and queue and start queue for scott
------------------------------------------------------------------
begin
dbms_aqadm.create_queue_table(
queue_table=>'scott.test_q_tab', 
queue_payload_type=>'raw', 
multiple_consumers=>false);
dbms_aqadm.create_queue(
queue_name=>'scott.test_q', 
queue_table=>'scott.test_q_tab');
dbms_aqadm.start_queue(queue_name=>'scott.test_q');
end;
/
------------------------------------------------------------------
-- plsql to stop queue and drop queue & queue-table from scott
------------------------------------------------------------------
begin
dbms_aqadm.stop_queue('scott.test_q');
dbms_aqadm.drop_queue(
queue_name => 'scott.test_q', 
auto_commit => true);
dbms_aqadm.drop_queue_table(
queue_table => 'scott.test_q_tab',
force => false, 
auto_commit => true);
end;
/
-- end of part i, database setup.
//part ii: demonstrates application notification
//c#
using system;
using system.text;
using oracle.dataaccess.client;
using oracle.dataaccess.types;
namespace odpsample
{
/// <summary>
/// demonstrates how the application can be notified when a message is 
/// available in a queue.
/// </summary>
class notification
{
static bool isnotified = false;
static void main(string[] args)
{
// create connection
string constr = "user id=scott;password=pwd4sct;data source=oracle";
oracleconnection con = new oracleconnection(constr);
// create queue
oracleaqqueue queue = new oracleaqqueue("scott.test_q", con);
try
{
// open connection
con.open();
// set message type for the queue
queue.messagetype = oracleaqmessagetype.raw;
// add the event handler to handle the notification. the 
// msgreceived method will be invoked when a message is enqueued
queue.messageavailable +=
new oracleaqmessageavailableeventhandler(notification.msgreceived);
console.writeline("notification registered...");
// begin txn for enqueue
oracletransaction txn = con.begintransaction();
console.writeline("now enqueuing message...");
// prepare message and raw payload
oracleaqmessage enqmsg = new oracleaqmessage();
byte[] bytepayload = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
enqmsg.payload = bytepayload;
// prepare to enqueue
queue.enqueueoptions.visibility = oracleaqvisibilitymode.oncommit;
// enqueue message
queue.enqueue(enqmsg);
console.writeline("enqueued message payload      : "
+ bytearraytostring(enqmsg.payload as byte[]));
console.writeline("messageid of enqueued message : "
+ bytearraytostring(enqmsg.messageid));
console.writeline();
// enqueue txn commit
txn.commit();
// loop while waiting for notification
while (isnotified == false)
{
system.threading.thread.sleep(2000);
}
}
catch (exception e)
{
console.writeline("error: {0}", e.message);
}
finally
{
// close/dispose objects
queue.dispose();
con.close();
con.dispose();
}
}
static void msgreceived(object src, oracleaqmessageavailableeventargs arg)
{
try
{
console.writeline("notification received...");
console.writeline("queuename : {0}", arg.queuename);
console.writeline("notification type : {0}", arg.notificationtype);
//following type-cast to "byte[]" is required only for .net 1.x
byte[] notifiedmsgid = (byte[]) arg.messageid[0];
console.writeline("messageid of notified message : "
+ bytearraytostring(notifiedmsgid));
isnotified = true;
}
catch (exception e)
{
console.writeline("error: {0}", e.message);
}
}
// function to convert byte[] to string
static private string bytearraytostring(byte[] bytearray)
{
stringbuilder sb = new stringbuilder();
for (int n = 0; n < bytearray.length; n++)
{
sb.append((int.parse(bytearray[n].tostring())).tostring("x"));
}
return sb.tostring();
}
}
}
(0)
上一篇 2022年3月22日
下一篇 2022年3月22日

相关推荐