Java实时获取oracle变更
程序员文章站
2024-03-07 12:47:39
...
http://www.iteye.com/topic/267893
在一个基于数据库的“实时系统”里面,“实时”获取数据库变化是非常重要的,本文主要描述通过Oracle中的捕获进程实时获取数据库变化。
背景:
要做一个车辆GPS监控系统,主要分两块:
1.采集。由GPS厂商提供实时数据,通过UDP包接收
2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况
备选方案:
1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。
2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。
方案评估:
方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。
方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。
最终方案:
最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。
关于捕获进程,请参考《Streams概述》,《Streams捕获进程》
实现:
SQL代码
创建表空间和用户:
Sql代码
1.Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;
2.
3.--修改目标表(要捕获变更的表)追加日志
4.ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;
5.
6.
7.create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
8.
9.
10.grant connect, resource, select_catalog_role to strmadmin;
授予相应权限
Sql代码
1.grant execute on dbms_aqadm to strmadmin;
2.
3.grant execute on dbms_capture_adm to strmadmin;
4.
5.grant execute on dbms_propagation_adm to strmadmin;
6.
7.grant execute on dbms_streams_adm to strmadmin;
8.
9.grant execute on dbms_apply_adm to strmadmin;
10.
11.grant execute on dbms_flashback to strmadmin;
12.
13.grant execute on dbms_aq to strmadmin;
14.
15.grant execute on dbms_aqjms to strmadmin;
16.
17.grant execute on dbms_aqin to strmadmin;
18.
19.grant execute on dbms_aqjms_internal to strmadmin;
20.
执行系统存储过程分配权限
Sql代码
1.BEGIN
2.DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
3.privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
4.grantee => 'strmadmin',
5.grant_option => FALSE);
6.END;
7./
8.
9.
10.
11.BEGIN
12.DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
13.privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
14.grantee => 'strmadmin',
15.grant_option => FALSE);
16.END;
17./
18.
以strmadmin帐户登录oracle
创建AQ,类型为JMS消息
Sql代码
1.BEGIN
2. DBMS_AQADM.CREATE_QUEUE_TABLE(
3. Queue_table => 'gpsstatus_queue_table',
4. Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
5. multiple_consumers => false,
6. compatible => '8.1.5');
7. DBMS_AQADM.CREATE_QUEUE(
8. Queue_name => 'gpsstatus_queue',
9. Queue_table => 'gpsstatus_queue_table');
10. DBMS_AQADM.START_QUEUE(
11. queue_name => 'gpsstatus_queue');
12.END;
13./
14.BEGIN
15.DBMS_STREAMS_ADM.SET_UP_QUEUE(
16. queue_table => 'gps_temp_queue_table',
17. queue_name => 'gps_temp_queue');
18.END;
19./
为目标表创建捕获进程
Sql代码
1.BEGIN
2.DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3.table_name => 'myoracle.TEST_GPS_STATUS',
4.streams_type => 'capture',
5.streams_name => 'capture_gps',
6.queue_name => 'gps_temp_queue',
7.include_dml => true,
8.include_ddl => false);
9.END;
10./
初始化scn
Sql代码
1.DECLARE
2.iscn NUMBER; -- Variable to hold instantiation SCN value
3.BEGIN
4.iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
5.DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
6.source_object_name => 'myoracle.TEST_GPS_STATUS',
7.source_database_name => 'TESTdb',
8.instantiation_scn => iscn);
9.END;
10./
11.
为消息队列创建代理
Sql代码
1.BEGIN
2.DBMS_AQADM.CREATE_AQ_AGENT(
3.agent_name => 'gpsstatus_agent');
4.DBMS_AQADM.ENABLE_DB_ACCESS(
5.agent_name => 'gpsstatus_agent',
6.db_username => 'strmadmin');
7.END;
8./
9.DECLARE
10.subscriber SYS.AQ$_AGENT;
11.BEGIN
12.subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
13.SYS.DBMS_AQADM.ADD_SUBSCRIBER(
14.queue_name => 'strmadmin.gpsstatus_queue',
15.subscriber => subscriber,
16.rule => NULL,
17.transformation => NULL);
18.END;
19./
创建存储过程以决定将哪些信息放到消息队列里面
Sql代码
1.CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
2.--agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
3.message sys.aq$_jms_message;
4.enqueue_options dbms_aq.enqueue_options_t;
5.message_properties dbms_aq.message_properties_t;
6.msgid raw(16);
7.lcr SYS.LCR$_ROW_RECORD;
8.rc PLS_INTEGER;
9.DEVICEID varchar2(11);
10.GATHERDATETIME date;
11.LONGITUDETYPE char(1);
12.LONGITUDEVALUE number ;
13.LATITUDETYPE char(1);
14.LATITUDEVALUE number ;
15.SPEED number ;
16.DIRECTION number ;
17.BEGIN
18.rc := in_any.GETOBJECT(lcr);
19.DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
20.GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
21.LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
22.LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
23.LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
24.LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
25.SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
26.DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
27.message := sys.aq$_jms_message.construct(1);
28.--message.set_replyto(agent);
29.message.set_type('');
30.message.set_userid('strmadmin');
31.message.set_appid('');
32.message.set_groupid('');
33.message.set_groupseq('');
34.message.set_string_property('DEVICEID', DEVICEID);
35.message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
36.message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
37.message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
38.message.set_string_property('LATITUDETYPE', LATITUDETYPE);
39.message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
40.message.set_string_property('SPEED', to_char(SPEED) );
41.message.set_string_property('DIRECTION', to_char(DIRECTION) );
42.--指定消息生存时间
43.message_properties.expiration:=60;
44.dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
45.enqueue_options => enqueue_options,
46.message_properties => message_properties,
47.payload => message,
48.msgid => msgid);
49.COMMIT;
50.END;
51./
为目标表配置处理器
Sql代码
1.BEGIN
2.DBMS_APPLY_ADM.SET_DML_HANDLER(
3.object_name => 'myoracle.TEST_GPS_STATUS',
4.object_type => 'TABLE',
5.operation_name => 'UPDATE', --可配置为insert,update,delete等
6.error_handler => false,
7.user_procedure => 'strmadmin.enq_gps_lcr',
8.apply_database_link => NULL);
9.END;
10./
设定参数及启动捕获进程
Sql代码
1.BEGIN
2.DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3.table_name => 'myoracle.TEST_GPS_STATUS',
4.streams_type => 'apply',
5.streams_name => 'apply_gps',
6.queue_name => 'strmadmin.gps_temp_queue',
7.include_dml => true,
8.include_ddl => false,
9.source_database => 'TESTdb');
10.END;
11./
12.BEGIN
13.DBMS_APPLY_ADM.SET_PARAMETER(
14.apply_name => 'apply_gps',
15.parameter => 'disable_on_error',
16.value => 'n');
17.END;
18./
19.BEGIN
20.DBMS_APPLY_ADM.START_APPLY(
21.apply_name => 'apply_gps');
22.END;
23./
24.BEGIN
25.DBMS_CAPTURE_ADM.START_CAPTURE(
26.capture_name => 'capture_gps');
27.END;
28./
至此,捕获进程配置完毕
可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。
下面是java处理代码,可直接使用JMS接口
本例使用oracle提供的API
Java代码
1.QueueConnectionFactory queueConnectionFactory = null;
2.QueueConnection queueConnection = null;
3.QueueSession queueSession = null;
4.
5.Queue queue = null;
6.QueueReceiver subscriber = null;
7.Message message = null;
Java代码
1.log.info("开始连接 ");
2.queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
3.queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
4.log.info("创建Queue Connection 成功");
5.queueConnection.start();
6.log.info("connection started");
7.queueSession = queueConnection.createQueueSession(false,
8.Session.AUTO_ACKNOWLEDGE);
9..info("Queue session created");
10.queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
11.log.info("Queue getted");
12.subscriber = queueSession.createReceiver(queue);
13.log.info("初始化完成");
开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。
Java代码
1.while (true) {
2. message = subscriber.receive();//receive方法使没有新消息时,线程挂起
3. //do something...
4.}
5.
最后:
本文只是试图探求一种比较好的获取实时数据方法,并不适用于所有场合,但在处理实时告警,订单等方面,应该是有一定的用武之地,若结合comet等技术,完全可以实现真正的实时。
在一个基于数据库的“实时系统”里面,“实时”获取数据库变化是非常重要的,本文主要描述通过Oracle中的捕获进程实时获取数据库变化。
背景:
要做一个车辆GPS监控系统,主要分两块:
1.采集。由GPS厂商提供实时数据,通过UDP包接收
2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况
备选方案:
1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。
2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。
方案评估:
方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。
方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。
最终方案:
最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。
关于捕获进程,请参考《Streams概述》,《Streams捕获进程》
实现:
SQL代码
创建表空间和用户:
Sql代码
1.Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;
2.
3.--修改目标表(要捕获变更的表)追加日志
4.ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;
5.
6.
7.create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
8.
9.
10.grant connect, resource, select_catalog_role to strmadmin;
授予相应权限
Sql代码
1.grant execute on dbms_aqadm to strmadmin;
2.
3.grant execute on dbms_capture_adm to strmadmin;
4.
5.grant execute on dbms_propagation_adm to strmadmin;
6.
7.grant execute on dbms_streams_adm to strmadmin;
8.
9.grant execute on dbms_apply_adm to strmadmin;
10.
11.grant execute on dbms_flashback to strmadmin;
12.
13.grant execute on dbms_aq to strmadmin;
14.
15.grant execute on dbms_aqjms to strmadmin;
16.
17.grant execute on dbms_aqin to strmadmin;
18.
19.grant execute on dbms_aqjms_internal to strmadmin;
20.
执行系统存储过程分配权限
Sql代码
1.BEGIN
2.DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
3.privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
4.grantee => 'strmadmin',
5.grant_option => FALSE);
6.END;
7./
8.
9.
10.
11.BEGIN
12.DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
13.privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
14.grantee => 'strmadmin',
15.grant_option => FALSE);
16.END;
17./
18.
以strmadmin帐户登录oracle
创建AQ,类型为JMS消息
Sql代码
1.BEGIN
2. DBMS_AQADM.CREATE_QUEUE_TABLE(
3. Queue_table => 'gpsstatus_queue_table',
4. Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
5. multiple_consumers => false,
6. compatible => '8.1.5');
7. DBMS_AQADM.CREATE_QUEUE(
8. Queue_name => 'gpsstatus_queue',
9. Queue_table => 'gpsstatus_queue_table');
10. DBMS_AQADM.START_QUEUE(
11. queue_name => 'gpsstatus_queue');
12.END;
13./
14.BEGIN
15.DBMS_STREAMS_ADM.SET_UP_QUEUE(
16. queue_table => 'gps_temp_queue_table',
17. queue_name => 'gps_temp_queue');
18.END;
19./
为目标表创建捕获进程
Sql代码
1.BEGIN
2.DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3.table_name => 'myoracle.TEST_GPS_STATUS',
4.streams_type => 'capture',
5.streams_name => 'capture_gps',
6.queue_name => 'gps_temp_queue',
7.include_dml => true,
8.include_ddl => false);
9.END;
10./
初始化scn
Sql代码
1.DECLARE
2.iscn NUMBER; -- Variable to hold instantiation SCN value
3.BEGIN
4.iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
5.DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
6.source_object_name => 'myoracle.TEST_GPS_STATUS',
7.source_database_name => 'TESTdb',
8.instantiation_scn => iscn);
9.END;
10./
11.
为消息队列创建代理
Sql代码
1.BEGIN
2.DBMS_AQADM.CREATE_AQ_AGENT(
3.agent_name => 'gpsstatus_agent');
4.DBMS_AQADM.ENABLE_DB_ACCESS(
5.agent_name => 'gpsstatus_agent',
6.db_username => 'strmadmin');
7.END;
8./
9.DECLARE
10.subscriber SYS.AQ$_AGENT;
11.BEGIN
12.subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
13.SYS.DBMS_AQADM.ADD_SUBSCRIBER(
14.queue_name => 'strmadmin.gpsstatus_queue',
15.subscriber => subscriber,
16.rule => NULL,
17.transformation => NULL);
18.END;
19./
创建存储过程以决定将哪些信息放到消息队列里面
Sql代码
1.CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
2.--agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
3.message sys.aq$_jms_message;
4.enqueue_options dbms_aq.enqueue_options_t;
5.message_properties dbms_aq.message_properties_t;
6.msgid raw(16);
7.lcr SYS.LCR$_ROW_RECORD;
8.rc PLS_INTEGER;
9.DEVICEID varchar2(11);
10.GATHERDATETIME date;
11.LONGITUDETYPE char(1);
12.LONGITUDEVALUE number ;
13.LATITUDETYPE char(1);
14.LATITUDEVALUE number ;
15.SPEED number ;
16.DIRECTION number ;
17.BEGIN
18.rc := in_any.GETOBJECT(lcr);
19.DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
20.GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
21.LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
22.LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
23.LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
24.LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
25.SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
26.DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
27.message := sys.aq$_jms_message.construct(1);
28.--message.set_replyto(agent);
29.message.set_type('');
30.message.set_userid('strmadmin');
31.message.set_appid('');
32.message.set_groupid('');
33.message.set_groupseq('');
34.message.set_string_property('DEVICEID', DEVICEID);
35.message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
36.message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
37.message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
38.message.set_string_property('LATITUDETYPE', LATITUDETYPE);
39.message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
40.message.set_string_property('SPEED', to_char(SPEED) );
41.message.set_string_property('DIRECTION', to_char(DIRECTION) );
42.--指定消息生存时间
43.message_properties.expiration:=60;
44.dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
45.enqueue_options => enqueue_options,
46.message_properties => message_properties,
47.payload => message,
48.msgid => msgid);
49.COMMIT;
50.END;
51./
为目标表配置处理器
Sql代码
1.BEGIN
2.DBMS_APPLY_ADM.SET_DML_HANDLER(
3.object_name => 'myoracle.TEST_GPS_STATUS',
4.object_type => 'TABLE',
5.operation_name => 'UPDATE', --可配置为insert,update,delete等
6.error_handler => false,
7.user_procedure => 'strmadmin.enq_gps_lcr',
8.apply_database_link => NULL);
9.END;
10./
设定参数及启动捕获进程
Sql代码
1.BEGIN
2.DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3.table_name => 'myoracle.TEST_GPS_STATUS',
4.streams_type => 'apply',
5.streams_name => 'apply_gps',
6.queue_name => 'strmadmin.gps_temp_queue',
7.include_dml => true,
8.include_ddl => false,
9.source_database => 'TESTdb');
10.END;
11./
12.BEGIN
13.DBMS_APPLY_ADM.SET_PARAMETER(
14.apply_name => 'apply_gps',
15.parameter => 'disable_on_error',
16.value => 'n');
17.END;
18./
19.BEGIN
20.DBMS_APPLY_ADM.START_APPLY(
21.apply_name => 'apply_gps');
22.END;
23./
24.BEGIN
25.DBMS_CAPTURE_ADM.START_CAPTURE(
26.capture_name => 'capture_gps');
27.END;
28./
至此,捕获进程配置完毕
可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。
下面是java处理代码,可直接使用JMS接口
本例使用oracle提供的API
Java代码
1.QueueConnectionFactory queueConnectionFactory = null;
2.QueueConnection queueConnection = null;
3.QueueSession queueSession = null;
4.
5.Queue queue = null;
6.QueueReceiver subscriber = null;
7.Message message = null;
Java代码
1.log.info("开始连接 ");
2.queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
3.queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
4.log.info("创建Queue Connection 成功");
5.queueConnection.start();
6.log.info("connection started");
7.queueSession = queueConnection.createQueueSession(false,
8.Session.AUTO_ACKNOWLEDGE);
9..info("Queue session created");
10.queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
11.log.info("Queue getted");
12.subscriber = queueSession.createReceiver(queue);
13.log.info("初始化完成");
开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。
Java代码
1.while (true) {
2. message = subscriber.receive();//receive方法使没有新消息时,线程挂起
3. //do something...
4.}
5.
最后:
本文只是试图探求一种比较好的获取实时数据方法,并不适用于所有场合,但在处理实时告警,订单等方面,应该是有一定的用武之地,若结合comet等技术,完全可以实现真正的实时。