Apache Kafka는 대규모 데이터를 효율적으로 스트리밍하고 처리할 수 있도록 설계된 분산 메시징 플랫폼입니다.
Node.js와 Kafka를 통합하면 실시간 데이터 처리, 로그 처리, 이벤트 기반 시스템 등 다양한 활용이 가능합니다.
1. Apache Kafka란?
- LinkedIn에서 개발된 오픈 소스 분산 메시징 시스템
- 실시간 데이터 처리와 스트리밍을 위한 강력한 도구
- Producer(데이터를 보내는 역할)와 Consumer(데이터를 읽는 역할)를 기반으로 동작
[Kafka의 주요 특징]
- 높은 처리량: 초당 수백만 건의 메시지 처리 가능
- 확장성: 클러스터를 확장하여 더 많은 데이터를 처리 가능
- 내구성: 데이터를 디스크에 저장하여 손실 방지
- 실시간 처리: 데이터를 실시간으로 스트리밍 및 처리
2. Kafka 설치
[Kafka 설치]
- Apache Kafka 다운로드 페이지에서 최신 Kafka 버전 다운로드
- 다운로드한 파일의 압축 해제 (예: kafka_2.13-3.9.0.tgz)
- 파일 경로의 이름이 길면 명령어가 안될 수 있으니 이름 변경 (예: kafka)
[디렉토리 구조 확인]
- LICENSE, NOTICE: Kafka 라이선스 정보
- bin: Kafka 실행 파일이 포함된 폴더
- config: Kafka와 Zookeeper의 설정 파일
- libs: Kafka 실행에 필요한 라이브러리
3. Kafka 실행
(1) Zookeeper 실행
Kafka는 Zookeeper를 통해 메타데이터를 관리하고 브로커와 연결을 조정하기에, 먼저 Zookeeper를 실행해야 Kafka 브로커가 정상 작동.
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties
- ./bin/windows/zookeeper-server-start.bat: Zookeeper 서버를 시작하는 스크립트
- ./config/zookeeper.properties: Zookeeper의 설정 파일, 포트 번호(기본값: 2181)와 데이터 디렉토리 등이 정의됨.
실행 결과: Zookeeper가 실행되며 포트 2181에 바인딩 완료
[INFO] binding to port 0.0.0.0/0.0.0.0:2181
(2) Kafka 브로커 실행
Zookeeper 실행 후, Kafka 브로커를 실행.
Kafka 브로커는 데이터를 저장하고 Producer와 Consumer 간의 데이터를 중계하는 역할.
./bin/windows/kafka-server-start.bat ./config/server.properties
- ./bin/windows/kafka-server-start.bat: Kafka 브로커를 시작하는 스크립트.
- ./config/server.properties: Kafka 브로커의 설정 파일로, 브로커 ID, 포트 번호(기본값: 9092), 로그 디렉토리 등이 정의됨.
실행 결과: Kafka 브로커가 실행되며 포트 9092에 바인딩 완료
[INFO] [KafkaServer id=0] started
(3) Kafka 토픽 생성
Kafka의 토픽(topic)은 데이터를 주고받는 채널. 토픽을 생성하지 않으면 Kafka가 데이터를 저장하거나 전송 불가
./bin/windows/kafka-topics.bat --create --topic example-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- ./bin/windows/kafka-topics.bat: Kafka에서 토픽을 생성하거나 삭제, 조회 등의 작업을 수행하는 스크립트
- -create : 새로운 토픽을 생성하겠다는 명령어
- -topic example-topic : 생성할 토픽의 이름 지정. Producer와 Consumer에서 이 이름을 사용해 데이터를 주고 받음
- -bootstrap-server [localhost:9092](<http://localhost:9092>) : Kafka 브로커 주소를 지정. Kafka 브로커는 데이터를 관리하고, Producer와 Consumer를 연결하는 역할을
- -partitions 1 : 토픽의 파티션 수 설정. 파티션은 토픽을 분리하여 데이터를 분산 처리 가능
- -replication-factor 1 : 데이터의 복제본 수 설정. 복제본은 데이터 손실을 방지하기 위해 사용되며, 단일 브로커 환경에서는 기본값인 1을 설정.
결과 확인: 아래와 같은 메시지가 출력되면 토픽 생성이 완료
Created topic example-topic.
(4) Kafka Producer 실행 (메시지 보내기)
Kafka의 Producer는 특정 토픽에 데이터를 게시하는 역할.
Producer는 터미널을 통해 사용자가 직접 메시지를 입력하고 Kafka로 전송.
./bin/windows/kafka-console-producer.bat --topic example-topic --bootstrap-server localhost:9092
- ./bin/windows/kafka-console-producer.bat: Kafka에서 Producer 역할을 하는 콘솔 스크립트
- -topic example-topic : 데이터를 전송할 대상 토픽을 지정.
- -bootstrap-server localhost:9092 : Kafka 브로커 주소를 지정. 브로커가 데이터를 받아 토픽에 저장.
메시지 입력: 실행 후 터미널에 메시지를 입력하고 Enter를 누르면 입력한 메시지가 Kafka 토픽에 게시
안녕하세요, Kafka!
(5) Kafka Consumer 실행 (메시지 읽기)
Kafka의 Consumer는 토픽에 게시된 데이터를 읽어오는 역할
./bin/windows/kafka-console-consumer.bat --topic example-topic --from-beginning --bootstrap-server localhost:9092
- ./bin/windows/kafka-console-consumer.bat: Kafka에서 Consumer 역할을 하는 콘솔 스크립트
- -topic example-topic : 데이터를 읽을 Kafka 토픽을 지정합니다.
- -from-beginning : 토픽에 저장된 모든 메시지를 처음부터 읽어오는 명령어. 이 옵션을 생략하면 Consumer는 실행 시점 이후에 추가된 메시지만 읽음.
- -bootstrap-server localhost:9092 : Kafka 브로커 주소 지정
결과 확인: Producer에서 보낸 메시지가 Consumer 터미널에 출력.
안녕하세요, Kafka!
4. Node.js에서 Kafka 구현하기
Node.js는 Kafka와 통신할 수 있는 kafkajs 라이브러리 사용
(1) Node.js 프로젝트 설정
새 프로젝트 생성
mkdir kafka-node-project
cd kafka-node-project
npm init -y
KafkaJS 라이브러리 설치
npm install kafkajs
프로젝트 디렉토리 구조
workspace/
├── kafka/ # Kafka 디렉토리 (Kafka 관련 파일)
│ ├── bin/
│ ├── config/
│ └── ...
├── kafka-node-project/ # Node.js 프로젝트 디렉토리
├── producer.js # Producer 구현 파일
├── consumer.js # Consumer 구현 파일
├── package.json # Node.js 설정 파일
(2) KafkaJS를 사용한 Node.js Producer 및 Consumer 구현
Producer
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'nodejs-producer',
brokers: ['localhost:9092'], // Kafka 브로커 주소
});
const producer = kafka.producer();
const run = async () => {
await producer.connect();
console.log('Producer 연결 성공!');
const topic = 'example-topic'; // 메시지를 보낼 토픽 이름
const messages = [{ value: 'Hello Kafka from Node.js!' }]; // 메시지 내용
await producer.send({ topic, messages });
console.log('메시지 전송 완료');
await producer.disconnect();
};
run().catch(console.error);
producer.js로 저장 후 명령어 실행
node producer.js
Consumer
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'nodejs-consumer',
brokers: ['localhost:9092'], // Kafka 브로커 주소
});
const consumer = kafka.consumer({ groupId: 'example-group' });
const run = async () => {
await consumer.connect();
console.log('Consumer 연결 성공!');
await consumer.subscribe({ topic: 'example-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`받은 메시지: ${message.value.toString()}`);
},
});
};
run().catch(console.error);
consumer.js로 저장 후 실행
node consumer.js
5. Kafka와 Node.js 실행 및 테스트
1. Zookeeper 실행
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties
2. Kafka 브로커 실행
./bin/windows/kafka-server-start.bat ./config/server.properties
3. Node.js Producer 실행
node producer.js
Producer가 연결되었으며 메시지가 Kafka 토픽에 전송된 경우 다음과 같은 메시지 출력
Producer 연결 성공!
메시지 전송 완료
4. Node.js Consumer 실행
node consumer.js
Consumer가 Producer에서 보낸 메시지를 읽어온 경우 터미널에 다음과 같은 메시지 출력
Consumer 연결 성공!
받은 메시지: Hello Kafka from Node.js!
Node.js 관점에서 Kafka 구성 다이어그램
+------------+ +-----------------+ +------------+
| Node.js | ---> | Kafka Broker | ---> | Node.js |
| Producer | | | | Consumer |
+------------+ +-----------------+ +------------+
- Node.js Producer
- Node.js로 구현된 Producer는 Kafka Broker로 데이터 전송
- 예: 웹 서버에서 수집한 사용자 활동 데이터를 Kafka로 전송.
- Kafka Broker
- Kafka 클러스터의 핵심 역할을 수행하며, 데이터를 토픽에 저장하고 Consumer로 전달
- Node.js Consumer
- Node.js로 구현된 Consumer는 Kafka에서 데이터를 읽음
- 예: 데이터를 읽어 웹 애플리케이션에 표시하거나 데이터베이스에 저장.
Kafka와 Node.js를 통합하면 실시간 데이터 스트리밍 및 처리가 간단해집니다.
이 포스팅에서는 Kafka 설치 및 실행, Node.js를 활용한 Producer와 Consumer 구현 방법을 다루었습니다.
Kafka의 강력한 기능을 활용하여 더 나은 스트리밍 데이터 시스템을 설계해 보세요.
'개발정리 (nodeJS)' 카테고리의 다른 글
[nodeJS] Node.js에서 DB 프로시저 활용하기 (0) | 2024.12.31 |
---|---|
[nodeJS] Node.js에서 SQLite를 사용한 로컬 데이터베이스 구축하기 (1) | 2024.12.18 |
[nodeJS] Node.js에서 Bcrypt를 사용한 비밀번호 해싱 및 검증하기 (0) | 2024.11.21 |
[nodeJS] Node.js에서 Worker Threads 사용하기 (0) | 2024.11.13 |
[nodeJS] Node.js에서 재귀 함수 이해하고 활용하기 (0) | 2024.11.05 |