관리 메뉴

밤 늦게까지 여는 카페

[OpenSearch] Ingest pipeline - 데이터 프로세싱도 할 수 있습니다! 본문

DevOps

[OpenSearch] Ingest pipeline - 데이터 프로세싱도 할 수 있습니다!

Jㅐ둥이 2025. 4. 2. 01:16

안녕하세요. 오늘은 OpenSearch의 기능 중 하나인 Ingest pipeline에 대해서 공부한 것을 정리해보겠습니다.
 

1. Ingest pipeline 기능이 뭔가요?

Ingest pipeline은 말 그대로 OpenSearch에 데이터가 수집(ingest)되는 중에 실행되는 pipeline입니다.
pipeline은 processor라고 하는 데이터 처리 유닛의 array로 구성되어 있습니다.

 
processor에는 단순하게 필드에 값을 설정해주는 Set 프로세서, 필드의 값을 대문자로 바꿔주는 Uppercase 프로세서 같이 간단한 것들부터
 
key=value 형식을 추출해주는 KV 프로세서 IPv4, IPv6 주소값의 주소명을 알려주는 IP2Geo 프로세스

Script를 작성할 수 있는 Script 프로세서 등 정말 다양한 프로세서들이 있습니다.

OpenSearch에서 지원하는 다양한 프로세서들

 
이런 프로세서들을 이용해서 document의 프로세싱이 가능한 것이죠!
 

2. Ingest pipeline은 어떻게 사용하는 거에요?

OpenSearch 대시보드를 많이 보신 분들이라면 아실 겁니다. Ingest pipeline을 생성하고 관리하는 메뉴가 없다는 것을요;;;

https://opensearch.org/docs/latest/ingest-pipelines/

 
Ingest API를 통해서만 Ingest pipeline을 관리할 수 있습니다. 
 

2.1. Ingest pipline 생성하기

만약 oil_cost, repair_cost, insurance_cost 필드를 가지고 있는 document에서 이 셋을 더하여 total_cost라는 필드에 값을 저장하고 싶다면
 
다음과 같이 ingest pipeline을 생성할 수 있습니다.

PUT _ingest/pipeline/car-cost
{
  "processors": [
    {
      "script": {
        "source": """
          ctx.total_cost = (
            ctx.oil_cost + ctx.repair_cost + ctx.insurance_cost
          );
        """
      }
    }
  ]
}

 

  • pipeline을 수정하고 싶을 때에는 json을 수정해서 다시 요청을 보내면 됩니다!
  • 만약 pipeline을 삭제하고 싶다면 DELETE 메소드를 이용하면 됩니다.
    • DELETE _ingest/pipeline/[PIPELINE NAME]
  • 생성된 pipeline을 조회하고 싶다면 GET 메소드를 이용하면 됩니다.
    • GET _ingest/pipeline

P.S.
저는 Dev Tool을 이용해서 만들고 관리하는 것이 편리하더라고요!

Dev Tool에서 ingest pipeline 관리하기

 

2.2. Index template에 Ingest pipeline 붙이기

이제 생성된 Ingest pipeline을 index template에 붙여서 index가 저장될 때마다 pipeline이 실행되도록 설정해야 합니다.
 
방법은 단순합니다. index template의 설정 탭에 들어가서 "index.default_pipeline": "파이프라인 이름"을 추가해주면 됩니다!

index template에 ingest pipeline 붙이기

3. Ingest pipeline 사용 시 주의사항

OpenSearch 2.19 기준 파이프라인이 실패했을 때 document가 저장되지 않을 수 있습니다.

  • OpenSearch 공식 문서를 보면 프로세서가 실패했을 때 옵션이 다음 2개밖에 없다고 합니다.
    1.  파이프라인 전부 실패하고 document가 저장되지 않습니다.
    2. 실패한 프로세서만 넘어가고 다음 프로세서가 실행됩니다.

 
위에서 예시로 들었던 car-cost pipeline을 등록했는데 document로 등록할 데이터에 repair_cost라는 필드가 없으면 어떻게 될까요?
 
에러가 발생하면서 document 자체가 저장되지 않게 됩니다 ㅜㅠ

  • 실제로 팀원분이 일부러 로그 안 찍으신 것이냐고 질문 주셨습니다. 놓친 거였지만요... ㅜ

 
pipeline 중간에 실패했을 때 프로세서를 실행을 멈추고 로그 원본을 저장하고 싶으면 다음과 "on_failure" 부분을 추가해줘야 합니다.

  • on_failure에 def dummy=1;인 스크립트를 추가해둔 이유는 on_failure가 비어있을 시 그대로 파이프라인이 멈춰버리기 때문입니다.
PUT _ingest/pipeline/car-cost
{
  "processors": [
    {
      "script": {
        "source": """
          ctx.total_cost = (
            ctx.oil_cost + ctx.repair_cost + ctx.insurance_cost
          );
        """
      }
    }
  ],
  "on_failure": [
    {
      "script": {
        "source": "def dummy=1;"
       }
    }
  ]
}

 
P.S.
다음과 같이 pipeline 동작을 테스트 해볼 수 있는 API가 있으니 충분히 테스트 해보고 적용하세요!

POST _ingest/pipeline/car_cost/_simulate
{
  "docs": [
    {"_source": {"oil_cost": 20, "repair_cost":5, "insurance_cost": 900}},
    {"_source": {"oil_cost": 20, "repair_cost":5}}
  ]
}

 

🎁다른 OpenSearch 관련 포스팅들🎁

반응형