본문 바로가기

STUDY/실전 카프카 개발부터 운영까지

3장 - 카프카 기본 개념과 구조

카프카의 주요 구성 요소

 

Kafka 구성 요소

  • 주키퍼(Zookeeper): 카프카의 메타데이터를 관리 하고 브로커의 상태를 점검하는 어플리케이션
  • 카프카(Kafka): 프로듀서와 컨슈머 사이에서 메시지를 중개하는 메시지 브로커 어플리케이션
  • 브로커(Broker): 카프카가 설치되고 동작하는 서버 또는 노드
  • 프로듀서(Producer): 카프카로 메시지를 보내는 클라이언트
  • 컨슈머(Consumer): 카프카에서 메시지를 가져가는 클라이언트
  • 토픽(Topic): 카프카 메시지를 구분하는 카테고리. 카프카 내에서 고유하다.
  • 파티션(Partition): 병렬 처리를 위해 토픽을 나눈 것
  • 세그먼트(Segment): 메시지가 저장된 브로커의 디스크 파일
  • 메시지(Message) 또는 레코드(Record): 프로듀서가 전송하고 컨슈머가 가져가는 데이터 조각.

 

 


리플리케이션

 

리플리케이션이란 메시지를 복제해서 여러 브로커에 분산 저장하는 것을 의미한다. 이 덕분에 하나의 브로커에서 장애가 발생 하더라고, 안정성을 유지 할 수 있다. 리플리케이션의 개수를 설정할 수 있는데, 수가 많아질수록 안정성은 올라가지만 리소스를 많이 사용하므로 적절한 개수를 설정해주어야 한다. 일반적으로 테스트/개발환경에서는 그 1로 설정한다. 운영에서는 로그와 같이 유실이 허용되는 경우 2로 설정하고, 유실을 최소화 해야하는 경우에는 3으로 설정을 한다.

 

 


파티션

 

하나의 토픽을 병렬 처리하기위해 나누어 둔 것을 파티션이라고 한다. 이렇게 나누어진 파티션의 수 만큼 컨슈머를 연결하여 병렬로 처리할 수 있다. 파티션의 수는 언제든지 늘릴 수 있지만, 한번 늘린 파티션의 수를 줄일 수는 없기때문에 작게 시작해서 천천히 늘려가는 것이 좋다.

 

 


세그먼트

 

이전 실습에서 환경을 구성하고 메시지를 하나 보냈었다. 이 메시지는 kisu-overview 토픽의 파티션 0에 저장되어 있을 것이다. 그리고 이 저장되어있는 파일을 세그먼트라고 한다.

 

> cd /data/kafka-logs/
> ls
...
kisu-overview-0
...

 

kafka의 인스턴스로 가서 /data/kafka 디렉토리로 이동하면, 이전 실습때 보낸 메시지가 저장된 세그먼트를 찾을 수 있다. 'kisu-overview-0'이란 파일 명은 kisu-overview 토픽의 0번째 파티션이라는 뜻이다.

 

> cd kisu-overview-0
> ls
...
0000000000000000.log
...
> xxd 0000000000000000.log

 

kisu-overview-0 세그먼트 폴더로 이동해서 log 파일을 출력해보면 이전에 보냈던 메시지를 확인할 수 있다.

 

 


카프카 특징

 

  • 분산 시스템: 카프카는 분산시스템으로 구성할 수 있도록 되어있다. 이 덕분에 카프카는 높은 처리 성능을 가지고있고 장애 대응도 탁월하다.
  • 페이지 캐시: 매번 디스크에 I/O 접근을 하는 대신, 잔여 메모리를 활용하여 페이지캐시를 통해 데이터를 가져오고 뛰어난 처리량을 달성하였다.
  • 배치 전송 처리: 프로듀서-브로커, 브로커-컨슈머 간에 메시지를 주고받을 때 배치처리를 활용하여 효율적인 리소스 사용은 지원 한다.
  • 압축 전송: 카프카는 다양한 타입의 압축 전송을 지원하여, 효율적인 네트워크 사용을 도와준다.
  • 토픽, 파티션, 오프셋: 카프카는 메시지를 토픽으로 구분하고, 병렬처리를 위해 파티션을 사용한다. 그리고 각 파티션은 오프셋을 가지고 있어서 메시지의 순서를 보장한다.
  • 고가용성 보장: 리플리케이션으로 파티션들은 복제하여 저장함으로서, 하나의 브로커에서 장애가 발생해도 나머지 브로커에서 메시지를 처리할 수 있도록하여 고가용성을 보장한다.
  • 주키퍼 의존성: 주키퍼를 이용해 카프카의 메타 정보를 관리하고 브로커의 상태를 관리한다. 주키퍼는 과반수 이상 유지되어야 해서 홀수 구성이 좋다.

 

 


카프카 프로듀서 실습

 

//// 메시지 보내고 확인하지 않기
Properties props = new Properties();
props.put("bootstrap.servers", "kisu-kafka01.foo.bar:9092,kisu-kafka02.foo.bar:9092,kisu-kafka03.foo.bar:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);

try {
	ProducerRecord<String, String> record = new ProducerRecord<String, String>("kisu-basic01", "kisu-basic01 record");
	producer.send(record);
} catch (Exception e) {
	e.printStackTrace();
} finally {
	producer.close();
}

 

//// 동기 전송
Properties props = new Properties();
props.put("bootstrap.servers", "kisu-kafka01.foo.bar:9092,kisu-kafka02.foo.bar:9092,kisu-kafka03.foo.bar:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);

try {
	ProducerRecord<String, String> record = new ProducerRecord<String, String>("kisu-basic01", "kisu-basic01 record");
	RecordMetadata metadata = producer.send(record).get();
	System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", metadata.topic(), metadata.partition(), metadata.offset(), record.key(), record.value());
} catch (Exception e) {
	e.printStackTrace();
} finally {
	producer.close();
}

 

//// 비동기 전송
public class KisuProducerCallback implements Callback{
	private ProducerRecord<String, String> record;
	
	public KisuProducerCallback(ProducerRecord<String, String> record) {
		this.record = record;
	}
 
	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
		// TODO Auto-generated method stub
		if(exception != null) {
			exception.printStackTrace();
		}
		else {
			System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", metadata.topic(), metadata.partition(), metadata.offset(), record.key(), record.value());
		}
	}

}
...
Properties props = new Properties();
props.put("bootstrap.servers", "kisu-kafka01.foo.bar:9092,kisu-kafka02.foo.bar:9092,kisu-kafka03.foo.bar:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);

try {
	ProducerRecord<String, String> record = new ProducerRecord<String, String>("kisu-basic01", "kisu-basic01 record");
	producer.send(record, new KisuProducerCallback(record));
} catch (Exception e) {
	e.printStackTrace();
} finally {
	producer.close();
}
...

 

 


카프카 컨슈머 실습

 

//// 자동 커밋
Properties props = new Properties();
props.put("bootstrap.servers", "kisu-kafka01.foo.bar:9092,kisu-kafka02.foo.bar:9092,kisu-kafka03.foo.bar:9092");
props.put("group.id", "kisu-consumer01");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Set.of("kisu-basic01"));

try {
	while(true) {
		ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
		for(ConsumerRecord<String, String> record : records) {
			System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
		}
	}
} catch (Exception e) {
	e.printStackTrace();
} finally {
	consumer.close();
}

 

//// 동기 커밋
Properties props = new Properties();
props.put("bootstrap.servers", "kisu-kafka01.foo.bar:9092,kisu-kafka02.foo.bar:9092,kisu-kafka03.foo.bar:9092");
props.put("group.id", "kisu-consumer01");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Set.of("kisu-basic01"));

try {
	while(true) {
		ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
		for(ConsumerRecord<String, String> record : records) {
			System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
		}
		consumer.commitSync();
	}
} catch (Exception e) {
	e.printStackTrace();
} finally {
	consumer.close();
}

 

//// 비동기 커밋
Properties props = new Properties();
props.put("bootstrap.servers", "kisu-kafka01.foo.bar:9092,kisu-kafka02.foo.bar:9092,kisu-kafka03.foo.bar:9092");
props.put("group.id", "kisu-consumer01");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Set.of("kisu-basic01"));

try {
	while(true) {
		ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
		for(ConsumerRecord<String, String> record : records) {
			System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
		}
		consumer.commitAsync();
	}
} catch (Exception e) {
	e.printStackTrace();
} finally {
	consumer.close();
}
728x90