-
Embulk 사용하기공부/데이터 2021. 11. 13. 01:25
Embulk는 플러그인을 사용해 여러 형태의 데이터 소스에서 데이터를 가져와 병렬로 로딩 할 수 있습니다. 여러개의 파일을 동시에 로딩하거나 하나의 큰 파일을 여러 파일로 쪼개서 로딩할 수 있어서 속도적인 측면에 장점이 있습니다.
인풋 파일의 데이터 타입을 자동으로 추측하여 스키마를 예측하므로 스키마 작업을 줄일 수 있습니다.
Embulk는 자바 위에서 실행이 되므로 자바가 설치가 되어 있어야 합니다. 현재 버전인 v0.9와 v0.10은 자바8을 지원하고 자바9는 공식적으로는 지원하지 않으므로 유의해야 합니다.
설치
아래 명령어를 차례대로 입력하면 됩니다.
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar" $ chmod +x ~/.embulk/bin/embulk $ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc $ source ~/.bashrc
플러그인 목록은 https://plugins.embulk.org/ 에서 확인할 수 있습니다.
이후 아래 명령어를 입력했을 때, 버전이 출력되면 정상적으로 설치가 된 경우입니다.
$ embulk -version
플러그인
아래 명령어를 통해 플러그인을 설치할 수 있습니다
embulk gem install [플러그인] # 설치된 플러그인 리스트 확인 $ embulk gem list 2021-11-11 01:02:10.382 +0900: Embulk v0.9.23 Gem plugin path is: /home/.embulk/lib/gems *** LOCAL GEMS *** addressable (2.8.0) bundler (1.16.0) concurrent-ruby (1.1.9) declarative (0.0.20) declarative-option (0.1.0) did_you_mean (default: 1.0.1) embulk (0.9.23 java) embulk-output-bigquery (0.6.7) faraday (0.17.4) google-api-client (0.32.1) googleauth (0.9.0) httpclient (2.8.3) jar-dependencies (default: 0.3.10) jruby-openssl (0.9.21 java) jruby-readline (1.2.0 java) json (1.8.3 java) jwt (2.3.0) liquid (4.0.0) memoist (0.16.2) mini_mime (1.1.2) minitest (default: 5.4.1) msgpack (1.1.0 java) multi_json (1.15.0) multipart-post (2.1.1) net-telnet (default: 0.1.1) os (1.1.4) power_assert (default: 0.2.3) psych (2.2.4 java) public_suffix (4.0.6) rake (default: 10.4.2) rdoc (default: 4.2.0) representable (3.0.4) retriable (3.1.2) signet (0.11.0) test-unit (default: 3.1.1) time_with_zone (0.3.1) tzinfo (2.0.4) uber (0.1.0)
플러그인은 https://plugins.embulk.org/ 에서 목록을 확인할 수 있습니다.
예제
embulk는 예제를 제공해 주고 있어서 손쉽게 확인을 할 수 있습니다.
$ embulk example ./try1
위 명령어를 통해
./try1
폴더에 예제 파일을 생성해 줍니다. 예제 내용은 제공된 csv 파일을 읽어와 표준출력을 해주는 간단한 작업입니다. 만약./try1
를 지정하지 않으면 현재 위치에서embulk_example
폴더를 만들어 줍니다.생성된 파일을 보면
seed.yml
파일이 있는데 아래와 같은 형식으로 작성되어 있습니다.in: # 읽어 들이는 곳의 설정 type: file # 읽어 들이는 곳의 타입 path_prefix: '/home/embulk-example/csv/sample_' # csv 파일 경로. prefix로 되어 있어서 sample_01.csv.gz파일에 접근이 가능함 out: # 로딩할 곳의 설정 type: stdout # 로딩할 곳의 타입
이제 위의 파일을 guess 명령어를 통해 데이터에 대한 추측을 하고 파일을 생성할 수도 있습니다.
$ embulk guess seed.yml -o config.yml # seed.yml 파일을 읽어와 config.yml 파일로 생성
만약
-o config.yml
부분이 없다면 표준출력으로 보여주게 됩니다.위 명령어를 실행하면 아래와 같이 생성이 됩니다.
in: type: file path_prefix: /home/embulk-example/csv/sample_ decoders: - {type: gzip} parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' null_string: 'NULL' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: id, type: long} - {name: account, type: long} - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: purchase, type: timestamp, format: '%Y%m%d'} - {name: comment, type: string} out: {type: stdout}
다음 데이터를 로딩하기 전, 제대로 동작하는지 일부 데이터를 읽어 파싱하고 결과를 보여줍니다.
$ embulk preview config.yml 021-11-11 01:24:33.714 +0900: Embulk v0.9.23 2021-11-11 01:24:34.628 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2021-11-11 01:24:36.721 +0900 [INFO] (main): Gem's home and path are set by default: "/home/.embulk/lib/gems" 2021-11-11 01:24:37.376 +0900 [INFO] (main): Started Embulk v0.9.23 2021-11-11 01:24:37.466 +0900 [INFO] (0001:preview): Listing local files at directory '/home/embulk-example/csv' filtering filename by prefix 'sample_' 2021-11-11 01:24:37.467 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2021-11-11 01:24:37.468 +0900 [INFO] (0001:preview): Loading files [/home/embulk-example/csv/sample_01.csv.gz] 2021-11-11 01:24:37.476 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source +---------+--------------+-------------------------+-------------------------+----------------------------+ | id:long | account:long | time:timestamp | purchase:timestamp | comment:string | +---------+--------------+-------------------------+-------------------------+----------------------------+ | 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk | | 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby | | 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin | | 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | | +---------+--------------+-------------------------+-------------------------+----------------------------+
다음은 실제로 out에 입력된 곳으로 로딩하는 명령어입니다. 현재 예제의 out은 표준출력으로 되어 있기 때문에 출력이 되는 것을 볼 수 있습니다.
$ embulk run config.yml 2021-11-11 01:27:06.212 +0900: Embulk v0.9.23 2021-11-11 01:27:07.379 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2021-11-11 01:27:09.874 +0900 [INFO] (main): Gem's home and path are set by default: "/home/.embulk/lib/gems" 2021-11-11 01:27:10.709 +0900 [INFO] (main): Started Embulk v0.9.23 2021-11-11 01:27:10.873 +0900 [INFO] (0001:transaction): Listing local files at directory '/home/embulk-example/csv' filtering filename by prefix 'sample_' 2021-11-11 01:27:10.875 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2021-11-11 01:27:10.876 +0900 [INFO] (0001:transaction): Loading files [/home/embulk-example/csv/sample_01.csv.gz] 2021-11-11 01:27:10.929 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=16 / output tasks 8 = input tasks 1 * 8 2021-11-11 01:27:10.937 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 1,32864,2015-01-27 19:23:49,20150127,embulk 2,14824,2015-01-27 19:01:23,20150127,embulk jruby 3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin 4,11270,2015-01-29 11:54:36,20150129, 2021-11-11 01:27:11.047 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2021-11-11 01:27:11.056 +0900 [INFO] (main): Committed. 2021-11-11 01:27:11.057 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/home/embulk-example/csv/sample_01.csv.gz"},"out":{}}
CSV → PostgreSQL
이번에는 표준출력이 아닌 DB에 저장하는 방식을 설명하면서 속도를 올리는 방법도 소개합니다.
admin_no,deleted,admin_name 3,T, 2,F,관리자2 1,F,관리자1
와 같은 csv 형식을
admin.csv
로 저장한 뒤, 아래와 같이seed.yml
을 생성합니다.seed.yml
파일을 생성하는 이유는 실제 in, out을 할 상세속성을 만들기 위함인데 이를 수동으로 작성을 했다면seed.yml
파일을 생성할 필요는 없습니다.in: type: file path_prefix: '/home/embulk-example/csv/admin' out: type: postgresql
다음 명령어를 통해서
config.yml
파일을 생성합니다.$ embulk guess seed.yml -o config.yml 2021-11-12 00:55:59.152 +0900: Embulk v0.9.23 2021-11-12 00:56:00.019 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2021-11-12 00:56:02.140 +0900 [INFO] (main): Gem's home and path are set by default: "/home/.embulk/lib/gems" 2021-11-12 00:56:02.801 +0900 [INFO] (main): Started Embulk v0.9.23 2021-11-12 00:56:02.902 +0900 [INFO] (0001:guess): Listing local files at directory '/home/embulk-example/csv' filtering filename by prefix 'admin' 2021-11-12 00:56:02.903 +0900 [INFO] (0001:guess): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2021-11-12 00:56:02.904 +0900 [INFO] (0001:guess): Loading files [/home/embulk-example/csv/admin.csv] 2021-11-12 00:56:02.916 +0900 [INFO] (0001:guess): Try to read 32,768 bytes from input source 2021-11-12 00:56:02.973 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) 2021-11-12 00:56:02.992 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) 2021-11-12 00:56:03.012 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) 2021-11-12 00:56:03.023 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) in: type: file path_prefix: /home/embulk-example/csv/admin parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: admin_no, type: long} - {name: deleted, type: boolean} # 의도와 다르게 타입이 선언됨. 확인이 필요 - {name: admin_name, type: string} out: {type: postgresql} Created 'config.yml' file.
데이터를 보고 판단하여 자동으로 스키마를 생성해 주기 때문에 의도하는 것과 다를 수 있습니다. 여기서
is_deleted
는 string 타입으로 사용할 예정인데 embulk는 boolean으로 선언하였으니 한 번 더 확인 하는 것이 필요합니다.다음은 out을 구성할 차례입니다. PostgreSQL에 적재를 해야 하므로 아래 명령어로 플러그인을 설치해 줍니다.
$ embulk gem install embulk-output-postgresql
마지막으로
config.yml
폴더의 out 부분에 로딩할 PostgreSQL의 접속정보와 테이블, 모드를 작성해 줍니다.out: type: postgresql host: localhost port: 5432 user: pgadmintest password: "" database: my_database table: my_table mode: insert
my_table이란 테이블에 insert를 하는 설정입니다.
아래와 같이 더 자세하게 설정할 수도 있습니다.
out: type: postgresql host: localhost user: myuser password: "" database: my_database table: my_table options: {loglevel: 2} mode: insert_direct column_options: my_col_1: {type: 'BIGSERIAL'} my_col_3: {type: 'INT NOT NULL'} my_col_4: {value_type: string, timestamp_format: `%Y-%m-%d %H:%M:%S %z`, timezone: '-0700'} my_col_5: {type: 'DECIMAL(18,9)', value_type: pass}
추가적인 설정과 설명은 https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-postgresql 에 작성되어 있습니다.
parser-none 플러그인으로 속도 향상시키기
csv나 json 파일이 이미 out 형식에 맞게 정의되어 있다면 parser 플러그인을 사용해 속도를 향상시킬 수 있습니다. 데이터를 일일이 파싱할 필요가 없기 때문에 속도 향상이 크게 발생합니다.
$ embulk gem install embulk-parser-none
이후
config.yml
파일의 in - parser 부분을 아래와 같이 변경합니다.in: type: file path_prefix: /home/embulk-example/csv/admin parser: type: none column_name: payload
parser-none은 https://github.com/sonots/embulk-parser-none 자세하게 설명되어 있습니다.
PostgreSQL → GCP Bigquery
이번에는 PostgreSQL에서 GCP Bigquery로 옮기는 예제입니다.
PostgreSQL에 접속하여 쿼리를 호출해 데이터를 가져와야 하므로 아래 플러그인을 설치해 줍니다.
$ embulk gem install embulk-input-postgresql
그 다음 config.yml을 수정하여 읽어올 DB의 서버와 쿼리를 작성해 줍니다.
in: type: postgresql host: localhost user: myuser password: "" database: my_database table: my_table select: "col1, col2, col3" where: "col4 != 'a'" order_by: "col1 DESC"
쿼리가 간단하다면 위의 문법으로도 충분하지만 복잡한 쿼리라면 아래와 같이 작성할 수 있습니다.
in: type: postgresql host: localhost user: myuser password: "" database: my_database query: | SELECT t1.id, t1.name, t2.id AS t2_id, t2.name AS t2_name FROM table1 AS t1 LEFT JOIN table2 AS t2 ON t1.id = t2.t1_id
또한 읽어들일 때, 타입에 따라 형변환을 진행하거나 컬럼 일부만 형변환을 진행할 수 있습니다.
in: type: postgresql host: localhost user: myuser password: "" database: my_database table: "my_table" select: "col1, col2, col3" where: "col4 != 'a'" default_column_options: TIMESTAMP: { type: string, timestamp_format: "%Y/%m/%d %H:%M:%S", timezone: "+0900"} BIGINT: { type: string } column_options: col1: {type: long} col3: {type: string, timestamp_format: "%Y/%m/%d", timezone: "+0900"} after_select: "update my_table set col5 = '1' where col4 != 'a'"
보통 embulk는 데이터를 다른 곳으로 증분할 때 많이 사용하기 때문에 아래의 옵션을 사용하면 유용하게 쓸 수 있습니다.
- last_record: 증분을 위한 마지막 레코드의 값을 입력. (기본값으로는 모든 레코드를 로딩함)
- fetch_rows: 한 번에 가져오는 데이터 사이즈 (기본값 10,000)
기타 더 자세한 옵션과 설명은 https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-postgresql 에서 확인할 수 있습니다.
이제 GCP Bigquery 플러그인을 설치해 줍니다.
$ embulk gem install embulk-output-bigquery
마지막으로 out을 작성해 줍니다.
out: type: bigquery mode: append auth_method: service_account json_keyfile: /path/to/json_keyfile.json project: your-project-000 dataset: your_dataset_name table: your_table_name schema_file: /data/schema_file.json
auth_method
여기서는 service_account (or json_key) 와 compute_engine 두 가지만 설명하겠습니다.
service_account
GCP service account를 json 파일 형식으로 내려 받은 다음, embulk에서 해당 파일 위치를 호출하거나 json을 직접 입력하여 사용할 수 있습니다.
out: type: bigquery auth_method: service_account json_keyfile: /path/to/json_keyfile.json 또는 out: type: bigquery auth_method: service_account json_keyfile: content: | { "private_key_id": "123456789", "private_key": "-----BEGIN PRIVATE KEY-----\nABCDEF", "client_email": "..." }
compute_engine
만약 embulk를 GCP compute engine에서 실행하는데 같은 프로젝트 내에 Bigquery가 존재한다면 인증을 생략할 수 있습니다.
out: type: bigquery auth_method: compute_engine
schema
여기서는 미리 설정한 스키마 파일과 존재하는 테이블의 스키마를 사용하는 방식 2가지를 설명합니다.
schema_file
미리 스키마 파일을 정의하여 해당 파일경로를 입력하는 형식입니다.
out: type: bigquery table: table_%Y%m%d schema_file: /path/to/schema.json
기존 테이블 스키마 사용하기
bigquery에 존재하는 기존 테이블의 스키마를 읽어서 그대로 사용하는 형식입니다.
out: type: bigquery table: table_%Y%m%d template_table: existing_table_name
GCS 경유하기
바로 Bigquery로 적재할 수 있지만 embulk는 병렬 처리로 여러 태스크에 나눠 실행이 되는데 bigquery에서는 프로젝트 당 하루 100,000개의 job만 로딩이 가능합니다. 여기서 GCS를 사용하면 GCS에서 Bigquery로 여러 파일을 옮겨도 1개의 job으로 인식하게 됩니다. 또한 병렬 처리로 진행되는 작업의 안정성을 위해선 GCS를 사용하는 것이 좋습니다.
- 병렬로 job이 생성되는 개수는
min_output_tasks
와max_threads
같은 파라미터에 의존합니다.
out: type: bigquery gcs_bucket: bucket_name auto_create_gcs_bucket: true
다음은 사용하면서 유용한 옵션들입니다.
- auto_create_table: 테이블이 없을 경우, 자동으로 생성
- ignore_unknown_values: 알 수 없는 값을 무시
- allow_quoted_newlines: 데이터에 개행문자가 있으면 true로 설정해야함. true로 할 시, 속도가 느려짐
- auto_create_dataset: dataset이 없을 시, 자동 생성
- path_prefix:
/tmp/prefix_
와 같은 파일 경로 접두사. 기본값은 랜덤하게 생성
더 많은 정보는 https://github.com/embulk/embulk-output-bigquery에서 확인할 수 있습니다.
멀티 쓰레드를 이용하여 로딩 속도 올리기
max_threads
와min_output_tasks
를 설정하여 병렬로 작업을 처리할 수 있습니다.min_output_tasks
는 최소 실행할 작업의 수를 의미하므로 4라고 정하면 최소 4개의 작업이 병렬로 작업을 처리하게 됩니다. 1개의 task에서는 여러 thread를 사용할 수 있습니다.max_threads
는 최대 스레드를 정하여 동시성을 제어하게 됩니다. 너무 많이 설정하면 부하가 걸릴 수 있고 CPU 사용율이 낮다면 이 숫자를 높여서 속도를 향상할 수 있습니다.exec: max_threads: 기본적으로 사용 가능한 cpu 코어 수 * 2 min_output_tasks: 기본적으로 사용 가능한 cpu 코어 수 in: type: file path_prefix: /home/embulk-example/csv/admin parser: type: none column_name: payload