Загрузка данных SNMP-мониторинга в Oracle

14 ноября 2013 г.

Сегодня я расскажу о реальной задаче обработки данных SNMP-мониторинга, уделяя максимальное внимание техническим подробностям. Я постараюсь обосновать выбор подходов для ее решения и сравнить их производительность. Также, я уделю внимание тем техническим моментам, которые могут вызвать сложности у новичков.

Для чего все это?

Хочу сразу сказать, что меня совершенно не интересуют такие вопросы как:

  1. С какой максимальной скоростью можно загрузить данные в Oracle?
  2. Что быстрее Oracle или PostgreSQL?
  3. Насколько быстро можно осуществлять вставку в таблицу БД?

Патентов, однако в Google

По большей части, эти вопросы, и подобные им, не имеют смысла (во всяком случае в отрыве от подробностей аппаратной конфигурации). Я совершенно сознательно не говорю ни слова о том, на каком «железе» работает мой сервер Oracle. На мой взгляд, это не важно. Но что же тогда важно?

Важно то, что есть реальная задача, например сбора данных SNMP-мониторинга, в процессе выполнения которой, постоянно генерируется большой объем данных, которые нужно обработать. При этом, существенны следующие моменты:

  1. Недостаточно просто вставить данные в таблицу (как именно должны быть обработаны данные и почему, я расскажу в следующем разделе)
  2. Данные генерируются не на сервере БД (скорее всего, будет несколько серверов сбора данных, передающих данные в единую БД)
  3. Данные поступают постоянно и также постоянно должны обрабатываться, желательно минимизировать время обработки данных (чтобы обеспечить минимальное время реакции на возникновение какой либо аварийной ситуации)
  4. Допускается потеря части данных (если произошла авария, мы обнаружим это на следующем цикле опроса, даже при потере части данных текущего)
  5. История изменения основных параметров должна сохраняться долговременно

Я рассматриваю различные варианты решения этой задачи и сравниваю их производительность. Разумеется, целью является поиск наиболее производительного решения.

Постановка задачи

Для начала, вспомним, какие именно данные мы получаем, используя SNMP? Фактически, мы можем получить значения некоторых предопределенных переменных, запросив интересующий нас OID, используя GET-запрос. Например, запросив OID = 1.3.6.1.2.1.1.3.0, мы можем получить значение такой важной для мониторинга величины как sysUpTime. Значения переменных, доступ к которым предоставляется по SNMP, не обязательно числовые. Это могут быть и строки.

Но SNMP не ограничивается доступом к набору скалярных переменных. Значения переменных могут быть сгруппированных в таблицы. В каждой строке таблицы группируются значения переменных, связанных с каким-то отдельным ресурсом. Для доступа к каждому значению, необходимо дополнить OID присвоенный столбцу таблицы некоторым идентификатором, определяющим строку описания ресурса. Этот идентификатор строки мы будем называть индексом ресурса.

В случае опроса списка интерфейсов (1.3.6.1.2.1.2), в роли идентификатора ресурса выступает целое число, но, для других таблиц, это может быть IP-адреc или что-то другое, определяемое спецификацией. Сложность заключается в том, что нам не известен индекс опрашиваемого ресурса заранее и мы не можем получить значение интересующей нас переменной используя GET-запрос.

Для чтения значений в таблицах, необходимо использовать GETNEXT-запросы, возвращающие OID и значение переменной, следующей в лексикографическом порядке за OID-ом, указанном в запросе. Так передавая OID-столбца, представляющий собой префикс OID-а интересующей нас переменной, мы получаем соответствующее значение из первой строки таблицы. Чтобы получить значение следующей строки, мы передаем OID, полученный в рамках ответа на первый запрос и так далее, пока таблица не будет просмотрена полностью.

В целях оптимизации производительности (путем уменьшения количества отсылаемых запросов), мы можем передавать OID-ы нескольких столбцов одним запросом. Кроме того, во 2 версии SNMP была добавлена возможность формирования BULK-запросов. Один BULK-запрос заменяет несколько выполненных друг за другом GETNEXT-запросов, что позволяет прочитать всю таблицу целиком за один запрос (при достаточной величине BULK). Обо всем этом я уже рассказывал ранее.

Повторяю я все это для того, чтобы было понятно — таблицы не постоянны! Описание (например где-то в БД) таблицы интерфейсов вручную, с назначением им OID-ов совершенно бессмысленно. Строки таблиц могут добавляться и удаляться при переконфигурации оборудования. Более того, индекс какого-то существующего интерфейса может измениться! Фактически, одной из задач системы SNMP-мониторинга является автоматизация отслеживания всех изменений просматриваемых таблиц.

Как это будет выглядеть в БД? Довольно просто:

Таблица 1

Данные, полученные в процессе мониторинга, мы будем привязывать к ресурсам (ae_resource). Ресурсы, в свою очередь, будут связаны в двухуровневую иерархию. На верхнем уровне будет представлен ресурс устройства. С ним, по owner_id, будут связаны дочерние ресурсы, например интерфейсы (тот факт, что это именно интерфейсы, а не что-то другое, будет определяться значением type_id из справочника ae_resource_type). Значения device_id, у родительского и всех дочерних ресурсов, будут совпадать и указывать на описание оборудования.

Можно заметить, что у таблицы ae_resource имеются поля start_date и end_date. Поддерживать их в актуальном состоянии — наша задача. Мы должны создавать новые ресурсы, по мере необходимости, проставляя им дату начала действия в start_date и завершать действие устаревших ресурсов, устанавливая end_date. Для идентификации ресурсов, будет использоваться поле name (в случае интерфейсов, это значение атрибута ifDescr — 1.3.6.1.2.1.2.2.1.2). Индекс ресурса будем сохранять в поле res_num (в случае его изменения, ресурс со старым значением индекса должен быть закрыт, после чего, должен быть создан новый ресурс).

Необходимость поддержки списка интерфейсов в актуальном состоянии — главная причина, по которой данные придется обрабатывать (хотя обычная вставка полученных данных в таблицу заняла-бы гораздо меньше времени). Но уж если мы все равно обрабатываем данные, почему-бы не получить от этого максимальную пользу? В процессе мониторинга, мы получаем очень много данных, часть из которых не изменяется или изменяется незначительно. Мы можем уменьшить объем данных, сохраняемых в БД (что благотворно отразится как на ее объеме, так и на производительности), если будет сохранять только значимые изменения. Но как определить, какие изменения являются значимыми? В этом нам помогут политики:

Таблица 2

Значение каждого полученного нами параметра будет связано с некоторым доменом (ae_domain). Регулярное выражение (regexp) поможет валидировать корректность значения. Перед сохранением в БД, значение может быть преобразовано к какому-то другому домену (например строки мы получаем в шестнадцатеричном представлении, которое было бы неплохо преобразовывать в более привычную форму). Правила преобразования будут определяться таблицей ae_domain_convert.

Какие изменения будут считаться значимыми? Это зависит от домена. По умолчанию, значимым будет считаться любое изменение значения (то есть, если значение не изменилось, запись в БД выполняться не будет). Для некоторых параметров имеет смысл задать особые правила. Например sysUpTime (после соответствующего преобразования) — монотонно возрастающая числовая величина. Уменьшение этого значения означает, что хост перезагрузился. Задание особой политики для этого домена позволит нам записывать в БД только события уменьшения значения (означающее перезагрузки), при этом, в БД будет записываться не полученное, а предыдущее значение (то есть максимальный достигнутый uptime).

В ae_threshold будем задавать пороги, пересечение которых (в заданном направлении) будет рассматриваться как значимое изменение. Дополнительно введем особый тип порога (delta), определяющий абсолютное значение разности между предыдущим и полученным значением. Задание такого порога может быть удобно, например, для счетчиков трафика, таких как ifInOctets (1.3.6.1.2.1.2.2.1.10).

Целиком, схема данных будет выглядеть следующим образом:

Таблица 3

create sequence ae_platform_model_seq start with 100;

create table ae_platform_model (
  id             number                              not null,
  name           varchar2(30)                        not null,
  description    varchar2(300)
);

comment on table ae_platform_model is 'Модель оборудования';

create unique index ae_platform_model_pk on ae_platform_model(id);

alter table ae_platform_model add
  constraint pk_ae_platform_model primary key(id);

create sequence ae_device_seq cache 100;

create table ae_device (
  id             number                              not null,
  model_id       number                              not null,
  start_date     date        default sysdate         not null,
  end_date       date        default null
);

comment on table ae_device is 'Оборудование';

create unique index ae_device_pk on ae_device(id);

create index ae_device_fk on ae_device(device_id);

create index ae_device_model_fk on ae_device(model_id);

create index ae_device_zone_fk on ae_device(zone_id);

alter table ae_device add
  constraint pk_ae_device primary key(id);

alter table ae_device add
  constraint fk_ae_device_model foreign key (model_id)
    references ae_platform_model(id);

create sequence ae_resource_class_seq start with 100;

create table ae_resource_class (
  id             number                              not null,
  owner_id       number,
  is_logical     number(1)                           not null,
  name           varchar2(30)                        not null,
  description    varchar2(300)
);

comment on table ae_resource_class is 'Класс ресурса';

comment on column ae_resource_class.is_logical is 'Признак логического ресурса';

create unique index ae_resource_class_pk on ae_resource_class(id);

create index ae_resource_class_fk on ae_resource_class(owner_id);

alter table ae_resource_class
  add constraint ae_resource_class_ck check (is_logical in (0, 1));

alter table ae_resource_class add
  constraint pk_ae_resource_class primary key(id);

create sequence ae_resource_type_seq start with 100;

create table ae_resource_type (
  id             number                              not null,
  owner_id       number,
  parent_id      number,
  class_id       number                              not null,
  name           varchar2(30)                        not null,
  description    varchar2(300)
);

comment on table ae_resource_type is 'Тип ресурса';

create unique index ae_resource_type_pk on ae_resource_type(id);

create index ae_resource_type_owner_fk on ae_resource_type(owner_id);

create index ae_resource_type_parent_fk on ae_resource_type(parent_id);

alter table ae_resource_type add
  constraint pk_ae_resource_type primary key(id);

alter table ae_resource_type add
  constraint fk_ae_resource_type foreign key (class_id)
    references ae_resource_class(id);

alter table ae_resource_type add
  constraint fk_ae_resource_type_owner foreign key (owner_id)
    references ae_resource_type(id);

alter table ae_resource_type add
  constraint fk_ae_resource_type_parent foreign key (parent_id)
    references ae_resource_type(id);

create sequence ae_resource_seq cache 100;

create table ae_resource (
  id             number                              not null,
  device_id      number                              not null,
  owner_id       number           default null,
  type_id        number                              not null,
  name           varchar2(1000)                      not null,
  res_num        varchar2(300)                       not null,
  res_id         number,
  tmp_id         number,
  start_date     date        default sysdate         not null,
  end_date       date        default null
);

create unique index ae_resource_pk on ae_resource(id);

create index ae_res_dev_fk on ae_resource(device_id);

create index ae_res_dev_type_fk on ae_resource(type_id);

create index ae_res_dev_res_fk on ae_resource(res_id);

create index ae_res_dev_res_tmp_fk on ae_resource(tmp_id);

alter table ae_resource add
  constraint pk_ae_resource primary key(id);

alter table ae_resource add
  constraint fk_ae_res_device foreign key (device_id)
    references ae_device(id);

alter table ae_resource add
  constraint fk_ae_res_dev_parent foreign key (owner_id)
    references ae_resource(id);

alter table ae_resource add
  constraint fk_ae_res_dev_type foreign key (type_id)
    references ae_resource_type(id);

create table ae_policy_type (
  id                 number                           not null,
  name               varchar2(30)                     not null,
  description        varchar2(100)
);

comment on table ae_policy_type is 'Список поддерживаемых платформ';

create unique index ae_policy_type_pk on ae_policy_type(id);

create unique index ae_policy_type_uk on ae_policy_type(name);

alter table ae_policy_type add
  constraint pk_ae_policy_type primary key(id);

create table ae_state_policy (
  id                 number                           not null,
  type_id            number                           not null,
  name               varchar2(30)                     not null,
  description        varchar2(100)
);

comment on table ae_state_policy is 'Список поддерживаемых платформ';

create unique index ae_state_policy_pk on ae_state_policy(id);

create index ae_state_policy_fk on ae_state_policy(type_id);

alter table ae_state_policy add
  constraint pk_ae_state_policy primary key(id);

alter table ae_state_policy add
  constraint fk_ae_state_policy foreign key (type_id)
    references ae_policy_type(id);

create table ae_threshold_type (
  id             number                              not null,
  name           varchar2(30)                        not null,
  description    varchar2(300)
);

create unique index ae_threshold_type_pk on ae_threshold_type(id);

alter table ae_threshold_type add
  constraint pk_ae_threshold_type primary key(id);

create sequence ae_threshold_seq start with 100;

create table ae_threshold (
  id             number                              not null,
  type_id        number                              not null,
  policy_id      number                              not null,
  value          varchar2(100)                       not null
);

create unique index ae_threshold_pk on ae_threshold(id);

create index ae_threshold_direction_fk on ae_threshold(type_id);

create index ae_threshold_profile_fk on ae_threshold(policy_id);

alter table ae_threshold add
  constraint pk_ae_threshold primary key(id);

alter table ae_threshold add
  constraint fk_ae_threshold_type foreign key (type_id)
    references ae_threshold_type(id);

alter table ae_threshold add
  constraint fk_ae_threshold_policy foreign key (policy_id)
    references ae_state_policy(id);

create sequence ae_domain_convert_seq start with 100;

create table ae_domain (
  id             number                              not null,
  policy_id      number            default null,
  regexp         varchar2(100),
  is_case_sens   number(1)         default 0         not null,
  description    varchar2(100)
);

create unique index ae_domain_pk on ae_domain(id);

create index ae_domain_fk on ae_domain(policy_id);

alter table ae_domain
  add constraint ae_domain_ck check (is_case_sens in (0, 1));

alter table ae_domain add
  constraint pk_ae_domain primary key(id);

alter table ae_domain add
  constraint fk_ae_domain foreign key (policy_id)
    references ae_state_policy(id);

create sequence ae_parameter_seq start with 1000;

create table ae_parameter (
  id             number                              not null,
  domain_id      number                              not null,
  parent_id      number,
  name           varchar2(30)                        not null,
  description    varchar2(100)
);

create unique index ae_parameter_pk on ae_parameter(id);

create unique index ae_parameter_uk on ae_parameter(name);

create index ae_parameter_domain_fk on ae_parameter(domain_id);

create index ae_parameter_parent_fk on ae_parameter(parent_id);

alter table ae_parameter add
  constraint pk_ae_parameter primary key(id);

alter table ae_parameter add
  constraint fk_ae_parameter_domain foreign key (domain_id)
    references ae_domain(id);

alter table ae_parameter add
  constraint fk_ae_parameter foreign key (parent_id)
    references ae_parameter(id);

create sequence ae_state_seq cache 100;

create table ae_state (
  id             number                              not null,
  res_id         number                              not null,
  param_id       number                              not null,
  value          varchar2(300),
  datetime       timestamp default current_timestamp not null
);

comment on table ae_state is 'Состояние параметра';

comment on column ae_state.datetime is 'Дата и время последнего изменения';

create unique index ae_state_pk on ae_state(id);

create index ae_state_res_fk on ae_state(res_id);

create index ae_state_param_fk on ae_state(param_id);

alter table ae_state add
  constraint pk_ae_state primary key(id);

alter table ae_state add
  constraint fk_ae_state_res foreign key (res_id)
    references ae_resource(id);

alter table ae_state add
  constraint fk_ae_state_param foreign key (param_id)
    references ae_parameter(id);

create sequence ae_state_log_seq cache 100;

create table ae_state_log (
  id             number                              not null,
  res_id         number                              not null,
  param_id       number                              not null,
  value          varchar2(300),
  datetime       timestamp default current_timestamp not null
) pctfree 0
  partition by range (datetime)
( partition ae_state_log_p1 values less than (maxvalue)
);

comment on table ae_state_log is 'Хронология изменения состояния параметра';

create unique index ae_state_log_pk on ae_state_log(datetime, id) local;

alter table ae_state_log add
  constraint pk_ae_state_log primary key(datetime, id);

create sequence ae_profile_type_seq;

create table ae_profile_type (
  id             number                              not null,
  name           varchar2(30)                        not null,
  description    varchar2(100)
);

create unique index ae_profile_type_pk on ae_profile_type(id);

create unique index ae_profile_type_uk on ae_profile_type(name);

alter table ae_profile_type add
  constraint pk_ae_profile_type primary key(id);

create sequence ae_profile_seq;

create table ae_profile (
  id             number                              not null,
  type_id        number                              not null,
  is_default     number(1)         default 0         not null,
  model_id       number                              not null,
  script_id      number            default null,
  name           varchar2(30)                        not null,
  description    varchar2(100)
);

create unique index ae_profile_pk on ae_profile(id);

create index ae_profile_type_fk on ae_profile(type_id);

create index ae_profile_model_fk on ae_profile(model_id);

create index ae_profile_script_fk on ae_profile(script_id);

alter table ae_profile
  add constraint ae_profile_ck check (is_default in (0, 1));

alter table ae_profile add
  constraint pk_ae_profile primary key(id);

alter table ae_profile add
  constraint fk_ae_profile_type foreign key (type_id)
    references ae_profile_type(id);

create sequence ae_profile_detail_seq;

create table ae_profile_detail (
  id             number                              not null,
  type_id        number                              not null,
  profile_id     number                              not null,
  model_id       number                              not null,
  param_id       number                              not null
);

create unique index ae_profile_detail_pk on ae_profile_detail(id);

create index ae_profile_detail_fk on ae_profile_detail(profile_id);

create index ae_profile_detail_type_fk on ae_profile_detail(type_id);

create index ae_profile_detail_model_fk on ae_profile_detail(model_id);

create index ae_profile_detail_param_fk on ae_profile_detail(param_id);

alter table ae_profile_detail add
  constraint pk_ae_profile_detail primary key(id);

alter table ae_profile_detail add
  constraint fk_ae_profile_detail foreign key (profile_id)
    references ae_profile(id);

alter table ae_profile_detail add
  constraint fk_ae_profile_detail_type foreign key (type_id)
    references ae_resource_type(id);

alter table ae_profile_detail add
  constraint fk_ae_profile_detail_model foreign key (model_id)
    references ae_platform_model(id);

create global temporary table ae_state_tmp (
  id             number                              not null,
  device_id      number                              not null,
  profile_id     number                              not null,
  param_id       number                              not null,
  num            varchar2(300),
  value          varchar2(300),
  datetime       timestamp default current_timestamp not null
) on commit delete rows;

create index ae_state_tmp_ix on ae_state_tmp(device_id, profile_id, param_id, num);

Теперь осталось заполнить справочники данными:

Insert into AE_POLICY_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (1, 'default', NULL);
Insert into AE_POLICY_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (2, 'uptime', NULL);
Insert into AE_POLICY_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (3, 'threshold', NULL);
COMMIT;

Insert into AE_STATE_POLICY
   (ID, NAME, DESCRIPTION, TYPE_ID)
 Values
   (1, 'default', NULL, 1);
Insert into AE_STATE_POLICY
   (ID, NAME, DESCRIPTION, TYPE_ID)
 Values
   (2, 'uptime', NULL, 2);
COMMIT;

Insert into AE_DOMAIN
   (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID)
 Values
   (10, '((\d+)\D*,\s*)?(\d+):(\d+):(\d+)(\.\d+)?', 0, 'SNMP uptime', 1);
Insert into AE_DOMAIN
   (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID)
 Values
   (11, '\d+', 0, 'SNMP число', 1);
Insert into AE_DOMAIN
   (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID)
 Values
   (12, '([a-fA-F\d])+', 0, 'SNMP строка', 1);
Insert into AE_DOMAIN
   (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID)
 Values
   (13, '.*', 0, 'SNMP Произвольная строка', 1);
Insert into AE_DOMAIN
   (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID)
 Values
   (14, '\d+', 0, 'SNMP uptime (числовая форма)', 2);
COMMIT;

Insert into AE_PARAMETER
   (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION)
 Values
   (101, 14, NULL, 'uptime', 'SNMP Uptime');
Insert into AE_PARAMETER
   (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION)
 Values
   (102, 11, NULL, 'ifIndex', 'Индекс интерфейса');
Insert into AE_PARAMETER
   (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION)
 Values
   (103, 13, NULL, 'ifName', 'Имя интерфейса');
Insert into AE_PARAMETER
   (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION)
 Values
   (104, 11, NULL, 'ifInOctets', 'Входящий трафик');
Insert into AE_PARAMETER
   (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION)
 Values
   (105, 11, NULL, 'ifOutOctets', 'Исходящий трафик');
COMMIT;

Insert into AE_PLATFORM_MODEL
   (ID, NAME, DESCRIPTION)
 Values
   (1, 'test', NULL);
COMMIT;

Insert into AE_PROFILE_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (1, 'mon', 'Мониторинг');
COMMIT;

Insert into AE_PROFILE
   (ID, TYPE_ID, IS_DEFAULT, MODEL_ID, SCRIPT_ID,
    NAME, DESCRIPTION)
 Values
   (1, 1, 1, 1, NULL,
    'test', NULL);
COMMIT;

Insert into AE_RESOURCE_CLASS
   (ID, IS_LOGICAL, NAME, DESCRIPTION, OWNER_ID)
 Values
   (1, 0, 'Устройство', NULL, NULL);
Insert into AE_RESOURCE_CLASS
   (ID, IS_LOGICAL, NAME, DESCRIPTION, OWNER_ID)
 Values
   (2, 0, 'Интерфейс', NULL, 1);
COMMIT;

Insert into AE_RESOURCE_TYPE
   (ID, CLASS_ID, NAME, DESCRIPTION, OWNER_ID,
    PARENT_ID)
 Values
   (1, 1, 'Host', NULL, NULL,
    NULL);
Insert into AE_RESOURCE_TYPE
   (ID, CLASS_ID, NAME, DESCRIPTION, OWNER_ID,
    PARENT_ID)
 Values
   (2, 2, 'Interface', NULL, 1,
    NULL);
COMMIT;

Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (4, 2, 1, 1, 104);
Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (5, 2, 1, 1, 105);
Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (6, 1, 1, 1, 1);
Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (1, 1, 1, 1, 101);
Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (2, 2, 1, 1, 102);
Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (3, 2, 1, 1, 103);
COMMIT;

Insert into AE_DEVICE
   (ID, MODEL_ID, START_DATE, END_DATE)
 Values
   (0, 1, TO_DATE('10/30/2013 15:37:16', 'MM/DD/YYYY HH24:MI:SS'), NULL);
COMMIT;

Insert into AE_RESOURCE
   (ID, DEVICE_ID, OWNER_ID, TYPE_ID, NAME,
    RES_NUM, RES_ID, START_DATE, END_DATE, TMP_ID)
 Values
   (1, 0, NULL, 1, '127.0.0.1',
    '0', NULL, TO_DATE('10/30/2013 15:24:44', 'MM/DD/YYYY HH24:MI:SS'), NULL, NULL);
COMMIT;

Insert into AE_THRESHOLD_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (1, 'increase', 'Увеличение');
Insert into AE_THRESHOLD_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (2, 'decrease', 'Уменьшение');
Insert into AE_THRESHOLD_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (3, 'delta', 'Приращение');
COMMIT;

Insert into AE_THRESHOLD
   (ID, TYPE_ID, POLICY_ID, VALUE)
 Values
   (1, 3, 1, '100');
COMMIT;

И подготовить заготовку кода тестирования:

package com.acme.ae.tests.jdbc;

import oracle.jdbc.driver.OracleCallableStatement;
import oracle.sql.*;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {

	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.5:1523:new11";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";

	private final static boolean AUTO_COMMIT_MODE = false;
	private final static int     BULK_SIZE        = 100;
	private final static int     ALL_SIZE         = 1000;

	private final static String  TRACE_ON_SQL     =
			"ALTER SESSION SET EVENTS '10046 trace name context forever, level 12'";

	private final static Long   DEVICE_ID         = 0L;
	private final static Long   PROFILE_ID        = 1L;
	private final static Long   UPTIME_PARAM_ID   = 101L;
	private final static Long   IFNAME_PARAM_ID   = 103L;
	private final static Long   INOCT_PARAM_ID    = 104L;
	private final static String FAKE_NUM_VALUE    = "0";

	private Connection c = null;

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
		CallableStatement st = c.prepareCall(TRACE_ON_SQL);
		try {
			st.execute();
		} finally {
			st.close();
		}
	}

	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test_plsql();

				// Здесь будем вызывать тестовый код

			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

Для детального анализа производительности, будем использовать трассировку event 10046 на сервере, с последующей обработкой трейсов утилитой tkprof.

Самый медленный способ (plsql)

Начнем тестирование с наиболее очевидной обработки по одной записи. Помимо собственно оценки производительности, написание этого кода поможет нам лучше разобраться с тем как именно будут обрабатываться данные.

CREATE OR REPLACE package AIS.ae_monitoring as
    procedure    addValue( p_device    in  number
                         , p_profile   in  number
                         , p_param     in  number
                         , p_num       in  varchar2
                         , p_val       in  varchar2 );
end ae_monitoring;
/

CREATE OR REPLACE package body AIS.ae_monitoring as

    g_ifName_parameter    constant number default 103;
    g_default_policy      constant number default 1;
    g_uptime_policy       constant number default 2;
    g_threshold_policy    constant number default 3;
    g_increase_type       constant number default 1;
    g_decrease_type       constant number default 2;
    g_delta_type          constant number default 3;

    procedure addValue( p_device    in  number
                      , p_profile   in  number
                      , p_param     in  number
                      , p_num       in  varchar2
                      , p_val       in  varchar2 ) as
    cursor    c_res(p_type number) is
    select    r.id, r.name
    from      ae_resource r
    where     r.device_id = p_device
    and       r.res_num = p_num
    and       r.type_id = p_type
    and       r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1);
    cursor    c_state(p_resid number) is
    select    s.value
    from      ae_state s
    where     s.res_id = p_resid
    and       s.param_id = p_param;
    l_resid   ae_resource.id%type            default null;
    l_resname ae_resource.name%type          default null;
    l_oldval  ae_state.value%type            default null;
    l_restype ae_profile_detail.type_id%type default null;
    l_owntype ae_resource_type.owner_id%type default null;
    l_owner   ae_resource.id%type            default null;
    l_policy  ae_state_policy.type_id%type   default null;
    l_polid   ae_state_policy.id%type        default null;
    l_count   number                         default 0;
    begin

      -- Получить тип ресурса
      select d.type_id, r.owner_id
      into   l_restype, l_owntype
      from   ae_profile_detail d
      inner  join ae_resource_type r on (r.id = d.type_id)
      where  d.profile_id = p_profile
      and    d.param_id = p_param;

      -- Получить ID владельца
      if not l_owntype is null then
         select r.id into l_owner
         from   ae_resource r
         where  r.device_id = p_device
         and    r.type_id = l_owntype;
      end if;

      -- Обработать имя интерфейса
      if p_param = g_ifName_parameter then
         open c_res(l_restype);
         fetch c_res into l_resid, l_resname;
         if c_res%notfound or l_resname <> p_val then

            -- Закрыть старый ресурс интерфейса
            update ae_resource set end_date = sysdate
            where id = l_resid;

            -- Создать новый ресурс интерфейса
            insert into ae_resource(id, device_id, owner_id, type_id, res_num, name)
            values (ae_resource_seq.nextval, p_device, l_owner, l_restype, p_num, p_val);

         end if;
         close c_res;
         return;
      end if;

      -- Получить ID ресурса
      open c_res(l_restype);
      fetch c_res into l_resid, l_resname;
      if c_res%notfound then
         -- Если ресурс не найден, создать новый ресурс интерфейса
         insert into ae_resource(id, device_id, owner_id, type_id, res_num, name)
         values (ae_resource_seq.nextval, p_device, l_owner, l_restype, p_num, p_val)
         returning id into l_resid;
      end if;

      -- Получить старое значение параметра
      open c_state(l_resid);
      fetch c_state into l_oldval;
      if c_state%notfound then
         l_oldval := null;
      end if;
      close c_state;

      -- Получить политику сохранения значений
      select l.type_id, l.id
      into   l_policy, l_polid
      from   ae_parameter p
      inner  join ae_domain d on (d.id = p.domain_id)
      inner  join ae_state_policy l on (l.id = d.policy_id)
      where  p.id = p_param;

      -- Получить количество пересеченных порогов
      select count(*)
      into   l_count
      from   ae_threshold t
      where  t.policy_id = l_polid
      and (( t.type_id = g_increase_type and l_oldval <= t.value and p_val >= t.value ) or
           ( t.type_id = g_decrease_type and l_oldval >= t.value and p_val <= t.value ) or
           ( t.type_id = g_delta_type and abs(p_val - l_oldval) >= t.value ));

      -- Сохранить запись в ae_state_log в соответствии с политикой
      if l_oldval is null or l_count > 0 or
         ( l_policy = g_uptime_policy and p_val < l_oldval) or
         ( l_policy = g_default_policy and p_val <> l_oldval) then
         insert into ae_state_log(id, res_id, param_id, value)
         values (ae_state_log_seq.nextval, l_resid, p_param, decode(l_policy, g_uptime_policy, nvl(l_oldval, p_val), p_val));
      end if;

      -- Обновить ae_state
      update ae_state set value = p_val
                      ,   datetime = current_timestamp
      where  res_id = l_resid and param_id = p_param;
      if sql%rowcount = 0 then
         insert into ae_state(id, param_id, res_id, value)
         values (ae_state_seq.nextval, p_param, l_resid, p_val);
      end if;

      close c_res;
    exception
      when others then
        if c_res%isopen then close c_res; end if;
        if c_state%isopen then close c_state; end if;
        raise;
    end;

end ae_monitoring;
/
	private final static String  ADD_VAL_SQL      =
			"begin ae_monitoring.addValue(?,?,?,?,?); end;";

	private void test_plsql() throws SQLException {
		System.out.println("test_plsql:");
		CallableStatement st = c.prepareCall(ADD_VAL_SQL);
		Long timestamp = System.currentTimeMillis();
		Long uptime = 0L;
		Long inoct  = 0L;
		try {
			for (int i = 1; i <= ALL_SIZE; i++) {

				// Передать uptime
				st.setLong(1,   DEVICE_ID);
				st.setLong(2,   PROFILE_ID);
				st.setLong(3,   UPTIME_PARAM_ID);
				st.setString(4, FAKE_NUM_VALUE);
				st.setString(5, uptime.toString());
				st.execute();

				// Передать имя интерфейса
				st.setLong(1,   DEVICE_ID);
				st.setLong(2,   PROFILE_ID);
				st.setLong(3,   IFNAME_PARAM_ID);
				st.setString(4, Integer.toString((i % 100) + 1));
				st.setString(5, Integer.toString((i % 100) + 1));
				st.execute();

				// Передать счетчик трафика
				st.setLong(1,   DEVICE_ID);
				st.setLong(2,   PROFILE_ID);
				st.setLong(3,   INOCT_PARAM_ID);
				st.setString(4, Integer.toString((i % 100) + 1));
				st.setString(5, inoct.toString());
				st.execute();

				// Увеличить счетчики
				uptime += 100L;
				if (uptime >= 1000) {
					uptime = 0L;
				}
				inoct += 10L;
			}
		} finally {
			st.close();
		}
		Long delta_1 = System.currentTimeMillis() - timestamp;
		System.out.println((ALL_SIZE * 1000L) / delta_1);
		timestamp = System.currentTimeMillis();
		c.commit();
		Long delta_2 = System.currentTimeMillis() - timestamp;
		System.out.println(delta_2);
		System.out.println((ALL_SIZE * 1000L) / (delta_1 - delta_2));
	}

Очевидно, что для корректной поддержки этим кодом списка интерфейсов, необходимо, чтобы имя интерфейса передавалось в БД до остальных атрибутов ресурса.

Результаты тестирования вполне предсказуемы:

OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 1 0.00 0.00 0 0 0 0
Execute 3000 4.23 4.13 7 102942 6615 3000
Fetch 0 0.00 0.00 0 0 0 0
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 3001 4.23 4.13 7 102942 6615 3000

Misses in library cache during parse: 1
Misses in library cache during execute: 1

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  SQL*Net message to client 3002 0.00 0.00
  SQL*Net message from client 3002 5.92 7.12
  latch: library cache 4 0.00 0.00
  log file sync 1 0.00 0.00

OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 69 0.00 0.00 0 0 0 0
Execute 17261 2.42 2.36 7 9042 6615 3160
Fetch 14000 0.38 0.37 0 93900 0 13899
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 31330 2.81 2.74 7 102942 6615 17059

Misses in library cache during parse: 10
Misses in library cache during execute: 10

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  db file sequential read 7 0.00 0.00

Мы выполняем очень большое количество запросов и тратим много времени на сетевое взаимодействие.

Используем массовую обработку (temporary)

Наиболее радикальный способ борьбы с накладными расходами — переход к массовой обработке. Мы можем предварительно сохранить набор данных для обработки в какой-то таблице, а затем обработать данные не по одной записи, а все сразу. Конечно, для промежуточного хранения данных, можно использовать обычные таблицы, но использование для этих целей GTT более выгодно, вследствие снижения накладных расходов на журналирование.

Для вставки данных во временную таблицу мы будем использовать DML, а не вызовы хранимых процедур, что позволит нам использовать JDBC batch, для снижения накладных расходов на сетевое взаимодействие.

При использовании этого подхода, не требуется, чтобы имя интерфейса обрабатывалось до остальных его параметров. Достаточно того, чтобы имена всех обрабатываемых интерфейсов присутствовали в обрабатываемом наборе данных.

CREATE OR REPLACE package AIS.ae_monitoring as
    procedure  saveValues;
end ae_monitoring;
/

CREATE OR REPLACE package body AIS.ae_monitoring as

    g_ifName_parameter    constant number default 103;
    g_default_policy      constant number default 1;
    g_uptime_policy       constant number default 2;
    g_threshold_policy    constant number default 3;
    g_increase_type       constant number default 1;
    g_decrease_type       constant number default 2;
    g_delta_type          constant number default 3;

    procedure saveValues as
    begin

      -- Создать ресурс, если он отсутствует
      merge into ae_resource d
      using ( select t.id, t.device_id, t.num, t.value name, p.type_id, o.id owner_id
              from   ae_state_tmp t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource_type r on (r.id = p.type_id)
              left   join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id)
              where  t.param_id = g_ifName_parameter
            ) s
      on ( d.device_id = s.device_id and d.res_num = s.num and d.type_id = s.type_id and
           d.start_date <= sysdate and sysdate <= nvl(d.end_date, sysdate + 1) )
      when matched then
        update set d.tmp_id = s.id
        where  d.name <> s.name
      when not matched then
        insert (id, device_id, owner_id, type_id, res_num, name)
        values (ae_resource_seq.nextval, s.device_id, s.owner_id, s.type_id, s.num, s.name);

      -- Добавить недостающие ae_resource
      insert into ae_resource(id, device_id, owner_id, type_id, res_num, name)
      select ae_resource_seq.nextval, t.device_id, o.id, p.type_id, t.num, t.value
      from   ae_state_tmp t
      inner  join ae_resource c on (c.tmp_id = t.id)
      inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
      inner  join ae_resource_type r on (r.id = p.type_id)
      left   join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id);

      -- Закрыть устаревшие интерфейсы
      update ae_resource set end_date = sysdate
                         ,   tmp_id   = null
      where  tmp_id > 0;

      -- Сохранить записи в ae_state_log
      insert into ae_state_log(id, res_id, param_id, value)
      select ae_state_log_seq.nextval, id, param_id, value
      from ( select distinct r.id, t.param_id,
                    decode(l.type_id, g_uptime_policy, nvl(s.value, t.value), t.value) value
             from   ae_state_tmp t
             inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
             inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                            r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1))
             left   join ae_state s on (s.res_id = r.id and s.param_id = t.param_id)
             inner  join ae_parameter a on (a.id = p.param_id)
             inner  join ae_domain d on (d.id = a.domain_id)
             inner  join ae_state_policy l on (l.id = d.policy_id)
             left   join ae_threshold h on (
                    h.policy_id = l.id and
                 (( h.type_id = g_increase_type and s.value <= h.value and t.value >= h.value ) or
                  ( h.type_id = g_decrease_type and s.value >= h.value and t.value <= h.value ) or
                  ( h.type_id = g_delta_type and abs(t.value - s.value) >= h.value )))
             where  ( s.id is null or not h.id is null
             or   ( l.type_id = g_uptime_policy and t.value < s.value )
             or   ( l.type_id = g_default_policy and t.value <> s.value ) )
             and    t.param_id <> g_ifName_parameter );

      -- Обновить ae_state
      merge into ae_state d
      using ( select t.param_id, t.value, r.id res_id
              from   ae_state_tmp t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                             r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1))
              where  t.param_id <> g_ifName_parameter
            ) s
      on (d.res_id = s.res_id and d.param_id = s.param_id)
      when matched then
        update set d.value = s.value
               ,   d.datetime = current_timestamp
      when not matched then
        insert (id, param_id, res_id, value)
        values (ae_state_seq.nextval, s.param_id, s.res_id, s.value);

      -- Сохранить изменения
      commit write nowait;
    end;

end ae_monitoring;
/
	private final static int     BULK_SIZE        = 200;

	private final static String  INS_VAL_SQL      =
			"insert into ae_state_tmp(id, device_id, profile_id, param_id, num, value) values (?,?,?,?,?,?)";

	private final static String  SAVE_VALUES_SQL  =
			"begin ae_monitoring.saveValues; end;";

	private void test_temporary() throws SQLException {
		System.out.println("test_temporary:");
		CallableStatement st = c.prepareCall(INS_VAL_SQL);
		Long timestamp = System.currentTimeMillis();
		Long uptime = 0L;
		Long inoct  = 0L;
		Long ix     = 1L;
		int  bulk   = BULK_SIZE;
		try {
			for (int i = 1; i <= ALL_SIZE; i++) {

				// Передать uptime
				st.setLong(1,   ix++);
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   UPTIME_PARAM_ID);
				st.setString(5, FAKE_NUM_VALUE);
				st.setString(6, uptime.toString());
				st.addBatch();

				// Передать имя интерфейса
				st.setLong(1,   ix++);
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   IFNAME_PARAM_ID);
				st.setString(5, Integer.toString((i % 100) + 1));
				st.setString(6, Integer.toString((i % 100) + 1));
				st.addBatch();

				// Передать счетчик трафика
				st.setLong(1,   ix++);
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   INOCT_PARAM_ID);
				st.setString(5, Integer.toString((i % 100) + 1));
				st.setString(6, inoct.toString());
				st.addBatch();

				if (--bulk <= 0) {
					st.executeBatch();
					bulk = BULK_SIZE;
				}

				// Увеличить счетчики
				uptime += 100L;
				if (uptime >= 1000) {
					uptime = 0L;
				}
				inoct += 10L;
			}
			if (bulk < BULK_SIZE) {
				st.executeBatch();
			}
		} finally {
			st.close();
		}
		Long delta_1 = System.currentTimeMillis() - timestamp;
		System.out.println((ALL_SIZE * 1000L) / delta_1);
		timestamp = System.currentTimeMillis();
		st = c.prepareCall(SAVE_VALUES_SQL);
		timestamp = System.currentTimeMillis();
		try {
			st.execute();
		} finally {
			st.close();
		}
		Long delta_2 = System.currentTimeMillis() - timestamp;
		System.out.println(delta_2);
		System.out.println((ALL_SIZE * 1000L) / (delta_1 - delta_2));
	}

Запускаем этот код на выполнение и получаем:

java.sql.SQLException: ORA-30926: невозможно получить устойчивый набор строк в исходных таблицах
ORA-06512: на  "AIS.AE_MONITORING", line 205
ORA-06512: на  line 1

Если подумать, причина этой ошибки становится понятна. Если набор обрабатываемых данных содержит противоречивые данные для какой-то переменной (мы успели прочитать ее несколько раз), возникает проблема. При нормальном функционировании SNMP-мониторинга, такой ситуации возникать не должно, но мы должны предусмотреть что-то, чтобы не допустить падения приложения, если она все-таки возникнет.

Первое, что приходит в голову — хранение агрегированных данных по каждой переменной. Мы будем добавлять новую запись если ее еще нет или обновлять существующую, записывай в нее новое значение:

	private final static int     BULK_SIZE        = 200;

	private final static String  MERGE_VAL_SQL    =
			"merge into ae_state_tmp d " +
			"using ( select ? id,? device_id,? profile_id,? param_id,? num,? value " +
			"        from dual" +
			"      ) s " +
			"on ( d.device_id = s.device_id and d.profile_id = s.profile_id and " +
			"     d.param_id = s.param_id and d.num = s.num ) " +
			"when matched then " +
			"  update set d.value = s.value " +
			"when not matched then " +
			"  insert (id, device_id, profile_id, param_id, num, value) " +
			"  values (s.id, s.device_id, s.profile_id, s.param_id, s.num, s.value)";

	private final static String  SAVE_VALUES_SQL  =
			"begin ae_monitoring.saveValues; end;";

	private void test_temporary() throws SQLException {
		System.out.println("test_temporary:");
		CallableStatement st = c.prepareCall(MERGE_VAL_SQL);
		...

Теперь мы имеем дело с той-же проблемой, но на стадии выполнения batch-запроса:

java.sql.BatchUpdateException: ORA-00600: код внутренней ошибки, аргументы: [6704], [2], [0], [6301696], [], [], [], [], [], [], [], []

Приходится отказаться от batch:

private final static int     BULK_SIZE        = 1;

Результаты заметно улучшились:

OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 2 0.00 0.00 0 0 0 0
Execute 1001 1.02 1.01 0 9002 3503 3001
Fetch 0 0.00 0.00 0 0 0 0
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 1003 1.02 1.01 0 9002 3503 3001

Misses in library cache during parse: 1
Misses in library cache during execute: 1

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  SQL*Net message to client 1002 0.00 0.00
  SQL*Net message from client 1002 0.00 0.41
  log file sync 1 0.00 0.00

OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 23 0.01 0.01 0 1 0 0
Execute 23 0.21 0.21 43 29392 348 111
Fetch 11 0.00 0.00 0 27 0 10
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 57 0.22 0.23 43 29420 348 121

Misses in library cache during parse: 8
Misses in library cache during execute: 6

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  db file sequential read 41 0.01 0.01
  db file scattered read 1 0.00 0.00

Альтернативный подход (distinct)

Мы можем упростить себе жизнь на этапе вставки данных (заодно избавившись от возможности ORA-600), добавив группировку на этапе массовой обработки. В этом случае, мы можем не скромничать с BULK_SIZE, выставив его по максимуму.

CREATE OR REPLACE package AIS.ae_monitoring as
    procedure  saveValuesDistinct;
end ae_monitoring;
/

CREATE OR REPLACE package body AIS.ae_monitoring as

    g_ifName_parameter    constant number default 103;
    g_default_policy      constant number default 1;
    g_uptime_policy       constant number default 2;
    g_threshold_policy    constant number default 3;
    g_increase_type       constant number default 1;
    g_decrease_type       constant number default 2;
    g_delta_type          constant number default 3;

    procedure saveValuesDistinct as
    begin

      -- Создать ресурс, если он отсутствует
      merge into ae_resource d
      using ( select t.id, t.device_id, t.num, t.value name, p.type_id, o.id owner_id
              from   ( select device_id, profile_id, param_id, num
                       ,      max(id) keep (dense_rank last order by datetime) id
                       ,      max(value) keep (dense_rank last order by datetime) value
                       ,      max(datetime) datetime
                       from   ae_state_tmp
                       group  by device_id, profile_id, param_id, num
                     ) t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource_type r on (r.id = p.type_id)
              left   join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id)
              where  t.param_id = g_ifName_parameter
            ) s
      on ( d.device_id = s.device_id and d.res_num = s.num and d.type_id = s.type_id and
           d.start_date <= sysdate and sysdate <= nvl(d.end_date, sysdate + 1) )
      when matched then
        update set d.tmp_id = s.id
        where  d.name <> s.name
      when not matched then
        insert (id, device_id, owner_id, type_id, res_num, name)
        values (ae_resource_seq.nextval, s.device_id, s.owner_id, s.type_id, s.num, s.name);

      -- Добавить недостающие ae_resource
      insert into ae_resource(id, device_id, owner_id, type_id, res_num, name)
      select ae_resource_seq.nextval, t.device_id, o.id, p.type_id, t.num, t.value
      from   ( select device_id, profile_id, param_id, num
               ,      max(id) keep (dense_rank last order by datetime) id
               ,      max(value) keep (dense_rank last order by datetime) value
               ,      max(datetime) datetime
               from   ae_state_tmp
               group  by device_id, profile_id, param_id, num
             ) t
      inner  join ae_resource c on (c.tmp_id = t.id)
      inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
      inner  join ae_resource_type r on (r.id = p.type_id)
      left   join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id);

      -- Закрыть устаревшие интерфейсы
      update ae_resource set end_date = sysdate
                         ,   tmp_id   = null
      where  tmp_id > 0;

      -- Сохранить записи в ae_state_log
      insert into ae_state_log(id, res_id, param_id, value)
      select ae_state_log_seq.nextval, id, param_id, value
      from ( select distinct r.id, t.param_id,
                    decode(l.type_id, g_uptime_policy, nvl(s.value, t.value), t.value) value
             from   ( select device_id, profile_id, param_id, num
                      ,      max(id) keep (dense_rank last order by datetime) id
                      ,      max(value) keep (dense_rank last order by datetime) value
                      ,      max(datetime) datetime
                      from   ae_state_tmp
                      group  by device_id, profile_id, param_id, num
                    ) t
             inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
             inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                            r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1))
             left   join ae_state s on (s.res_id = r.id and s.param_id = t.param_id)
             inner  join ae_parameter a on (a.id = p.param_id)
             inner  join ae_domain d on (d.id = a.domain_id)
             inner  join ae_state_policy l on (l.id = d.policy_id)
             left   join ae_threshold h on (
                    h.policy_id = l.id and
                 (( h.type_id = g_increase_type and s.value <= h.value and t.value >= h.value ) or
                  ( h.type_id = g_decrease_type and s.value >= h.value and t.value <= h.value ) or
                  ( h.type_id = g_delta_type and abs(t.value - s.value) >= h.value )))
             where  ( s.id is null or not h.id is null
             or   ( l.type_id = g_uptime_policy and t.value < s.value )
             or   ( l.type_id = g_default_policy and t.value <> s.value ) )
             and    t.param_id <> g_ifName_parameter );

      -- Обновить ae_state
      merge into ae_state d
      using ( select t.param_id, t.value, r.id res_id
              from   ( select device_id, profile_id, param_id, num
                      ,      max(id) keep (dense_rank last order by datetime) id
                      ,      max(value) keep (dense_rank last order by datetime) value
                      ,      max(datetime) datetime
                      from   ae_state_tmp
                      group  by device_id, profile_id, param_id, num
                    ) t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                             r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1))
              where  t.param_id <> g_ifName_parameter
            ) s
      on (d.res_id = s.res_id and d.param_id = s.param_id)
      when matched then
        update set d.value = s.value
               ,   d.datetime = current_timestamp
      when not matched then
        insert (id, param_id, res_id, value)
        values (ae_state_seq.nextval, s.param_id, s.res_id, s.value);

      -- Сохранить изменения
      commit write nowait;
    end;

end ae_monitoring;
/
	private final static int     BULK_SIZE        = 200;

	private final static String  INS_VAL_SQL      =
			"insert into ae_state_tmp(id, device_id, profile_id, param_id, num, value) values (?,?,?,?,?,?)";

	private final static String  SAVE_VALUES_DISTINCT_SQL  =
			"begin ae_monitoring.saveValuesDistinct; end;";

	private void test_temporary_distinct() throws SQLException {
		System.out.println("test_temporary:");
		CallableStatement st = c.prepareCall(INS_VAL_SQL);
		Long timestamp = System.currentTimeMillis();
		Long uptime = 0L;
		Long inoct  = 0L;
		Long ix     = 1L;
		int  bulk   = BULK_SIZE;
		try {
			for (int i = 1; i <= ALL_SIZE; i++) {

				// Передать uptime
				st.setLong(1,   ix++);
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   UPTIME_PARAM_ID);
				st.setString(5, FAKE_NUM_VALUE);
				st.setString(6, uptime.toString());
				st.addBatch();

				// Передать имя интерфейса
				st.setLong(1,   ix++);
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   IFNAME_PARAM_ID);
				st.setString(5, Integer.toString((i % 100) + 1));
				st.setString(6, Integer.toString((i % 100) + 1));
				st.addBatch();

				// Передать счетчик трафика
				st.setLong(1,   ix++);
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   INOCT_PARAM_ID);
				st.setString(5, Integer.toString((i % 100) + 1));
				st.setString(6, inoct.toString());
				st.addBatch();

				if (--bulk <= 0) {
					st.executeBatch();
					bulk = BULK_SIZE;
				}

				// Увеличить счетчики
				uptime += 100L;
				if (uptime >= 1000) {
					uptime = 0L;
				}
				inoct += 10L;
			}
			if (bulk < BULK_SIZE) {
				st.executeBatch();
			}
		} finally {
			st.close();
		}
		Long delta_1 = System.currentTimeMillis() - timestamp;
		System.out.println((ALL_SIZE * 1000L) / delta_1);
		timestamp = System.currentTimeMillis();
		st = c.prepareCall(SAVE_VALUES_DISTINCT_SQL);
		timestamp = System.currentTimeMillis();
		try {
			st.execute();
		} finally {
			st.close();
		}
		Long delta_2 = System.currentTimeMillis() - timestamp;
		System.out.println(delta_2);
		System.out.println((ALL_SIZE * 1000L) / (delta_1 - delta_2));
	}

Запускаем и смотрим результаты:

OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 2 0.00 0.00 0 0 0 0
Execute 1001 0.36 0.33 0 96 6616 3001
Fetch 0 0.00 0.00 0 0 0 0
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 1003 0.36 0.33 0 96 6616 3001

Misses in library cache during parse: 2
Misses in library cache during execute: 1

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  SQL*Net message to client 1002 0.00 0.00
  SQL*Net message from client 1002 0.00 0.41
  log file sync 1 0.00 0.00

OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 30 0.01 0.01 0 3 0 0
Execute 30 0.41 0.40 3 48932 1104 218
Fetch 8 0.00 0.00 0 176 0 8
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 68 0.44 0.43 3 49111 1104 226

Misses in library cache during parse: 8
Misses in library cache during execute: 7

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  db file sequential read 3 0.00 0.00

Как и ожидалось, издержки на вставку данных уменьшились, но усложнилась последующая их обработка.

Используем коллекции (collection)

Как справедливо заметил DenKrep, даже при самых минимальных издержках на вставку данных, подход с использованием GTT сильно напоминает процесс переливания из пустого в порожнее. Каким еще образом мы можем снизить издержки на сетевое взаимодействие? Мы не можем использовать JDBC batch для вызова PL/SQL-кода (вернее можем, но это не принесет никакой выгоды), но мы можем передавать массивы!

Сам процесс передачи массивов из Java-кода хорошо описан в этом руководстве. Для передачи данных, нам потребуется определить следующие типы:

create or replace type ae_state_rec as object (
  device_id      number,
  profile_id     number,
  param_id       number,
  num            varchar2(300),
  value          varchar2(300)
)
/

create or replace type ae_state_tab as table of ae_state_rec;
/

Поскольку у меня установлен Oracle Client 11g, мне пришлось заменить [ORACLE_HOME]/jdbc/lib/classes12.zip на [ORACLE_HOME]/jdbc/lib/ojdbc5.jar. Также, хочу подчеркнуть, что [ORACLE_HOME]/jdbc/lib/nls_charset12.zip должна быть доступна при запуске приложения, в противном случае, будет невозможно передавать строковые значения (ошибок возникать не будет, но передаваться будет NULL).

Как мы будем обрабатывать массив на сервере? Для начала, просто попробуем вызывать написанную ранее addValue в цикле:

    ...
    procedure addValues( p_tab in ae_state_tab ) as
    begin
      for i in 1 .. p_tab.count loop
          addValue( p_device  => p_tab(i).device_id
                  , p_profile => p_tab(i).profile_id
                  , p_param   => p_tab(i).param_id
                  , p_num     => p_tab(i).num
                  , p_val     => p_tab(i).value );
      end loop;
      commit write nowait;
    end;
    ...

Java-код заметно усложнился:

	private final static String  ADD_VALUES_SQL   =
			"begin ae_monitoring.addValues(?); end;";

	private void test_collection() throws SQLException {
		System.out.println("test_collection:");
		OracleCallableStatement st = (OracleCallableStatement)c.prepareCall(ADD_VALUES_SQL);
		int oracleId = CharacterSet.CL8MSWIN1251_CHARSET;
		CharacterSet charSet = CharacterSet.make(oracleId);
		Long timestamp = System.currentTimeMillis();
		Long uptime = 0L;
		Long inoct  = 0L;
		RecType r[] = new RecType[ALL_SIZE * 3];
		int ix      = 0;
		for (int i = 1; i <= ALL_SIZE; i++) {

			// Передать uptime
			r[ix++] = new RecType(
					new NUMBER(DEVICE_ID),
					new NUMBER(PROFILE_ID),
					new NUMBER(UPTIME_PARAM_ID),
					new CHAR(FAKE_NUM_VALUE, charSet),
					new CHAR(uptime.toString(), charSet));

			// Передать имя интерфейса
			r[ix++] = new RecType(
					new NUMBER(DEVICE_ID),
					new NUMBER(PROFILE_ID),
					new NUMBER(IFNAME_PARAM_ID),
					new CHAR(Integer.toString((i % 100) + 1), charSet),
					new CHAR(Integer.toString((i % 100) + 1), charSet));

			// Передать счетчик трафика
			r[ix++] = new RecType(
					new NUMBER(DEVICE_ID),
					new NUMBER(PROFILE_ID),
					new NUMBER(INOCT_PARAM_ID),
					new CHAR(Integer.toString((i % 100) + 1), charSet),
					new CHAR(inoct.toString(), charSet));

			// Увеличить счетчики
			uptime += 100L;
			if (uptime >= 1000) {
				uptime = 0L;
			}
			inoct += 10L;
		}
		RecTab t = new RecTab(r);
		try {
			st.setORAData(1, t);
			st.execute();
		} finally {
			st.close();
		}
		System.out.println((ALL_SIZE * 1000L) / (System.currentTimeMillis() - timestamp));
	}

В трейсе появляются разные забавные операторы, связанные с передачей с клиента массивов:

SELECT INSTANTIABLE, supertype_owner, supertype_name, LOCAL_ATTRIBUTES
FROM
 all_types WHERE type_name = :1 AND owner = :2

Результат закономерен:

OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 4 0.00 0.00 0 0 0 0
Execute 4 4.35 4.31 5 136053 6610 3
Fetch 1 0.00 0.00 0 9 0 1
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 9 4.35 4.31 5 136062 6610 4

Misses in library cache during parse: 2
Misses in library cache during execute: 2

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  SQL*Net message to client 6 0.00 0.00
  SQL*Net message from client 6 0.23 0.34
  SQL*Net more data from client 41 0.00 0.00

OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 77 0.00 0.00 0 0 0 0
Execute 17270 2.97 2.92 5 6046 6610 3160
Fetch 14013 0.49 0.49 1 129930 0 13909
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 31360 3.48 3.43 6 135976 6610 17069

Misses in library cache during parse: 8
Misses in library cache during execute: 11

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  db file sequential read 6 0.00 0.00

Улучшились показатели, связанные с передачей данных с клиента, но несколько усложнилась обработка данных, по сравнению с вариантом plsql.

Используем коллекции по умному (bulk)

Очевидно, что мы используем потенциал массивов, передаваемых с клиента не в полной мере. Было бы неплохо передавать массив непосредственно в SQL-запросы, чтобы задействовать массовую обработку, но как это сделать? Мы не можем использовать BULK COLLECT, поскольку наши запросы слишком сложные для этого. К счастью, мы можем обернуть коллекцию в TABLE:

CREATE OR REPLACE package AIS.ae_monitoring as
    procedure  saveValues( p_tab       in  ae_state_tab );
end ae_monitoring;
/

CREATE OR REPLACE package body AIS.ae_monitoring as

    g_ifName_parameter    constant number default 103;
    g_default_policy      constant number default 1;
    g_uptime_policy       constant number default 2;
    g_threshold_policy    constant number default 3;
    g_increase_type       constant number default 1;
    g_decrease_type       constant number default 2;
    g_delta_type          constant number default 3;

    procedure  saveValues( p_tab in  ae_state_tab ) as
    begin

      -- Создать ресурс, если он отсутствует
      merge into ae_resource d
      using ( select t.device_id, t.num, t.value name, p.type_id, o.id owner_id
              from   ( select device_id, profile_id, param_id, num
                       ,      max(value) value
                       from   table( p_tab )
                       group  by device_id, profile_id, param_id, num
                     ) t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource_type r on (r.id = p.type_id)
              left   join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id)
              where  t.param_id = g_ifName_parameter
            ) s
      on ( d.device_id = s.device_id and d.res_num = s.num and d.type_id = s.type_id and
           d.start_date <= sysdate and sysdate <= nvl(d.end_date, sysdate + 1) )
      when not matched then
        insert (id, device_id, owner_id, type_id, res_num, name)
        values (ae_resource_seq.nextval, s.device_id, s.owner_id, s.type_id, s.num, s.name);

      -- Сохранить записи в ae_state_log
      insert into ae_state_log(id, res_id, param_id, value)
      select ae_state_log_seq.nextval, id, param_id, value
      from ( select distinct r.id, t.param_id,
                    decode(l.type_id, g_uptime_policy, nvl(s.value, t.value), t.value) value
             from   ( select device_id, profile_id, param_id, num
                       ,      max(value) value
                       from   table( p_tab )
                       group  by device_id, profile_id, param_id, num
                     ) t
             inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
             inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                            r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1))
             left   join ae_state s on (s.res_id = r.id and s.param_id = t.param_id)
             inner  join ae_parameter a on (a.id = p.param_id)
             inner  join ae_domain d on (d.id = a.domain_id)
             inner  join ae_state_policy l on (l.id = d.policy_id)
             left   join ae_threshold h on (
                    h.policy_id = l.id and
                 (( h.type_id = g_increase_type and s.value <= h.value and t.value >= h.value ) or
                  ( h.type_id = g_decrease_type and s.value >= h.value and t.value <= h.value ) or
                  ( h.type_id = g_delta_type and abs(t.value - s.value) >= h.value )))
             where  ( s.id is null or not h.id is null
             or   ( l.type_id = g_uptime_policy and t.value < s.value )
             or   ( l.type_id = g_default_policy and t.value <> s.value ) )
             and    t.param_id <> g_ifName_parameter );

      -- Обновить ae_state
      merge into ae_state d
      using ( select t.param_id, t.value, r.id res_id
              from   ( select device_id, profile_id, param_id, num
                       ,      max(value) value
                       from   table( p_tab )
                       group  by device_id, profile_id, param_id, num
                     ) t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                             r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate + 1))
              where  t.param_id <> g_ifName_parameter
            ) s
      on (d.res_id = s.res_id and d.param_id = s.param_id)
      when matched then
        update set d.value = s.value
               ,   d.datetime = current_timestamp
      when not matched then
        insert (id, param_id, res_id, value)
        values (ae_state_seq.nextval, s.param_id, s.res_id, s.value);

      -- Сохранить изменения
      commit write nowait;
    end;

end ae_monitoring;
/
	private final static String  BULK_VALUES_SQL  =
			"begin ae_monitoring.saveValues(?); end;";

	private void test_bulk() throws SQLException {
		System.out.println("test_bulk:");
		OracleCallableStatement st = (OracleCallableStatement)c.prepareCall(BULK_VALUES_SQL);
		int oracleId = CharacterSet.CL8MSWIN1251_CHARSET;
		CharacterSet charSet = CharacterSet.make(oracleId);
		Long timestamp = System.currentTimeMillis();
		Long uptime = 0L;
		Long inoct  = 0L;
		RecType r[] = new RecType[ALL_SIZE * 3];
		int ix      = 0;
		for (int i = 1; i <= ALL_SIZE; i++) {

			// Передать uptime
			r[ix++] = new RecType(
					new NUMBER(DEVICE_ID),
					new NUMBER(PROFILE_ID),
					new NUMBER(UPTIME_PARAM_ID),
					new CHAR(FAKE_NUM_VALUE, charSet),
					new CHAR(uptime.toString(), charSet));

			// Передать имя интерфейса
			r[ix++] = new RecType(
					new NUMBER(DEVICE_ID),
					new NUMBER(PROFILE_ID),
					new NUMBER(IFNAME_PARAM_ID),
					new CHAR(Integer.toString((i % 100) + 1), charSet),
					new CHAR(Integer.toString((i % 100) + 1), charSet));

			// Передать счетчик трафика
			r[ix++] = new RecType(
					new NUMBER(DEVICE_ID),
					new NUMBER(PROFILE_ID),
					new NUMBER(INOCT_PARAM_ID),
					new CHAR(Integer.toString((i % 100) + 1), charSet),
					new CHAR(inoct.toString(), charSet));

			// Увеличить счетчики
			uptime += 100L;
			if (uptime >= 1000) {
				uptime = 0L;
			}
			inoct += 10L;
		}
		RecTab t = new RecTab(r);
		try {
			st.setORAData(1, t);
			st.execute();
		} finally {
			st.close();
		}
		System.out.println((ALL_SIZE * 1000L) / (System.currentTimeMillis() - timestamp));
	}

Результат говорит сам за себя:


OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 4 0.00 0.00 0 0 0 0
Execute 4 0.20 0.20 4 696 1095 3
Fetch 1 0.00 0.00 0 9 0 1
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 9 0.20 0.20 4 705 1095 4

Misses in library cache during parse: 0

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  SQL*Net message to client 6 0.00 0.00
  SQL*Net message from client 6 0.10 0.19
  SQL*Net more data from client 41 0.00 0.00

OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 30 0.00 0.00 0 0 0 0
Execute 38 0.18 0.17 4 591 1095 217
Fetch 46 0.00 0.00 0 96 0 30
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 114 0.18 0.18 4 687 1095 247

Misses in library cache during parse: 7
Misses in library cache during execute: 7

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  db file sequential read 4 0.00 0.00

Выводы

Как показывают результаты тестирования, непосредственная передача массивов в Oracle из клиентского кода (bulk) позволяет добиться наилучшей производительности. Варианты с использованием GTT (temporary, distinct) не сильно ему уступают в плане производительности, но значительно проще, с точки зрения Java-кода. Вариант temporary, помимо того, дает возможность пронаблюдать ORA-600, при использовании batch и неудачном расположении звезд.

Какой именно подход использовать для обработки данных — решать вам. Результаты тестирования выложены на GitHub.

Валентин aka GlukKazan

Теги:
рубрика Java