최신 팀 내에서 EDA(Event Driven Architecture) 바람이 불면서 인프라 개선을 위한 마이그레이션 작업 중 이벤트 브로커로 Kafka를 도입하게 되었다.
Kafka를 중앙에 두고 양측의 이벤트 발행자와 소비자의 관계는 느슨해졌으나, 데이터 인터페이스 정의와 공유에 있어 불필요한 커뮤니케이션과 관리의 모호함을 많이 느끼게 되었다.
그러하여 EDA 진영에서 마이크로 서비스 간의 스키마 정의와 관리를 도와주는 Schema Registry에 대해 알아보고 간단한 소개를 적어보려고 한다.
# Schema Registry?
kafka는 이벤트를 발행하는 producer와 소비하는 consumer 사이의 직접적인 관계를 끊음으로써 구조적인 결합도를 낮추는 장점이 있다.
하지만 구조적 결합도가 낮아지면서 이벤트 consumer는 producer가 어떠한 형태의 이벤트를 발행했는지 알 수 없으며, producer는 이벤트의 형태를 언제나 수정 할 수 있다는 점에서 장애를 야기할 수 있다.
이로써 구조적 결합도는 낮아졌지만, 내부적인 결합도는 여전히 가지고 있다고 이야기 한다.
이를 해결하기 위해 나온 개념이 Schema Registry이다.
## Schema Registry의 정의
"kafka를 이용하여 주고받는 메시지(이벤트) 스키마를 중앙에서 관리하기 위한 웹 애플리케이"
## Schema Registry의 기본 컨셉
- Schema Registry는 kafka 외부에 별도로 구성되어 클라이언트(Producer, Consumer)와 통신
- kafka에 write 되는데 사용되는 모든 schema는 registry에 저장 (저장된 schema들은 고유한 id를 취득)
- schema에 대한 정보는 REST API 방식을 통해 조회 가능
- topic별 메시지 key, value schema 버전 관리가 가능
- Producer는 schema의 id를 record에 달아서 데이터를 직렬화 후 produce
- consumer는 메시지 내의 schema id를 이용해 Schema Registry에서 Schema를 조회 후 역직렬화 수행
- 핵심은 schema 호환성 규칙을 강제하고 직렬화/역직렬화를 통해 메시지를 주고 받는 다는 것
## Schema Registry에 Schema 조회와 애플리케이션 성능지연
- Producing, Consuming마다 Schema Registry를 직접 조회를 하지 않음
- Schema Registry 연동 라이브러리에는 CachedSchemaRegistryClient 모듈을 지원하여 모든 schema 정보를 local cache에 캐싱하고 있으며 registry를 consume하여 신규 schema에 대한 정보도 캐싱
(Confluent의 Schema Registry는 지원, 다른 것은 조사 필요)
## Schema Registry 사용 상세 플로우
메시지 Produce (직렬화 진행)메시지 Producing Flow
(https://www.lydtechconsulting.com/blog-kafka-schema-registry-intro.html) |
메시지 comsume (역직렬화 진행)메시지 Consuming Flow
(https://www.lydtechconsulting.com/blog-kafka-schema-registry-intro.html) |
## Node.js 메시지 Produce/Consume 소스 예시
const path = require("path");
const { Kafka } = require("kafkajs");
const {
SchemaRegistry,
SchemaType,
avdlToAVSCAsync,
} = require("@kafkajs/confluent-schema-registry");
const registry = new SchemaRegistry({ host: "http://localhost:8081" });
const kafka = new Kafka({
brokers: ["localhost:9092"],
clientId: "example-consumer",
});
const consumer = kafka.consumer({ groupId: "test-group" });
const producer = kafka.producer();
const incomingTopic = "incoming";
const outgoingTopic = "outgoing";
const run = async () => {
const schema = await avdlToAVSCAsync(path.join(__dirname, "schema.avdl"));
const { id } = await registry.register({
type: SchemaType.AVRO,
schema: JSON.stringify(schema),
});
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: incomingTopic });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const decodedMessage = {
...message,
value: await registry.decode(message.value),
};
const outgoingMessage = {
key: message.key,
value: await registry.encode(id, decodedMessage.value),
};
await producer.send({
topic: outgoingTopic,
messages: [outgoingMessage],
});
},
});
};
run().catch(async (e) => {
console.error(e);
consumer && (await consumer.disconnect());
producer && (await producer.disconnect());
process.exit(1);
});
## Schema 정의포멧
Avro, JSON Schema, Protobuf, Parquet 등 다양한 포맷으로 정의가 가능합니다.
## Avro 정의 예시
{
"type": "record",
"name": "Message",
"namespace": "me.daehokimm.kafka.example.schemaregistry",
"fields": [
{
"name": "content",
"type": {
"type": "string",
"avro.java.string": "String"
},
"avro.java.string": "String"
},
{
"name": "timestamp",
"type": "long",
"default": 0
}
]
}
## Schema Registry Cloud Service
- Apache Confluent Schema Registry
- 오픈소스 - confluentinc/schema-registry
- AWS Glue Schema Registry
- MS Azure Event Hubs
- 등
## Schema Registry 장점과 도입 고려사항
장점
- 일관된 포맷으로 단일 sotrage에 Schema를 관리
- 애플리케이션 개발 언어에 국한되지 않는 type관리
- schema 변경 시 producer 측과 client 측의 애플리케이션 소스 수정 없이 동기화 가능
- 직렬화/역직렬화를 이용한 효과적인 데이터 압축과 네트워크 대역폭 낭비 최소화
- MSK 환경에서 JSON Format과 Avro Format을 비교한 사례를 보면, 238만건 데이터 기준으로 JSON은 12GB, Avro는 538MB로 약 24배의 압축률을 확인
- 협의되지 않은 데이터를 주고 받는 현상을 사전에 방지
도입 고려사항
- schema 정의 포멧에 대한 학습이 필요
- schema 네이밍, 버전 관리 등의 정책 수립 필요
- 관리 포인트 증가
- 인프라 비용 증가
Producer와 Consumer 간의 스키마 정의와 공유에 있어 이벤트 브로커 즉 Kafka 등을 사용하는 EDA 환경에서는 어려움이 있던 부분을 맹점으로 Schema Registry가 좋은 솔루션이지 않을까 싶다. 또한 메시지의 직렬화/역직렬화를 통해 실시간으로 들어오는 스트림 메시지들을 압축까지 한다면 인프라 비용 측변에서도 사용하지 않을 이유가 없는 듯하다.
그러나 Kafka의 내장 서비스가 아닌 별도의 storage로 관리 포인트가 증가하며 schema 버전 간 호환성 정책, schema 정의 정책 등 도입 후 관리를 위해서는 알맞는 정책을 수립 후 신중하게 사용하는 것이 좋을 것 같다.
'Backend > Kafka' 카테고리의 다른 글
[kafka] Schema Registry 호환성 (0) | 2024.05.27 |
---|