본 포스팅은 2022년 01월 01일을 기준으로 작성되었습니다.
Ingest pipeline을 사용하면 인덱싱하기 전에 데이터를 가공할 수 있습니다. 예를 들어 필드를 제거하고, 텍스트에서 값을 추출하고, 데이터를 보강할 수 있습니다.
Ingest pipeline processor가 실행된 후 Elasticsearch는 변환된 문서를 data stream 또는 index에 추가합니다.
1 - index template | 2 - redindex | 3 - default pipeline
{
"pipeline": {
"name": "inner-pipeline"
}
}
Ingest 노드가 pipeline을 처리합니다. Ingest pipeline을 사용하려면 클러스터에 수집(ingest) 역할을 하는 노드가 하나 이상 있어야 합니다. 수집 로드(loads)가 많은 경우 전용 ingest node를 만드는 것이 좋습다.
Elasticsearch 보안 기능이 활성화된 경우, 클러스터 권한(cluster privillege) 중 manage_pipeline, manage_ingest_pipelines 또는 manage 권한이 있어야 합니다.
Kibana의 Ingest Node Pipelines 기능을 사용하려면 cluster:monitor/nodes/info 권한도 필요합니다.
enrich 프로세서를 적용한 pipeline은 추가 설정이 필요합니다. Enrich processor
Kibana > Stack Management > Ingest Pipelines에서는 아래 항목들을 수행할 수 있습니다.
Pipeline 목록 및 세부정보 확인
기존 pipeline 편집 및 복제
파이프라인 삭제
새로운 pipeline은 Create pipeline을 클릭하여 만들 수 있습니다.
Ingest API를 사용하여 pipeline을 생성하고 관리할 수 있습니다.
Request
PUT /_ingest/pipeline/<pipeline>
↓아래는 2개의 set 프로세서와 1개의 lowercase 프로세서가 있는 pipeline을 생성하는 예제입니다. (Processor는 순차대로 실행됩니다.)
PUT _ingest/pipeline/my-pipeline
{
"description": "My optional pipeline description",
"processors": [
{
"set": {
"description": "My optional processor description",
"field": "my-long-field",
"value": 10
}
},
{
"set": {
"description": "Set 'my-boolean-field' to true",
"field": "my-boolean-field",
"value": true
}
},
{
"lowercase": {
"field": "my-keyword-field"
}
}
]
}
Pipeline을 생성하거나 업데이트할 때 version을 설정 할 수 있습니다. Elasticsearch는 이 version을 내부적으로 사용하지는 않지만, version을 통해 변경 사항을 추적할 수는 있습니다.
PUT _ingest/pipeline/my-pipeline-id
{
"version": 1,
"processors": [ ... ]
}
① Kibana
Kibana에서 pipeline 생성 후, 다시 편집화면에서 Add documents를 클릭합니다. Documents 탭에서 샘플 문서를 작성하고 Run the pipeline을 클릭합니다.
② Ingest API
simulate pipeline API를 사용하여 pipeline을 테스트할 수도 있습니다. 아래는 my-pipeline이라는 pipeline을 테스트하는 예제입니다.
POST _ingest/pipeline/my-pipeline/_simulate
{
"docs": [
{
"_source": {
"my-keyword-field": "FOO"
}
},
{
"_source": {
"my-keyword-field": "BAR"
}
}
]
}
또는
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"lowercase": {
"field": "my-keyword-field"
}
}
]
},
"docs": [
{
"_source": {
"my-keyword-field": "FOO"
}
},
{
"_source": {
"my-keyword-field": "BAR"
}
}
]
}
pipeline 쿼리 매개변수를 사용하여 문서를 인덱싱 할 때, pipeline을 적용 할 수 있습니다.
POST my-data-stream/_doc?pipeline=my-pipeline
{
"@timestamp": "2099-03-07T11:04:05.000Z",
"my-keyword-field": "foo"
}
PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }
POST my-data-stream/_update_by_query?pipeline=my-pipeline
POST _reindex
{
"source": {
"index": "my-data-stream"
},
"dest": {
"index": "my-new-data-stream",
"op_type": "create",
"pipeline": "my-pipeline"
}
}
인덱스 설정(index/_settings)에서 매개변수 index.default_pipeline 에 pipeline을 지정하여 기본 pipeline을 설정할 수 있습니다.
default_pipeline 로 설정되어 있는 pipeline이 존재하지 않으면 인덱스 요청은 실패 됩니다. _none으로 설정할 경우, 어떤 pipeline도 실행하지 않게 됩니다.
인덱스 설정(index/_settings)에서 매개변수 index.final_pipeline 에 pipeline을 지정하여 최종 pipeline을 설정할 수 있습니다.
final_pipeline 로 설정되어 있는 pipeline이 존재하지 않으면 인덱스 요청은 실패 됩니다. _none으로 설정할 경우, 어떤 pipeline도 실행하지 않게 됩니다.
Fleet은 통합을 위해 Ingest pipeline을 자동으로 추가합니다. Fleet은 pipeline 인덱스 설정이 포함된 인덱스 템플릿을 사용하여 이러한 파이프라인을 적용합니다. Elasticsearch는 스트림의 명명 체계를 기반으로 이러한 템플릿을 Fleet 데이터 스트림과 일치시킵니다.
Fleet의 ingest pipeline을 변경하거나 Fleet 통합을 위해 사용자 지정 파이프라인을 사용하지 마십시오. 그렇게 하면 Fleet 데이터 스트림이 손상될 수 있습니다.
Fleet은 직접 ingest pipeline을 제공하지 않습니다. 인덱스 템플릿 또는 사용자 지정 구성의 두 가지 방법 중 하나로 이 통합을 위한 파이프라인을 안전하게 지정할 수 있습니다.
Processors는 문서 _source 필드에 대한 읽기 쓰기 권한이 있습니다. processors에서 해당 필드 이름을 바로 입력하거나(방법1), _source.<field_name> 으로 접근 할 수 있습니다. 아래는 my-long-field필드에 접근하는 예제입니다.
방법1
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "my-long-field",
"value": 10
}
}
]
}
방법2
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "_source.my-long-field",
"value": 10
}
}
]
}
접근하는 필드가 객체 형식(flattened objects)인 경우, dot_expander 프로세서를 사용하여 먼저 확장해야 합니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"dot_expander": {
"description": "Expand 'my-object-field.my-property'",
"field": "my-object-field.my-property"
}
},
{
"set": {
"description": "Set 'my-object-field.my-property' to 10",
"field": "my-object-field.my-property",
"value": 10
}
}
]
}
Template snippet
Template snippet을 사용하면 필드 이름이나 값을 동적으로 설정 할 수 있습니다.{{{field-name}}}
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Set dynamic '<service>' field to 'code' value",
"field": "{{{service}}}",
"value": "{{{code}}}"
}
}
]
}
Metadata 필드에 접근 (Access metadata fields in a processor)
프로세서에서 아래의 메타데이터 필드에 접근 할 수 있습니다.
_index
_id
_routing
_dynamic_templates
Mustache 템플릿 스니펫({{{xxx}}})을 사용하여 메타데이터 필드 값에 액세스합니다. 예를 들어 {{{_routing}}}은 문서의 라우팅 값을 검색합니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Set '_routing' to 'geoip.country_iso_code' value",
"field": "_routing",
"value": "{{{geoip.country_iso_code}}}"
}
}
]
}
아래 예제는 필드가 아직 index mapping에서 정의되지 않은 경우, address: geo_point라는 동적 템플릿을 사용하도록 ES에 지시합니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Use geo_point dynamic template for address field",
"field": "_dynamic_templates",
"value": {
"address": "geo_point"
}
}
}
]
}
Ingest processor는 _ingest 를 사용하여 ingest metadata를 추가하거나 액세스 할 수 있습니다.
Source 나 metadata 필드와 달리 Elasticsearch는 기본적으로 ingest metadata 필드를 인덱싱하지 않습니다. 하지만 _ingest 키로 시작하는 소스 필드는 허용하기에 _source._ingest를 사용하여 액세스 할 수 있습니다.
파이프라인은 기본적으로 _ingest.timestamp ingest metadata 필드만 생성합니다. (이 필드는 문서가 인덱싱 요청을 수신한 시간의 타임스탬프가 입력됩니다.) _ingest.timestamp 또는 다른 ingest metadata 필드를 인덱싱하려면 set 프로세서를 사용할 수 있습니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Index the ingest timestamp as 'event.ingested'",
"field": "event.ingested",
"value": "{{{_ingest.timestamp}}}"
}
}
]
}
중첩 pipeline설정
↓내부 파이프라인(pipelineA) 정의:
PUT _ingest/pipeline/pipelineA
{
"description" : "inner pipeline",
"processors" : [
{
"set" : {
"field": "inner_pipeline_set",
"value": "inner"
}
}
]
}
↓이전에 정의한 내부 파이프라인(pipelineA)을 사용하여 다른 파이프라인(pipelineB)을 정의:
PUT _ingest/pipeline/pipelineB
{
"description" : "outer pipeline",
"processors" : [
{
"pipeline" : {
"name": "pipelineA"
}
},
{
"set" : {
"field": "outer_pipeline_set",
"value": "outer"
}
}
]
}
이제 인덱스에 외부 파이프라인(pipelineB)을 적용하게 되면 내부 파이프라인(pipelineA)와 외부 파이프라인(pipelineB)이 함께 적용됩니다.
↓(외부 파이프라인(pipelineB)을 적용한) index에 새로운 문서 삽입:
PUT /my-index/_doc/1?pipeline=pipelineB
{
"field": "value"
}
↓해당 index를 조회하면 아래와 같이 중첩된 파이프라인이 모두 적용된 결과를 볼 수 있습니다.
{
"field": "value",
"inner_pipeline_set": "inner",
"outer_pipeline_set": "outer"
}
Pipeline processor는 순차적으로 실행됩니다. 그러므로 processor 중 하나만 실패하면 오류가 발생하여 모든 pipeline 처리가 중지됩니다.
# 오류 무시 (ignore_failure)
Processor 오류를 무시하고 pipeline의 나머지 processor를 실행하려면 ignore_failure=true로 설정합니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"ignore_failure": true
}
}
]
}
# 오류 처리 (on_failure)
① Pipeline Processor의 on_failure
오류가 발생되면 실행할 프로세서를 정의 할 수 있습니다. on_failure을 호출만 하고 내부에 아무 프로세서도 정의하지 않으면, 오류 발생 시 pipeline의 나머지 프로세서도 실행하게 됩니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "error.message",
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
"override": false
}
}
]
}
}
]
}
↓중첩 오류 처리도 가능합니다. (오류 처리 중 다시 오류가 발생 할 경우):
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "error.message",
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
"override": false,
"on_failure": [
{
"set": {
"description": "Set 'error.message.multi'",
"field": "error.message.multi",
"value": "Document encountered multiple ingest errors",
"override": true
}
}
]
}
}
]
}
}
]
}
② Pipeline의 on_failure
전체 pipeline에 대해 on_failure를 지정할 수도 있습니다. on_failure 가 없는 프로세서가 실패하면 Elasticsearch는 이 pipeline의 on_failure로 오류 처리를 진행합니다. Elasticsearch는 pipeline의 나머지 프로세서를 실행하지 않습니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Index document to 'failed-<index>'",
"field": "_index",
"value": "failed-{{{ _index }}}"
}
}
]
}
Pipeline 실패에 대한 자세한 내용은 문서의 metadata 필드에서 확인 가능합니다. (on_failure_message, on_failure_processor_type, on_failure_processor_tag, on_failure_pipeline) 이 필드들 on_failure 블록 내에서만 액세스할 수 있습니다.
↓아래 예제는 metadata 필드를 사용하여 pipeline 오류 정보를 나타내는 방법을 보여줍니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Record error information",
"field": "error_information",
"value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
}
}
]
}
각 프로세서는 Painless 스크립트로 작성된 if문만 지원됩니다.
Painless의 ingest processor context에서 if문이 실행되는 경우, ctx 값은 읽기 전용이 됩니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents with 'network.name' of 'Guest'",
"if": "ctx?.network?.name == 'Guest'"
}
}
]
}
PUT _ingest/pipeline/sample-pipeline
{
"processors": [
{
"set": {
"description": "Set documents with 'buttonName.key' of '굿샷'",
"field": "tagName",
"value": "{{buttonName}}",
"if": "ctx.buttonName == '굿샷' || ctx.buttonName == '좋아요' || ctx.buttonName == '오잘공' || ctx.buttonName == '드라이버' || ctx.buttonName == '우드' || ctx.buttonName == '아이언' || ctx.buttonName == '유틸' || ctx.buttonName == '웻지' || ctx.buttonName == '개공' || ctx.buttonName == '인도어' || ctx.buttonName == '실내' || ctx.buttonName == '필드'"
}
}
]
}
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents with 'network.name' of 'Guest'",
"if": "ctx?.network?.name == 'Guest'"
}
}
]
}
script.painless.regex.enabled 클러스터 설정이 활성화 되어야만 if 절에서 정규식을 사용할 수 있습니다. (Painless regular expressions)
주의!
가능한 정규식을 사용하지 말것. 비싼 정규식은 인덱싱 속도를 늦출 뿐!PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
"if": "ctx.url?.scheme =~ /^http[^s]/",
"field": "url.insecure",
"value": true
}
}
]
}
일반적으로 조건문은 한 줄로 작성해야 하지만, Kibana 콘솔에서 삼중 따옴표(""")를 사용하면 더욱 자유롭게 작성하고 디버그 할 수 있습니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents that don't contain 'prod' tag",
"if": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
]
}
스크립트를 따로 저장하여 사용 할 수도 있습니다. (stored script)
PUT _scripts/my-stored-script
{
"script": {
"lang": "painless",
"source": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
"if": { "id": "my-stored-script" }
}
}
]
}
수신되는 문서에 종종 객체(object) 필드가 포함되는데, 상위 객체가 없는 필드에 액세스하려고 하면 Elasticsearch는 NullPointerException을 반환하게 됩니다. 이러한 예외를 피하려면 ?.와 같은 null safe 연산자를 사용하면 됩니다.
예를 들어, ctx.network?.name.equalsIgnoreCase('Guest')는ctx.network?.name 이 null을 반환 할 수 있어 안전하지 않습니다.
'Guest'.equalsIgnoreCase(ctx.network?.name)이렇게 작성하면 Guest는 항상 null이 아니므로 null safe 하게 됩니다.
스크립트를 다시 작성할 수 없는 경우, 명시적 null 검사(explicit null check)를 적어 줄 수 있습니다.
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents that contain 'network.name' of 'Guest'",
"if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
}
}
]
}
if문을 사용하여 문서 별로 적용할 파이프라인을 지정할 수 있습니다. 이런 파이프라인은 index template의 default pipeline으로 사용할 수 있습니다.
PUT _ingest/pipeline/one-pipeline-to-rule-them-all
{
"processors": [
{
"pipeline": {
"description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
"if": "ctx.service?.name == 'apache_httpd'",
"name": "httpd_pipeline"
}
},
{
"pipeline": {
"description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
"if": "ctx.service?.name == 'syslog'",
"name": "syslog_pipeline"
}
},
{
"fail": {
"description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
"if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
"message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
}
}
]
}
node stats API를 사용하여 global 또는 파이프라인 별 수집 통계를 가져올 수 있습니다. 이 통계치를 확인하여 가장 자주 실행되거나 처리시간이 가장 많이 소비되는 파이프라인을 알 수 있습니다.
GET _nodes/stats/ingest?filter_path=nodes.*.ingest