Author: 风移
这篇文章介绍四种实现MSSQL Server审计日志功能的方法探索,即解析数据库事务日志、SQL Profiler、SQL Audit以及Extended Event。详细介绍了这四种方法的具体实现,以及通过优缺点的对比和总结,最终得出结论,使用Extended Event实现审计日志是最好的选择,为产品化选型提供参考。
对于关系型数据库来而言,在生产环境SQL Server数据库实例中,审计日志是一个非常重要且必须的强需求功能,主要体现在以下几个方面。
安全审计
问题排查
性能调优
在一些存取敏感信息的产品环境数据库SQL Server实例中(比如:财务系统、设计到国家安全层面的数据库系统),对数据操作要求十分谨慎,安全要求等级十分严密,需要对每一条数据操作语句进行审计,以便做到每次数据变动或查看均可追溯。在这个场景中,对敏感信息操作的审计是基于数据安全性的要求。
在日常生产系统管理维护过程中,我们经常会遇到类似的场景和疑问:能否找到是谁在哪个时间点执行了什么语句把数据XXX给删除(更新)了呢?笔者在从事DBA行业的几年工作经历过程中,无数次被问及到类似的问题。要解决这个场景中的问题,审计日志功能是不二选择。
利用审计日志对数据库系统进行性能调优是审计日志非常重要的功能和用途。比如,以下是几个审计日志典型的应用场景:
找出某段时间内哪些语句导致了系统性能消耗严重(比如:CPU、IOPS等)
找出某段时间内的TOP CPU SQL语句
找出某段时间内的TOP IO SQL语句
找出某段时间内的TOP Time Cost SQL语句
找出某段时间内哪个用户使用的数据库系统资源最多
找出某段时间内哪个应用使用的数据库系统资源最多
……
基于以上对审计日志的需求分析,我们了解到审计日志的功能是关系型数据至关重要的强需求,让我们来看看SQL Server数据库系统有哪些实现审计日志功能的方法和具体实现,以及这些方法的优缺点对比。
在SQL Server数据库事务日志中,记录了每个事务的数据变更操作的详细信息,包含谁在哪个时间点做了什么操作。所以,我们可以基于SQL Server数据库事务日志的分析,来获取数据变更的详细审计日志信息。使用这个方法来实现审计日志功能的,有一家叫着ApexSQL的公司产品做的很不错,产品ApexSQL Log就是通过数据库事务日志来实现审计日志功能的产品,详情参见:ApexSQL Log。附一张来自ApexSQL官网的截图:
但是,由于SQL Server本身是微软的闭源产品,对于事务日志格式外界很难知道,所以这个方法的实现门槛很高,实现难度极大。加之,有可能不同版本的SQL Server事务日志格式存在差异,必须要对每个版本的事务日志解析做相应的适配,导致维护成本极高,产品功能延续性存在极大风险和挑战。
SQL Profiler是微软从SQL Server 2000开始引入的数据库引擎跟踪工具,具有使用界面操作的接口、使用SQL语句创建接口以及使用SMO编程创建接口。使用SQL Profiler,可以实现非常多的功能,比如:
图形化监控数据库引擎执行的SQL语句(也可以将执行语句保存到表中)
查看执行语句实时的执行计划
数据库引擎错误信息排查
数据库性能分析
阻塞,锁等待、锁升级及死锁跟踪
后台收集查询语句信息
……
所以,从功能完整性角度来说,我们完全可以使用SQL Profiler来实现就数据库实例级别的审计日志的功能。那么接下来让我们看看如何使用SQL Profiler实现审计日志的功能。
开始 => 运行 => 键入“Profiler” => 回车,打开Profiler工具后,点击“New Trace” => Server Name => Authentication => Connect,如下图所示:
然后,选择General => Save to table => 选择要保留到的实例名、数据库名、架构名和表名 => OK
接下来选择要跟踪的事件,Events Selection => SQL:StmtCompleted => Column Filters => LoginName => Not Like %sa% => OK => Run
使用图形化界面创建SQL Profiler实现审计日志功能,简单易用,很容易上手。但是,过程繁琐、效率不高,难于自动化。这个时候,就需要使用SQL语句来创建SQL Profiler功能,实现一键创建的方法了。
use master
GO
set nocount on
declare
@trace_folder nvarchar(256)
,@trace_file nvarchar(256)
,@max_files_size bigint
,@stop_time datetime
,@file_count int
,@int_filter_cpu int
,@int_filter_duration bigint
,@int_filter_spid int
,@set_trace_status int
;
select
@trace_folder=N'C:\Temp\perfmon'
,@max_files_size = 50 --max file size for each trace file
,@file_count = 10 --max file count
,@stop_time = '6/13/2017 10:50' --null: stop trace manully; specify time (stop at the specify time)
,@int_filter_cpu = NULL -- >= @int_filter_cpu ms will be traced. or else, skipped.
--NULL: ignore this filter
,@int_filter_duration = NULL --execution duration filter: millisecond
--NULL: ignore this filter
--,@int_filter_spid = 151 --integer: specify a spid to trace
--
,@set_trace_status = 1 --0: Stops the specified trace.;
--1: Starts the specified trace.;
--2: Closes the specified trace and deletes its definition from the server.;
;
/*
select * from sys.traces
*/
--private variables
declare
@trace_id int
,@do int
,@loop int
,@trace_event_id int
,@trace_column_id int
,@return_code tinyint
,@return_decription varchar(200)
,@field_separator char(1)
;
select
@field_separator = ',' --trace columns list separator
;
IF right(ltrim(rtrim(@trace_folder)), 1 ) <> '\'
BEGIN
SELECT
@trace_folder = ltrim(rtrim(@trace_folder)) + N'\'
;
exec sys.xp_create_subdir @trace_folder
END
;
select
@trace_file = @trace_folder + REPLACE(@@SERVERNAME, N'\', N'')
;
IF @int_filter_spid IS NOT NULL
BEGIN
select
@trace_file = @trace_file + cast(@int_filter_spid as varchar)
;
END
--select @trace_file
select top 1
@trace_id = id
from sys.traces
where path like @trace_file + N'%'
if @trace_id is not null
begin
-- Start Trace (status 1 = start)
EXEC sys.sp_trace_setstatus @trace_id, @set_trace_status
return
end
if OBJECT_ID('tempdb..#trace_event','u') is not null
drop table #trace_event
create table #trace_event
(
id int identity(1,1) not null primary key
,trace_event_id int not null
,trace_column_id int not null
,event_name sysname null
,trace_column_name sysname null
)
;with trace_event
as
( --select * from sys.trace_events order by trace_event_id
select
is_trace = 1 , event_name = 'SQL:StmtCompleted'
,trace_column_list = 'NestLevel,ClientProcessID,EndTime,DatabaseID,GroupID,ServerName,SPID,DatabaseName,NTUserName,IntegerData2,RequestID,EventClass,SessionLoginName,NTDomainName,TextData,XactSequence,CPU,ApplicationName,Offset,LoginSid,TransactionID,IntegerData,Duration,SourceDatabaseID,LineNumber,ObjectID,Reads,RowCounts,Writes,IsSystem,ObjectName,LoginName,ObjectType,StartTime,HostName,EventSequence,'
),
trace_column
as(
select
*
,trace_column_list_xml =
CAST(
'<V><![CDATA['
+ REPLACE(
REPLACE(
REPLACE(
trace_column_list,CHAR(10),']]></V><V><![CDATA['
),@field_separator,']]></V><V><![CDATA['
),CHAR(13),']]></V><V><![CDATA['
)
+ ']]></V>'
as xml
)
from trace_event
where is_trace = 1
)
,data
as(
select
trace_column = T.C.value('(./text())[1]','sysname')
,event_name
from trace_column AS a
CROSS APPLY trace_column_list_xml.nodes('./V') AS T(C)
)
INSERT INTO #trace_event
select
trace_event_id = ev.trace_event_id
,trace_column_id = col.trace_column_id
,a.event_name
,trace_column_name = a.trace_column
from data as a
inner join sys.trace_columns as col
on a.trace_column = col.name
inner join sys.trace_events as ev
on a.event_name = ev.name
where col.trace_column_id is not null
order by ev.trace_event_id
;
--select * from #trace_event
---private variables
select
@trace_id = 0
,@do = 1
,@loop = @@ROWCOUNT
,@trace_event_id = 0
,@trace_column_id = 0
,@return_code = 0
,@return_decription = ''
;
--create trace
exec @return_code = sys.sp_trace_create @traceid = @trace_id OUTPUT
, @options = 2
, @tracefile = @trace_file
, @maxfilesize = @max_files_size
, @stoptime = @stop_time
, @filecount = @file_count
;
select
trace_id = @trace_id
,[current_time] = getdate()
,[stop_time] = @stop_time
;
set
@return_decription = case @return_code
when 0 then 'No error.'
when 1 then 'Unknown error.'
when 10 then 'Invalid options. Returned when options specified are incompatible.'
when 12 then 'File not created.'
when 13 then 'Out of memory. Returned when there is not enough memory to perform the specified action.'
when 14 then 'Invalid stop time. Returned when the stop time specified has already happened.'
when 15 then 'Invalid parameters. Returned when the user supplied incompatible parameters.'
else ''
end
;
raiserror('Trace create with:
%s',10,1,@return_decription) with nowait
--loop set trace event & event column
while @do <= @loop
begin
select top 1
@trace_event_id = trace_event_id
,@trace_column_id = trace_column_id
from #trace_event
where id = @do
;
--set trace event
exec sys.sp_trace_setevent @trace_id, @trace_event_id, @trace_column_id, 1
raiserror('exec sys.sp_trace_setevent @trace_id, %d, %d, 1',10,1,@trace_event_id,@trace_column_id) with nowait
set @do = @do + 1;
end
--CPU >= 500/ cpu columnid = 18
IF @int_filter_cpu IS NOT NULL
EXEC sys.sp_trace_setfilter @trace_id, 18, 0, 4, @int_filter_cpu
--duration filter/ duration columnid=13
IF @int_filter_duration IS NOT NULL
EXEC sys.sp_trace_setfilter @trace_id, 13, 0, 4, @int_filter_duration
--spid filter/ spid columnid=12
IF @int_filter_spid IS NOT NULL
exec sys.sp_trace_setfilter @trace_id, 12, 0, 0, @int_filter_spid
--applicationName not like 'SQL Server Profiler%'
EXEC sys.sp_trace_setfilter @trace_id, 10, 0, 7, N'SQL Server Profiler%'
-- Start Trace (status 1 = start)
EXEC sys.sp_trace_setstatus @trace_id, @set_trace_status
GO
其中输入参数表达的含义解释如下:
@trace_folder:Trace文件存放的位置
@max_files_size:每一个Trace文件大小
@file_count:Trace滚动最多的文件数量
@stop_time:Trace停止的时间
@int_filter_cpu:CPU过滤阈值,CPU使用率超过这个值会被记录下来,单位毫秒
@int_filter_duration:执行时间过滤阈值,执行时间超过这个值会被记录,单位毫秒
@set_trace_status:Trace的状态:0停止;1启动;2删除
SQL Profiler除了使用图形化界面创建,使用系统存储过程创建两种方法以外,还可以使用SMO编程方法来创建。
使用SQL Audit实现SQL Server审计日志功能需要以下三个步骤来完成:
创建实例级别的Audit并启动
创建数据库级别的Audit Specification
读取审计日志文件
使用Create Server Audit语句创建实例级别的Audit,方法如下:
USE [master]
GO
CREATE SERVER AUDIT [Audit_Svr_User_Defined_for_Testing]
TO FILE
( FILEPATH = N'C:\Temp\Audit'
,MAXSIZE = 10 MB
,MAX_ROLLOVER_FILES = 10
,RESERVE_DISK_SPACE = OFF
)
WITH
( QUEUE_DELAY = 1000
,ON_FAILURE = CONTINUE
)
GO
启动实例级别的Audit,代码如下
USE [master]
GO
ALTER SERVER AUDIT [Audit_Svr_User_Defined_for_Testing] WITH(STATE=ON)
;
GO
实例级别Audit创建完毕后,接下来是对需要审计的数据库建立对于的Audit Specification,方法如下:
USE [testdb]
GO
CREATE DATABASE AUDIT SPECIFICATION [Audit_Spec_for_TestDB]
FOR SERVER AUDIT [Audit_Svr_User_Defined_for_Testing]
ADD (SELECT, INSERT, UPDATE, DELETE, EXECUTE ON DATABASE::[testdb] BY [public])
WITH (STATE = ON);
GO
由于SQL Audit Specification是基于数据库级别的,所以存在以下场景的维护性复杂度增加:
用户需要审计实例中某些或者所有数据库,必须在每个需要审计的数据库下创建对象
用户实例有新数据库创建,并需要审计日志功能时,必须在新的数据库下创建对象
最后,我们需要将审计日志文件中存放的内容读取出来,使用SQL Server提供的系统函数sys.fn_get_audit_file,方法如下:
DECLARE
@AuditFilePath sysname
;
SELECT
@AuditFilePath = audit_file_path
FROM sys.dm_server_audit_status
WHERE name = 'Audit_Svr_User_Defined_for_Testing'
SELECT statement,*
FROM sys.fn_get_audit_file(@AuditFilePath,default,default)
;
微软SQL Server产品长期的规划是逐渐使用Extended Event来替换SQL Profiler工具,因为Extended Event更加轻量级,性能消耗比SQL Profiler大幅降低,因此对用户系统性能影响也大幅减轻。在审计日志的应用场景中,只需要在实例级别创建一个Extended Event Session对象,然后启用即可。既满足了功能性的需求,又能够做到很好后期维护,不需要为某一个数据库创建相应对象,对实例的性能消耗大幅降低到5%左右。
使用Create Event Session On Server语句创建基于实例级别的Extended Event。语句如下:
USE master
GO
CREATE EVENT SESSION [svrXEvent_User_Define_Testing] ON SERVER
ADD EVENT sqlserver.sql_statement_completed
(
ACTION
(
sqlserver.database_id,
sqlserver.database_name,
sqlserver.session_id,
sqlserver.username,
sqlserver.client_hostname,
sqlserver.client_app_name,
sqlserver.sql_text,
sqlserver.query_hash,
sqlserver.query_plan_hash,
sqlserver.plan_handle,
sqlserver.tsql_stack,
sqlserver.is_system,
package0.collect_system_time
)
WHERE sqlserver.username <> N'NT AUTHORITY\SYSTEM'
AND sqlserver.username <> 'sa'
AND (NOT sqlserver.like_i_sql_unicode_string(sqlserver.client_app_name, '%IntelliSense'))
AND sqlserver.is_system = 0
)
ADD TARGET package0.asynchronous_file_target
(
SET
FILENAME = N'C:\Temp\svrXEvent_User_Define_Testing.xel',
MAX_FILE_SIZE = 10,
MAX_ROLLOVER_FILES = 100
)
WITH (
EVENT_RETENTION_MODE = NO_EVENT_LOSS,
MAX_DISPATCH_LATENCY = 5 SECONDS
);
GO
Extended Event Session对象创建完毕后,需要启动这个session对象,方法如下:
USE master
GO
-- We need to enable event session to capture event and event data
ALTER EVENT SESSION [svrXEvent_User_Define_Testing]
ON SERVER STATE = START;
GO
Extend Event生成审计日志文件以后,我们可以使用sys.fn_xe_file_target_read_file系统函数来读取,然后分析event_data列所记录的详细信息。
USE master
GO
SELECT *
FROM sys.fn_xe_file_target_read_file('C:\Temp\svrXEvent_User_Define_Testing*.xel', null, null, null)
根据前面章节“实现审计日志的方法”部分的介绍,我们从可靠性、对象级别、可维护性、开销和对数据库系统的影响五个方面来总结这四种技术的优缺点。
可靠性:这四种实现审计日志的方法可靠性都有保障,如果使用数字化衡量可维护性,得满分100分
对象级别:SQL Profiler和Extended Event是基于实例级别的技术方案;解析事务日志解析和SQL Audit方法是基于数据库级别的技术,一旦有数据库创建或者删除操作,需要做相应的适配,所以维护成本也相对高。基于数据库级别的方案得分为0,基于实例级别得分为100
维护性:基于实例级别的实现方法可维护性(得分100)显然优于基于数据库级别(得分为0)的实现方式
开销:SQL Profiler对数据库系统开销很大,大概20%左右(得分100 - 20 = 80),其他三种开销较小5%左右(得分100 - 5 = 95)
影响:开销大的技术方案自然影响就大,反之亦然。得分与开销部分类似。
四种技术方案优缺点汇总如下表所示:
以下是对四种实现审计日志方法五个维度打分,得分统计汇总如下表所示:
将汇总得分做成雷达图,如下图所示:
从雷达图我们可以很清楚的看到,综合考虑可靠性、可维护性、系统开销和影响来看,使用Extended Event实现审计日志的方法是最优的选择。
本期分享了SQL Server实现审计日志功能的四种技术方案和详细实现,并从可靠性、可维护性、对象级别、系统开销和影响五个维度分析了四种方案各自的优缺点,最后的结论是使用Extended Event实现审计日志方法是最优选择,以此来为我们的产品化做出正确的选择。