• 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 설계된 고성능 분산 이벤트 스트리밍 플랫폼
  • Pub-Sub 모델의 메시지 큐 형태로 동작하며 분산환경에 특화

동작방식

1. 메시지 브로커

  • 메시지를 처리하면 빠른시간내에 삭제
  • ex) 레디스 큐, RabbitMQ

2. 이벤트 브로커

  • 이벤트를 처리하면 일정시간 보관가능
  • 이벤트를 저장해서 장애시 시작위치 복구가능
  • ex) Kafka, AWS 키네시스

구성요소

1. Producer

  • 메시지 전송시 Topic 설정
  • ack설정 (0:전달여부X, 1:전달여부O, all:복제여부까지)

2. Consumer

  • 1개의 partition과 Consumer Group은 1:1 맵핑
  • Consumer Group간의 영향을 끼치지 않음
  • Consumer 갯수 <= Partition 갯수 (컨슈머 초과분은 동작하지 않음)

3. Broker

  • Kafka server

4. Topic

  • 메시지(데이터)의 형식

5. Partition

  • 메시지를 저장하는 물리적인 파일 (Queue)
  • Topic은 여러 partition으로 나눈다
  • 1개의 Topic은 1개 이상의 Partition 으로 구성
  • 같은 메시지 key를 갖으면 같은 partition에 들어감
  • 파티션 갯수가 추후에 추가되면 일관성이 깨지게되니 추의

consumer lag : producer offset - consumer offset

Avro

  • 언어에 독립적인 Serializer Library
  • IDL(Interface Description Language)로써 언어에 독립적으로 인터페이스를 표현해 통신한다
  • Serialize된 Schema를 파일로 생성해서 메타데이터에 저장 및 사용
  • Avro를 사용하기 위해서는 Topic Schema(.avsc)를 생성한 후 Java class를 생성시켜줘야 한다
  • Avro Schema(.avsc)는 JSON 형식

  • SimpleRecord.avsc
{
   "type": "record",
   "name": "User",
   "namespace": "io.confluent.developer",
   "doc": "User Schema",
   "fields": [
      {
        "name": "name",
        "type": "string",
        "doc": "last name"
      },
      {
        "name": "age",
        "type": ["int", "null"],       // int or null
        "doc": "international age"
      },
      {
        "name": "gender",
        "type": ["string", "null"],       // string or null
        "doc": "international age"
      }
   ]
}
  • 만들어놓은 .avsc로 Java class 생성
    • 경로 : build/avro/io/confluent/developer
$ gradle build
or
$ gradle generateAvroJava
  • Java class Schema 사용
    • newBuilder() : 값이 셋팅될 때 유효성 검사
User user = User.newBuilder()
            .setName("europa")
            .setAge(17)
            // gender is null
            .build();     

gradle

plugins {
    id("com.github.davidmc24.gradle.plugin.avro") version "1.8.0"
}

repositories {
    maven("https://packages.confluent.io/maven")
}

dependencies {
    implementation("org.springframework.kafka:spring-kafka")
    implementation("org.apache.avro:avro")                // avro
    implementation("io.confluent:kafka-avro-serializer")  // KafkaAvroSerializer

    testImplementation("org.springframework.kafka:spring-kafka-test")
}

avro {
    isCreateSetters.set(false)
    isCreateOptionalGetters.set(true)
    isGettersReturnOptional.set(true)
    isOptionalGettersForNullableFieldsOnly.set(true)
}

tasks.generateAvroJava {
    source("src/main/resources/avro")       // .avsc 경로
}

KafkaConfig

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.properties.bootstrap.servers}")
    private String bootstrapServer;

    @Value("${spring.kafka.properties.schema.registry.url}")
    private String schemaRegistryUrl;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        final Map<String, Object> configs =
                Map.of(
                        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer,
                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class,
                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        return new DefaultKafkaProducerFactory<>(configs);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Producer

  • kafkaTemplate.send() 사용

Consumer

  • @KafkaListener 사용