정적 팩터리 메서드와 생성자는 선택적 매개변수가 많을때 적절히 대응하기 어렵다는 제약이 있습니다.

이럴때 프로그래머들은 점층적 생성자 패턴을 즐겨 사용하였습니다.

1. Telescoping constructor pattern

점층적 생성자 패턴은 n개의 매개변수를 가지는 클래스에 대해 선택 매개변수를 1개 받는 생성자, 2개받는 생성자, ... n개 받는 생성자 의 형태로 생성자를 늘려가는 방식입니다.

class NutritionFacts {
    private final int servingSize;     // 필수
    private final int servings;     // 필수
    private final int calories;     // 선택
    private final int fat;            // 선택

    public NutritionFacts(int servingSize, int servings) {
        this(servingSize, servings, 0);
    }

    public NutritionFacts(int servingSize, int servings, int calories) {
        this(servingSize, servings, calories, 0);
    }

    public NutritionFacts(int servingSize, int servings, int calories, int fat) {
        this.servingSize = servingSize;
        this.servings = servings;
        this.calories = calories;
        this.fat = fat;
    }
}

하지만 이러한 점층적 생성자 패턴은 다음과 같은 단점을 가집니다.

  • 매개변수의 개수가 많아질수록 코드를 작성하고, 읽기 어려워집니다. (확장성 및 가독성이 떨어집니다)
  • 코드를 읽을때 각 값의 의미가 무엇인지 파악하기 어렵습니다.
  • 매개변수의 개수가 몇개인지 주의하여 작성하여야 합니다.
  • 매개변수의 순서를 헷갈리게 되면, 런타임에 엉뚱한 동작을 하게 됩니다.

2. JavaBeans Pattern

매개변수가 없는 생성자로 객체를 만든 후, 세터 메서드들을 호출하여 원하는 매개변수의 값을 설정하는 방식입니다.

class NutritionFacts {
    private int servingSize = -1;     // 필수
    private int servings    = -1;     // 필수
    private int calories    = 0;     // 선택
    private int fat             = 0;    // 선택

    public NutritionFacts() { }

    public void setServingSize(int val);
    public void setServings(int val);
    public void setCalories(int val);
    public void setFat(int val);
}

점층적 생성자 패턴에 비해 인스턴스를 더 만들기 쉽고 그결과 더 읽기 쉬운 코드가 되었습니다.

하지만 클라이언트는 객체 하나를 만들기 위해 여러 메서드를 호출해야 하고, 객체가 완전히 생성되기 전까지는 일관성이 무너진 상태에 놓이게 됩니다. 때문에 클래스를 불변(final)로 만들수 없게 되고, 스레드 안정성을 얻기위한 추가적업이 필요하게 됩니다.

클래스의 모든 필드를 불변으로 만들면 생성 이후에는 읽기 작업만 가능해 지기 때문에 스레드 안정성을 보장합니다.

[Effective Java] Item 17에서는 클래스가 가변적이여야 하는 합당한 이유가 없다면 모든 필드는 private final이어야 한다고 설명합니다.

3. Builder Pattern

빌더패턴은 점층적 생성자 패턴의 안정성(Immutable class)과 자바 빈즈 패턴의 가독성(setter method)을 겸비합니다.

클라이언트는 필수 매개변수만으로 생성자를 호출하고 빌더 객체가 제공하는 세터 메서드로 선택 매개변수를 설정합니다. 이후 build 메서드를 호출하여 필요한 객체를 얻습니다.

// 코드 2-3 빌더 패턴 - 점층적 생성자 패턴과 자바빈즈 패턴의 장점만 취했다. (17~18쪽)
public class NutritionFacts {
    private final int servingSize;
    private final int servings;
    private final int calories;
    private final int fat;
    private final int sodium;
    private final int carbohydrate;

    public static class Builder {
        // 필수 매개변수
        private final int servingSize;
        private final int servings;

        // 선택 매개변수 - 기본값으로 초기화한다.
        private int calories      = 0;
        private int fat           = 0;
        private int sodium        = 0;
        private int carbohydrate  = 0;

        public Builder(int servingSize, int servings) {
            this.servingSize = servingSize;
            this.servings    = servings;
        }

        public Builder calories(int val)
        { calories = val;      return this; }
        public Builder fat(int val)
        { fat = val;           return this; }
        public Builder sodium(int val)
        { sodium = val;        return this; }
        public Builder carbohydrate(int val)
        { carbohydrate = val;  return this; }

        public NutritionFacts build() {
            return new NutritionFacts(this);
        }
    }

    private NutritionFacts(Builder builder) {
        servingSize  = builder.servingSize;
        servings     = builder.servings;
        calories     = builder.calories;
        fat          = builder.fat;
        sodium       = builder.sodium;
        carbohydrate = builder.carbohydrate;
    }

    public static void main(String[] args) {
        NutritionFacts cocaCola = new NutritionFacts.Builder(240, 8)
                .calories(100).sodium(35).carbohydrate(27).build();
    }
}

NutritionFacts cocaCola = new NutritionFacts.Builder(240, 8).calories(100).sodium(35).carbohydrate(27).build();에서 볼 수 있듯, 빌더의세터 메서드들은 빌더 자신(this)를 반환하기 때문에 연쇄적으로 호출이 가능합니다.

이러한 방식을 메서드 호출이 흐르듯 연결된다는 뜻으로 Fluent API 또는 메서드 연쇄(method chaining)라 합니다.

 

빌더패턴은 점층적 생성자 패턴에 비해 확장에 유리하고 가독성이 높은 동시에 불변성을 확보하여 자바 빈즈 패턴보다 안정적입니다. 따라서 생성자나 정적 팩터리가 처리해야 할 매개변수가 많다면 빌더 패턴을 선택하는것이 좋습니다.

Reference

  • Effective Java 3th Edition

블룸필터란

수많은 양의 정보가 저장된 데이터베이스에서 해당 데이터가 존재하는지 확인하는 작업에는 부하가 뒤따릅니다. 이를 위해 선형탐색을 사용한다면 너무나 오랜 시간이 소요될 것입니다. 이진 탐색도 좋은 방법이지만 더 나은 방법이 있을 것 같습니다.

 

이렇게 어떤 집합에 특정원소가 있는지 확인하는 작업을 Membership Testing이라고 합니다. 이러한 Membership Test를 위한 자료구조에는 Bloom filter, AVL, red-black tree등이 있습니다. 이중 Bloom filter는 확률적 자료구조로 에러를 허용하는 대신 공간복잡도를 낮추었습니다.

 

Bloom Filter는 Burtoon H. Bloom이 1970년의 논문 Space/Time Trade-offs in Hash Coding with Allowable Errors에서 제안한 확률적 자료구조입니다.

Working

Bloom Filter는 다음과 같이 m개의 비트로 구성된 bit 배열 자료구조입니다.

image

Insert

주어진 input값(test, )에 대해 k개의 hash function(h1,h2,h3,...,hk)을 통해 hash를 계산합니다.(3개의 hash function을 사용한다 가정하겠습니다.)

// input : test
h1(test) % 10 = 0
h2(test) % 10 = 3
h3(test) % 10 = 7
// input  : monkey 
h1(monkey) % 10 = 3
h2(monkey) % 10 = 4
h3(monkey) % 10 = 5

image

)

image

Lookup(Membership Test)

Membership Test는 동일한 input을 주어 필터에 해당하는 해시값이 모두 1로 설정되어있다면 해당 input값이 존재할 것이라고 판단할 수 있습니다.

False Positive

거짓양성(False Positive)는 값이 존재하지 않음에도 불구하고 해당값이 존재할것이라 판단하는것을 말합니다.

// input  : bloom
h1(bloom) % 10 = 3
h2(bloom) % 10 = 5
h3(bloom) % 10 = 7

image

예를들어 위와 같이 'bloom'이라는 input을 조회했을때 해시값이 모두 1이므로 해당 값이 존재할 것이라 판단하게 됩니다. 하지만 bloom은 존재하지 않기때문에 이를 거짓양성(False positive)라고 합니다.

이러한 거짓양성의 발생률은 Bloom Filter의 크기를 조절하여 조정할 수 있습니다. 당연히 Bloom Filter를 위해 더많은 공간(hash function, bit array)을 할당할수록 거짓양성률이 낮아집니다.

Space Efficiency

값을 저장하는 배열, 연결리스트, 해시맵, 트리와 같은 자료구조는 실제 값을 저장해야 하지만, Bloom Filter는 실제값을 저장하지 않고 bit array를 통해 표현하기 때문에 공간 복잡도가 낮습니다. 다만 이는 hash collision을 유발합니다.

Hash Collision(해시 충돌)
Hash table의 크기가 충분히 크지 못해 서로다른 입력값이 동일한 hash값을 가지는 상황을 의미합니다.

Bloom Filter의 종류(Classification)

  • Standard Bloom Filter(SBF) : 전통적인 블룸필터로 거짓 양성과 거짓음성의 문제를 가지고 있습니다. 거짓양성은 비트 배열의 크기가 충분히 크기 못할때 발생하며, 거짓음성은 원소를 삭제할때 일어납니다. 이러한 문제때문에 SBF는 원소를 삭제할 수 없습니다.
  • Counting Bloom Filter(CBF) : SBF의 확장성 문제를 해결하기 위해 등장하였으며, 비트배열에 카운터를 포함합니다. 삽입연산이 발생하면 카운터를 증가시키고 삭제연산이 발생하면 카운터를 감소시켜 삽입/삭제가 가능하도록 구현하였습니다. SBF에 비해 거짓 양성의 비율이 매우 높고, 삭제연산으로 인해 거짓 음성이 발생할 수 있습니다.
  • Fingerprint Bloom Filter : CBF기반으로 만들어 졌으며 binary bit를 사용하는 대신 작은 크기의 hash값을 사용하여 CBF의 높은 거짓 양성률을 해결하였습니다. 하지만 이로인해 CBF에 비해 높은 공간복잡도를 보입니다.
  • Hierachical Bloom Filter : 트리구조의 Bloom filter로 높은 정확도, 확정성, 그리고 낮은 거짓 양성률을 보입니다. 대표적으로 Forest Structured Bloom filter, Bloom Store가 있습니다.
  • Multidimensional Bloom Filter(MDBF) : 다차원의 Bloom filter 배열을 사용합니다.
  • Compressed Bloom Filter : 공간 효율적인 Bloom filter이지만 압축률이 높아질수록 거짓양성률이 매우 높아집니다.

활용

그림 출처: 위키피디아 블룸필터 문서

Bloom Filter는 위의 그림과 같이 Filter는 메모리에 두고 실제 데이터는 Storage에 저장하여 디스크 접근시간으로 인한 성능저하를 개선할 수 있습니다. 하지만 이경우에도 거짓 양성으로인한 불필요한 디스크 접근이 발생할 수 있습니다.

LSM-Tree와 B-Tree는 현대 Key-value 저장소의 스토리지 엔진으로 주로 사용되는 자료구조입니다. LSM-Tree의 경우 updata 쿼리에서, B-Tree는 좁은 범위의 lookups에서 좋은 성능을 보입니다.

B-Trees

B-tree는 디스크에서 사용하기위해 설계된 인덱스 구조로 R. Bayer 와 E. McCreight가 1970년 7월 발표한 논문 "Organization and Maintenance of Large Ordered Indexes"에서 처음 발표되었습니다.

인덱스구조
데이터베이스에서 탐색속도를 높이는데 사용되는 자료구조입니다.
인덱스 구조를 생성하고 유지하기 위해서는 추가적인 스토리지 공간과 쓰기작업과 관련된 비용이 발생합니다. 하지만 인덱스를 사용하여 쿼리하는것이 데이터베이스의 모든 행을 탐색하는것보다 훨씬 빠르기 때문에 이러한 비용은 큰문제가 되지 않습니다.

B-tree를 사용하는 가장 큰 이유는 디스크 접근시간을 줄일 수 있기 때문입니다. B-tree는 2개 이상의 차수를 가진 트리구조로 비교적 적은 읽기 횟수로 데이터의 저장 위치를 알아낼 수 있습니다.

LSM-Trees

Log-Structured Merge Tree는 Patrick O’Neil이 1996년 발표한 논문 The Log-Structured Merge-Tree (LSM-Tree)에서 처음 발표되었습니다. LSM-tree는 우수한 업데이트 성능과 space amplification특성때문에 많은 key-value store 데이터베이스에서 차용되었습니다.

Space amplification

LSM-tree의 update는 디스크 I/O를 최소화하기위해 메모리의 버퍼에 log를 남깁니다. 그리고 버퍼는 주기적으로 정렬되어 디스크에 쓰여지는데 이를 flush라고 하며, 정렬된 일련의 배열을 run이라고 합니다.

LRU

LRU는 Least Recently Used의 약자로 페이지 폴트가 발생하게 되면 가장 오래전에 접근했던 페이지를 퇴출시켜 공간을 확보합니다.

버퍼에 존재하는 페이지에 접근하는 경우 리스트의 head로 페이지를 이동시킵니다. 이러한 방식으로 최근에 접근한 페이지 일수록 head에 가깝게, 오래전에 접근한 페이지일수록 tail에 가까이 위치하게 됩니다. 새로운 페이지를 위한 공간이 필요할 경우 tail에 존재하는 페이지를 퇴출시킵니다.

LRU는 주로 이중 연결리스트와 해시테이블을 통해 구현합니다. 구현이 간단하고, 알고리즘의 효율성 때문에 80년대 초반까지 대부분의 시스템에서 차용되었던 알고리즘입니다. 하지만, 이러한 장점들에도 불구하고 특정 상황에서 최악의 성능을 보입니다.

img

Sub-Optimality during DB scans

데이터베이스 테이블이 LRU 캐시보다 큰 경우, 테이블을 스캔할 때 DB 프로세스는 전체 LRU 캐시를 삭제하고 스캔한 테이블의 페이지들로 채우게 됩니다. 이러한 페이지들이 다시 참조되지 않는다면 데이터 베이스의 성능을 크게 저하시킬 수 있습니다.

2Q

LRU의 이러한 단점을 보완하기 위해 2Q는 추가적인 대기열을 통해 실제로 Hot한 페이지가 LRU캐시에 위치할 수 있도록 합니다. 즉, 2Q는 접근 빈도 뿐만 아니라, 페이지의 Hot여부도 판단합니다.

Simplified 2Q

img

2Q 알고리즘은 기본 LRU버퍼 Am과 보조 FIFO버퍼 A1으로 구성됩니다. Page Fault가 발생하면 먼저 A1버퍼에 페이지를 push합니다. 이후 페이지가 다시 참조되면 LRU버퍼 Am으로 페이지를 push합니다. 이를통해 Am버퍼에 있는 페이지의 hot함을 보장할 수 있습니다.

만약 A1버퍼의 page가 다시 참조되지 않으면 A1버퍼의 fifo정책에 따라 퇴출됩니다.
이는 해당 페이지가 cold하며, 캐시될 필요가 없음을 암시합니다.

def access_page(X: page):
    # if the page already exists in the LRU cache
    # in buffer Am
    if X in Am:
         Am.move_front(X)

    # if the page exists in secondary storage
    # and not it gets access.
    # since the page is accessed again, indicating interest
    # and long-term need, move it to Am.
    elif X in A1:
         A1.remove(X)
         Am.add_front(X)

    # page X is accessed for the first time
    else:
         # if A1 is full then free a slot.
         if A1.is_full():
             A1.pop()

         # add X to the front of the FIFO A1 queue
         A1.add_front(X)

A1버퍼의 크기가 너무 작다면, hot의 기준이 너무 높아지게 되며, A1버퍼의 크기가 너무 커지게 되면 메모리의 한계로 인해 Am버퍼의 크기를 줄이게 되고 이는 데이터 베이스의 성능을 저하시킬 수 있습니다.

2Q Full Version

img

일반적인 데이터베이스 접근 패턴에서 페이지는 짧은 시간동안 많은 참조를 받은 다음, 오랜시간 동안 참조되지 않습니다. 만약 캐시를 해야한다면, 짧은 시간의 많은 참조 이후에도 정기적으로 참조되는 페이지가 캐시되야 할 것입니다.

이러한 데이터 베이스 접근패턴에서 2Q를 활용하기 위해, 2Q Full Version은 보조 버퍼 A1을 A1-in, A1-out으로 나눕니다.

새로운 페이지는 항상 A1-in에 저장이 되고 페이지가 다시 참조될때까지 A1에 머무릅니다.
만약 페이지가 오래되어 퇴출되는 경우 메모리에서는 페이지가 삭제되지만, A1-out에 디스크 참조를 저장합니다. A1-out의 페이지가 다시 액세스 되는 경우 Am버퍼로 이동합니다.

References

토픽 생성

kafka-topics.sh를 통해 토픽 관련 명령을 실행할 수 있다.

$ bin/kafka-topics.sh \
--create \
--bootstrap-server <카프카 서버 주소>:9092 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=172800000 \
--topic hello.kafka
  1. create
    • 토픽을 생성하는 명령어
  2. bootstrap-server
    • 토픽을 생성할 카프카 클러스터를 구성하는 브로커 IP와 Port를 적는다.
  3. partitions
    • 파티션의 개수를 지정한다. 옵션을 명시하지 않으면, 카프카 브로커의 설정파일의 num.partitions옵션을 따른다.
  4. replication-factor
    • 토픽의 파티션을 복제할 복제 개수를 적는다. 사용하는 브로커의 개수를 따르면 되고, 실제 업무환경에서는 3개 이상의 카프카 브로커로 운영하는 것이 일반적
  5. config retentions.ms
    • 토픽의 데이터를 유지하는 기간을 나타낸다. ms단위로, 172800000은 2일을 나타낸다. 따라서 2일이 지난 토픽 데이터는 삭제된다.

토픽 리스트 조회

토픽 리스트 조회**

$ bin/kafka-topics.sh --bootstrap-server <카프카 서버 주소>:9092 --list

상세 조회

$ bin/kafka-topics.sh --bootstrap-server <카프카 서버 주소>:9092 --describe --topic <조회할 topic명>
Topic: hello.kafka      TopicId: XGxv2fB5QTidTzfb5rsShA PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824,retention.ms=172800000
        Topic: hello.kafka      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: hello.kafka      Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: hello.kafka      Partition: 2    Leader: 0       Replicas: 0     Isr: 0

토픽 옵션 수정

토픽 옵션을 변경하기 위해서 kafka-topics.sh 또는 kafka-configs.sh를 사용한다.

파티션 개수 변경

$ bin/kafka-topics.sh \
--topic <속성을 변경할 topic의 이름> \
--bootstrap-server <카프카 서버 주소>:9092 \
--alter \
--partitions 4

파티션의 개수는 늘릴수는 있지만 줄일 수는 없다. 따라서 파티션 개수를 늘릴때는 반드시 늘려야 하는 상황인지 판단하는것이 중요하다.

Topic: hello.kafka      TopicId: XGxv2fB5QTidTzfb5rsShA PartitionCount: 4       ReplicationFactor: 1    Configs: segment.bytes=1073741824,retention.ms=172800000
        Topic: hello.kafka      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: hello.kafka      Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: hello.kafka      Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: hello.kafka      Partition: 3    Leader: 0       Replicas: 0     Isr: 0

파티션의 번호는 0부터 1씩 늘어나며 순차적으로 부여된다.

데이터 유지시간(retention.ms) 변경

$ bin/kafka-configs.sh --bootstrap-server <카프카 서버 주소>:9092 \
--entity-type topics \
--entity-name hello.kafka \
--alter --add-config retention.ms=86400000

변경 확인

$ bin/kafka-topics.sh --bootstrap-server 52.79.85.44:9092 --describe --topic hello.kafka

Topic: hello.kafka      TopicId: XGxv2fB5QTidTzfb5rsShA PartitionCount: 4       ReplicationFactor: 1    Configs: segment.bytes=1073741824,retention.ms=86400000
        Topic: hello.kafka      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: hello.kafka      Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: hello.kafka      Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: hello.kafka      Partition: 3    Leader: 0       Replicas: 0     Isr: 0

kafka-console-producer.sh

kafka-console-producer.sh명령어를 통해 토픽에 데이터를 넣을 수 있다.

토픽에 넣는 데이터는 레코드(record)라고 부르며 메시지 키(key)와 값(value)로 이루어져 있다.

메시지 키 없이 메시지 값만 보내기

메시지 키없이 메시지 값만 보낼경우 메시지 키가 자바의 null로 설정되어 브로커로 전송된다.

$ bin/kafka-console-producer.sh --bootstrap-server <카프카 서버 주소>:9092 \
--topic hello.kafka

>NullTest1
>NullTest2
>NullTest3

bin/kafka-console-producer.sh를 통해 전송되는 레코드의 값은 UTF-8을 기반으로 Byte로 변환된 후 ByteArraySerializer로만 직렬화 된다. 즉, String이 아닌 타입으로는 직렬화 하여 전송할 수 없다.

만약 다른 타입으로 직렬화하여 데이터를 브로커로 전송하고 싶으면 직접 프로듀서 애플리케이션을 개발하여야 한다.

메시지 키가 null인 경우 프로듀서가 파티션으로 전송할때 레코드 배치 단위로 라운드로빈으로 전송된다.

메시지 키를 가지는 레코드 전송

$ bin/kafka-console-producer.sh --bootstrap-server <카프카 서버 주소>:9092 \
--topic hello.kafka \
--property "parse.key=true" \
--property "key.separator=:"

>key1:val1
>key2:val2
>key3:val3
>key3:val4
  • key.separator=:
    • 키와 값을 나누는 구분자를 선언한다. 만약 구분자를 넣지 않고 엔터를 누르면 KafkaException과 함께 종료된다.

메시지 키와 값을 함께 전송한 레코드는 메시지 키의 해시값을 작성하여 존재하는 토픽의 파티션중 한개에 할당된다. 따라서 메시지 키가 동일할 경우 동일 파티션에 전송된다.

kafka-console-consumer.sh

kafka-console-consumer.sh명령어를 통해 토픽에 데이터를 확인할 수 있다.

$ bin/kafka-console-consumer.sh --bootstrap-server <카프카 서버 주소>:9092 \
--topic hello.kafka \
--from-beginning

NullTest2
val1
val2
NullTest3
val3
val4
NullTest1
^CProcessed a total of 7 messages
  • from-beginning
    • 토픽에 저장된 가장 처음의 데이터부터 출력한다.

키와 함께 확인

$ bin/kafka-console-consumer.sh --bootstrap-server <카프카 서버 주소>:9092 \
--topic hello.kafka \
--property print.key=true \
--property key.separator="-" \
--group hello-group \
--from-beginning

null-NullTest2
key1-val1
key2-val2
null-NullTest3
key3-val3
key3-val4
null-NullTest1
^CProcessed a total of 7 messages
  • group hello-group
    • group옵션을 통해 신규 컨슈머 그룹 hello-group을 생성하였다.
    • 컨슈머 그룹은 1개 이상의 컨슈머로 이루어져 있으며 해당 컨슈머 그룹을 통해 가져간 토픽 메시지는 가져간 메시지에 대해 커밋을 한다
    • 커밋이란 컨슈머가 특정 레코드까지 처리를 완료했다고 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것이다.
    • 커밋 정보는 __consumer_offsets 이름의 내부 토픽에 저장된다.
  • from-beginning
    • 전송한 데이터의 순서가 현재 출력되는 순서와 다른것을 볼 수 있다. 이는 카프카의 파티션때문에 생기는 현상이다.
    • 만약 토픽에 넣은 데이터의 순서를 보장하고 싶다면 파티션 1개로 구성된 토픽을 만드는 것이다.
    • 한개의 파티션에서는 데이터의 순서를 보장하기 때문이다.

kafka-consumer-groups.sh

생성한 컨슈머 그룹 리스트

$ bin/kafka-consumer-groups.sh --bootstrap-server <카프카 서버 주소>:9092 --list
hello-group

컨슈머 그룹이 데이터를 가져가는 토픽 확인

$ bin/kafka-consumer-groups.sh --bootstrap-server <카프카 서버 주소>:9092 \
--group hello-group \
--describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
hello-group     hello.kafka     2          3               3               0               -               -               -
hello-group     hello.kafka     1          3               3               0               -               -               -
hello-group     hello.kafka     0          1               1               0               -               -               -

kafka-verifiable-producer, consumer.sh

kafka-verfiable-producer, consumer.sh을 사용하면 String타입 메시지 값을 코드없이 주고받을수 있어 네트워크 통신 테스트를 할때 유용하다.

데이터 전송

$ bin/kafka-verifiable-producer.sh --bootstrap-server 3.35.13.39:9092 \
--max-messages 10 \
--topic verify-test

{"timestamp":1629028682306,"name":"startup_complete"}
[2021-08-15 20:58:02,510] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {verify-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
{"timestamp":1629028682746,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"verify-test","partition":0}
{"timestamp":1629028682748,"name":"producer_send_success","key":null,"value":"1","offset":1,"topic":"verify-test","partition":0}
{"timestamp":1629028682748,"name":"producer_send_success","key":null,"value":"2","offset":2,"topic":"verify-test","partition":0}
{"timestamp":1629028682749,"name":"producer_send_success","key":null,"value":"3","offset":3,"topic":"verify-test","partition":0}
{"timestamp":1629028682749,"name":"producer_send_success","key":null,"value":"4","offset":4,"topic":"verify-test","partition":0}
{"timestamp":1629028682750,"name":"producer_send_success","key":null,"value":"5","offset":5,"topic":"verify-test","partition":0}
{"timestamp":1629028682750,"name":"producer_send_success","key":null,"value":"6","offset":6,"topic":"verify-test","partition":0}
{"timestamp":1629028682750,"name":"producer_send_success","key":null,"value":"7","offset":7,"topic":"verify-test","partition":0}
{"timestamp":1629028682750,"name":"producer_send_success","key":null,"value":"8","offset":8,"topic":"verify-test","partition":0}
{"timestamp":1629028682750,"name":"producer_send_success","key":null,"value":"9","offset":9,"topic":"verify-test","partition":0}
{"timestamp":1629028682757,"name":"shutdown_complete"}
{"timestamp":1629028682759,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":22.07505518763797}

전송한 데이터 확인

$ bin/kafka-verifiable-consumer.sh --bootstrap-server 3.35.13.39:9092 \
--topic verify-test \
--group-id test-group

{"timestamp":1629028804932,"name":"startup_complete"}
{"timestamp":1629028805183,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1629028805250,"name":"records_consumed","count":10,"partitions":[{"topic":"verify-test","partition":0,"count":10,"minOffset":0,"maxOffset":9}]}
{"timestamp":1629028805260,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":10}],"success":true}
...

kafka-delete-records.sh

kafka-delete-records.sh명령어를 통해 이미 적재된 토픽의 데이터를 지울 수 있다. 토픽의 데이터중 가장오래된 데이터(가장 낮은 오프셋)부터 특정 시점의 오프셋가지 삭제할 수 있다.

이때 삭제하고자 하는 데이터에 대한 정보를 파일(.json)로 저장해서 사용해야 한다.

$ vi delete-topic.json
{"partitions": [{"topic": "verify-test", "partition": 0, "offset": 5}], "version":1}

$ bin/kafka-delete-records.sh --bootstrap-server 3.35.13.39:9092 \
--offset-json-file delete-topic.json

Executing records delete operation
Records delete operation completed:
partition: verify-test-0        low_watermark: 5

여기서 주의해야 할 점은 토픽의 특정 레코드 하나만 삭제되는것이 아니라 파티션에 존재하는 가장 오래된 오프셋부터 지정한 오프셋까지 삭제된다는 점이다.
즉, 카프카는 토픽의 특정 데이터만 삭제할 수 없다.

'Infra' 카테고리의 다른 글

[FPGA]Introduction of FPGA acceleration  (0) 2022.08.22
[CUDA]Programming Model  (0) 2022.08.22
[Kafka] EC2에 Kafka서버 만들기  (0) 2022.08.22
WSL에서 mysql사용환경 만들기  (0) 2022.08.22
[MongoDB] GridFS  (0) 2022.08.22

AWS인스턴스 Kafka설치

AWS 인스턴스에서

1. AWS EC2인스턴스 생성

  1. 보안그룹 -> 인바운드 규칙
    • 2181 포트 개방 : 주키퍼 기본 포트
    • 9092 포트 개방 : 카프카 브로커 기본포트
  2. 인스턴스 접속

2. 인스턴스 자바 설치(jdk-11)

sudo apt-get install openjdk-11-jdk
java --version

3. 카프카 브로커 다운로드

wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar xvf kafka_2.13-2.8.0.tgz

4. 카프카 브로커 설정

힙메모리 설정

카프카 브로커의 경우 힙메모리가 1Gb, 주키퍼의 경우 512Mb로 기본 설정되어있다.
만약 메모리가부족하면 Cannot allocate memory에러가 출력되면서 실행되지 않는다.

이때 환경변수를 통해 힙메모리를 지정해 줄 수 있다.

export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
echo $KAFKA_HEAP_OPTS

실행옵션 설정
config폴더의 server.properties파일에는 카프카 브로커가 클러스터운영에 필요한 옵션들을 지정할 수 있다.

advertised.listeners의 주석을 해제하고 aws인스턴스의 퍼블릭 IPv4 주소를 적는다.

...
advertised.listeners=PLAINTEXT://**<AWS인스턴스주소>**:9092
...

5. 주키퍼 실행

카프카 바이너리 폴더에 주키퍼가 준비되어있다.

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

-daemon옵션을 해제하면 포그라운드에서 실행하여 주키퍼 로그를 확인할 수 있다.

6. 카프카 브로커 실행

bin/kafka-server-start.sh -daemon config/server.properties

로컬 컴퓨터에서

카프카 서버와 통신 확인

카프카 바이너리 패키지는 카프카 브로커에 대한 정보를 가져올 수 있는 kafka-broker-api-versions.sh명령어를 제공한다. 이명령어를 통해 카프카 브로커와 정상적으로 연동되는지 확인할 수 있다.

로컬에도 동일하게 kafka를 다운로드한다.

wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar xvf kafka_2.13-2.8.0.tgz

통신 확인

bin/kafka-broker-api-versions.sh --bootstrap-server <AWS 인스턴스 주소>:9092

'Infra' 카테고리의 다른 글

[CUDA]Programming Model  (0) 2022.08.22
[Kafka] Kafka 커맨드라인  (0) 2022.08.22
WSL에서 mysql사용환경 만들기  (0) 2022.08.22
[MongoDB] GridFS  (0) 2022.08.22
[MongoDB] Introduction  (0) 2022.08.22

+ Recent posts