본 문서는 2025년 12월 19일 기준으로 작성되었습니다.
Elasticsearch 9.2.0 버전을 사용하였습니다.
보안 탐지 룰을 운영하다 보면 이런 순간이 옵니다.
정상 트래픽인데 알림이 계속 발생한다
룰의 탐지 범위를 느슨하게 만들면 실제로 위협인 경우를 놓칠까봐 불안하다
이를 처리하기 위한 예외사항을 룰마다 따로 적용, 관리하기 어렵다.
심지어 실시간으로 예외사항이 변화한다.
이럴 때 Elastic Security의 Shared Exception List 기능을 잘 쓰면, 룰은 유지하면서 오탐(False Positive, 정상인데 오류로 탐지)만 깔끔하게 제외할 수 있습니다. 그중에서도 오늘은 운영 난이도를 확 낮춰주는 Shared Exception List를 소개해볼까 합니다. 더불어, Shared Exception List와 Watcher를 활용하여 실시간으로 변화하는 예외 사항을 반영하는 방법도 알아보겠습니다.
Shared Exception List란, 보안 룰에 대한 예외 조건을 하나의 리스트로 묶어두고, 여러 탐지 룰에 공통으로 적용할 수 있게 해주는 기능입니다. Shared Exception List를 사용하면 여러 룰에 같은 예외사항을 반복해서 추가할 필요가 없고, 한번 수정하면 모든 연결된 룰에 실시간으로 반영이 되어서 운영 관리가 편리하다는 장점이 있습니다.
여러개의 보안 룰에 적용할 예외사항을 리스트와 연결 가능
리스트 안에 예외사항을 아이템이라는 단위로 여러 개 넣을 수 있음
아이템 중 하나라도 매치 되면 해당 이벤트는 예외 처리되어 알림에서 빠짐
복잡한 여러 개의 조건이 필요하면 아이템 안에 엔트리를 여러 개 넣어서 구성
직접 Shared Exception List를 설정해보겠습니다. 설정에 앞서, 테스트를 위해 임시로 보안 룰을 만들어 Alert을 발생시켰습니다. 테스트 룰은 저의 데스크탑에서 ChatGPT.exe가 실행되는 경우를 탐지하게 하였습니다.
Alert이 발생하게 해 둔 상태에서, Shared Exception List를 만들어 예외처리를 해보겠습니다.
Kibana -> Security -> Rules -> Shared Exception Lists로 이동합니다.
우측 상단의 'Create shared exception list'를 클릭 -> 'Create shared list'를 선택합니다.
3. 목적에 따라 Exception List의 이름과 설명을 간단히 작성한 뒤 Create shared exception list를 클릭하여 생성합니다.
4. 자동으로 Exception List 페이지로 이동됩니다. Create rule exception을 눌러, Exception List 내의 아이템을 생성합니다.
5. 아이템 이름과 조건을 지정합니다. 여기에선 프로세스가 ChatGPT.exe 인 것과, 호스트명이 저의 데스크탑인 것으로 조건을 걸었습니다. 이를 통해 기존에 발생하던 알람이 예외처리되어 발생하지 않게 하려 합니다. 우측 하단의 Add rule exception을 눌러 적용합니다.
6. 적용시, 우측 하단에 예외 사항이 성공적으로 리스트에 등록되었다고 팝업 알람이 나옵니다. 확인 후, 우측 상단의 Link rules를 클릭해 Shared Exception List를 룰에 적용해줍니다.
7. 룰 검색 시, 태그를 사용하여 검색하면 편합니다. 룰을 만들 때에도 태그를 적용해두면 추후 관리가 편해집니다. 현재 알람이 발생중이던 룰을 선택하고 Save를 눌러 적용합니다.
8. Security -> Alerts로 돌아가, 예외사항 적용이 되었는지 확인합니다. 룰에 Shared Exception List를 적용한 이후, 18시 11분 부터는 알람이 발생하지 않는 모습입니다.
실시간으로 화이트리스트 IP들이 인덱스에 저장되고 있고, 이 화이트리스트 IP가 Shared Exception List에 자동으로 등록되어서 룰 예외처리를 원하는 요구사항이 있다고 가정해보겠습니다. 엘라스틱서치 내에서 '자동화'를 구현하려면 watcher를 사용해야합니다. Watcher 코드는 크게 5가지 블록으로 구분됩니다. Trigger, Input, Condition, Actions, Transform 다섯가지로, 직접 코드를 보며 어떻게 이 기능을 구현하였는지를 보겠습니다.
{
"trigger": {
"schedule": {
"interval": "10s"
}
},
Trigger에서는 Watcher 코드를 실행할 주기를 스케줄링 할 수 있습니다. 너무 짧게 할 경우, 리소스 사용에 무리가 갈 수 있기에 10초로 지정하였습니다. 화이트리스트의 경우 0~1초 사이의 실시간성을 필요로 하진 않지만, 만약 더 빠른 적용을 원할 경우에는 리소스를 더 사용하더라도 interval을 줄이면 되겠습니다.
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"logs-whitelist-ip"
],
"rest_total_hits_as_int": true,
"body": {
"size": 10000,
"sort": [
{
"@timestamp": "desc"
}
],
"_source": [
"source.ip"
]
}
}
}
},
Input에서는 데이터 소스를 선택합니다. 이 경우에는 계속 적재되는 화이트리스트 IP 인덱스를 지정하였으며, 리소스를 줄이기 위해 필드는 source.ip 필드만 가져오게 하였습니다. size 지정이 없으면 기본적으로 10개만 가져옵니다.
"condition": {
"always": {}
},
Condition은 Input에서 받은 데이터 소스를 가지고 어떤 조건으로 Transform, Action을 실행할지를 지정하는 부분입니다. 코드로 치면 If문 같은 부분이며, 저희는 조건 없이 항상 실행할 것이므로 always를 줍니다.
"transform": {
"script": {
"source": """
def ipSet = new HashSet();
for (def h : ctx.payload.hits.hits) {
def ip = h._source['source.ip'];
if (ip != null) { ipSet.add(ip); }
}
def entry = new HashMap();
entry.put("type", "match_any");
entry.put("field", "source.ip");
entry.put("value", ipSet.toArray(new String[0]));
entry.put("operator", "included");
def entries = new ArrayList();
entries.add(entry);
return ["entries": entries];
""",
"lang": "painless"
}
},
Transform에서는 Input에서 받은 데이터를 변환할 수 있습니다. Shared Exception List의 Item은 다음과 같은 Hashmap 구조로 되어있습니다.
"entries":[{
"type": "match",
"field": "process.name",
"value": "ChatGPT.exe",
"operator": "included"}]
Input의 쿼리 결과는 ctx.payload.hits.hits에 담깁니다. 저희가 원하는 화이트리스트 IP는 ctx.payload.hits.hits._source.source.ip에 있습니다. 화이트리스트 IP를 반복문을 통해 전부 ipSet이라는 리스트 변수에 담습니다.
Item 구조에서 field를 source.ip로, value를 ipSet으로 하여 Hashmap 형태로 저장합니다. Exception Item 구조가 그대로 담긴 entries 변수는 추후 Actions에서 활용합니다.
"actions": {
"send_to_kibana": {
"webhook": {
"scheme": "https",
"host": "192.168.0.1",
"port": 5601,
"method": "put",
"path": "/api/exception_lists/items",
"params": {},
"headers": {
"kbn-xsrf": "true",
"Content-Type": "application/json"
},
"auth": {
"basic": {
"username": "elastic",
"password": "::es_redacted::"
}
},
"body":
"""{
"list_id": "ab-c-d-e-f",
"item_id": "ab-c-d-e-f",
"type": "simple",
"name": "whitelist-ip",
"description": "Exception list item",
"entries": {{#toJson}}ctx.payload.entries{{/toJson}}
}"""
}
}
}
}
Actions에서는 Condition에 따라 어떤 작업을 수행할지를 결정합니다. 사전에 Shared Exception List와 Item을 UI에서 만들어두고, 여기에 Item을 overwrite 하는 방법으로 진행합니다. Webhook Action으로 Kibana에 item 생성 API를 전송합니다.
인증정보인 유저와 비밀번호를 입력한 뒤, body에 list_id와 item_id, 그리고 entries를 넣습니다. list_id와 item_id는 UI에서 생성하고 export를 하면 json 파일 내에서 확인 가능합니다.
아래는 Watcher 코드 전문이며, 복사하여 사용하면 되겠습니다.
{
"trigger": {
"schedule": {
"interval": "10s"
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"logs-whitelist-ip"
],
"rest_total_hits_as_int": true,
"body": {
"size": 10000,
"sort": [
{
"@timestamp": "desc"
}
],
"_source": [
"source.ip"
]
}
}
}
},
"condition": {
"always": {}
},
"actions": {
"send_to_kibana": {
"webhook": {
"scheme": "https",
"host": "192.168.0.1",
"port": 5601,
"method": "put",
"path": "/api/exception_lists/items",
"params": {},
"headers": {
"kbn-xsrf": "true",
"Content-Type": "application/json"
},
"auth": {
"basic": {
"username": "elastic",
"password": "::es_redacted::"
}
},
"body":
"""{
"list_id": "ab-c-d-e-f",
"item_id": "ab-c-d-e-f",
"type": "simple",
"name": "whitelist-ip",
"description": "Exception list item",
"entries": {{#toJson}}ctx.payload.entries{{/toJson}}
}"""
}
}
},
"transform": {
"script": {
"source": """
def ipSet = new HashSet();
for (def h : ctx.payload.hits.hits) {
def ip = h._source['source.ip'];
if (ip != null) { ipSet.add(ip); }
}
def entry = new HashMap();
entry.put("type", "match_any");
entry.put("field", "source.ip");
entry.put("value", ipSet.toArray(new String[0]));
entry.put("operator", "included");
def entries = new ArrayList();
entries.add(entry);
return ["entries": entries];
""",
"lang": "painless"
}
}
}
이번 포스팅에서는 Shared Exception List의 개념, 사용방법 핸즈온, 자동화 Watcher 코드까지 알아보았습니다. Flase Positive 알람을 줄여나가야하는 SIEM 특성상 예외 사항을 한번에 관리하고 실시간으로 룰에 적용할 수 있는 Shared Exception List는 선택이 아닌 필수라고 느껴집니다. 또한, Watcher를 통한 자동화는 Shared Exception List 외에도 엘라스틱 내에서 다양한 목적을 달성하기 위한 수단으로서 유용하므로, 코드를 참고하여 다른데에 사용해보면 좋겠다는 생각이 듭니다.