-
Logstash에서 DB(MySQL, Maria) 연동하기소프트웨어개발 이야기 2020. 4. 24. 10:08
Logstash에서 DB(MySQL, Maria)를 연동하여 데이터를 수집하는 방법을 정리한다.
Logstash에서 DB를 연동하기 위해서는 jdbc input 플러그인을 사용한다. 플러그인 속성은 다음과 같다.
input { jdbc { jdbc_driver_library => "/elasticstack/logstash-7.6.1/lib/mysql-connector-java-8.0.19.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.0.0.1:3306/db" jdbc_user => "userid" jdbc_password => "userpass" tracking_column => "unix_ts_in_secs" use_column_value => true tracking_column_type => "numeric" statement => "SELECT id, date, title, UNIX_TIMESTAMP(update_datetime) AS unix_ts_in_secs FROM table WHERE UNIX_TIMESTAMP(update_datetime) > :sql_last_value AND update_datetime < NOW() ORDER BY update_datetime ASC" schedule => "0 * * * *" last_run_metadata_path => "/elasticstack/logstash-7.6.1\data\plugins\inputs\jdbc\.mcdw.onedb.vod" } }
각 속성은 필드명만으로 설명이 가능할 정도로 명확하기 때문에 몇가지 중요한 부분만 언급하고자 한다.
- jdbc_driver_class
검색엔진을 통해 나오는 내용을 보면 "com.mysql.jdbc.Driver"가 주로 언급되지만, 최신 library를 사용하면 deprecated 라고 표시된다. "com.mysql.cj.jdbc.Driver"를 사용하자. (JDBC 6.X 버전에서 변경됨)- tracking_column, tracking_column_type
파일에서 데이터를 수집할때는 새롭게 파일이 추가되거나 파일 내용이 추가된 부분부터 데이터를 재수집 할 수 있었다. DB를 사용한다면 어떻게 해야 할까? 이때 사용하는 속성이 tracking_column이다. 뒤에 더 자세히 설명하겠지만, 가장 최근 수정된 데이터를 담고 있는 필드를 지정해야 한다. 데이터 수정일시 필드가 대표적이다.- statement
수집하고자 하는 데이터를 만들어주는 쿼리다. 쿼리에 반드시 필요한 부분이 위에서 설명한 tracking_column과 ORDER BY 절이다. 검색항목에 tracking_column을 지정해주고 tracking_column 필드의 오름차순(ASC)으로 정렬한다. 한가지 더. 기존에 수집한 데이터 이후의 데이터만 수집하기 위해 WHERE 절에 UNIX_TIMESTAMP(update_datetime) > :sql_last_value AND update_datetime < NOW() 도 있지 말자. 이에 대한 자세한 설명은 아래의 블로그 글을 참고한다.주기적으로 쿼리 수행을 통해 수집된 데이터를 ElasticSearch에 인덱싱하기 위해서는 반드시 document_id를 지정해 줘야 한다. 이 정보가 설정되지 않는다면 쿼리가 수행될 때마다 매번 데이터가 추가로 인덱싱 된다.
때문에, 아래와 같이 document_id를 지정해주며 이때 document_id의 값은 조회하는 테이블의 PK 값으로 설정하는게 좋다. 복합키일 경우, mutate 필터의 add_field를 사용해서 아이디를 생성해 주면 된다.
filter { mutate { add_field => { "pkid" => "%{id}_%{date}" } } } output { elasticsearch { hosts => ["localhost:9200"] index => "logstash-vod" document_id => "%{pkid}" user => "elastic_id" password => "elastic_pass" } }
'소프트웨어개발 이야기' 카테고리의 다른 글
Logstash Date Filter와 Timezone (0) 2020.05.07 Logstash에서 REST API 연동하기 (1) 2020.04.28 Maria DB (MySQL 포함) 이모지 입력 버그 해결 (0) 2020.04.21 유튜브(Youtube) API - 12. 할당량 최적화 (0) 2020.04.17 Kibana 실행 에러 해결방법 (elasticsearch all shards failed) (0) 2020.04.16