YOGAE

TODO: FIXME:

Logstash

14 Jan 2019

Logstash는 형식이나 스키마에 관계없이 모든 데이터를 수집, 통합하기위한 Elastic Stack의 중앙 데이터 흐름 엔진입니다.

Logstash는 inputoutput 필수사항과 filter optional 사항이 있습니다. input plugin은 source data를 소비하고 filter plugin은 data를 수정하고 output plugin은 destination에 data를 씁니다.

이키텍처

  • Logstash 이벤트 처리 파이프라인은 입력, 필터, 출력 세가지 단계로 구성된다.
  • 기본적으로 파이프라인 단계(입력 -> 필터, 필터 -> 출력)마다 이벤트를 버퍼에 담기 위해 인메모리 바운드 큐를 사용합니다.
  • Logstash가 불안정한 상태로 종료되면 인메모리에 저장된 이벤트는 손실됩니다. 데이터 손실 방지를 위해 영구 큐를 사용해 실행중인 이벤트를 디스크에 유지할 수도 있습니다.(영구 큐는 LOGSTASH_HOME/config 디렉토리 아래에 있는 logstash.yml 파일에서 queue.type: persisted 속성을 설정하면 활성화할 수 있습니다.)

인제스트 노드

  • 인제스트 노드는 Elasticsearch에서 색인을 생성하기 전에 도큐먼트를 사전 처리하고, 도큐먼트를 풍부하게 해주는 경량 솔루션입니다.
  • 5.x에서 인제스트 노드라는 기능이 도입되었습니다.
  • 인제스트 노드는 도큐먼트에 실제로 색인을 생성하기 전에 사전 처리하는 데 사용할 수 있습니다.
  • 사전 처리작업은 벌크 및 인덱스 요청을 가로채 데이터를 변환하고, 벌크 또는 인덱스 API로 다시 도큐먼트를 전달합니다.

파이프라인 정의

  • 파이프라인은 일련의 프로세서를 정의합니다. 각 프로세서는 도큐먼트를 특정 방법으로 변환하고, 각 프로세서는 파이프라인에 정의한 순서대로 실행됩니다.

logstash config file 설정

아래의 설정은 s3에서 cloudfront log file을 input으로 받고 lostash에서 filter작업을 하고 amazon elasticsearch로 data를 전송하는 logstash config file입니다.

#cloudfront.conf
input {
  s3 {
    bucket => ""
    prefix => ""
    region => ""
    access_key_id => ""
    secret_access_key => ""
  }
}

filter {
  grok {
    match => { "message" => "%{DATE_EU:date}\t%{TIME:time}\t%{WORD:x_edge_location}\t(?:%{NUMBER:sc_bytes:int}|-)\t%{IPORHOST:c_ip}\t%{WORD:cs_method}\t%{HOSTNAME:cs_host}\t%{NOTSPACE:cs_uri_stem}\t%{NUMBER:sc_status:int}\t%{GREEDYDATA:referrer}\t%{GREEDYDATA:User_Agent}\t%{GREEDYDATA:cs_uri_stem}\t%{GREEDYDATA:cookies}\t%{WORD:x_edge_result_type}\t%{NOTSPACE:x_edge_request_id}\t%{HOSTNAME:x_host_header}\t%{URIPROTO:cs_protocol}\t%{INT:cs_bytes:int}\t%{GREEDYDATA:time_taken}\t%{GREEDYDATA:x_forwarded_for}\t%{GREEDYDATA:ssl_protocol}\t%{GREEDYDATA:ssl_cipher}\t%{GREEDYDATA:x_edge_response_result_type}" }
  }

  mutate {
    add_field => [ "listener_timestamp", "%{date} %{time}" ]
  }

  date {
    match => [ "listener_timestamp", "yy-MM-dd HH:mm:ss" ]
    target => "@timestamp"
  }

  geoip {
    source => "c_ip"
  }

  useragent {
    source => "User_Agent"
    target => "useragent"
  }

  mutate {
    remove_field => ["date", "time", "listener_timestamp", "cloudfront_version", "message", "cloudfront_fields", "User_Agent"]
  }
}

output {
  amazon_es {
    hosts => [""]
    region => ""
    index => "cloudfront-logs-%{+YYYY.MM.dd}"
    template => ""
    ## 아래의 key가 없는 경우 elasticsearch의 endpoint에 접근할 수 없습니다.
    ## aws elasticsearch에 acccess 정책을 "Principal": { "AWS": "*" }로 설정하여도 필요
    ## 설정하지 않으면 credential을 찾지 못하는 error 발생
    aws_access_key_id => ""
    aws_secret_access_key => ""
  }
}

filter plugin

Mutate filter

Mutate 필터를 사용하면 이벤트 필드 변환, 제거, 수정, 이름 변경 등 다양한 변형 작업을 수행합니다.

...
filter {
  csv {
    autodetect_column_names => true
  }
  mutate {
    convert => { # 데이터 타입을 변경(유효한 변환 대상은 integer, string, float, boolean)
      "Age" => "integer"
      "Salary" => "float"
    }
    rename => { # 필드 이름을 변경
      "FName" => "Firstname"
      "LName" => "Lastname"
    }
    gsub => ["EmailId", "\.", "_"] # 필드 값과 비교해 일치하는 모든 문자를 대체 문자열로 치환(3개의 필드 요소 => 필드 이름, 정규식, 대체할 문자열)
    strip => ["Firstname", "Lastname"] # 문자열에서 공백을 제거
    uppercase => ["Gender"]
  }
}

Grok filter

불규칙한 데이터를 분석해 구조화하고, 구조환된 데이터를 쉽게 쿼리하고 필더링하는 데 자주 사용하는 강력한 플러그인입니다.

......
filter {
  grok {
    match => {"message" => "%{TIMESTAMP_ISO8601:eventtime} %{USERNAME:userid} %{GREEDYDATA:data}"}
  }
}
......

Date filter

필드에서 날짜를 분석할 때 사용하는 Date 필터는 시계열 이벤트 작업 시 매우 편리하고 유용합니다. 로그스태시는 기본적으로 각 이벤트에 이벤트 처리 시간을 나타내는 @timestamp 필드를 추가합니다.

......
filter {
  date {
    match => ["timestamp", "dd/MMM/YYYY:HH:mm:ss Z"]
  }
}
......

Geoip filter

IP 주소가 있을 때 Geoip 필터를 사용해 IP 주소의 위치 정보를 추가할 수 있습니다.

Useragent filter

사용자 에이전트 문자열을 구조화된 데이터로 분석하고, 운영체제 및 번전, 장치 등 사용자 정보를 추가합니다.

s3 접근 권한 설정

s3에 접근하기 위한 권한을 IAM에 설정해야합니다.

위에서 설정한 aws_access_key_id의 user에 아래의 권한을 부여해야합니다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::<access-s3-bucket-name>/<prefix>/*"
        },
        {
            ## 아래의 권한이 없을 경우 logstash에서 list할 권한 error 발생
            ## ListBucket 권한은 s3 bucket에 할당해야한다.("arn:aws:s3:::<access-s3-bucket- name>/<prefix>에 할당하지 않는다.)
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::<access-s3-bucket- name>"
        }
    ]
}

memory 부족 error

lostash에서 사용하는 min, max memory 용량이 있어서 이보다 적은 memory가 남아있다면 memory 부족하여 error가 발생합니다.

lostash에서 default 설정으로 지정한 memory 사이즈를 변경합니다.

# <logstash>/config/jvm.options
-Xms1g -> min size 1g
-Xmx1g -> max size 1g
# <logstash>/config/jvm.options
-Xms512m -> min size 512m
-Xmx512m -> max size 512m

t2.micro instance로 logstash를 사용하는 경우 logstash에서 사용하는 default memory가 1g로 지정되어 있고 t2.micro에서 1g의 memory가 할당되므로 문제가 발생한다.