Develop/infra

CDC - debezium 설정

kudl 2021. 7. 25. 14:44

Debezium 이란?

더보기

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.

Debezium은 변경 데이터 캡처를 위한 오픈 소스 분산 플랫폼이다. 

Debezium 에서 변경된 데이터 캡쳐를 위해 mysql의 경우 binlog, postgresql의 경우 replica slot(logical)을 이용하여 데이터베이스에 커밋하는 데이터를 감시하여 Kakfa, DB, ElasticSearch 등 미들웨어에 이벤트를 전달한다.

debezium 관련 사이트

debezium 설정

RDS 설정

  • RDS 마스터 권한의 계정이 필요함.
  • RDS logical_replication 변경
    • RDS logical_replication 이란?
      • 논리 복제는 변경된 데이터를 스트림으로 다른 서버로 보낸다.
      • WAL(트랜잭션 로그)로 논리적인 데이터 변경을 스트림으로 만든다. 테이블 단위로 리플리케이션 될 수도 있다.
      • 또, 마스터나 리플리카를 구분해서 디자인된게 아니라 양방향으로 스트림될 수 있다.
    • WAL 이란?
      • 공식 문서의 내용을 확인해보면, WAL 파일은 데이터 무결성을 보장하는 표준 방법인데 어떤 데이터베이스에 쿼리를 날려 변경 이벤트를 실행할 때, 데이터를 변경하기 전에 해당 변경 내용을 로그에 미리 담아두고 이후에 변경한다는 개념이다. 이렇게 되면, 어떠한 문제(데이터 충돌, 파괴 등...)이 발생했을 때 WAL 파일에 로깅된 내용을 보고 복구가 가능할 수 있기 때문에 무결성을 보장해 줄 수 있다.
    • AWS RDS의 설정
      • RDS → 파라미터 그룹 → common-postgres12 → rds.logical_replication → 값 1로 변경
      • 해당 RDS 재부팅이 필요함
      • 재부팅 후 Database 콘솔에서 SHOW wal_level 쿼리 실행 후 logical 로 변경되었는지 확인을 해야한다.
  • 사용자 계정 추가 및 권한 추가
    • 더보기
      CREATE USER debeziumuser;
      GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA testdb TO debeziumuser;
      GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA testdb TO debeziumuser;
    • debezium에서 체크하는 권한 관련 코드
      • pg_roles 테이블 rolcanlogin컬럼 true 이어야함.
      • pg_roles 테이블 rolreplication 컬럼 true 혹은 pg_auth_members 테이블에서 다음 권한을 하나이상 갖고 있어야함(rds_superuser, rdsadmin, rdsrepladmin)
  • 테스트 테이블 생성 및 REPLICA IDENTITY 변경
    • DEFAULT: 기본키 칼럼 옛 값을 기록.
    • USING INDEX: 지정한 인덱스에 있는 해당 칼럼 옛 값을 기록. 이 인덱스는 유니크 인덱스여야 하며, 부분 인덱스가 아니며, 지연 불가능한 인덱스여야 하며, 그 칼럼은 모두 NOT NULL 속성이 있어야 한다.
    • FULL: 해당 로우의 모든 칼럼 옛 값을 기록.
    • NOTHING: 옛 로우에 대한 기록은 안 남김. (시스템 테이블은 이 값이 기본값이다.)
    • 어떤 설정을 하든, 새 버전 로우와 옛 버전 로우과 완벽하게 일치하면, 미리 쓰기 로그에는 어떠한 기록도 남기지 않는다.
      • 대상 테이블 replica 상태 체크 쿼리
        • 더보기
          SELECT CASE relreplident 
          WHEN 'd' THEN 'default' 
          WHEN 'n' THEN 'nothing' 
          WHEN 'f' THEN 'full'
          WHEN 'i' THEN 'index'
          END AS replica_identity
          FROM pg_class WHERE oid = 'debeziumuser'::regclass;
    • sample 쿼리
      • 더보기
        CREATE TABLE holding 
        ( holding_id int, 
        user_id int, 
        holding_stock varchar(8), 
        holding_quantity int, 
        datetime_created timestamp, 
        datetime_updated timestamp, 
        primary key(holding_id) ); 

        ALTER TABLE holding replica identity FULL; 
        insert into holding values (1055, 1, 'VFIAX', 10, now(), now());

Kafka(MSK 설정)

  • Debezium은 kafka topic을 json 설정부분의 database.server.name + table.whitelist 기반으로 생성이 된다. 그래서 Kafka Topic 자동 생성 설정을 true로 해주는게 좋다.
  • AWS → MSK → Clusters → Cluster 상세 클릭 → Cluster configuration 수정 버튼 클릭 → kafka-config 설정에는 토픽이 없을시 자동 생성되도록 auto.create.topics.enable=true 설정

Debezium 어플리케이션 json 설정

  • database.server.name : server, table.whitelist : public.member 라면 topic은 server.public.member 로 생성됨.
    • database.server.id : 디비지움 커넥터의 ID 이다. 같은 데이터베이스를 바라보는 커넥터(슬레이브) 사이에서 이 아이디는 고유해야 하며, 그렇지 않을 경우 예상치 않게 동작할 수 있다.
    • database.server.name : 디비지움 커넥터의 논리명이다. 이 논리명도 고유해야 한다. 이 이름은 커넥터에서 사용하는 모든 토픽의 접두사로 사용된다. 영숫자, 문자, 밑줄만 사용할 수 있다.
    • include.schema.changes : true(기본)인 경우 이 토픽에 스키마 변경 이벤트가 기록된다. 변경 이력을 저장하는 것과는 별개이다.
    • database.history.kafka.topic : 디비지움은 원본 DB 의 스키마 변경을 항상 추적하여 이 토픽에 이력을 저장해 놓는다.
  • debezium connector 설정으로 동일 데이터소스에 붙을경우 slot.name 옵션을 통해 replication slot 을 분리해야한다.(기본 : debezium)
  • 데이터 파싱
    • payload.op 필드, payload.after, payload.before 필드 사용해야 한다.
    • payload.op 필드 C/R/U/D 값으로 로그가 남게 된다.
    •  payload.after필드
      • 테이블의 pk 컬럼을 사용하여 후처리를 한다.

참고 문서

기타 정보

  • RDS heartbeat 설정
    • DB 설정등의 문제로 replication slot 소비가 재대로 이루어 지지 않을때 DB disk full 오류가 발생할 수 있다.
    • 임계치 설정은 필수로 해야하며 heartbeat 설정을 통해 임의로 file 소비를 해줄수 있다.
    • 관련 문서 : https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space
    • 더보기
      CREATE TABLE IF NOT EXISTS heartbeat (id SERIAL PRIMARY KEY, ts TIMESTAMP WITH TIME ZONE); 
      INSERT INTO heartbeat (id, ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET ts=EXCLUDED.ts;

'Develop > infra' 카테고리의 다른 글

M1 MAC OS Rancher desktop 사용  (0) 2023.01.04
Datadog 이란?  (0) 2021.08.23
Nginx 튜닝, 보안 설정 및 Docker-compose 실행  (0) 2020.12.16
Docker Compose 이용한 ElasticSearch Cluster, Kibana 구성  (0) 2020.12.15
Kubernetes 개념  (0) 2020.11.24