Загрузка данных SNMP-мониторинга в Oracle
Сегодня я расскажу о реальной задаче обработки данных SNMP-мониторинга, уделяя максимальное внимание техническим подробностям. Я постараюсь обосновать выбор подходов для ее решения и сравнить их производительность. Также, я уделю внимание тем техническим моментам, которые могут вызвать сложности у новичков.
Для чего все это?
Хочу сразу сказать, что меня совершенно не интересуют такие вопросы как:
- С какой максимальной скоростью можно загрузить данные в Oracle?
- Что быстрее Oracle или PostgreSQL?
- Насколько быстро можно осуществлять вставку в таблицу БД?
По большей части, эти вопросы, и подобные им, не имеют смысла (во всяком случае в отрыве от подробностей аппаратной конфигурации). Я совершенно сознательно не говорю ни слова о том, на каком «железе» работает мой сервер Oracle. На мой взгляд, это не важно. Но что же тогда важно?
Важно то, что есть реальная задача, например сбора данных SNMP-мониторинга, в процессе выполнения которой, постоянно генерируется большой объем данных, которые нужно обработать. При этом, существенны следующие моменты:
- Недостаточно просто вставить данные в таблицу (как именно должны быть обработаны данные и почему, я расскажу в следующем разделе)
- Данные генерируются не на сервере БД (скорее всего, будет несколько серверов сбора данных, передающих данные в единую БД)
- Данные поступают постоянно и также постоянно должны обрабатываться, желательно минимизировать время обработки данных (чтобы обеспечить минимальное время реакции на возникновение какой либо аварийной ситуации)
- Допускается потеря части данных (если произошла авария, мы обнаружим это на следующем цикле опроса, даже при потере части данных текущего)
- История изменения основных параметров должна сохраняться долговременно
Я рассматриваю различные варианты решения этой задачи и сравниваю их производительность. Разумеется, целью является поиск наиболее производительного решения.
Постановка задачи
Для начала, вспомним, какие именно данные мы получаем, используя SNMP? Фактически, мы можем получить значения некоторых предопределенных переменных, запросив интересующий нас OID, используя GET-запрос. Например, запросив OID = 1.3.6.1.2.1.1.3.0, мы можем получить значение такой важной для мониторинга величины как sysUpTime. Значения переменных, доступ к которым предоставляется по SNMP, не обязательно числовые. Это могут быть и строки.
Но SNMP не ограничивается доступом к набору скалярных переменных. Значения переменных могут быть сгруппированных в таблицы. В каждой строке таблицы группируются значения переменных, связанных с каким-то отдельным ресурсом. Для доступа к каждому значению, необходимо дополнить OID присвоенный столбцу таблицы некоторым идентификатором, определяющим строку описания ресурса. Этот идентификатор строки мы будем называть индексом ресурса.
В случае опроса списка интерфейсов (1.3.6.1.2.1.2), в роли идентификатора ресурса выступает целое число, но, для других таблиц, это может быть IP-адреc или что-то другое, определяемое спецификацией. Сложность заключается в том, что нам не известен индекс опрашиваемого ресурса заранее и мы не можем получить значение интересующей нас переменной используя GET-запрос.
Для чтения значений в таблицах, необходимо использовать GETNEXT-запросы, возвращающие OID и значение переменной, следующей в лексикографическом порядке за OID-ом, указанном в запросе. Так передавая OID-столбца, представляющий собой префикс OID-а интересующей нас переменной, мы получаем соответствующее значение из первой строки таблицы. Чтобы получить значение следующей строки, мы передаем OID, полученный в рамках ответа на первый запрос и так далее, пока таблица не будет просмотрена полностью.
В целях оптимизации производительности (путем уменьшения количества отсылаемых запросов), мы можем передавать OID-ы нескольких столбцов одним запросом. Кроме того, во 2 версии SNMP была добавлена возможность формирования BULK-запросов. Один BULK-запрос заменяет несколько выполненных друг за другом GETNEXT-запросов, что позволяет прочитать всю таблицу целиком за один запрос (при достаточной величине BULK). Обо всем этом я уже рассказывал ранее.
Повторяю я все это для того, чтобы было понятно — таблицы не постоянны! Описание (например где-то в БД) таблицы интерфейсов вручную, с назначением им OID-ов совершенно бессмысленно. Строки таблиц могут добавляться и удаляться при переконфигурации оборудования. Более того, индекс какого-то существующего интерфейса может измениться! Фактически, одной из задач системы SNMP-мониторинга является автоматизация отслеживания всех изменений просматриваемых таблиц.
Как это будет выглядеть в БД? Довольно просто:
Данные, полученные в процессе мониторинга, мы будем привязывать к ресурсам (ae_resource). Ресурсы, в свою очередь, будут связаны в двухуровневую иерархию. На верхнем уровне будет представлен ресурс устройства. С ним, по owner_id, будут связаны дочерние ресурсы, например интерфейсы (тот факт, что это именно интерфейсы, а не что-то другое, будет определяться значением type_id из справочника ae_resource_type). Значения device_id, у родительского и всех дочерних ресурсов, будут совпадать и указывать на описание оборудования.
Можно заметить, что у таблицы ae_resource имеются поля start_date и end_date. Поддерживать их в актуальном состоянии — наша задача. Мы должны создавать новые ресурсы, по мере необходимости, проставляя им дату начала действия в start_date и завершать действие устаревших ресурсов, устанавливая end_date. Для идентификации ресурсов, будет использоваться поле name (в случае интерфейсов, это значение атрибута ifDescr — 1.3.6.1.2.1.2.2.1.2). Индекс ресурса будем сохранять в поле res_num (в случае его изменения, ресурс со старым значением индекса должен быть закрыт, после чего, должен быть создан новый ресурс).
Необходимость поддержки списка интерфейсов в актуальном состоянии — главная причина, по которой данные придется обрабатывать (хотя обычная вставка полученных данных в таблицу заняла-бы гораздо меньше времени). Но уж если мы все равно обрабатываем данные, почему-бы не получить от этого максимальную пользу? В процессе мониторинга, мы получаем очень много данных, часть из которых не изменяется или изменяется незначительно. Мы можем уменьшить объем данных, сохраняемых в БД (что благотворно отразится как на ее объеме, так и на производительности), если будет сохранять только значимые изменения. Но как определить, какие изменения являются значимыми? В этом нам помогут политики:
Значение каждого полученного нами параметра будет связано с некоторым доменом (ae_domain). Регулярное выражение (regexp) поможет валидировать корректность значения. Перед сохранением в БД, значение может быть преобразовано к какому-то другому домену (например строки мы получаем в шестнадцатеричном представлении, которое было бы неплохо преобразовывать в более привычную форму). Правила преобразования будут определяться таблицей ae_domain_convert.
Какие изменения будут считаться значимыми? Это зависит от домена. По умолчанию, значимым будет считаться любое изменение значения (то есть, если значение не изменилось, запись в БД выполняться не будет). Для некоторых параметров имеет смысл задать особые правила. Например sysUpTime (после соответствующего преобразования) — монотонно возрастающая числовая величина. Уменьшение этого значения означает, что хост перезагрузился. Задание особой политики для этого домена позволит нам записывать в БД только события уменьшения значения (означающее перезагрузки), при этом, в БД будет записываться не полученное, а предыдущее значение (то есть максимальный достигнутый uptime).
В ae_threshold будем задавать пороги, пересечение которых (в заданном направлении) будет рассматриваться как значимое изменение. Дополнительно введем особый тип порога (delta), определяющий абсолютное значение разности между предыдущим и полученным значением. Задание такого порога может быть удобно, например, для счетчиков трафика, таких как ifInOctets (1.3.6.1.2.1.2.2.1.10).
Целиком, схема данных будет выглядеть следующим образом:
create sequence ae_platform_model_seq start with 100; create table ae_platform_model ( id number not null, name varchar2(30) not null, description varchar2(300) ); comment on table ae_platform_model is 'Модель оборудования'; create unique index ae_platform_model_pk on ae_platform_model(id); alter table ae_platform_model add constraint pk_ae_platform_model primary key(id); create sequence ae_device_seq cache 100; create table ae_device ( id number not null, model_id number not null, start_date date default sysdate not null, end_date date default null ); comment on table ae_device is 'Оборудование'; create unique index ae_device_pk on ae_device(id); create index ae_device_fk on ae_device(device_id); create index ae_device_model_fk on ae_device(model_id); create index ae_device_zone_fk on ae_device(zone_id); alter table ae_device add constraint pk_ae_device primary key(id); alter table ae_device add constraint fk_ae_device_model foreign key (model_id) references ae_platform_model(id); create sequence ae_resource_class_seq start with 100; create table ae_resource_class ( id number not null, owner_id number, is_logical number(1) not null, name varchar2(30) not null, description varchar2(300) ); comment on table ae_resource_class is 'Класс ресурса'; comment on column ae_resource_class.is_logical is 'Признак логического ресурса'; create unique index ae_resource_class_pk on ae_resource_class(id); create index ae_resource_class_fk on ae_resource_class(owner_id); alter table ae_resource_class add constraint ae_resource_class_ck check (is_logical in (0, 1)); alter table ae_resource_class add constraint pk_ae_resource_class primary key(id); create sequence ae_resource_type_seq start with 100; create table ae_resource_type ( id number not null, owner_id number, parent_id number, class_id number not null, name varchar2(30) not null, description varchar2(300) ); comment on table ae_resource_type is 'Тип ресурса'; create unique index ae_resource_type_pk on ae_resource_type(id); create index ae_resource_type_owner_fk on ae_resource_type(owner_id); create index ae_resource_type_parent_fk on ae_resource_type(parent_id); alter table ae_resource_type add constraint pk_ae_resource_type primary key(id); alter table ae_resource_type add constraint fk_ae_resource_type foreign key (class_id) references ae_resource_class(id); alter table ae_resource_type add constraint fk_ae_resource_type_owner foreign key (owner_id) references ae_resource_type(id); alter table ae_resource_type add constraint fk_ae_resource_type_parent foreign key (parent_id) references ae_resource_type(id); create sequence ae_resource_seq cache 100; create table ae_resource ( id number not null, device_id number not null, owner_id number default null, type_id number not null, name varchar2(1000) not null, res_num varchar2(300) not null, res_id number, tmp_id number, start_date date default sysdate not null, end_date date default null ); create unique index ae_resource_pk on ae_resource(id); create index ae_res_dev_fk on ae_resource(device_id); create index ae_res_dev_type_fk on ae_resource(type_id); create index ae_res_dev_res_fk on ae_resource(res_id); create index ae_res_dev_res_tmp_fk on ae_resource(tmp_id); alter table ae_resource add constraint pk_ae_resource primary key(id); alter table ae_resource add constraint fk_ae_res_device foreign key (device_id) references ae_device(id); alter table ae_resource add constraint fk_ae_res_dev_parent foreign key (owner_id) references ae_resource(id); alter table ae_resource add constraint fk_ae_res_dev_type foreign key (type_id) references ae_resource_type(id); create table ae_policy_type ( id number not null, name varchar2(30) not null, description varchar2(100) ); comment on table ae_policy_type is 'Список поддерживаемых платформ'; create unique index ae_policy_type_pk on ae_policy_type(id); create unique index ae_policy_type_uk on ae_policy_type(name); alter table ae_policy_type add constraint pk_ae_policy_type primary key(id); create table ae_state_policy ( id number not null, type_id number not null, name varchar2(30) not null, description varchar2(100) ); comment on table ae_state_policy is 'Список поддерживаемых платформ'; create unique index ae_state_policy_pk on ae_state_policy(id); create index ae_state_policy_fk on ae_state_policy(type_id); alter table ae_state_policy add constraint pk_ae_state_policy primary key(id); alter table ae_state_policy add constraint fk_ae_state_policy foreign key (type_id) references ae_policy_type(id); create table ae_threshold_type ( id number not null, name varchar2(30) not null, description varchar2(300) ); create unique index ae_threshold_type_pk on ae_threshold_type(id); alter table ae_threshold_type add constraint pk_ae_threshold_type primary key(id); create sequence ae_threshold_seq start with 100; create table ae_threshold ( id number not null, type_id number not null, policy_id number not null, value varchar2(100) not null ); create unique index ae_threshold_pk on ae_threshold(id); create index ae_threshold_direction_fk on ae_threshold(type_id); create index ae_threshold_profile_fk on ae_threshold(policy_id); alter table ae_threshold add constraint pk_ae_threshold primary key(id); alter table ae_threshold add constraint fk_ae_threshold_type foreign key (type_id) references ae_threshold_type(id); alter table ae_threshold add constraint fk_ae_threshold_policy foreign key (policy_id) references ae_state_policy(id); create sequence ae_domain_convert_seq start with 100; create table ae_domain ( id number not null, policy_id number default null, regexp varchar2(100), is_case_sens number(1) default 0 not null, description varchar2(100) ); create unique index ae_domain_pk on ae_domain(id); create index ae_domain_fk on ae_domain(policy_id); alter table ae_domain add constraint ae_domain_ck check (is_case_sens in (0, 1)); alter table ae_domain add constraint pk_ae_domain primary key(id); alter table ae_domain add constraint fk_ae_domain foreign key (policy_id) references ae_state_policy(id); create sequence ae_parameter_seq start with 1000; create table ae_parameter ( id number not null, domain_id number not null, parent_id number, name varchar2(30) not null, description varchar2(100) ); create unique index ae_parameter_pk on ae_parameter(id); create unique index ae_parameter_uk on ae_parameter(name); create index ae_parameter_domain_fk on ae_parameter(domain_id); create index ae_parameter_parent_fk on ae_parameter(parent_id); alter table ae_parameter add constraint pk_ae_parameter primary key(id); alter table ae_parameter add constraint fk_ae_parameter_domain foreign key (domain_id) references ae_domain(id); alter table ae_parameter add constraint fk_ae_parameter foreign key (parent_id) references ae_parameter(id); create sequence ae_state_seq cache 100; create table ae_state ( id number not null, res_id number not null, param_id number not null, value varchar2(300), datetime timestamp default current_timestamp not null ); comment on table ae_state is 'Состояние параметра'; comment on column ae_state.datetime is 'Дата и время последнего изменения'; create unique index ae_state_pk on ae_state(id); create index ae_state_res_fk on ae_state(res_id); create index ae_state_param_fk on ae_state(param_id); alter table ae_state add constraint pk_ae_state primary key(id); alter table ae_state add constraint fk_ae_state_res foreign key (res_id) references ae_resource(id); alter table ae_state add constraint fk_ae_state_param foreign key (param_id) references ae_parameter(id); create sequence ae_state_log_seq cache 100; create table ae_state_log ( id number not null, res_id number not null, param_id number not null, value varchar2(300), datetime timestamp default current_timestamp not null ) pctfree 0 partition by range (datetime) ( partition ae_state_log_p1 values less than (maxvalue) ); comment on table ae_state_log is 'Хронология изменения состояния параметра'; create unique index ae_state_log_pk on ae_state_log(datetime, id) local; alter table ae_state_log add constraint pk_ae_state_log primary key(datetime, id); create sequence ae_profile_type_seq; create table ae_profile_type ( id number not null, name varchar2(30) not null, description varchar2(100) ); create unique index ae_profile_type_pk on ae_profile_type(id); create unique index ae_profile_type_uk on ae_profile_type(name); alter table ae_profile_type add constraint pk_ae_profile_type primary key(id); create sequence ae_profile_seq; create table ae_profile ( id number not null, type_id number not null, is_default number(1) default 0 not null, model_id number not null, script_id number default null, name varchar2(30) not null, description varchar2(100) ); create unique index ae_profile_pk on ae_profile(id); create index ae_profile_type_fk on ae_profile(type_id); create index ae_profile_model_fk on ae_profile(model_id); create index ae_profile_script_fk on ae_profile(script_id); alter table ae_profile add constraint ae_profile_ck check (is_default in (0, 1)); alter table ae_profile add constraint pk_ae_profile primary key(id); alter table ae_profile add constraint fk_ae_profile_type foreign key (type_id) references ae_profile_type(id); create sequence ae_profile_detail_seq; create table ae_profile_detail ( id number not null, type_id number not null, profile_id number not null, model_id number not null, param_id number not null ); create unique index ae_profile_detail_pk on ae_profile_detail(id); create index ae_profile_detail_fk on ae_profile_detail(profile_id); create index ae_profile_detail_type_fk on ae_profile_detail(type_id); create index ae_profile_detail_model_fk on ae_profile_detail(model_id); create index ae_profile_detail_param_fk on ae_profile_detail(param_id); alter table ae_profile_detail add constraint pk_ae_profile_detail primary key(id); alter table ae_profile_detail add constraint fk_ae_profile_detail foreign key (profile_id) references ae_profile(id); alter table ae_profile_detail add constraint fk_ae_profile_detail_type foreign key (type_id) references ae_resource_type(id); alter table ae_profile_detail add constraint fk_ae_profile_detail_model foreign key (model_id) references ae_platform_model(id); create global temporary table ae_state_tmp ( id number not null, device_id number not null, profile_id number not null, param_id number not null, num varchar2(300), value varchar2(300), datetime timestamp default current_timestamp not null ) on commit delete rows; create index ae_state_tmp_ix on ae_state_tmp(device_id, profile_id, param_id, num);
Теперь осталось заполнить справочники данными:
Insert into AE_POLICY_TYPE (ID, NAME, DESCRIPTION) Values (1, 'default', NULL); Insert into AE_POLICY_TYPE (ID, NAME, DESCRIPTION) Values (2, 'uptime', NULL); Insert into AE_POLICY_TYPE (ID, NAME, DESCRIPTION) Values (3, 'threshold', NULL); COMMIT; Insert into AE_STATE_POLICY (ID, NAME, DESCRIPTION, TYPE_ID) Values (1, 'default', NULL, 1); Insert into AE_STATE_POLICY (ID, NAME, DESCRIPTION, TYPE_ID) Values (2, 'uptime', NULL, 2); COMMIT; Insert into AE_DOMAIN (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID) Values (10, '((\d+)\D*,\s*)?(\d+):(\d+):(\d+)(\.\d+)?', 0, 'SNMP uptime', 1); Insert into AE_DOMAIN (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID) Values (11, '\d+', 0, 'SNMP число', 1); Insert into AE_DOMAIN (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID) Values (12, '([a-fA-F\d])+', 0, 'SNMP строка', 1); Insert into AE_DOMAIN (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID) Values (13, '.*', 0, 'SNMP Произвольная строка', 1); Insert into AE_DOMAIN (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID) Values (14, '\d+', 0, 'SNMP uptime (числовая форма)', 2); COMMIT; Insert into AE_PARAMETER (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION) Values (101, 14, NULL, 'uptime', 'SNMP Uptime'); Insert into AE_PARAMETER (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION) Values (102, 11, NULL, 'ifIndex', 'Индекс интерфейса'); Insert into AE_PARAMETER (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION) Values (103, 13, NULL, 'ifName', 'Имя интерфейса'); Insert into AE_PARAMETER (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION) Values (104, 11, NULL, 'ifInOctets', 'Входящий трафик'); Insert into AE_PARAMETER (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION) Values (105, 11, NULL, 'ifOutOctets', 'Исходящий трафик'); COMMIT; Insert into AE_PLATFORM_MODEL (ID, NAME, DESCRIPTION) Values (1, 'test', NULL); COMMIT; Insert into AE_PROFILE_TYPE (ID, NAME, DESCRIPTION) Values (1, 'mon', 'Мониторинг'); COMMIT; Insert into AE_PROFILE (ID, TYPE_ID, IS_DEFAULT, MODEL_ID, SCRIPT_ID, NAME, DESCRIPTION) Values (1, 1, 1, 1, NULL, 'test', NULL); COMMIT; Insert into AE_RESOURCE_CLASS (ID, IS_LOGICAL, NAME, DESCRIPTION, OWNER_ID) Values (1, 0, 'Устройство', NULL, NULL); Insert into AE_RESOURCE_CLASS (ID, IS_LOGICAL, NAME, DESCRIPTION, OWNER_ID) Values (2, 0, 'Интерфейс', NULL, 1); COMMIT; Insert into AE_RESOURCE_TYPE (ID, CLASS_ID, NAME, DESCRIPTION, OWNER_ID, PARENT_ID) Values (1, 1, 'Host', NULL, NULL, NULL); Insert into AE_RESOURCE_TYPE (ID, CLASS_ID, NAME, DESCRIPTION, OWNER_ID, PARENT_ID) Values (2, 2, 'Interface', NULL, 1, NULL); COMMIT; Insert into AE_PROFILE_DETAIL (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID) Values (4, 2, 1, 1, 104); Insert into AE_PROFILE_DETAIL (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID) Values (5, 2, 1, 1, 105); Insert into AE_PROFILE_DETAIL (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID) Values (6, 1, 1, 1, 1); Insert into AE_PROFILE_DETAIL (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID) Values (1, 1, 1, 1, 101); Insert into AE_PROFILE_DETAIL (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID) Values (2, 2, 1, 1, 102); Insert into AE_PROFILE_DETAIL (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID) Values (3, 2, 1, 1, 103); COMMIT; Insert into AE_DEVICE (ID, MODEL_ID, START_DATE, END_DATE) Values (0, 1, TO_DATE('10/30/2013 15:37:16', 'MM/DD/YYYY HH24:MI:SS'), NULL); COMMIT; Insert into AE_RESOURCE (ID, DEVICE_ID, OWNER_ID, TYPE_ID, NAME, RES_NUM, RES_ID, START_DATE, END_DATE, TMP_ID) Values (1, 0, NULL, 1, '127.0.0.1', '0', NULL, TO_DATE('10/30/2013 15:24:44', 'MM/DD/YYYY HH24:MI:SS'), NULL, NULL); COMMIT; Insert into AE_THRESHOLD_TYPE (ID, NAME, DESCRIPTION) Values (1, 'increase', 'Увеличение'); Insert into AE_THRESHOLD_TYPE (ID, NAME, DESCRIPTION) Values (2, 'decrease', 'Уменьшение'); Insert into AE_THRESHOLD_TYPE (ID, NAME, DESCRIPTION) Values (3, 'delta', 'Приращение'); COMMIT; Insert into AE_THRESHOLD (ID, TYPE_ID, POLICY_ID, VALUE) Values (1, 3, 1, '100'); COMMIT;
И подготовить заготовку кода тестирования:
package com.acme.ae.tests.jdbc; import oracle.jdbc.driver.OracleCallableStatement; import oracle.sql.*; import java.sql.CallableStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class Test { private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver"; private final static String USER_CONN = "jdbc:oracle:thin:@192.168.124.5:1523:new11"; private final static String USER_NAME = "ais"; private final static String USER_PASS = "ais"; private final static boolean AUTO_COMMIT_MODE = false; private final static int BULK_SIZE = 100; private final static int ALL_SIZE = 1000; private final static String TRACE_ON_SQL = "ALTER SESSION SET EVENTS '10046 trace name context forever, level 12'"; private final static Long DEVICE_ID = 0L; private final static Long PROFILE_ID = 1L; private final static Long UPTIME_PARAM_ID = 101L; private final static Long IFNAME_PARAM_ID = 103L; private final static Long INOCT_PARAM_ID = 104L; private final static String FAKE_NUM_VALUE = "0"; private Connection c = null; private void start() throws ClassNotFoundException, SQLException { Class.forName(CLASS_NAME); c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS); c.setAutoCommit(AUTO_COMMIT_MODE); CallableStatement st = c.prepareCall(TRACE_ON_SQL); try { st.execute(); } finally { st.close(); } } private void stop() throws SQLException { if (c != null) { c.close(); } } public static void main(String[] args) { Test t = new Test(); try { try { t.start(); t.test_plsql(); // Здесь будем вызывать тестовый код } finally { t.stop(); } } catch (Exception e) { System.out.println(e.toString()); } } }
Для детального анализа производительности, будем использовать трассировку event 10046 на сервере, с последующей обработкой трейсов утилитой tkprof.
Самый медленный способ (plsql)
Начнем тестирование с наиболее очевидной обработки по одной записи. Помимо собственно оценки производительности, написание этого кода поможет нам лучше разобраться с тем как именно будут обрабатываться данные.
CREATE OR REPLACE package AIS.ae_monitoring as procedure addValue( p_device in number , p_profile in number , p_param in number , p_num in varchar2 , p_val in varchar2 ); end ae_monitoring; / CREATE OR REPLACE package body AIS.ae_monitoring as g_ifName_parameter constant number default 103; g_default_policy constant number default 1; g_uptime_policy constant number default 2; g_threshold_policy constant number default 3; g_increase_type constant number default 1; g_decrease_type constant number default 2; g_delta_type constant number default 3; procedure addValue( p_device in number , p_profile in number , p_param in number , p_num in varchar2 , p_val in varchar2 ) as cursor c_res(p_type number) is select r.id, r.name from ae_resource r where r.device_id = p_device and r.res_num = p_num and r.type_id = p_type and r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1); cursor c_state(p_resid number) is select s.value from ae_state s where s.res_id = p_resid and s.param_id = p_param; l_resid ae_resource.id%type default null; l_resname ae_resource.name%type default null; l_oldval ae_state.value%type default null; l_restype ae_profile_detail.type_id%type default null; l_owntype ae_resource_type.owner_id%type default null; l_owner ae_resource.id%type default null; l_policy ae_state_policy.type_id%type default null; l_polid ae_state_policy.id%type default null; l_count number default 0; begin -- Получить тип ресурса select d.type_id, r.owner_id into l_restype, l_owntype from ae_profile_detail d inner join ae_resource_type r on (r.id = d.type_id) where d.profile_id = p_profile and d.param_id = p_param; -- Получить ID владельца if not l_owntype is null then select r.id into l_owner from ae_resource r where r.device_id = p_device and r.type_id = l_owntype; end if; -- Обработать имя интерфейса if p_param = g_ifName_parameter then open c_res(l_restype); fetch c_res into l_resid, l_resname; if c_res%notfound or l_resname <> p_val then -- Закрыть старый ресурс интерфейса update ae_resource set end_date = sysdate where id = l_resid; -- Создать новый ресурс интерфейса insert into ae_resource(id, device_id, owner_id, type_id, res_num, name) values (ae_resource_seq.nextval, p_device, l_owner, l_restype, p_num, p_val); end if; close c_res; return; end if; -- Получить ID ресурса open c_res(l_restype); fetch c_res into l_resid, l_resname; if c_res%notfound then -- Если ресурс не найден, создать новый ресурс интерфейса insert into ae_resource(id, device_id, owner_id, type_id, res_num, name) values (ae_resource_seq.nextval, p_device, l_owner, l_restype, p_num, p_val) returning id into l_resid; end if; -- Получить старое значение параметра open c_state(l_resid); fetch c_state into l_oldval; if c_state%notfound then l_oldval := null; end if; close c_state; -- Получить политику сохранения значений select l.type_id, l.id into l_policy, l_polid from ae_parameter p inner join ae_domain d on (d.id = p.domain_id) inner join ae_state_policy l on (l.id = d.policy_id) where p.id = p_param; -- Получить количество пересеченных порогов select count(*) into l_count from ae_threshold t where t.policy_id = l_polid and (( t.type_id = g_increase_type and l_oldval <= t.value and p_val >= t.value ) or ( t.type_id = g_decrease_type and l_oldval >= t.value and p_val <= t.value ) or ( t.type_id = g_delta_type and abs(p_val - l_oldval) >= t.value )); -- Сохранить запись в ae_state_log в соответствии с политикой if l_oldval is null or l_count > 0 or ( l_policy = g_uptime_policy and p_val < l_oldval) or ( l_policy = g_default_policy and p_val <> l_oldval) then insert into ae_state_log(id, res_id, param_id, value) values (ae_state_log_seq.nextval, l_resid, p_param, decode(l_policy, g_uptime_policy, nvl(l_oldval, p_val), p_val)); end if; -- Обновить ae_state update ae_state set value = p_val , datetime = current_timestamp where res_id = l_resid and param_id = p_param; if sql%rowcount = 0 then insert into ae_state(id, param_id, res_id, value) values (ae_state_seq.nextval, p_param, l_resid, p_val); end if; close c_res; exception when others then if c_res%isopen then close c_res; end if; if c_state%isopen then close c_state; end if; raise; end; end ae_monitoring; /
private final static String ADD_VAL_SQL = "begin ae_monitoring.addValue(?,?,?,?,?); end;"; private void test_plsql() throws SQLException { System.out.println("test_plsql:"); CallableStatement st = c.prepareCall(ADD_VAL_SQL); Long timestamp = System.currentTimeMillis(); Long uptime = 0L; Long inoct = 0L; try { for (int i = 1; i <= ALL_SIZE; i++) { // Передать uptime st.setLong(1, DEVICE_ID); st.setLong(2, PROFILE_ID); st.setLong(3, UPTIME_PARAM_ID); st.setString(4, FAKE_NUM_VALUE); st.setString(5, uptime.toString()); st.execute(); // Передать имя интерфейса st.setLong(1, DEVICE_ID); st.setLong(2, PROFILE_ID); st.setLong(3, IFNAME_PARAM_ID); st.setString(4, Integer.toString((i % 100) + 1)); st.setString(5, Integer.toString((i % 100) + 1)); st.execute(); // Передать счетчик трафика st.setLong(1, DEVICE_ID); st.setLong(2, PROFILE_ID); st.setLong(3, INOCT_PARAM_ID); st.setString(4, Integer.toString((i % 100) + 1)); st.setString(5, inoct.toString()); st.execute(); // Увеличить счетчики uptime += 100L; if (uptime >= 1000) { uptime = 0L; } inoct += 10L; } } finally { st.close(); } Long delta_1 = System.currentTimeMillis() - timestamp; System.out.println((ALL_SIZE * 1000L) / delta_1); timestamp = System.currentTimeMillis(); c.commit(); Long delta_2 = System.currentTimeMillis() - timestamp; System.out.println(delta_2); System.out.println((ALL_SIZE * 1000L) / (delta_1 - delta_2)); }
Очевидно, что для корректной поддержки этим кодом списка интерфейсов, необходимо, чтобы имя интерфейса передавалось в БД до остальных атрибутов ресурса.
Результаты тестирования вполне предсказуемы:
OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 1 0.00 0.00 0 0 0 0 Execute 3000 4.23 4.13 7 102942 6615 3000 Fetch 0 0.00 0.00 0 0 0 0 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 3001 4.23 4.13 7 102942 6615 3000 Misses in library cache during parse: 1 Misses in library cache during execute: 1 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited ---------------------------------------- Waited ---------- ------------ SQL*Net message to client 3002 0.00 0.00 SQL*Net message from client 3002 5.92 7.12 latch: library cache 4 0.00 0.00 log file sync 1 0.00 0.00 OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 69 0.00 0.00 0 0 0 0 Execute 17261 2.42 2.36 7 9042 6615 3160 Fetch 14000 0.38 0.37 0 93900 0 13899 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 31330 2.81 2.74 7 102942 6615 17059 Misses in library cache during parse: 10 Misses in library cache during execute: 10 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited ---------------------------------------- Waited ---------- ------------ db file sequential read 7 0.00 0.00
Мы выполняем очень большое количество запросов и тратим много времени на сетевое взаимодействие.
Используем массовую обработку (temporary)
Наиболее радикальный способ борьбы с накладными расходами — переход к массовой обработке. Мы можем предварительно сохранить набор данных для обработки в какой-то таблице, а затем обработать данные не по одной записи, а все сразу. Конечно, для промежуточного хранения данных, можно использовать обычные таблицы, но использование для этих целей GTT более выгодно, вследствие снижения накладных расходов на журналирование.
Для вставки данных во временную таблицу мы будем использовать DML, а не вызовы хранимых процедур, что позволит нам использовать JDBC batch, для снижения накладных расходов на сетевое взаимодействие.
При использовании этого подхода, не требуется, чтобы имя интерфейса обрабатывалось до остальных его параметров. Достаточно того, чтобы имена всех обрабатываемых интерфейсов присутствовали в обрабатываемом наборе данных.
CREATE OR REPLACE package AIS.ae_monitoring as procedure saveValues; end ae_monitoring; / CREATE OR REPLACE package body AIS.ae_monitoring as g_ifName_parameter constant number default 103; g_default_policy constant number default 1; g_uptime_policy constant number default 2; g_threshold_policy constant number default 3; g_increase_type constant number default 1; g_decrease_type constant number default 2; g_delta_type constant number default 3; procedure saveValues as begin -- Создать ресурс, если он отсутствует merge into ae_resource d using ( select t.id, t.device_id, t.num, t.value name, p.type_id, o.id owner_id from ae_state_tmp t inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource_type r on (r.id = p.type_id) left join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id) where t.param_id = g_ifName_parameter ) s on ( d.device_id = s.device_id and d.res_num = s.num and d.type_id = s.type_id and d.start_date <= sysdate and sysdate <= nvl(d.end_date, sysdate + 1) ) when matched then update set d.tmp_id = s.id where d.name <> s.name when not matched then insert (id, device_id, owner_id, type_id, res_num, name) values (ae_resource_seq.nextval, s.device_id, s.owner_id, s.type_id, s.num, s.name); -- Добавить недостающие ae_resource insert into ae_resource(id, device_id, owner_id, type_id, res_num, name) select ae_resource_seq.nextval, t.device_id, o.id, p.type_id, t.num, t.value from ae_state_tmp t inner join ae_resource c on (c.tmp_id = t.id) inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource_type r on (r.id = p.type_id) left join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id); -- Закрыть устаревшие интерфейсы update ae_resource set end_date = sysdate , tmp_id = null where tmp_id > 0; -- Сохранить записи в ae_state_log insert into ae_state_log(id, res_id, param_id, value) select ae_state_log_seq.nextval, id, param_id, value from ( select distinct r.id, t.param_id, decode(l.type_id, g_uptime_policy, nvl(s.value, t.value), t.value) value from ae_state_tmp t inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1)) left join ae_state s on (s.res_id = r.id and s.param_id = t.param_id) inner join ae_parameter a on (a.id = p.param_id) inner join ae_domain d on (d.id = a.domain_id) inner join ae_state_policy l on (l.id = d.policy_id) left join ae_threshold h on ( h.policy_id = l.id and (( h.type_id = g_increase_type and s.value <= h.value and t.value >= h.value ) or ( h.type_id = g_decrease_type and s.value >= h.value and t.value <= h.value ) or ( h.type_id = g_delta_type and abs(t.value - s.value) >= h.value ))) where ( s.id is null or not h.id is null or ( l.type_id = g_uptime_policy and t.value < s.value ) or ( l.type_id = g_default_policy and t.value <> s.value ) ) and t.param_id <> g_ifName_parameter ); -- Обновить ae_state merge into ae_state d using ( select t.param_id, t.value, r.id res_id from ae_state_tmp t inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1)) where t.param_id <> g_ifName_parameter ) s on (d.res_id = s.res_id and d.param_id = s.param_id) when matched then update set d.value = s.value , d.datetime = current_timestamp when not matched then insert (id, param_id, res_id, value) values (ae_state_seq.nextval, s.param_id, s.res_id, s.value); -- Сохранить изменения commit write nowait; end; end ae_monitoring; /
private final static int BULK_SIZE = 200; private final static String INS_VAL_SQL = "insert into ae_state_tmp(id, device_id, profile_id, param_id, num, value) values (?,?,?,?,?,?)"; private final static String SAVE_VALUES_SQL = "begin ae_monitoring.saveValues; end;"; private void test_temporary() throws SQLException { System.out.println("test_temporary:"); CallableStatement st = c.prepareCall(INS_VAL_SQL); Long timestamp = System.currentTimeMillis(); Long uptime = 0L; Long inoct = 0L; Long ix = 1L; int bulk = BULK_SIZE; try { for (int i = 1; i <= ALL_SIZE; i++) { // Передать uptime st.setLong(1, ix++); st.setLong(2, DEVICE_ID); st.setLong(3, PROFILE_ID); st.setLong(4, UPTIME_PARAM_ID); st.setString(5, FAKE_NUM_VALUE); st.setString(6, uptime.toString()); st.addBatch(); // Передать имя интерфейса st.setLong(1, ix++); st.setLong(2, DEVICE_ID); st.setLong(3, PROFILE_ID); st.setLong(4, IFNAME_PARAM_ID); st.setString(5, Integer.toString((i % 100) + 1)); st.setString(6, Integer.toString((i % 100) + 1)); st.addBatch(); // Передать счетчик трафика st.setLong(1, ix++); st.setLong(2, DEVICE_ID); st.setLong(3, PROFILE_ID); st.setLong(4, INOCT_PARAM_ID); st.setString(5, Integer.toString((i % 100) + 1)); st.setString(6, inoct.toString()); st.addBatch(); if (--bulk <= 0) { st.executeBatch(); bulk = BULK_SIZE; } // Увеличить счетчики uptime += 100L; if (uptime >= 1000) { uptime = 0L; } inoct += 10L; } if (bulk < BULK_SIZE) { st.executeBatch(); } } finally { st.close(); } Long delta_1 = System.currentTimeMillis() - timestamp; System.out.println((ALL_SIZE * 1000L) / delta_1); timestamp = System.currentTimeMillis(); st = c.prepareCall(SAVE_VALUES_SQL); timestamp = System.currentTimeMillis(); try { st.execute(); } finally { st.close(); } Long delta_2 = System.currentTimeMillis() - timestamp; System.out.println(delta_2); System.out.println((ALL_SIZE * 1000L) / (delta_1 - delta_2)); }
Запускаем этот код на выполнение и получаем:
java.sql.SQLException: ORA-30926: невозможно получить устойчивый набор строк в исходных таблицах ORA-06512: на "AIS.AE_MONITORING", line 205 ORA-06512: на line 1
Если подумать, причина этой ошибки становится понятна. Если набор обрабатываемых данных содержит противоречивые данные для какой-то переменной (мы успели прочитать ее несколько раз), возникает проблема. При нормальном функционировании SNMP-мониторинга, такой ситуации возникать не должно, но мы должны предусмотреть что-то, чтобы не допустить падения приложения, если она все-таки возникнет.
Первое, что приходит в голову — хранение агрегированных данных по каждой переменной. Мы будем добавлять новую запись если ее еще нет или обновлять существующую, записывай в нее новое значение:
private final static int BULK_SIZE = 200; private final static String MERGE_VAL_SQL = "merge into ae_state_tmp d " + "using ( select ? id,? device_id,? profile_id,? param_id,? num,? value " + " from dual" + " ) s " + "on ( d.device_id = s.device_id and d.profile_id = s.profile_id and " + " d.param_id = s.param_id and d.num = s.num ) " + "when matched then " + " update set d.value = s.value " + "when not matched then " + " insert (id, device_id, profile_id, param_id, num, value) " + " values (s.id, s.device_id, s.profile_id, s.param_id, s.num, s.value)"; private final static String SAVE_VALUES_SQL = "begin ae_monitoring.saveValues; end;"; private void test_temporary() throws SQLException { System.out.println("test_temporary:"); CallableStatement st = c.prepareCall(MERGE_VAL_SQL); ...
Теперь мы имеем дело с той-же проблемой, но на стадии выполнения batch-запроса:
java.sql.BatchUpdateException: ORA-00600: код внутренней ошибки, аргументы: [6704], [2], [0], [6301696], [], [], [], [], [], [], [], []
Приходится отказаться от batch:
private final static int BULK_SIZE = 1;
Результаты заметно улучшились:
OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 2 0.00 0.00 0 0 0 0 Execute 1001 1.02 1.01 0 9002 3503 3001 Fetch 0 0.00 0.00 0 0 0 0 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 1003 1.02 1.01 0 9002 3503 3001 Misses in library cache during parse: 1 Misses in library cache during execute: 1 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited ---------------------------------------- Waited ---------- ------------ SQL*Net message to client 1002 0.00 0.00 SQL*Net message from client 1002 0.00 0.41 log file sync 1 0.00 0.00 OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 23 0.01 0.01 0 1 0 0 Execute 23 0.21 0.21 43 29392 348 111 Fetch 11 0.00 0.00 0 27 0 10 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 57 0.22 0.23 43 29420 348 121 Misses in library cache during parse: 8 Misses in library cache during execute: 6 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited ---------------------------------------- Waited ---------- ------------ db file sequential read 41 0.01 0.01 db file scattered read 1 0.00 0.00
Альтернативный подход (distinct)
Мы можем упростить себе жизнь на этапе вставки данных (заодно избавившись от возможности ORA-600), добавив группировку на этапе массовой обработки. В этом случае, мы можем не скромничать с BULK_SIZE, выставив его по максимуму.
CREATE OR REPLACE package AIS.ae_monitoring as procedure saveValuesDistinct; end ae_monitoring; / CREATE OR REPLACE package body AIS.ae_monitoring as g_ifName_parameter constant number default 103; g_default_policy constant number default 1; g_uptime_policy constant number default 2; g_threshold_policy constant number default 3; g_increase_type constant number default 1; g_decrease_type constant number default 2; g_delta_type constant number default 3; procedure saveValuesDistinct as begin -- Создать ресурс, если он отсутствует merge into ae_resource d using ( select t.id, t.device_id, t.num, t.value name, p.type_id, o.id owner_id from ( select device_id, profile_id, param_id, num , max(id) keep (dense_rank last order by datetime) id , max(value) keep (dense_rank last order by datetime) value , max(datetime) datetime from ae_state_tmp group by device_id, profile_id, param_id, num ) t inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource_type r on (r.id = p.type_id) left join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id) where t.param_id = g_ifName_parameter ) s on ( d.device_id = s.device_id and d.res_num = s.num and d.type_id = s.type_id and d.start_date <= sysdate and sysdate <= nvl(d.end_date, sysdate + 1) ) when matched then update set d.tmp_id = s.id where d.name <> s.name when not matched then insert (id, device_id, owner_id, type_id, res_num, name) values (ae_resource_seq.nextval, s.device_id, s.owner_id, s.type_id, s.num, s.name); -- Добавить недостающие ae_resource insert into ae_resource(id, device_id, owner_id, type_id, res_num, name) select ae_resource_seq.nextval, t.device_id, o.id, p.type_id, t.num, t.value from ( select device_id, profile_id, param_id, num , max(id) keep (dense_rank last order by datetime) id , max(value) keep (dense_rank last order by datetime) value , max(datetime) datetime from ae_state_tmp group by device_id, profile_id, param_id, num ) t inner join ae_resource c on (c.tmp_id = t.id) inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource_type r on (r.id = p.type_id) left join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id); -- Закрыть устаревшие интерфейсы update ae_resource set end_date = sysdate , tmp_id = null where tmp_id > 0; -- Сохранить записи в ae_state_log insert into ae_state_log(id, res_id, param_id, value) select ae_state_log_seq.nextval, id, param_id, value from ( select distinct r.id, t.param_id, decode(l.type_id, g_uptime_policy, nvl(s.value, t.value), t.value) value from ( select device_id, profile_id, param_id, num , max(id) keep (dense_rank last order by datetime) id , max(value) keep (dense_rank last order by datetime) value , max(datetime) datetime from ae_state_tmp group by device_id, profile_id, param_id, num ) t inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1)) left join ae_state s on (s.res_id = r.id and s.param_id = t.param_id) inner join ae_parameter a on (a.id = p.param_id) inner join ae_domain d on (d.id = a.domain_id) inner join ae_state_policy l on (l.id = d.policy_id) left join ae_threshold h on ( h.policy_id = l.id and (( h.type_id = g_increase_type and s.value <= h.value and t.value >= h.value ) or ( h.type_id = g_decrease_type and s.value >= h.value and t.value <= h.value ) or ( h.type_id = g_delta_type and abs(t.value - s.value) >= h.value ))) where ( s.id is null or not h.id is null or ( l.type_id = g_uptime_policy and t.value < s.value ) or ( l.type_id = g_default_policy and t.value <> s.value ) ) and t.param_id <> g_ifName_parameter ); -- Обновить ae_state merge into ae_state d using ( select t.param_id, t.value, r.id res_id from ( select device_id, profile_id, param_id, num , max(id) keep (dense_rank last order by datetime) id , max(value) keep (dense_rank last order by datetime) value , max(datetime) datetime from ae_state_tmp group by device_id, profile_id, param_id, num ) t inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1)) where t.param_id <> g_ifName_parameter ) s on (d.res_id = s.res_id and d.param_id = s.param_id) when matched then update set d.value = s.value , d.datetime = current_timestamp when not matched then insert (id, param_id, res_id, value) values (ae_state_seq.nextval, s.param_id, s.res_id, s.value); -- Сохранить изменения commit write nowait; end; end ae_monitoring; /
private final static int BULK_SIZE = 200; private final static String INS_VAL_SQL = "insert into ae_state_tmp(id, device_id, profile_id, param_id, num, value) values (?,?,?,?,?,?)"; private final static String SAVE_VALUES_DISTINCT_SQL = "begin ae_monitoring.saveValuesDistinct; end;"; private void test_temporary_distinct() throws SQLException { System.out.println("test_temporary:"); CallableStatement st = c.prepareCall(INS_VAL_SQL); Long timestamp = System.currentTimeMillis(); Long uptime = 0L; Long inoct = 0L; Long ix = 1L; int bulk = BULK_SIZE; try { for (int i = 1; i <= ALL_SIZE; i++) { // Передать uptime st.setLong(1, ix++); st.setLong(2, DEVICE_ID); st.setLong(3, PROFILE_ID); st.setLong(4, UPTIME_PARAM_ID); st.setString(5, FAKE_NUM_VALUE); st.setString(6, uptime.toString()); st.addBatch(); // Передать имя интерфейса st.setLong(1, ix++); st.setLong(2, DEVICE_ID); st.setLong(3, PROFILE_ID); st.setLong(4, IFNAME_PARAM_ID); st.setString(5, Integer.toString((i % 100) + 1)); st.setString(6, Integer.toString((i % 100) + 1)); st.addBatch(); // Передать счетчик трафика st.setLong(1, ix++); st.setLong(2, DEVICE_ID); st.setLong(3, PROFILE_ID); st.setLong(4, INOCT_PARAM_ID); st.setString(5, Integer.toString((i % 100) + 1)); st.setString(6, inoct.toString()); st.addBatch(); if (--bulk <= 0) { st.executeBatch(); bulk = BULK_SIZE; } // Увеличить счетчики uptime += 100L; if (uptime >= 1000) { uptime = 0L; } inoct += 10L; } if (bulk < BULK_SIZE) { st.executeBatch(); } } finally { st.close(); } Long delta_1 = System.currentTimeMillis() - timestamp; System.out.println((ALL_SIZE * 1000L) / delta_1); timestamp = System.currentTimeMillis(); st = c.prepareCall(SAVE_VALUES_DISTINCT_SQL); timestamp = System.currentTimeMillis(); try { st.execute(); } finally { st.close(); } Long delta_2 = System.currentTimeMillis() - timestamp; System.out.println(delta_2); System.out.println((ALL_SIZE * 1000L) / (delta_1 - delta_2)); }
Запускаем и смотрим результаты:
OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 2 0.00 0.00 0 0 0 0 Execute 1001 0.36 0.33 0 96 6616 3001 Fetch 0 0.00 0.00 0 0 0 0 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 1003 0.36 0.33 0 96 6616 3001 Misses in library cache during parse: 2 Misses in library cache during execute: 1 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited ---------------------------------------- Waited ---------- ------------ SQL*Net message to client 1002 0.00 0.00 SQL*Net message from client 1002 0.00 0.41 log file sync 1 0.00 0.00 OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 30 0.01 0.01 0 3 0 0 Execute 30 0.41 0.40 3 48932 1104 218 Fetch 8 0.00 0.00 0 176 0 8 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 68 0.44 0.43 3 49111 1104 226 Misses in library cache during parse: 8 Misses in library cache during execute: 7 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited ---------------------------------------- Waited ---------- ------------ db file sequential read 3 0.00 0.00
Как и ожидалось, издержки на вставку данных уменьшились, но усложнилась последующая их обработка.
Используем коллекции (collection)
Как справедливо заметил DenKrep, даже при самых минимальных издержках на вставку данных, подход с использованием GTT сильно напоминает процесс переливания из пустого в порожнее. Каким еще образом мы можем снизить издержки на сетевое взаимодействие? Мы не можем использовать JDBC batch для вызова PL/SQL-кода (вернее можем, но это не принесет никакой выгоды), но мы можем передавать массивы!
Сам процесс передачи массивов из Java-кода хорошо описан в этом руководстве. Для передачи данных, нам потребуется определить следующие типы:
create or replace type ae_state_rec as object ( device_id number, profile_id number, param_id number, num varchar2(300), value varchar2(300) ) / create or replace type ae_state_tab as table of ae_state_rec; /
Поскольку у меня установлен Oracle Client 11g, мне пришлось заменить [ORACLE_HOME]/jdbc/lib/classes12.zip на [ORACLE_HOME]/jdbc/lib/ojdbc5.jar. Также, хочу подчеркнуть, что [ORACLE_HOME]/jdbc/lib/nls_charset12.zip должна быть доступна при запуске приложения, в противном случае, будет невозможно передавать строковые значения (ошибок возникать не будет, но передаваться будет NULL).
Как мы будем обрабатывать массив на сервере? Для начала, просто попробуем вызывать написанную ранее addValue в цикле:
... procedure addValues( p_tab in ae_state_tab ) as begin for i in 1 .. p_tab.count loop addValue( p_device => p_tab(i).device_id , p_profile => p_tab(i).profile_id , p_param => p_tab(i).param_id , p_num => p_tab(i).num , p_val => p_tab(i).value ); end loop; commit write nowait; end; ...
Java-код заметно усложнился:
private final static String ADD_VALUES_SQL = "begin ae_monitoring.addValues(?); end;"; private void test_collection() throws SQLException { System.out.println("test_collection:"); OracleCallableStatement st = (OracleCallableStatement)c.prepareCall(ADD_VALUES_SQL); int oracleId = CharacterSet.CL8MSWIN1251_CHARSET; CharacterSet charSet = CharacterSet.make(oracleId); Long timestamp = System.currentTimeMillis(); Long uptime = 0L; Long inoct = 0L; RecType r[] = new RecType[ALL_SIZE * 3]; int ix = 0; for (int i = 1; i <= ALL_SIZE; i++) { // Передать uptime r[ix++] = new RecType( new NUMBER(DEVICE_ID), new NUMBER(PROFILE_ID), new NUMBER(UPTIME_PARAM_ID), new CHAR(FAKE_NUM_VALUE, charSet), new CHAR(uptime.toString(), charSet)); // Передать имя интерфейса r[ix++] = new RecType( new NUMBER(DEVICE_ID), new NUMBER(PROFILE_ID), new NUMBER(IFNAME_PARAM_ID), new CHAR(Integer.toString((i % 100) + 1), charSet), new CHAR(Integer.toString((i % 100) + 1), charSet)); // Передать счетчик трафика r[ix++] = new RecType( new NUMBER(DEVICE_ID), new NUMBER(PROFILE_ID), new NUMBER(INOCT_PARAM_ID), new CHAR(Integer.toString((i % 100) + 1), charSet), new CHAR(inoct.toString(), charSet)); // Увеличить счетчики uptime += 100L; if (uptime >= 1000) { uptime = 0L; } inoct += 10L; } RecTab t = new RecTab(r); try { st.setORAData(1, t); st.execute(); } finally { st.close(); } System.out.println((ALL_SIZE * 1000L) / (System.currentTimeMillis() - timestamp)); }
В трейсе появляются разные забавные операторы, связанные с передачей с клиента массивов:
SELECT INSTANTIABLE, supertype_owner, supertype_name, LOCAL_ATTRIBUTES FROM all_types WHERE type_name = :1 AND owner = :2
Результат закономерен:
OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 4 0.00 0.00 0 0 0 0 Execute 4 4.35 4.31 5 136053 6610 3 Fetch 1 0.00 0.00 0 9 0 1 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 9 4.35 4.31 5 136062 6610 4 Misses in library cache during parse: 2 Misses in library cache during execute: 2 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited ---------------------------------------- Waited ---------- ------------ SQL*Net message to client 6 0.00 0.00 SQL*Net message from client 6 0.23 0.34 SQL*Net more data from client 41 0.00 0.00 OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 77 0.00 0.00 0 0 0 0 Execute 17270 2.97 2.92 5 6046 6610 3160 Fetch 14013 0.49 0.49 1 129930 0 13909 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 31360 3.48 3.43 6 135976 6610 17069 Misses in library cache during parse: 8 Misses in library cache during execute: 11 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited ---------------------------------------- Waited ---------- ------------ db file sequential read 6 0.00 0.00
Улучшились показатели, связанные с передачей данных с клиента, но несколько усложнилась обработка данных, по сравнению с вариантом plsql.
Используем коллекции по умному (bulk)
Очевидно, что мы используем потенциал массивов, передаваемых с клиента не в полной мере. Было бы неплохо передавать массив непосредственно в SQL-запросы, чтобы задействовать массовую обработку, но как это сделать? Мы не можем использовать BULK COLLECT, поскольку наши запросы слишком сложные для этого. К счастью, мы можем обернуть коллекцию в TABLE:
CREATE OR REPLACE package AIS.ae_monitoring as procedure saveValues( p_tab in ae_state_tab ); end ae_monitoring; / CREATE OR REPLACE package body AIS.ae_monitoring as g_ifName_parameter constant number default 103; g_default_policy constant number default 1; g_uptime_policy constant number default 2; g_threshold_policy constant number default 3; g_increase_type constant number default 1; g_decrease_type constant number default 2; g_delta_type constant number default 3; procedure saveValues( p_tab in ae_state_tab ) as begin -- Создать ресурс, если он отсутствует merge into ae_resource d using ( select t.device_id, t.num, t.value name, p.type_id, o.id owner_id from ( select device_id, profile_id, param_id, num , max(value) value from table( p_tab ) group by device_id, profile_id, param_id, num ) t inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource_type r on (r.id = p.type_id) left join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id) where t.param_id = g_ifName_parameter ) s on ( d.device_id = s.device_id and d.res_num = s.num and d.type_id = s.type_id and d.start_date <= sysdate and sysdate <= nvl(d.end_date, sysdate + 1) ) when not matched then insert (id, device_id, owner_id, type_id, res_num, name) values (ae_resource_seq.nextval, s.device_id, s.owner_id, s.type_id, s.num, s.name); -- Сохранить записи в ae_state_log insert into ae_state_log(id, res_id, param_id, value) select ae_state_log_seq.nextval, id, param_id, value from ( select distinct r.id, t.param_id, decode(l.type_id, g_uptime_policy, nvl(s.value, t.value), t.value) value from ( select device_id, profile_id, param_id, num , max(value) value from table( p_tab ) group by device_id, profile_id, param_id, num ) t inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1)) left join ae_state s on (s.res_id = r.id and s.param_id = t.param_id) inner join ae_parameter a on (a.id = p.param_id) inner join ae_domain d on (d.id = a.domain_id) inner join ae_state_policy l on (l.id = d.policy_id) left join ae_threshold h on ( h.policy_id = l.id and (( h.type_id = g_increase_type and s.value <= h.value and t.value >= h.value ) or ( h.type_id = g_decrease_type and s.value >= h.value and t.value <= h.value ) or ( h.type_id = g_delta_type and abs(t.value - s.value) >= h.value ))) where ( s.id is null or not h.id is null or ( l.type_id = g_uptime_policy and t.value < s.value ) or ( l.type_id = g_default_policy and t.value <> s.value ) ) and t.param_id <> g_ifName_parameter ); -- Обновить ae_state merge into ae_state d using ( select t.param_id, t.value, r.id res_id from ( select device_id, profile_id, param_id, num , max(value) value from table( p_tab ) group by device_id, profile_id, param_id, num ) t inner join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id) inner join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1)) where t.param_id <> g_ifName_parameter ) s on (d.res_id = s.res_id and d.param_id = s.param_id) when matched then update set d.value = s.value , d.datetime = current_timestamp when not matched then insert (id, param_id, res_id, value) values (ae_state_seq.nextval, s.param_id, s.res_id, s.value); -- Сохранить изменения commit write nowait; end; end ae_monitoring; /
private final static String BULK_VALUES_SQL = "begin ae_monitoring.saveValues(?); end;"; private void test_bulk() throws SQLException { System.out.println("test_bulk:"); OracleCallableStatement st = (OracleCallableStatement)c.prepareCall(BULK_VALUES_SQL); int oracleId = CharacterSet.CL8MSWIN1251_CHARSET; CharacterSet charSet = CharacterSet.make(oracleId); Long timestamp = System.currentTimeMillis(); Long uptime = 0L; Long inoct = 0L; RecType r[] = new RecType[ALL_SIZE * 3]; int ix = 0; for (int i = 1; i <= ALL_SIZE; i++) { // Передать uptime r[ix++] = new RecType( new NUMBER(DEVICE_ID), new NUMBER(PROFILE_ID), new NUMBER(UPTIME_PARAM_ID), new CHAR(FAKE_NUM_VALUE, charSet), new CHAR(uptime.toString(), charSet)); // Передать имя интерфейса r[ix++] = new RecType( new NUMBER(DEVICE_ID), new NUMBER(PROFILE_ID), new NUMBER(IFNAME_PARAM_ID), new CHAR(Integer.toString((i % 100) + 1), charSet), new CHAR(Integer.toString((i % 100) + 1), charSet)); // Передать счетчик трафика r[ix++] = new RecType( new NUMBER(DEVICE_ID), new NUMBER(PROFILE_ID), new NUMBER(INOCT_PARAM_ID), new CHAR(Integer.toString((i % 100) + 1), charSet), new CHAR(inoct.toString(), charSet)); // Увеличить счетчики uptime += 100L; if (uptime >= 1000) { uptime = 0L; } inoct += 10L; } RecTab t = new RecTab(r); try { st.setORAData(1, t); st.execute(); } finally { st.close(); } System.out.println((ALL_SIZE * 1000L) / (System.currentTimeMillis() - timestamp)); }
Результат говорит сам за себя:
OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 4 0.00 0.00 0 0 0 0 Execute 4 0.20 0.20 4 696 1095 3 Fetch 1 0.00 0.00 0 9 0 1 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 9 0.20 0.20 4 705 1095 4 Misses in library cache during parse: 0 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited ---------------------------------------- Waited ---------- ------------ SQL*Net message to client 6 0.00 0.00 SQL*Net message from client 6 0.10 0.19 SQL*Net more data from client 41 0.00 0.00 OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS call count cpu elapsed disk query current rows ------- ------ -------- ---------- ---------- ---------- ---------- ---------- Parse 30 0.00 0.00 0 0 0 0 Execute 38 0.18 0.17 4 591 1095 217 Fetch 46 0.00 0.00 0 96 0 30 ------- ------ -------- ---------- ---------- ---------- ---------- ---------- total 114 0.18 0.18 4 687 1095 247 Misses in library cache during parse: 7 Misses in library cache during execute: 7 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited ---------------------------------------- Waited ---------- ------------ db file sequential read 4 0.00 0.00
Выводы
Как показывают результаты тестирования, непосредственная передача массивов в Oracle из клиентского кода (bulk) позволяет добиться наилучшей производительности. Варианты с использованием GTT (temporary, distinct) не сильно ему уступают в плане производительности, но значительно проще, с точки зрения Java-кода. Вариант temporary, помимо того, дает возможность пронаблюдать ORA-600, при использовании batch и неудачном расположении звезд.
Какой именно подход использовать для обработки данных — решать вам. Результаты тестирования выложены на GitHub.