[Kafka] Spring Kafka
- 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 설계된 고성능 분산 이벤트 스트리밍 플랫폼
- Pub-Sub 모델의 메시지 큐 형태로 동작하며 분산환경에 특화
동작방식
1. 메시지 브로커
- 메시지를 처리하면 빠른시간내에 삭제
- ex) 레디스 큐, RabbitMQ
2. 이벤트 브로커
- 이벤트를 처리하면 일정시간 보관가능
- 이벤트를 저장해서 장애시 시작위치 복구가능
- ex) Kafka, AWS 키네시스
구성요소
1. Producer
- 메시지 전송시 Topic 설정
- ack설정 (0:전달여부X, 1:전달여부O, all:복제여부까지)
2. Consumer
- 1개의 partition과 Consumer Group은 1:1 맵핑
- Consumer Group간의 영향을 끼치지 않음
- Consumer 갯수 <= Partition 갯수 (컨슈머 초과분은 동작하지 않음)
3. Broker
- Kafka server
4. Topic
- 메시지(데이터)의 형식
5. Partition
- 메시지를 저장하는 물리적인 파일 (Queue)
- Topic은 여러 partition으로 나눈다
- 1개의 Topic은 1개 이상의 Partition 으로 구성
- 같은 메시지 key를 갖으면 같은 partition에 들어감
- 파티션 갯수가 추후에 추가되면 일관성이 깨지게되니 추의
consumer lag : producer offset - consumer offset
Avro
- 언어에 독립적인 Serializer Library
- IDL(Interface Description Language)로써 언어에 독립적으로 인터페이스를 표현해 통신한다
- Serialize된 Schema를 파일로 생성해서 메타데이터에 저장 및 사용
- Avro를 사용하기 위해서는 Topic Schema(
.avsc
)를 생성한 후 Java class를 생성시켜줘야 한다 -
Avro Schema(
.avsc
)는 JSON 형식 - SimpleRecord.avsc
{
"type": "record",
"name": "User",
"namespace": "io.confluent.developer",
"doc": "User Schema",
"fields": [
{
"name": "name",
"type": "string",
"doc": "last name"
},
{
"name": "age",
"type": ["int", "null"], // int or null
"doc": "international age"
},
{
"name": "gender",
"type": ["string", "null"], // string or null
"doc": "international age"
}
]
}
- 만들어놓은
.avsc
로 Java class 생성- 경로 :
build/avro/io/confluent/developer
- 경로 :
$ gradle build
or
$ gradle generateAvroJava
- Java class Schema 사용
-
newBuilder()
: 값이 셋팅될 때 유효성 검사
-
User user = User.newBuilder()
.setName("europa")
.setAge(17)
// gender is null
.build();
gradle
plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.8.0"
}
repositories {
maven("https://packages.confluent.io/maven")
}
dependencies {
implementation("org.springframework.kafka:spring-kafka")
implementation("org.apache.avro:avro") // avro
implementation("io.confluent:kafka-avro-serializer") // KafkaAvroSerializer
testImplementation("org.springframework.kafka:spring-kafka-test")
}
avro {
isCreateSetters.set(false)
isCreateOptionalGetters.set(true)
isGettersReturnOptional.set(true)
isOptionalGettersForNullableFieldsOnly.set(true)
}
tasks.generateAvroJava {
source("src/main/resources/avro") // .avsc 경로
}
KafkaConfig
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.properties.bootstrap.servers}")
private String bootstrapServer;
@Value("${spring.kafka.properties.schema.registry.url}")
private String schemaRegistryUrl;
@Bean
public ProducerFactory<String, String> producerFactory() {
final Map<String, Object> configs =
Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Producer
-
kafkaTemplate.send()
사용
Consumer
-
@KafkaListener
사용