数据库内核月报

数据库内核月报 - 2017 / 06

MSSQL · 实现分析 · SQL Server实现审计日志的方案探索

Author: 风移

摘要

这篇文章介绍四种实现MSSQL Server审计日志功能的方法探索,即解析数据库事务日志、SQL Profiler、SQL Audit以及Extended Event。详细介绍了这四种方法的具体实现,以及通过优缺点的对比和总结,最终得出结论,使用Extended Event实现审计日志是最好的选择,为产品化选型提供参考。

审计日志需求分析

对于关系型数据库来而言,在生产环境SQL Server数据库实例中,审计日志是一个非常重要且必须的强需求功能,主要体现在以下几个方面。

安全审计

在一些存取敏感信息的产品环境数据库SQL Server实例中(比如:财务系统、设计到国家安全层面的数据库系统),对数据操作要求十分谨慎,安全要求等级十分严密,需要对每一条数据操作语句进行审计,以便做到每次数据变动或查看均可追溯。在这个场景中,对敏感信息操作的审计是基于数据安全性的要求。

问题排查

在日常生产系统管理维护过程中,我们经常会遇到类似的场景和疑问:能否找到是谁在哪个时间点执行了什么语句把数据XXX给删除(更新)了呢?笔者在从事DBA行业的几年工作经历过程中,无数次被问及到类似的问题。要解决这个场景中的问题,审计日志功能是不二选择。

性能调优

利用审计日志对数据库系统进行性能调优是审计日志非常重要的功能和用途。比如,以下是几个审计日志典型的应用场景:

……

实现审计日志的方法

基于以上对审计日志的需求分析,我们了解到审计日志的功能是关系型数据至关重要的强需求,让我们来看看SQL Server数据库系统有哪些实现审计日志功能的方法和具体实现,以及这些方法的优缺点对比。

数据库日志分析

在SQL Server数据库事务日志中,记录了每个事务的数据变更操作的详细信息,包含谁在哪个时间点做了什么操作。所以,我们可以基于SQL Server数据库事务日志的分析,来获取数据变更的详细审计日志信息。使用这个方法来实现审计日志功能的,有一家叫着ApexSQL的公司产品做的很不错,产品ApexSQL Log就是通过数据库事务日志来实现审计日志功能的产品,详情参见:ApexSQL Log。附一张来自ApexSQL官网的截图: 01.png

但是,由于SQL Server本身是微软的闭源产品,对于事务日志格式外界很难知道,所以这个方法的实现门槛很高,实现难度极大。加之,有可能不同版本的SQL Server事务日志格式存在差异,必须要对每个版本的事务日志解析做相应的适配,导致维护成本极高,产品功能延续性存在极大风险和挑战。

SQL Profiler

SQL Profiler是微软从SQL Server 2000开始引入的数据库引擎跟踪工具,具有使用界面操作的接口、使用SQL语句创建接口以及使用SMO编程创建接口。使用SQL Profiler,可以实现非常多的功能,比如:

……

所以,从功能完整性角度来说,我们完全可以使用SQL Profiler来实现就数据库实例级别的审计日志的功能。那么接下来让我们看看如何使用SQL Profiler实现审计日志的功能。

图形化创建

开始 => 运行 => 键入“Profiler” => 回车,打开Profiler工具后,点击“New Trace” => Server Name => Authentication => Connect,如下图所示: 02.png

然后,选择General => Save to table => 选择要保留到的实例名、数据库名、架构名和表名 => OK 03.png

接下来选择要跟踪的事件,Events Selection => SQL:StmtCompleted => Column Filters => LoginName => Not Like %sa% => OK => Run 04.png

使用SQL语句创建

使用图形化界面创建SQL Profiler实现审计日志功能,简单易用,很容易上手。但是,过程繁琐、效率不高,难于自动化。这个时候,就需要使用SQL语句来创建SQL Profiler功能,实现一键创建的方法了。

use master
GO

set nocount on

declare 
	@trace_folder nvarchar(256)
	,@trace_file nvarchar(256) 
	,@max_files_size bigint
	
	,@stop_time datetime
	,@file_count int

	,@int_filter_cpu int
	,@int_filter_duration bigint
	,@int_filter_spid int
	,@set_trace_status int
;

select 
	@trace_folder=N'C:\Temp\perfmon'
	
	,@max_files_size = 50			--max file size for each trace file
	,@file_count = 10				--max file count
	
	,@stop_time = '6/13/2017 10:50'	--null: stop trace manully; specify time (stop at the specify time)
	,@int_filter_cpu = NULL				-- >= @int_filter_cpu ms will be traced. or else, skipped.
										--NULL: ignore this filter
	,@int_filter_duration = NULL		--execution duration filter: millisecond
										--NULL: ignore this filter
	--,@int_filter_spid = 151			--integer: specify a spid to trace
										--				
										
	,@set_trace_status = 1	--0: Stops the specified trace.; 
							--1: Starts the specified trace.;
							--2: Closes the specified trace and deletes its definition from the server.;
;

/*

select * from sys.traces

*/
--private variables
declare
	@trace_id int
	,@do int
	,@loop int
	,@trace_event_id int
	,@trace_column_id int
	,@return_code tinyint
	,@return_decription varchar(200)
	,@field_separator char(1)

;	
select
	@field_separator = ','			--trace columns list separator
;

IF right(ltrim(rtrim(@trace_folder)), 1 ) <> '\'
BEGIN
	SELECT 
		@trace_folder = ltrim(rtrim(@trace_folder)) + N'\' 
	;
	exec sys.xp_create_subdir @trace_folder
END
;

select
	@trace_file = @trace_folder + REPLACE(@@SERVERNAME, N'\', N'')
;

IF @int_filter_spid IS NOT NULL
BEGIN
	select
		@trace_file = @trace_file + cast(@int_filter_spid as varchar)
	;
END

--select @trace_file

select top 1
	@trace_id = id
from sys.traces
where path like @trace_file + N'%'

if @trace_id is not null
begin
	
	-- Start Trace (status 1 = start)
	EXEC sys.sp_trace_setstatus @trace_id, @set_trace_status

	return
end

if OBJECT_ID('tempdb..#trace_event','u') is not null
	drop table #trace_event
create table #trace_event
(
	id int identity(1,1) not null primary key
	,trace_event_id int not null
	,trace_column_id int not null
	,event_name sysname null
	,trace_column_name sysname null
)

;with trace_event
as
(		--select * from sys.trace_events order by trace_event_id
	select 
		is_trace = 1 , event_name = 'SQL:StmtCompleted'
		,trace_column_list = 'NestLevel,ClientProcessID,EndTime,DatabaseID,GroupID,ServerName,SPID,DatabaseName,NTUserName,IntegerData2,RequestID,EventClass,SessionLoginName,NTDomainName,TextData,XactSequence,CPU,ApplicationName,Offset,LoginSid,TransactionID,IntegerData,Duration,SourceDatabaseID,LineNumber,ObjectID,Reads,RowCounts,Writes,IsSystem,ObjectName,LoginName,ObjectType,StartTime,HostName,EventSequence,'
),
trace_column
as(
	select 
		*
		,trace_column_list_xml = 
								CAST(
										'<V><![CDATA[' 
													+ REPLACE(
														REPLACE(
																REPLACE(
																			trace_column_list,CHAR(10),']]></V><V><![CDATA['
																		),@field_separator,']]></V><V><![CDATA['
																),CHAR(13),']]></V><V><![CDATA['
															) 
										+ ']]></V>'
									as xml
								)
	from trace_event
	where is_trace = 1
)
,data
as(
	select 
		trace_column = T.C.value('(./text())[1]','sysname')
		,event_name
	from trace_column AS a
		CROSS APPLY trace_column_list_xml.nodes('./V') AS T(C)
)
INSERT INTO #trace_event
select 
	trace_event_id = ev.trace_event_id
	,trace_column_id = col.trace_column_id
	,a.event_name
	,trace_column_name = a.trace_column
from data as a
	inner join sys.trace_columns as col
	on a.trace_column = col.name
	inner join sys.trace_events as ev
	on a.event_name = ev.name
where col.trace_column_id is not null
order by ev.trace_event_id
;

--select * from #trace_event

---private variables
select 
	@trace_id = 0
	,@do = 1
	,@loop = @@ROWCOUNT
	,@trace_event_id = 0
	,@trace_column_id = 0
	,@return_code = 0
	,@return_decription = ''
;

--create trace
exec @return_code = sys.sp_trace_create @traceid = @trace_id OUTPUT 
										, @options = 2  
										, @tracefile =  @trace_file
										, @maxfilesize = @max_files_size
										, @stoptime = @stop_time
										, @filecount =  @file_count
;

select 
	trace_id = @trace_id
	,[current_time] = getdate()
	,[stop_time] = @stop_time
;

set
	@return_decription = case @return_code
								when 0 then 'No error.'
								when 1 then 'Unknown error.'
								when 10 then 'Invalid options. Returned when options specified are incompatible.'
								when 12 then 'File not created.'
								when 13 then 'Out of memory. Returned when there is not enough memory to perform the specified action.'
								when 14 then 'Invalid stop time. Returned when the stop time specified has already happened.'
								when 15 then 'Invalid parameters. Returned when the user supplied incompatible parameters.'
							else ''
							end
;

raiserror('Trace create with:
%s',10,1,@return_decription) with nowait

--loop set trace event & event column
while @do <= @loop
begin
	select top 1
		@trace_event_id = trace_event_id
		,@trace_column_id = trace_column_id
	from #trace_event
	where id = @do
	;
	
	--set trace event
	exec sys.sp_trace_setevent @trace_id, @trace_event_id, @trace_column_id, 1
	raiserror('exec sys.sp_trace_setevent @trace_id, %d, %d, 1',10,1,@trace_event_id,@trace_column_id) with nowait
	
	set @do = @do + 1;
end

--CPU >= 500/ cpu columnid = 18
IF @int_filter_cpu IS NOT NULL
	EXEC sys.sp_trace_setfilter @trace_id, 18, 0, 4, @int_filter_cpu

--duration filter/ duration columnid=13
IF @int_filter_duration IS NOT NULL
	EXEC sys.sp_trace_setfilter @trace_id, 13, 0, 4, @int_filter_duration

--spid filter/ spid columnid=12
IF @int_filter_spid IS NOT NULL
	exec sys.sp_trace_setfilter @trace_id, 12, 0, 0, @int_filter_spid


--applicationName not like 'SQL Server Profiler%'
EXEC sys.sp_trace_setfilter @trace_id, 10, 0, 7, N'SQL Server Profiler%'

-- Start Trace (status 1 = start)
EXEC sys.sp_trace_setstatus @trace_id, @set_trace_status
GO

其中输入参数表达的含义解释如下:

@trace_folder:Trace文件存放的位置

@max_files_size:每一个Trace文件大小

@file_count:Trace滚动最多的文件数量

@stop_time:Trace停止的时间

@int_filter_cpu:CPU过滤阈值,CPU使用率超过这个值会被记录下来,单位毫秒

@int_filter_duration:执行时间过滤阈值,执行时间超过这个值会被记录,单位毫秒

@set_trace_status:Trace的状态:0停止;1启动;2删除

SMO编程创建

SQL Profiler除了使用图形化界面创建,使用系统存储过程创建两种方法以外,还可以使用SMO编程方法来创建。

SQL Audit

使用SQL Audit实现SQL Server审计日志功能需要以下三个步骤来完成:

创建实例级别Audit

使用Create Server Audit语句创建实例级别的Audit,方法如下:

USE [master]
GO

CREATE SERVER AUDIT [Audit_Svr_User_Defined_for_Testing]
TO FILE 
( FILEPATH = N'C:\Temp\Audit'
 ,MAXSIZE = 10 MB
 ,MAX_ROLLOVER_FILES = 10
 ,RESERVE_DISK_SPACE = OFF
)
WITH
( QUEUE_DELAY = 1000
 ,ON_FAILURE = CONTINUE
)
GO

启动实例级别的Audit,代码如下

USE [master]
GO
ALTER SERVER AUDIT [Audit_Svr_User_Defined_for_Testing] WITH(STATE=ON)
;
GO

创建数据库级别Audit Specification

实例级别Audit创建完毕后,接下来是对需要审计的数据库建立对于的Audit Specification,方法如下:

USE [testdb]
GO
CREATE DATABASE AUDIT SPECIFICATION [Audit_Spec_for_TestDB]
FOR SERVER AUDIT [Audit_Svr_User_Defined_for_Testing] 
ADD (SELECT, INSERT, UPDATE, DELETE, EXECUTE ON DATABASE::[testdb] BY [public])
WITH (STATE = ON);
GO

由于SQL Audit Specification是基于数据库级别的,所以存在以下场景的维护性复杂度增加:

读取审计日志文件

最后,我们需要将审计日志文件中存放的内容读取出来,使用SQL Server提供的系统函数sys.fn_get_audit_file,方法如下:

DECLARE 
	@AuditFilePath sysname
;
 
SELECT 
   @AuditFilePath = audit_file_path
FROM sys.dm_server_audit_status
WHERE name = 'Audit_Svr_User_Defined_for_Testing'
 
SELECT statement,* 
FROM sys.fn_get_audit_file(@AuditFilePath,default,default)
;

Extended Event

微软SQL Server产品长期的规划是逐渐使用Extended Event来替换SQL Profiler工具,因为Extended Event更加轻量级,性能消耗比SQL Profiler大幅降低,因此对用户系统性能影响也大幅减轻。在审计日志的应用场景中,只需要在实例级别创建一个Extended Event Session对象,然后启用即可。既满足了功能性的需求,又能够做到很好后期维护,不需要为某一个数据库创建相应对象,对实例的性能消耗大幅降低到5%左右。

创建Extended Event Session

使用Create Event Session On Server语句创建基于实例级别的Extended Event。语句如下:

USE master
GO

CREATE EVENT SESSION [svrXEvent_User_Define_Testing] ON SERVER 
ADD EVENT sqlserver.sql_statement_completed
( 
	ACTION 
	( 
		sqlserver.database_id,
		sqlserver.database_name,
		sqlserver.session_id, 
		sqlserver.username, 
		sqlserver.client_hostname,
		sqlserver.client_app_name,
		sqlserver.sql_text, 
		sqlserver.query_hash,
		sqlserver.query_plan_hash,
		sqlserver.plan_handle,
		sqlserver.tsql_stack,
		sqlserver.is_system,
		package0.collect_system_time
	) 
	WHERE sqlserver.username <> N'NT AUTHORITY\SYSTEM'
		AND sqlserver.username <> 'sa'
		AND (NOT sqlserver.like_i_sql_unicode_string(sqlserver.client_app_name, '%IntelliSense'))
		AND sqlserver.is_system = 0
		
)
ADD TARGET package0.asynchronous_file_target
( 
	SET 
		FILENAME = N'C:\Temp\svrXEvent_User_Define_Testing.xel', 
		MAX_FILE_SIZE = 10,
		MAX_ROLLOVER_FILES = 100
)
WITH (
	EVENT_RETENTION_MODE = NO_EVENT_LOSS,
	MAX_DISPATCH_LATENCY = 5 SECONDS
);
GO

启用Extended Event Session

Extended Event Session对象创建完毕后,需要启动这个session对象,方法如下:

USE master
GO

-- We need to enable event session to capture event and event data 
ALTER EVENT SESSION [svrXEvent_User_Define_Testing]
ON SERVER STATE = START;
GO

读取审计日志文件

Extend Event生成审计日志文件以后,我们可以使用sys.fn_xe_file_target_read_file系统函数来读取,然后分析event_data列所记录的详细信息。

USE master
GO
SELECT *
FROM sys.fn_xe_file_target_read_file('C:\Temp\svrXEvent_User_Define_Testing*.xel', null, null, null)

方案对比

根据前面章节“实现审计日志的方法”部分的介绍,我们从可靠性、对象级别、可维护性、开销和对数据库系统的影响五个方面来总结这四种技术的优缺点。

四种技术方案优缺点汇总如下表所示: 05.png

以下是对四种实现审计日志方法五个维度打分,得分统计汇总如下表所示: 06.png

将汇总得分做成雷达图,如下图所示: 07.png

从雷达图我们可以很清楚的看到,综合考虑可靠性、可维护性、系统开销和影响来看,使用Extended Event实现审计日志的方法是最优的选择。

最后总结

本期分享了SQL Server实现审计日志功能的四种技术方案和详细实现,并从可靠性、可维护性、对象级别、系统开销和影响五个维度分析了四种方案各自的优缺点,最后的结论是使用Extended Event实现审计日志方法是最优选择,以此来为我们的产品化做出正确的选择。