ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Embulk 사용하기
    공부/데이터 2021. 11. 13. 01:25

    Embulk는 플러그인을 사용해 여러 형태의 데이터 소스에서 데이터를 가져와 병렬로 로딩 할 수 있습니다. 여러개의 파일을 동시에 로딩하거나 하나의 큰 파일을 여러 파일로 쪼개서 로딩할 수 있어서 속도적인 측면에 장점이 있습니다.

    인풋 파일의 데이터 타입을 자동으로 추측하여 스키마를 예측하므로 스키마 작업을 줄일 수 있습니다.

    Embulk는 자바 위에서 실행이 되므로 자바가 설치가 되어 있어야 합니다. 현재 버전인 v0.9와 v0.10은 자바8을 지원하고 자바9는 공식적으로는 지원하지 않으므로 유의해야 합니다.

    이미지 출처:  https://www.embulk.org/

    설치

    아래 명령어를 차례대로 입력하면 됩니다.

    $ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
    $ chmod +x ~/.embulk/bin/embulk
    $ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
    $ source ~/.bashrc

    플러그인 목록은 https://plugins.embulk.org/ 에서 확인할 수 있습니다.

    이후 아래 명령어를 입력했을 때, 버전이 출력되면 정상적으로 설치가 된 경우입니다.

    $ embulk -version

    플러그인

    아래 명령어를 통해 플러그인을 설치할 수 있습니다

    embulk gem install [플러그인]
    
    # 설치된 플러그인 리스트 확인
    $ embulk gem list
    2021-11-11 01:02:10.382 +0900: Embulk v0.9.23
    
    Gem plugin path is: /home/.embulk/lib/gems
    
    *** LOCAL GEMS ***
    
    addressable (2.8.0)
    bundler (1.16.0)
    concurrent-ruby (1.1.9)
    declarative (0.0.20)
    declarative-option (0.1.0)
    did_you_mean (default: 1.0.1)
    embulk (0.9.23 java)
    embulk-output-bigquery (0.6.7)
    faraday (0.17.4)
    google-api-client (0.32.1)
    googleauth (0.9.0)
    httpclient (2.8.3)
    jar-dependencies (default: 0.3.10)
    jruby-openssl (0.9.21 java)
    jruby-readline (1.2.0 java)
    json (1.8.3 java)
    jwt (2.3.0)
    liquid (4.0.0)
    memoist (0.16.2)
    mini_mime (1.1.2)
    minitest (default: 5.4.1)
    msgpack (1.1.0 java)
    multi_json (1.15.0)
    multipart-post (2.1.1)
    net-telnet (default: 0.1.1)
    os (1.1.4)
    power_assert (default: 0.2.3)
    psych (2.2.4 java)
    public_suffix (4.0.6)
    rake (default: 10.4.2)
    rdoc (default: 4.2.0)
    representable (3.0.4)
    retriable (3.1.2)
    signet (0.11.0)
    test-unit (default: 3.1.1)
    time_with_zone (0.3.1)
    tzinfo (2.0.4)
    uber (0.1.0)

    플러그인은 https://plugins.embulk.org/ 에서 목록을 확인할 수 있습니다.

    예제

    embulk는 예제를 제공해 주고 있어서 손쉽게 확인을 할 수 있습니다.

    $ embulk example ./try1

    위 명령어를 통해 ./try1 폴더에 예제 파일을 생성해 줍니다. 예제 내용은 제공된 csv 파일을 읽어와 표준출력을 해주는 간단한 작업입니다. 만약 ./try1 를 지정하지 않으면 현재 위치에서 embulk_example 폴더를 만들어 줍니다.

    생성된 파일을 보면 seed.yml 파일이 있는데 아래와 같은 형식으로 작성되어 있습니다.

    in: # 읽어 들이는 곳의 설정
      type: file # 읽어 들이는 곳의 타입
      path_prefix: '/home/embulk-example/csv/sample_' # csv 파일 경로. prefix로 되어 있어서 sample_01.csv.gz파일에 접근이 가능함
    out: # 로딩할 곳의 설정
      type: stdout # 로딩할 곳의 타입

    이제 위의 파일을 guess 명령어를 통해 데이터에 대한 추측을 하고 파일을 생성할 수도 있습니다.

    $ embulk guess seed.yml -o config.yml # seed.yml 파일을 읽어와 config.yml 파일로 생성

    만약 -o config.yml 부분이 없다면 표준출력으로 보여주게 됩니다.

    위 명령어를 실행하면 아래와 같이 생성이 됩니다.

    in:
      type: file
      path_prefix: /home/embulk-example/csv/sample_
      decoders:
      - {type: gzip}
      parser:
        charset: UTF-8
        newline: LF
        type: csv
        delimiter: ','
        quote: '"'
        escape: '"'
        null_string: 'NULL'
        trim_if_not_quoted: false
        skip_header_lines: 1
        allow_extra_columns: false
        allow_optional_columns: false
        columns:
        - {name: id, type: long}
        - {name: account, type: long}
        - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
        - {name: purchase, type: timestamp, format: '%Y%m%d'}
        - {name: comment, type: string}
    out: {type: stdout}

    다음 데이터를 로딩하기 전, 제대로 동작하는지 일부 데이터를 읽어 파싱하고 결과를 보여줍니다.

    $ embulk preview config.yml
    
    021-11-11 01:24:33.714 +0900: Embulk v0.9.23
    2021-11-11 01:24:34.628 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
    2021-11-11 01:24:36.721 +0900 [INFO] (main): Gem's home and path are set by default: "/home/.embulk/lib/gems"
    2021-11-11 01:24:37.376 +0900 [INFO] (main): Started Embulk v0.9.23
    2021-11-11 01:24:37.466 +0900 [INFO] (0001:preview): Listing local files at directory '/home/embulk-example/csv' filtering filename by prefix 'sample_'
    2021-11-11 01:24:37.467 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
    2021-11-11 01:24:37.468 +0900 [INFO] (0001:preview): Loading files [/home/embulk-example/csv/sample_01.csv.gz]
    2021-11-11 01:24:37.476 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source
    +---------+--------------+-------------------------+-------------------------+----------------------------+
    | id:long | account:long |          time:timestamp |      purchase:timestamp |             comment:string |
    +---------+--------------+-------------------------+-------------------------+----------------------------+
    |       1 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
    |       2 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
    |       3 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
    |       4 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                            |
    +---------+--------------+-------------------------+-------------------------+----------------------------+

    다음은 실제로 out에 입력된 곳으로 로딩하는 명령어입니다. 현재 예제의 out은 표준출력으로 되어 있기 때문에 출력이 되는 것을 볼 수 있습니다.

    $ embulk run config.yml
    
    2021-11-11 01:27:06.212 +0900: Embulk v0.9.23
    2021-11-11 01:27:07.379 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
    2021-11-11 01:27:09.874 +0900 [INFO] (main): Gem's home and path are set by default: "/home/.embulk/lib/gems"
    2021-11-11 01:27:10.709 +0900 [INFO] (main): Started Embulk v0.9.23
    2021-11-11 01:27:10.873 +0900 [INFO] (0001:transaction): Listing local files at directory '/home/embulk-example/csv' filtering filename by prefix 'sample_'
    2021-11-11 01:27:10.875 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
    2021-11-11 01:27:10.876 +0900 [INFO] (0001:transaction): Loading files [/home/embulk-example/csv/sample_01.csv.gz]
    2021-11-11 01:27:10.929 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=16 / output tasks 8 = input tasks 1 * 8
    2021-11-11 01:27:10.937 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
    1,32864,2015-01-27 19:23:49,20150127,embulk
    2,14824,2015-01-27 19:01:23,20150127,embulk jruby
    3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
    4,11270,2015-01-29 11:54:36,20150129,
    2021-11-11 01:27:11.047 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
    2021-11-11 01:27:11.056 +0900 [INFO] (main): Committed.
    2021-11-11 01:27:11.057 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/home/embulk-example/csv/sample_01.csv.gz"},"out":{}}

    CSV → PostgreSQL

    이번에는 표준출력이 아닌 DB에 저장하는 방식을 설명하면서 속도를 올리는 방법도 소개합니다.

    admin_no,deleted,admin_name
    3,T,
    2,F,관리자2
    1,F,관리자1

    와 같은 csv 형식을 admin.csv 로 저장한 뒤, 아래와 같이 seed.yml을 생성합니다.

    seed.yml 파일을 생성하는 이유는 실제 in, out을 할 상세속성을 만들기 위함인데 이를 수동으로 작성을 했다면 seed.yml 파일을 생성할 필요는 없습니다.

    in:
      type: file
      path_prefix: '/home/embulk-example/csv/admin'
    out:
      type: postgresql

    다음 명령어를 통해서 config.yml 파일을 생성합니다.

    $ embulk guess seed.yml -o config.yml
    
    2021-11-12 00:55:59.152 +0900: Embulk v0.9.23
    2021-11-12 00:56:00.019 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
    2021-11-12 00:56:02.140 +0900 [INFO] (main): Gem's home and path are set by default: "/home/.embulk/lib/gems"
    2021-11-12 00:56:02.801 +0900 [INFO] (main): Started Embulk v0.9.23
    2021-11-12 00:56:02.902 +0900 [INFO] (0001:guess): Listing local files at directory '/home/embulk-example/csv' filtering filename by prefix 'admin'
    2021-11-12 00:56:02.903 +0900 [INFO] (0001:guess): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
    2021-11-12 00:56:02.904 +0900 [INFO] (0001:guess): Loading files [/home/embulk-example/csv/admin.csv]
    2021-11-12 00:56:02.916 +0900 [INFO] (0001:guess): Try to read 32,768 bytes from input source
    2021-11-12 00:56:02.973 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
    2021-11-12 00:56:02.992 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
    2021-11-12 00:56:03.012 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
    2021-11-12 00:56:03.023 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
    in:
      type: file
      path_prefix: /home/embulk-example/csv/admin
      parser:
        charset: UTF-8
        newline: LF
        type: csv
        delimiter: ','
        quote: '"'
        escape: '"'
        trim_if_not_quoted: false
        skip_header_lines: 1
        allow_extra_columns: false
        allow_optional_columns: false
        columns:
        - {name: admin_no, type: long}
        - {name: deleted, type: boolean} # 의도와 다르게 타입이 선언됨. 확인이 필요
        - {name: admin_name, type: string}
    out: {type: postgresql}
    
    Created 'config.yml' file.

    데이터를 보고 판단하여 자동으로 스키마를 생성해 주기 때문에 의도하는 것과 다를 수 있습니다. 여기서 is_deleted 는 string 타입으로 사용할 예정인데 embulk는 boolean으로 선언하였으니 한 번 더 확인 하는 것이 필요합니다.

    다음은 out을 구성할 차례입니다. PostgreSQL에 적재를 해야 하므로 아래 명령어로 플러그인을 설치해 줍니다.

    $ embulk gem install embulk-output-postgresql

    마지막으로 config.yml 폴더의 out 부분에 로딩할 PostgreSQL의 접속정보와 테이블, 모드를 작성해 줍니다.

    out:
      type: postgresql
      host: localhost
        port: 5432
      user: pgadmintest
      password: ""
      database: my_database
      table: my_table
      mode: insert

    my_table이란 테이블에 insert를 하는 설정입니다.

    아래와 같이 더 자세하게 설정할 수도 있습니다.

    out:
      type: postgresql
      host: localhost
      user: myuser
      password: ""
      database: my_database
      table: my_table
      options: {loglevel: 2}
      mode: insert_direct
      column_options:
        my_col_1: {type: 'BIGSERIAL'}
        my_col_3: {type: 'INT NOT NULL'}
        my_col_4: {value_type: string, timestamp_format: `%Y-%m-%d %H:%M:%S %z`, timezone: '-0700'}
        my_col_5: {type: 'DECIMAL(18,9)', value_type: pass}

    추가적인 설정과 설명은 https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-postgresql 에 작성되어 있습니다.

    parser-none 플러그인으로 속도 향상시키기

    csv나 json 파일이 이미 out 형식에 맞게 정의되어 있다면 parser 플러그인을 사용해 속도를 향상시킬 수 있습니다. 데이터를 일일이 파싱할 필요가 없기 때문에 속도 향상이 크게 발생합니다.

    $ embulk gem install embulk-parser-none

    이후 config.yml 파일의 in - parser 부분을 아래와 같이 변경합니다.

    in:
      type: file
      path_prefix: /home/embulk-example/csv/admin
      parser:
        type: none
        column_name: payload

    parser-none은 https://github.com/sonots/embulk-parser-none 자세하게 설명되어 있습니다.

    PostgreSQL → GCP Bigquery

    이번에는 PostgreSQL에서 GCP Bigquery로 옮기는 예제입니다.

    PostgreSQL에 접속하여 쿼리를 호출해 데이터를 가져와야 하므로 아래 플러그인을 설치해 줍니다.

    $ embulk gem install embulk-input-postgresql

    그 다음 config.yml을 수정하여 읽어올 DB의 서버와 쿼리를 작성해 줍니다.

    in:
      type: postgresql
      host: localhost
      user: myuser
      password: ""
      database: my_database
      table: my_table
      select: "col1, col2, col3"
      where: "col4 != 'a'"
      order_by: "col1 DESC"

    쿼리가 간단하다면 위의 문법으로도 충분하지만 복잡한 쿼리라면 아래와 같이 작성할 수 있습니다.

    in:
      type: postgresql
      host: localhost
      user: myuser
      password: ""
      database: my_database
      query: |
        SELECT t1.id, t1.name, t2.id AS t2_id, t2.name AS t2_name
        FROM table1 AS t1
        LEFT JOIN table2 AS t2
          ON t1.id = t2.t1_id

    또한 읽어들일 때, 타입에 따라 형변환을 진행하거나 컬럼 일부만 형변환을 진행할 수 있습니다.

    in:
      type: postgresql
      host: localhost
      user: myuser
      password: ""
      database: my_database
      table: "my_table"
      select: "col1, col2, col3"
      where: "col4 != 'a'"
      default_column_options:
        TIMESTAMP: { type: string, timestamp_format: "%Y/%m/%d %H:%M:%S", timezone: "+0900"}
        BIGINT: { type: string }
      column_options:
        col1: {type: long}
        col3: {type: string, timestamp_format: "%Y/%m/%d", timezone: "+0900"}
      after_select: "update my_table set col5 = '1' where col4 != 'a'"

    보통 embulk는 데이터를 다른 곳으로 증분할 때 많이 사용하기 때문에 아래의 옵션을 사용하면 유용하게 쓸 수 있습니다.

    • last_record: 증분을 위한 마지막 레코드의 값을 입력. (기본값으로는 모든 레코드를 로딩함)
    • fetch_rows: 한 번에 가져오는 데이터 사이즈 (기본값 10,000)

    기타 더 자세한 옵션과 설명은 https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-postgresql 에서 확인할 수 있습니다.

    이제 GCP Bigquery 플러그인을 설치해 줍니다.

    $ embulk gem install embulk-output-bigquery

    마지막으로 out을 작성해 줍니다.

    out:
      type: bigquery
      mode: append
      auth_method: service_account
      json_keyfile: /path/to/json_keyfile.json
      project: your-project-000
      dataset: your_dataset_name
      table: your_table_name
        schema_file: /data/schema_file.json

    auth_method

    여기서는 service_account (or json_key) 와 compute_engine 두 가지만 설명하겠습니다.

    service_account

    GCP service account를 json 파일 형식으로 내려 받은 다음, embulk에서 해당 파일 위치를 호출하거나 json을 직접 입력하여 사용할 수 있습니다.

    out:
      type: bigquery
      auth_method: service_account
      json_keyfile: /path/to/json_keyfile.json
    
    또는
    
    out:
      type: bigquery
      auth_method: service_account
      json_keyfile:
        content: |
          {
              "private_key_id": "123456789",
              "private_key": "-----BEGIN PRIVATE KEY-----\nABCDEF",
              "client_email": "..."
          }

    compute_engine

    만약 embulk를 GCP compute engine에서 실행하는데 같은 프로젝트 내에 Bigquery가 존재한다면 인증을 생략할 수 있습니다.

    out:
      type: bigquery
      auth_method: compute_engine

    schema

    여기서는 미리 설정한 스키마 파일과 존재하는 테이블의 스키마를 사용하는 방식 2가지를 설명합니다.

    schema_file

    미리 스키마 파일을 정의하여 해당 파일경로를 입력하는 형식입니다.

    out:
      type: bigquery
      table: table_%Y%m%d
      schema_file: /path/to/schema.json

    기존 테이블 스키마 사용하기

    bigquery에 존재하는 기존 테이블의 스키마를 읽어서 그대로 사용하는 형식입니다.

    out:
      type: bigquery
      table: table_%Y%m%d
      template_table: existing_table_name

    GCS 경유하기

    바로 Bigquery로 적재할 수 있지만 embulk는 병렬 처리로 여러 태스크에 나눠 실행이 되는데 bigquery에서는 프로젝트 당 하루 100,000개의 job만 로딩이 가능합니다. 여기서 GCS를 사용하면 GCS에서 Bigquery로 여러 파일을 옮겨도 1개의 job으로 인식하게 됩니다. 또한 병렬 처리로 진행되는 작업의 안정성을 위해선 GCS를 사용하는 것이 좋습니다.

    • 병렬로 job이 생성되는 개수는 min_output_tasksmax_threads 같은 파라미터에 의존합니다.
    out:
      type: bigquery
      gcs_bucket: bucket_name
      auto_create_gcs_bucket: true

    다음은 사용하면서 유용한 옵션들입니다.

    • auto_create_table: 테이블이 없을 경우, 자동으로 생성
    • ignore_unknown_values: 알 수 없는 값을 무시
    • allow_quoted_newlines: 데이터에 개행문자가 있으면 true로 설정해야함. true로 할 시, 속도가 느려짐
    • auto_create_dataset: dataset이 없을 시, 자동 생성
    • path_prefix: /tmp/prefix_ 와 같은 파일 경로 접두사. 기본값은 랜덤하게 생성

    더 많은 정보는 https://github.com/embulk/embulk-output-bigquery에서 확인할 수 있습니다.

    멀티 쓰레드를 이용하여 로딩 속도 올리기

    max_threadsmin_output_tasks 를 설정하여 병렬로 작업을 처리할 수 있습니다.

    min_output_tasks는 최소 실행할 작업의 수를 의미하므로 4라고 정하면 최소 4개의 작업이 병렬로 작업을 처리하게 됩니다. 1개의 task에서는 여러 thread를 사용할 수 있습니다.

    max_threads는 최대 스레드를 정하여 동시성을 제어하게 됩니다. 너무 많이 설정하면 부하가 걸릴 수 있고 CPU 사용율이 낮다면 이 숫자를 높여서 속도를 향상할 수 있습니다.

    exec:
      max_threads: 기본적으로 사용 가능한 cpu 코어 수 * 2
      min_output_tasks: 기본적으로 사용 가능한 cpu 코어 수
    in:
      type: file
      path_prefix: /home/embulk-example/csv/admin
      parser:
        type: none
        column_name: payload

    댓글