-
apache kafka Producer카테고리 없음 2022. 9. 5. 17:55
Producer
Producer는 kafka에 데이터를 보내는 역할을 하는데 만약에 엄청난 양의 클릭로그들을 대량으로 그리고 실시간으로 kafka에 적재할 때 Producer를 사용할 수 있다.
Producer는 이름에서 알 수 있듯이 데이터를 producing ->생산하는 역할을 한다. 즉 데이터를 kafka topic에 생성한다.
Producer 역할
- Topic에 해당하는 메시지를 생성
- 특정 Topic으로 데이터를 publish
- 처리 실패 -> 재시도첫번째로 가장 중요한 역할 topic에 전송할 메시지를 생성할 수 있다. 그리고 두번째 특징으로 특정 topic으로 데이터를 publish 전송할 수 있다. 이를 통해 기본적으로 카프카 데이터 전송이 완성된다.
kafka 의존성 추가
Getting Start
npm install kafkajs
# yarn add kafkajs그리고 마지막으로 kafka broker로 데이터를 전송할 때 전송 성공여부를 알 수 있고, 실패할 경우 재시도 할 수도 있다. kafka의 클라이언트인 Consumer와 Producer를 사용하기 위해서는 apache kafka 라이브러리를 추가해야한다.
client와 broker 버전
카프카 클라이언트를 dependency로 잡을 때 주의해야 할 점은 바로 버전이다. kafka는 brocker버전과 클라이언트 버전의 하위호환성을 완벽하게 모든 버전에 대해서 지원하지는 않는다.
일부 kafka broker 버전은 특정 kafka-client 버전을 지원하지 않을 수 도 있다. 그렇기 때문에 반드시 client와 broker에 하위호환성에 대해 숙지하고 이에 알맞는 카프카 클라이언트 버전을 사용하여야하다.
클라이언트 버전별 하위호환성에 대한 설명은
https://blog.voidmainvoid.net/193
Usage
const { Kafka } = require('kafkajs') const kafka = new Kafka({ clientId: 'my-app', brokers: ['kafka1:9092', 'kafka2:9092'] }) const producer = kafka.producer() const consumer = kafka.consumer({ groupId: 'test-group' }) const run = async () => { // Producing await producer.connect() await producer.send({ topic: 'test-topic', messages: [ { value: 'Hello KafkaJS user!' }, ], }) // Consuming await consumer.connect() await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }) await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }) }, }) } run().catch(console.error)
카프카 브로커의 주소목록은 되도록이면 2개이상의 ip와 port를 설정하도록 권장한다. 오직 하나의 broker만 지정할 경우 해당 broker가 비정상일 때 문제가 발생하기 때문이다. 2개의 ip와 port 주소목록을 가질 경우는 하나의 broker에 문제가 있을때 다른 broker로 대체가 바로 가능하기 때문이다.그러므로 실제로 애플리케이션을 카프카와 연동할 때는 반드시 2개 이상의 broker 정보를 넣는 것이 좋다.
직렬화(Serialization)
직렬화는 컴퓨터의 데이터 객체를 저장 매체에 저장할 수 있는 형식, 또는 네트워크를 통해 전송할 수 있는 것으로 변환하는 것을 뜻한다. 저장장치에 저장되는 데이터 또는 네트워크를 타고 다니는 데이터는 결국은 0과 1로 이루어진 디지털 데이터의 연속이다. 따라서 프로그래밍 언어에서 사용하는 변수, 구조체, 객체와 같은 데이터들을 저장장치에 저장하거나 네트워크로 전송할 때에는 여기에 적합한 형식으로 만들어 줘야 한다.
직렬화의 방식은 크게 두 가지가 있다.
1) 컴퓨터가 쉽게 이해할 수 있는 0과 1의 디지털 형식으로 된 이진(binary) 방식
2) 사람이 쉽게 읽을 수 읽는 형태로 직렬화하는 텍스트 방식
XML, JSON, YAML 같은 형식은 텍스트 형식 직렬화이고, 한글의 HWP 파일, 워드의 DOC 혹은 DOCX 파일, 이미지를 위한 JPG, PNG, GIF 같은 형식들, 오디오를 위한 WAV, MP3, OGG, AAC 같은 형식들, 영상을 위한 AVI, MOV, MP4와 같은 형식들은 이진 방식 직렬화다.Message Formats
주로 topic안에 data를 "messages"로 지시하지만, messagese가 가져야하는 정형화된 모양은 없다. kafka의 입장에서 보면 message는 단지 key-value의 한 쌍이다. 즉 key와 value모두 단지 bytes의 나열일 뿐이다. message가 어떤 형식일지는 전적으로 Consumer와 Producer의 합의점에 달려있다. 주로 스키마가 없는 plain-text가 선호된다. 예를 들어 JSON이나 binary 형식으로 강제된 AVRO를 들 수 있다.
Plaint-Text JSON
JSON은 사용하기 편리하다. 단지 JSON.stringfy({}) 이용해서 message를 문자열로 변화하면 된다.
await producer.send({ topic, messages: [{ key: 'my-key', value: JSON.stringify({ some: 'data' }) }] }) const eachMessage = async ({ /*topic, partition,*/ message }) => { // From Kafka's perspective, both key and value are just bytes // so we need to parse them. console.log({ key: message.key.toString(), value: JSON.parse(message.value.toString()) }) /** * { key: 'my-key', value: { some: 'data' } } */ }
JSON의 불편한 점은 JSON 순서를 가지를 것을 강제하지 않는다는 것이다. 그래서 문자열화된 JSON을 paser한 다음에 어떤 속성이 사용가능하고 어떤 type이 었는지 알 방법이 없게된다. 속성의 본래 상태가 유지 되거나 type이 변하지 않도록 데이터 Prodcer는 보장하지 않는다. 이는 JSON을 사용하기 어렵게 힘들게 만든다.
AVRO
AVRO는 데이터 직렬화 시스템이다. 이것은 변환시킨다. messages를 정의된 순서에 따라 콤팩트한 binary형태로 변환시킨다. 이런한 특징은 Consumer에게 각 messages가 담고있는 의미를 정확히 알 수 있게 한다. 그리고 Producer는 messages의 순서를 망가트릴 수 있는 변화를 만들때 스스로 그것을 자각하게 된다.
AVDL 형태에서 스키마는 다음과 같다.
@namespace("com.kafkajs.fixtures") protocol SimpleProto { record Simple { string foo; } }
그리고 나머지 key와 value에 대해 스트링시리얼라이저(StringSerializer)로 직렬화 설정을한 것을 볼 수 있다. Byte array,String,Integer 시리얼라이즈를 사용할 수 있다.
여기서 key라는 개념이 처음 나왔는데 key는 messages를 보내면서 topic의 파티션을 지정할 때 쓰인다.
Kafka 인스턴스 변수 생성
const { Kafka } = require('kafkajs') const kafka = new Kafka({ clientId: 'my-app', brokers: ['kafka1:9092', 'kafka2:9092'], }) const producer = kafka.producer()
kafka 인스턴스를 생성해서 해당 인스턴스 변수로 producer 변수와 consumer 변수를 생성한다.
데이터 전송
const producer = kafka.producer() await producer.connect() await producer.send({ topic: 'click_log', messages: [ { value: 'login' }, ], }) await producer.disconnect()
어떤 topic에 넣을 것인지 어떤(messages) key와 value에 넣을 것인지 선언 할 수 있다. 예제에서는 key를 지정하지 않고, value값에는 'login'라는 문자열 값을 넣어서 'click_log'이라는 topic으로 보내고 있다.
=> 데이터가 도착할 topic, 데이터, kafka broker와 host, port까지 데이터를 전송할 모든 준비가 되었다.
Producer.send( )
key null에 파티션 1개
key가 null이 데이터를 파티션이 1개인 topic에 보내면 위와 같이 차례대로 쌓이게 된다.
key null에 파티션 2개
만약 파티션이 1개 더 늘어나면 어떻게 될까? key가 null 데이터가 round-robin방식으로 2개의 파티션에 차곡차곡 쌓이게 된다.
key가 존재하는 데이터 topic에 보내기
이번 경우에는 "buy"하는 value의 key를 1 "review"라는 value의 key를 2라고 지정했다. kafka는 key를 특정한 hash값으로 변경시켜 파티션과 1대 1 매칭을 시킨다. 그러므로 위 코드를 반복해서 실행시키면 각 파티션에 동일 key의 동일 value만 쌓이게 된다.
topic에 새로운 파티션을 추가
topic에 새로운 파티션을 추가하는 순간 key와 파티션의 매칭이 깨지기 때문에 key와 파티션의 연결은 보장되지 않는다. 그러므로 key를 사용할 경우 이정을 유의해서 파티션 개수를 생성하고, 추후에 함부로 파티션 갯수를 추가하지 않는 것을 추천한다.