Логирования всех действий пользователя

9 ноября 2010 г.

Однажды передо мной поставили задачу логирования всех действий пользователя. Так как приложение имеет двухзвенную архитектуру, было решено реализовывать логирование на уровне базы данных.

Языком реализации был выбран perl, потому что у него более либеральный подход к записям. Можно адресовать запись по строке. И он у нас уже был установлен как внешний язык.

Логирование поддерживает естественные ключи.

Собственно код:

drop table Log cascade;
create table Log (
id serial,
table_name varchar not null,
column_name varchar not null,
old_value varchar null,
new_value varchar null,
pg_user varchar not null default session_user,
operation varchar not null,
log_date timestamp not null default now(),
primary_key VARCHAR not null,
constraint PK_LOG primary key (id)
);

CREATE OR REPLACE FUNCTION getTableColumnNames(VARCHAR) RETURNS SETOF VARCHAR AS
$$

SELECT a.attname::VARCHAR FROM pg_catalog.pg_attribute a
WHERE a.attnum > 0 AND NOT a.attisdropped
AND a.attrelid = (
SELECT c.oid FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname ~ ('^(' || $1 || ')$') AND pg_catalog.pg_table_is_visible(c.oid)
)
;

$$ LANGUAGE 'sql';

CREATE OR REPLACE FUNCTION getTablePrimaryKeyNames(VARCHAR) RETURNS SETOF VARCHAR AS
$$
SELECT a.attname::VARCHAR FROM pg_catalog.pg_attribute a, pg_catalog.pg_index i
WHERE a.attrelid = (SELECT c.oid FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname ~ ('^(' ||
(SELECT c2.relname::VARCHAR
FROM pg_catalog.pg_class c, pg_catalog.pg_class c2, pg_catalog.pg_index i
WHERE c.oid = (
SELECT c.oid
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname ~ ('^(' || $1 || ')$') AND pg_catalog.pg_table_is_visible(c.oid)
)
AND c.oid = i.indrelid AND i.indexrelid = c2.oid AND i.indisprimary
ORDER BY i.indisprimary DESC, i.indisunique DESC, c2.relname)
|| ')$')
AND pg_catalog.pg_table_is_visible(c.oid)
)
AND a.attnum > 0 AND NOT a.attisdropped AND a.attrelid = i.indexrelid
ORDER BY a.attnum;
$$ LANGUAGE 'sql';

CREATE OR REPLACE FUNCTION audit() RETURNS TRIGGER AS
$$

my $primaryKeyNames = spi_exec_query("select getTablePrimaryKeyNames('$_TD->{table_name}') as name");
my $numberOfPrimaryKeys = $primaryKeyNames->{processed};

my $primaryKeyString = "";
foreach my $rowPkNum ( 0 .. $numberOfPrimaryKeys -1){
my $row = $primaryKeyNames->{rows}[$rowPkNum];
my $newValue = $_TD->{new}->{$row->{name}};
my $oldValue = $_TD->{old}->{$row->{name}};
$newValue =~ s/\\/\\\\/g;
$newValue =~ s/'/\\'/g;
$oldValue =~ s/\\/\\\\/g;
$oldValue =~ s/'/\\'/g;
if($oldValue == $newValue) {
$primaryKeyString .= "$row->{name}\=$oldValue;";
} else {
if ($_TD->{event} eq UPDATE) {
$primaryKeyString .= "$row->{name}\=$oldValue;";
}
if ($_TD->{event} eq INSERT) {
$primaryKeyString .= "$row->{name}\=$newValue;";
}
if ($_TD->{event} eq DELETE) {
$primaryKeyString .= "$row->{name}\=$oldValue";
}
}
}

if ($_TD->{event} eq INSERT) {
my $columnNames = spi_exec_query("select getTableColumnNames('$_TD->{table_name}') as name;");
my $numberOfRows = $columnNames->{processed};

foreach my $rowNumber (0 .. $numberOfRows - 1) {
my $row = $columnNames->{rows}[$rowNumber];
my $newValue = $_TD->{new}->{$row->{name}};
$newValue =~ s/\\/\\\\/g;
$newValue =~ s/'/\\'/g;
my $sql = "INSERT INTO Log(table_name, column_name, new_value, primary_key, operation)VALUES
('$_TD->{table_name}','$row->{name}', '$newValue', '$primaryKeyString', 'INSERT');";
spi_exec_query($sql);
}
}
if ($_TD->{event} eq UPDATE) {
my $columnNames = spi_exec_query("select getTableColumnNames('$_TD->{table_name}') as name;");
my $numberOfRows = $columnNames->{processed};

my $primaryKeyNames = spi_exec_query("select getTablePrimaryKeyNames('$_TD->{table_name}') as name");
my $numberOfPrimaryKeys = $primaryKeyNames->{processed};

foreach my $rowNumber (0 .. $numberOfRows - 1) {
my $row = $columnNames->{rows}[$rowNumber];
my $newValue = $_TD->{new}->{$row->{name}};
my $oldValue = $_TD->{old}->{$row->{name}};
if($oldValue ne $newValue) {
$newValue =~ s/\\/\\\\/g;
$newValue =~ s/'/\\'/g;
$oldValue =~ s/\\/\\\\/g;
$oldValue =~ s/'/\''/g;
my $sql = "INSERT INTO Log(table_name, column_name, new_value, old_value, primary_key, operation)VALUES
('$_TD->{table_name}','$row->{name}', '$newValue', '$oldValue', '$primaryKeyString', 'UPDATE');";
spi_exec_query($sql);
}
}
}

if ($_TD->{event} eq DELETE) {
my $columnNames = spi_exec_query("select getTableColumnNames('$_TD->{table_name}') as name;");
my $numberOfRows = $columnNames->{processed};

foreach my $rowNumber (0 .. $numberOfRows - 1) {
my $oldValue = $_TD->{old}->{$row->{name}};
$oldValue =~ s/\\/\\\\/g;
$oldValue =~ s/'/\\'/g;
my $row = $columnNames->{rows}[$rowNumber];
my $sql = "INSERT INTO Log(table_name, column_name, old_value, primary_key, operation)VALUES
('$_TD->{table_name}','$row->{name}', '$oldValue', '$primaryKeyString', 'DELETE');";
spi_exec_query($sql);
}
}

return undef;
$$ LANGUAGE 'plperl' SECURITY DEFINER;

Процедуры которые устанавливают/снимают логирование со всех таблиц(писал коллега).

CREATE OR REPLACE FUNCTION setAuditAll() returns VOID AS
$$
DECLARE
tb_info RECORD;
tb_name VARCHAR;
sql VARCHAR;
BEGIN
FOR tb_info IN SELECT * from information_schema.tables where table_schema = 'public' and table_type = 'BASE TABLE' ORDER BY table_name
LOOP
tb_name = lower(tb_info.table_name::VARCHAR);
IF tb_name IN ('log') OR tb_name like '%_values'
THEN
RAISE NOTICE 'NOOO: %', tb_name;
CONTINUE;
END IF;

RAISE NOTICE '%', tb_name;
sql := 'CREATE TRIGGER ' || 'audit' || replace(tb_name, '_', '');
sql := sql || ' AFTER INSERT OR UPDATE OR DELETE ON ' || tb_name;
sql := sql || ' FOR EACH ROW EXECUTE PROCEDURE audit(); ';
--RAISE NOTICE 'Trig: %', sql;
BEGIN
EXECUTE sql;
EXCEPTION WHEN duplicate_object
THEN
RAISE NOTICE 'Trgger Exception %', tb_name;
END;
--RETURN ;
END LOOP;

END;
$$ LANGUAGE 'plpgsql';

CREATE OR REPLACE FUNCTION removeAuditAll() returns VOID AS
$$
DECLARE
tb_info RECORD;
tb_name VARCHAR;
sql VARCHAR;
BEGIN
FOR tb_info IN SELECT * from information_schema.tables where table_schema = 'public' and table_type = 'BASE TABLE' ORDER BY table_name
LOOP
tb_name = lower(tb_info.table_name::VARCHAR);
IF tb_name IN ('log') OR tb_name like '%_values'
THEN
RAISE NOTICE 'NOOO: %', tb_name;
CONTINUE;
END IF;

RAISE NOTICE '%', tb_name;
sql := 'DROP TRIGGER ' || 'audit' || replace(tb_name, '_', '');
sql := sql || ' ON ' || tb_name;
sql := sql || ' ; ';
--RAISE NOTICE 'Trig: %', sql;
BEGIN
EXECUTE sql;
EXCEPTION WHEN undefined_object
THEN
RAISE NOTICE 'Trgger Exception %', tb_name;
END;
--RETURN ;
END LOOP;

END;
$$ LANGUAGE 'plpgsql';

Плюсы:

  • Поддержка естественных ключей.
  • Удобство использования лога(поиск, сортировки).
  • Легко установить на новые таблицы.

Минусы:

  • Будут проблемы(скорее всего решаемые) при использовании на трёхзвенной системе. Нужно будет как — то получать активного пользователя.
  • Может изменится схема pg_catalog. Из за чего всё везде придётся снимать и в срочном порядке чинить.
  • Похожие статьи
  • Предыдущие из рубрики