iT邦幫忙

0

如何使用 Grafana Loki 分析 SQL Server Audit 日誌

  • 分享至 

  • xImage
  •  

上一篇文章已經教大家如何針對特定事件分析 Windows 登入日誌,該技巧幾乎可以運用在任何類型的日誌。今天就是要來教大家高階一點的應用,如何針對 SQL Server Audit 日誌進行分析與儀表板的製作。

首先我們要使用 SQL Server Audit 功能,沒用過的朋友請參考此篇文章

將使用資料庫或者資料表執行 CREATE、ALTER 或 DROP 作為演示範例

建立稽核
從物件總管中點選執行個體 > 安全性 > 稽核,新增稽核。

這個稽核是用於定義蒐集到的稽核紀錄如何儲存,選擇 Application Log。

剛被建立的稽核會處於停用狀態,必須要啟用紀錄才能夠被保存。

新增伺服器稽核規格
從物件總管中點選執行個體 > 安全性 > 伺服器稽核規格,新增伺服器稽核規格。

輸入伺服器稽核規格的名稱並選擇上面所建立的稽核

我們想要稽核資料庫或者資料表執行 CREATE、ALTER 或 DROP 陳述式時,選擇以下稽核動作群組名稱。

  • DATABASE_OBJECT_CHANGE_GROUP
  • SCHEMA_OBJECT_CHANGE_GROUP

關於稽核動作群組名稱的詳細描述,請參考此篇文章

完成伺服器稽核規格設定後,一樣需要啟用才會開始進行稽核。
需要注意的是伺服器稽核規格與稽核都必須要同時啟動,才能夠正確紀錄。

檢視稽核紀錄
確認建立資料庫與資料表的動作是否會被稽核記錄

利用下列的 T-SQL 指令碼建立資料庫

USE [master]
GO

CREATE DATABASE [Database_1]
 CONTAINMENT = NONE
 ON  PRIMARY 
( NAME = N'Database_1', FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\Database_1.mdf' , SIZE = 8192KB , MAXSIZE = UNLIMITED, FILEGROWTH = 65536KB )
 LOG ON 
( NAME = N'Database_1_log', FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\Database_1_log.ldf' , SIZE = 8192KB , MAXSIZE = 2048GB , FILEGROWTH = 65536KB )
GO

利用下列的 T-SQL 指令碼建立資料表

USE [Database_1]
GO

CREATE TABLE [dbo].[Table_1](
 [Column_1] [nchar](10) NULL
) ON [PRIMARY]
GO

確認建立資料庫與資料表的動作都被稽核記錄下來了
資料庫的稽核紀錄

資料表的稽核紀錄

利用下列的 T-SQL 指令碼新增一些測試資料,等一下建立儀表板會使用到。

USE [Database_1]
GO

CREATE TABLE [dbo].[Table_2](
 [Column_1] [nchar](10) NULL
) ON [PRIMARY]
GO

CREATE VIEW [dbo].[View_1] 
AS  
SELECT *  
FROM [dbo].[Table_1]
GO

CREATE VIEW [dbo].[View_2] 
AS  
SELECT *  
FROM [dbo].[Table_2]
GO

ALTER VIEW [dbo].[View_2] 
AS  
SELECT *  
FROM [dbo].[Table_1]
UNION ALL 
SELECT *  
FROM [dbo].[Table_2]
GO

DROP VIEW [dbo].[View_2] 
GO

Promtail
使用到的 promtail-local-config.yaml 範例如下,主要是將 Application 日誌推送到 Loki 並將 source、event_id 與 leveltext 欄位貼上標籤。

server:
  http_listen_port: 9080
  grpc_listen_port: 0

positions:
  filename: "./positions.yaml"
clients:
  - url: http://your_loki_ip:3100/loki/api/v1/push
    
scrape_configs:
- job_name: windows
  windows_events:
    eventlog_name: "Application"
    use_incoming_timestamp: true
    xpath_query: '*'
    bookmark_path: "./bookmark-application.xml"
    exclude_event_data: true
    exclude_user_data: true
    labels:
      logsource: windows-eventlog
  pipeline_stages:
  - json:
      expressions:
        source: source
        eventID: event_id
        level: levelText
  - labels:
      source:
      eventID:
      level:

一定有朋友好奇,那我能不能把稽核紀錄如何儲存到 File。

再透過 static_configs 讀取 AuditLog 底下的日誌呢?

server:
  http_listen_port: 9080
  grpc_listen_port: 0

positions:
  filename: "./positions.yaml"
clients:
  - url: http://your_loki_ip:3100/loki/api/v1/push
    
scrape_configs:
- job_name: mssql
  static_configs:
  - targets:
      - localhost
    labels:
      host: your_hostname
      job: auditlog
      __path__: D:\AuditLog\*.sqlaudit

答案是不行的,該日誌檔已經被 SQL Server 加密過了。

建立一個 SQL Server Audit 事件日誌儀表板,點選 Setting 中的 Variables。

我們先建立兩個變數 computer 與 search 之後會過濾會使用到。

變數 computer
General

  • Name 輸入 computer 當作變數名稱使用
  • Label 輸入電腦做為顯示名稱

Query options

  • Data Source 選擇 Loki
  • Query Type 選擇 Label vaules,Label 選擇 computer

Regex

  • 就依自己的命名規則,看如何篩選出 SQL Server 囉

Selection options

  • 勾選 Multi-value
  • 勾選 Include All option

若有配置正確,Preview of vaules 會出現您想要的變數數值,按下 Apply。

變數 search
回到 Setting 中的 Variables,這次我們建立一個 Text box 類型的變數。

General

  • Name 輸入 search 當作變數名稱使用
  • Label 輸入搜尋做為顯示名稱

事件紀錄 Panel
新增一個 Panel,選擇 Table 可視化。資料來源選擇 Loki,順便填入 Title 為事件紀錄。

Label browser 輸入 LogQL 語法如下

  • 日誌流選擇器帶入變數 computer 過濾日誌源
  • 日誌管道帶入變數 search 過濾日誌內容
  • 透過 pattern 將解析到的欄位存成標籤
  • 透過 label_format 將 action_id 與 class_type 轉成名稱
  • 過濾 action_id 為 CREATE、ALTER、DROP

可以用 (?i) 做為正規表達式的前缀,切換為不區分大小寫。

{computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search" 
| pattern `<_>action_id:<action_id>\n<_>` 
| label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
| action_id =~ "(CREATE|ALTER|DROP)" 
| pattern `<_>class_type:<class_type>\n<_>` 
| pattern `<_>database_name:<database_name>\n<_>` 
| pattern `<_>object_name:<object_name>\n<_>` 
| pattern `<_>schema_name:<schema_name>\n<_>` 
| pattern `<_>server_instance_name:<server_instance_name>\n<_>` 
| pattern `<_>server_principal_name:<server_principal_name>\n<_>` 
| pattern `<_>object_name:<object_name>\n<_>` 
| pattern `<_>statement:<statement>\nadditional_information<_>` 
| pattern `<_>succeeded:<succeeded>\n<_>` 
| label_format class_type=`{{.class_type | trim | replace "DB" "DATABASE" | replace "U" "TABLE" | replace "V" "VIEW" | replace "P" "STORED PROCEDURE"}}`
| label_format statement=`{{.statement | replace "\\r\\n" " " | replace "\\r" " " | replace "\\n" " " | replace "\u005c\u005c" "\u005c"}}`

可以透過下列的 T-SQL 指令來查找 action_id 與 class_type 對應的名稱

select * from sys.dm_audit_actions
select * from sys.dm_audit_class_type_map

Table 可視化允許 Pagination 與 Column filter

先點選 labels 欄位看一下,確認想要分析的欄位都有正確解析到。

編輯 Panel 切到 Transform,加入 Extract fields,Source 選擇 labels。

如此一來,就可以把解析到的標籤們轉換成多個欄位進行使用。

但是這樣欄位太多了,可以透過 Transform 加入 Organize fields 隱藏或排序欄位,顯示有意義的欄位即可。

畫面就乾淨許多了,也可以針對欄位進行過濾。

事件紀錄 Panel 基本上就搞定了

時間軸 Panel
新增一個 Panel,選擇 Time Series 可視化。資料來源選擇 Loki,順便填入 Title 為時間軸。

  • Graph Style 選擇 Bar。
  • Query Legend 填入 {{action_id}}

Label browser 輸入 LogQL 語法如下,日誌流選擇器帶入我們設定的變數。可根據下拉選項的過濾,依照 action_id 的數量進行統計。

sum by (action_id) (
    count_over_time({computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
    |~ "(?i)$search" 
    | pattern `<_>action_id:<action_id>\n<_>` 
    | label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
    | action_id =~ "(CREATE|ALTER|DROP)"
    [$__interval])
)

動作統計 Panel
新增一個 Panel,選擇 Pie Chart 可視化。資料來源選擇 Loki,順便填入 Title 為動作統計。

  • Query Legend 填入 {{action_id}}
  • Value Options Calculation 選擇 Total
  • Pie Chart Type 選擇 Donut
  • Pie Chart Labels 選擇 Percent
  • Legend Placement 選擇 Right

Label browser 輸入 LogQL 語法如下,日誌流選擇器帶入我們設定的變數。可根據下拉選項的過濾,依照 action_id 的數量進行統計。

sum by (action_id) (
    count_over_time({computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
    |~ "(?i)$search" 
    | pattern `<_>action_id:<action_id>\n<_>` 
    | label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
    | action_id =~ "(CREATE|ALTER|DROP)"
    [$__interval])
)

類別類型統計 Panel
新增一個 Panel,選擇 Pie Chart 可視化。資料來源選擇 Loki,順便填入 Title 為類別類型統計。

  • Query Legend 填入 {{class_type}}
  • 其他選項同上

Label browser 輸入 LogQL 語法如下,日誌流選擇器帶入我們設定的變數。可根據下拉選項的過濾,依照 class_type 的數量進行統計。

sum by (class_type) (
    count_over_time({computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
    |~ "(?i)$search" 
    | pattern `<_>action_id:<action_id>\n<_>` 
    | pattern `<_>class_type:<class_type>\n<_>`
    | label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
    | label_format class_type=`{{ .class_type | trim | replace "DB" "DATABASE" | replace "U" "TABLE" | replace "V" "VIEW" | replace "P" "STORED PROCEDURE"}}`
    | action_id =~ "(CREATE|ALTER|DROP)"
    [$__interval])
)

資料庫名稱統計 Panel
新增一個 Panel,選擇 Pie Chart 可視化。資料來源選擇 Loki,順便填入 Title 為資料庫名稱統計。

  • Query Legend 填入 {{database_name}}
  • 其他選項同上

Label browser 輸入 LogQL 語法如下,日誌流選擇器帶入我們設定的變數。可根據下拉選項的過濾,依照 database_name 的數量進行統計。

sum by (database_name) (
    count_over_time({computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
    |~ "(?i)$search" 
    | pattern `<_>action_id:<action_id>\n<_>` 
    | pattern `<_>database_name:<database_name>\n<_>` 
    | label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
    | action_id =~ "(CREATE|ALTER|DROP)"
    [$__interval])
)

物件名稱統計 Panel
新增一個 Panel,選擇 Pie Chart 可視化。資料來源選擇 Loki,順便填入 Title 為物件名稱統計。

  • Query Legend 填入 {{object_name}}
  • 其他選項同上

Label browser 輸入 LogQL 語法如下,日誌流選擇器帶入我們設定的變數。可根據下拉選項的過濾,依照 object_name 的數量進行統計。

sum by (object_name) (
    count_over_time({computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
    |~ "(?i)$search" 
    | pattern `<_>action_id:<action_id>\n<_>` 
    | pattern `<_>object_name:<object_name>\n<_>` 
    | label_format action_id=`{{.action_id | trim | replace "CR" "CREATE" | replace "AL" "ALTER" | replace "DR" "DROP"}}`
    | action_id =~ "(CREATE|ALTER|DROP)"
    [$__interval])
)

最後完成的儀表板如下,我們也可以使用關鍵字來進行過濾。

若您是想要分析資料操作語言(DML),例如 SELECT、INSERT、UPDAT 與 DELETE,只要仿照上面的步驟執行應該不難實作出來的。

例如我們想要稽核資料表執行 SELECT、INSERT、UPDATE 或 DELETE 陳述式時,選擇以下稽核動作群組名稱。

  • SCHEMA_OBJECT_ACCESS_GROUP

提供的參考的 LogQL 語法如下

{computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search" 
| pattern `<_>action_id:<action_id>\n<_>` 
| label_format action_id=`{{.action_id | trim | replace "SL" "SELECT" | replace "IN" "INSERT" | replace "UP" "UPDATE" | replace "DL" "DELETE"}}`
| action_id =~ "(SELECT|INSERT|UPDATE|DELETE)" 
| pattern `<_>class_type:<class_type>\n<_>` 
| pattern `<_>database_name:<database_name>\n<_>` 
| pattern `<_>object_name:<object_name>\n<_>` 
| pattern `<_>schema_name:<schema_name>\n<_>` 
| pattern `<_>server_instance_name:<server_instance_name>\n<_>` 
| pattern `<_>server_principal_name:<server_principal_name>\n<_>` 
| pattern `<_>object_name:<object_name>\n<_>` 
| pattern `<_>statement:<statement>\nadditional_information<_>` 
| pattern `<_>succeeded:<succeeded>\n<_>` 
| label_format class_type=`{{.class_type | trim | replace "DB" "DATABASE" | replace "U" "TABLE" | replace "V" "VIEW" | replace "P" "STORED PROCEDURE"}}`
| label_format statement=`{{.statement | replace "\\r\\n" " " | replace "\\r" " " | replace "\\n" " "}}`

成功登入與失敗的稽核紀錄當然也沒有問題

例如我們想要稽核已成功登入 SQL Server 或者失敗的主體,選擇以下稽核動作群組名稱。

  • SUCCESSFUL_LOGIN_GROUP
  • FAILED_LOGIN_GROUP

成功登入的 LogQL 語法如下

{computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search" 
| pattern `<_>action_id:<action_id>\n<_>` 
| label_format action_id=`{{.action_id | trim | replace "LGIS" "LOGIN SUCCEEDED"}}`
| action_id ="LOGIN SUCCEEDED" 
| pattern `<_>class_type:<class_type>\n<_>` 
| label_format class_type=`{{.class_type | trim | replace "LX" "LOGIN"}}`
| pattern `<_>server_instance_name:<server_instance_name>\n<_>` 
| pattern `<_>server_principal_name:<server_principal_name>\n<_>` 
| pattern `<_>network protocol:<network_protocol>\r\n<_>` 
| label_format network_protocol=`{{.network_protocol | trim}}`
| label_format statement =""
| pattern `<_>\u003caddress\u003e<ip_address>\u003c/address\u003e<_>` 
| server_principal_name !=""
| network_protocol =~"(TCP/IP|LPC)"
| ip_address !="local machine"

登入失敗的 LogQL 語法如下

{computer=~"$computer", source="MSSQLSERVER", eventID="33205"}
|~ "(?i)$search" 
| pattern `<_>action_id:<action_id>\n<_>` 
| label_format action_id=`{{.action_id | trim | replace "LGIF" "LOGIN FAILED"}}`
| action_id ="LOGIN FAILED" 
| pattern `<_>class_type:<class_type>\n<_>` 
| label_format class_type =`{{.class_type | trim | replace "LX" "LOGIN"}}`
| pattern `<_>\nserver_principal_name:<server_principal_name>\n<_>` 
| pattern `<_>statement:<statement>\n<_>` 
| pattern `<_>\u003caddress\u003e<ip_address>\u003c/address\u003e<_>`

再透過 Transform 加入 Merge,將這兩段的結果整併在同個 Panel 即可。

今天的分享就到這邊,希望有幫助到大家。

下一篇再來分享如何使用 Grafana Loki 警報規則並透過 Alertmanager 發送警告

參考文件

  1. https://learn.microsoft.com/en-us/sql/relational-databases/system-dynamic-management-views/sys-dm-audit-actions-transact-sql?view=sql-server-ver16
  2. https://learn.microsoft.com/en-us/sql/relational-databases/system-dynamic-management-views/sys-dm-audit-class-type-map-transact-sql?view=sql-server-ver16

圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言