Kafka cho người mới bắt đầu: Hoạt động như thế nào và tại sao tồn tại
Mục lục
Hầu hết tài liệu về Kafka bắt đầu bằng câu “Kafka là một nền tảng event streaming phân tán.” Câu đó chính xác nhưng không giúp ích được gì. Bài này bắt đầu từ vấn đề Kafka được xây dựng để giải quyết, sau đó giải thích từng thành phần từ góc độ tại sao nó tồn tại, không phải từ tên gọi của nó.
Phần 1: Tại sao Kafka tồn tại
Vấn đề với việc gọi trực tiếp giữa các service
Giả sử bạn có một hệ thống e-commerce. Khi người dùng đặt hàng, nhiều việc phải xảy ra: inventory service trừ tồn kho, notification service gửi email xác nhận, analytics service ghi lại đơn hàng, và fraud detection service chạy kiểm tra.
Cách đơn giản nhất: order service gọi trực tiếp đến từng downstream service.
Order Service
|
|-- HTTP --> Inventory Service
|-- HTTP --> Notification Service
|-- HTTP --> Analytics Service
|-- HTTP --> Fraud Detection Service
Cách này hoạt động cho đến khi không còn hoạt động nữa. Những gì bị phá vỡ:
Tight coupling. Order service phải biết về mọi downstream consumer. Khi team analytics muốn thêm event mới, họ sửa order service. Khi fraud detection thay đổi API, order service cũng phải thay đổi. Mỗi consumer mới là một dependency mới mà order service phải quản lý.
Chuỗi khả dụng. Nếu notification service đang down, order service hoặc phải fail toàn bộ đơn hàng, hoặc phải tự implement retry logic cho từng downstream service. Bốn consumer nghĩa là bốn failure mode khác nhau cần xử lý.
Tốc độ không đồng đều. Fraud detection có thể mất 300ms. Khách hàng không nên đợi 300ms chỉ vì một downstream service chậm. Order service hoặc block chờ tất cả, hoặc tự quản lý parallel call với timeout riêng cho từng service.
Không thể replay. Nếu analytics bị down hai tiếng và bỏ lỡ 10,000 event, những event đó mất đi vĩnh viễn. Không có cách nào gửi lại chúng.
Kafka thay đổi điều gì
Kafka đưa một broker vào giữa producer và consumer. Order service publish một event. Mọi downstream service đọc event đó độc lập.
Order Service --> Kafka --> Inventory Service
--> Notification Service
--> Analytics Service
--> Fraud Detection Service
Lúc này:
- Order service không biết và không quan tâm ai đọc event. Thêm consumer mới không cần thay đổi producer.
- Nếu notification bị down, event của nó tích lũy trong Kafka. Khi phục hồi, nó đọc từ chỗ nó dừng lại.
- Nếu analytics bỏ lỡ hai tiếng, nó replay hai tiếng đó từ log được lưu trong Kafka.
- Fraud detection chạy độc lập theo tốc độ của nó mà không làm chậm response của đơn hàng.
Trade-off: Bạn đã đổi sự đơn giản đồng bộ lấy sự phức tạp bất đồng bộ. Bạn không còn nhận được xác nhận ngay lập tức rằng mọi downstream service đã thành công. Đơn hàng được chấp nhận, nhưng email có được gửi hay không là một câu hỏi riêng được trả lời bất đồng bộ. Đây là trade-off đúng đắn cho hệ thống throughput cao. Đây là trade-off sai khi bạn cần phản hồi đồng bộ ngay lập tức từ downstream service trước khi tiếp tục.
Phần 2: Topic và Partition
Topic: kênh được đặt tên cho các event
Topic là một log được đặt tên của các event. Producer ghi vào topic; consumer đọc từ topic. Hãy nghĩ topic như một bảng trong database, ngoại trừ bạn chỉ có thể append vào nó và việc đọc không xóa bản ghi.
// Producer ghi vào topic "orders"
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, orderJson);
producer.send(record);
Tại sao partition tồn tại
Một topic có thể nhận hàng triệu event mỗi giây. Một máy không thể ghi hoặc đọc nhanh như vậy. Kafka chia topic thành các partition, mỗi partition là một log độc lập có thứ tự được lưu trên một broker khác nhau.
Topic: "orders"
Partition 0: [msg0] [msg3] [msg6] [msg9] ...
Partition 1: [msg1] [msg4] [msg7] [msg10] ...
Partition 2: [msg2] [msg5] [msg8] [msg11] ...
Trong một partition, thứ tự được đảm bảo. Giữa các partition, không có đảm bảo thứ tự. Đây là trade-off cơ bản: parallelism đổi lấy thứ tự toàn cục.
Điều gì xảy ra nếu không có partition: Nếu một topic chỉ có một partition, một consumer có thể xử lý nó và throughput bị giới hạn bởi một máy. Với 10 partition, 10 consumer có thể đọc song song, tăng throughput lên khoảng 10 lần.
Kafka quyết định message nào vào partition nào như thế nào:
// Nếu cung cấp key, Kafka hash key để chọn partition
// Tất cả message có cùng key luôn vào cùng partition
// -> thứ tự được đảm bảo theo từng key
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
customerId, // key: cùng customer luôn vào cùng partition
orderJson // value
);
// Nếu không có key, Kafka phân phối round-robin qua các partition
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderJson);
Production example: Order service gửi event với key là customerId. Mọi event của customer 42 đều vào partition 3 theo thứ tự. Fraud detection consumer đọc partition 3 thấy tất cả event của customer 42 theo đúng thứ tự, điều cần thiết để phát hiện velocity patterns (ba đơn hàng trong một phút).
Trade-off: Số partition được đặt khi tạo topic và khó thay đổi sau này (tăng partition làm thay đổi ánh xạ hash cho key, phá vỡ thứ tự per-key cho consumer hiện tại). Bắt đầu với nhiều partition hơn nhu cầu hiện tại. Điểm khởi đầu phổ biến cho topic traffic trung bình là 12 đến 24 partition.
Phần 3: Broker và Cluster
Kafka cluster là một nhóm server broker. Mỗi broker lưu một số partition. Kafka replicate mỗi partition qua nhiều broker để nếu một broker bị lỗi, dữ liệu không bị mất.
Broker 1: Partition 0 (leader), Partition 1 (replica)
Broker 2: Partition 1 (leader), Partition 2 (replica)
Broker 3: Partition 2 (leader), Partition 0 (replica)
Mỗi partition có một leader broker xử lý tất cả reads và writes cho partition đó. Các broker khác giữ bản sao gọi là follower, replicate từ leader.
Điều gì xảy ra nếu không có replication: Nếu Broker 1 giữ bản sao duy nhất của Partition 0 và Broker 1 bị crash, tất cả message trong Partition 0 mất đi và producer không thể ghi message mới cho đến khi broker phục hồi. Với replication factor 3, Broker 1 bị lỗi không phải là thảm họa: Broker 3 (đang giữ replica) được tự động promote lên làm leader.
Replication factor kiểm soát số bản sao tồn tại. Replication factor 1 nghĩa là không có dự phòng. Replication factor 3 nghĩa là dữ liệu sống sót qua việc mất bất kỳ một broker nào. Hầu hết topic production dùng replication factor 3.
Phần 4: Producer
Producer làm gì
Producer là bất kỳ code nào ghi message vào Kafka. Java producer client tự động xử lý quản lý kết nối, serialization, chọn partition, batching, và retry.
// Cấu hình producer tối giản
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// send là bất đồng bộ theo mặc định
Future<RecordMetadata> future = producer.send(
new ProducerRecord<>("orders", customerId, orderJson)
);
// Luôn close để flush các record đang buffer và giải phóng kết nối
producer.close();
Mức độ acknowledgment
Cài đặt acks kiểm soát số broker phải xác nhận write trước khi Kafka thông báo producer thành công. Đây là cấu hình producer quan trọng nhất cho độ tin cậy.
// acks=0: fire and forget
// Producer không đợi bất kỳ xác nhận nào
// Nhanh nhất, nhưng message có thể mất nếu broker crash ngay sau khi nhận
props.put(ProducerConfig.ACKS_CONFIG, "0");
// acks=1: leader xác nhận
// Broker leader xác nhận write vào log của nó
// Message mất nếu leader crash trước khi replicate sang follower
props.put(ProducerConfig.ACKS_CONFIG, "1");
// acks=all (hoặc -1): tất cả in-sync replica xác nhận
// Message bền vững miễn là có ít nhất một replica còn sống
// Chậm nhất, nhưng không mất dữ liệu khi một broker đơn bị lỗi
props.put(ProducerConfig.ACKS_CONFIG, "all");
Quy tắc production: Dùng acks=all cho bất kỳ dữ liệu quan trọng nào. Dùng acks=1 cho metrics hoặc log volume cao nơi mất mát occasional là chấp nhận được. Không bao giờ dùng acks=0 trừ khi đang benchmark throughput và có thể chấp nhận mất message.
Batching
Producer không gửi một network request cho mỗi message. Nó buffer record và gửi theo batch, giảm đáng kể network overhead.
// Producer đợi bao lâu để fill batch trước khi gửi
props.put(ProducerConfig.LINGER_MS_CONFIG, "5"); // đợi tối đa 5ms
// Kích thước batch tối đa tính bằng bytes
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB mặc định
// Bật nén snappy cho batch
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
Với linger.ms=5, producer đợi 5ms trước khi gửi. Nếu 1,000 message đến trong 5ms đó, chúng đi ra trong một network call thay vì 1,000 lần. Trade-off: mỗi message đợi tối đa 5ms trước khi được gửi. Với path nhạy cảm về latency, giữ linger.ms ở 0 hoặc 1.
Phần 5: Consumer và Consumer Group
Offset: Kafka theo dõi những gì bạn đã đọc như thế nào
Mỗi message trong partition có một số tuần tự gọi là offset. Offset 0 là message đầu tiên được ghi, offset 1 là message thứ hai, cứ thế tiếp tục. Khi consumer đọc message, nó commit offset hiện tại về Kafka. Nếu consumer crash và khởi động lại, nó tiếp tục từ offset đã commit cuối cùng.
Partition 0 offsets:
0 1 2 3 4 5
[msg0] [msg1] [msg2] [msg3] [msg4] [msg5]
^
committed offset = 2
(consumer đã xử lý đến msg2)
lần đọc tiếp theo bắt đầu từ offset 3
Điều gì xảy ra nếu không commit offset: Nếu bạn đọc message nhưng không bao giờ commit offset, mỗi lần khởi động lại đều đọc từ đầu topic. Nếu topic có 6 tháng lịch sử đơn hàng, mỗi lần restart sẽ xử lý lại tất cả.
Điều gì xảy ra với auto-commit: Cài đặt mặc định của Kafka (enable.auto.commit=true) commit offset mỗi 5 giây bất kể code của bạn có xử lý thành công message hay không. Nếu code của bạn đọc một batch, auto-commit kích hoạt, rồi việc xử lý crash trước khi hoàn thành: những message đó được đánh dấu là xong dù không được xử lý. Bạn mất chúng.
// An toàn hơn: tắt auto-commit và commit thủ công sau khi xử lý
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processOrder(record.value()); // xử lý trước
}
consumer.commitSync(); // rồi commit: chỉ đến đây nếu xử lý thành công
Consumer group: đọc song song
Consumer group là tập hợp consumer cùng hợp tác đọc một topic. Kafka gán mỗi partition cho đúng một consumer trong group. Không có hai consumer trong cùng group đọc cùng partition tại cùng thời điểm.
Topic "orders" với 3 partition
Consumer Group "fraud-detection" với 3 consumer:
Partition 0 --> Consumer A
Partition 1 --> Consumer B
Partition 2 --> Consumer C
Mỗi consumer xử lý partition được gán độc lập. Để tăng gấp đôi throughput, thêm consumer vào group (tối đa bằng số partition). Để có hai hệ thống độc lập đọc cùng topic, dùng hai consumer group riêng biệt: mỗi group nhận một bản sao độc lập đầy đủ của mọi message.
Topic "orders" với 3 partition
Group "fraud-detection": Group "analytics":
Partition 0 --> Consumer A Partition 0 --> Consumer X
Partition 1 --> Consumer B Partition 1 --> Consumer Y
Partition 2 --> Consumer C Partition 2 --> Consumer Z
Điều gì xảy ra nếu thêm nhiều consumer hơn partition: Consumer thừa ngồi idle. Topic với 3 partition có thể dùng tối đa 3 consumer trong cùng group cho xử lý song song.
Rebalancing
Khi consumer gia nhập hoặc rời group, Kafka tái phân phối partition giữa các consumer còn lại. Đây gọi là rebalance. Trong quá trình rebalance, tất cả consumer trong group tạm dừng đọc cho đến khi phân công mới hoàn thành.
Điều gì xảy ra nếu không xử lý rebalance: Nếu consumer đang giữ một database transaction mở hoặc HTTP request đang chạy khi rebalance bắt đầu, Kafka thu hồi partition trong lúc đang xử lý. Consumer tiếp theo nhận partition từ offset đã commit cuối cùng và xử lý lại các message đó. Nếu logic xử lý của bạn không idempotent, bạn sẽ gặp duplicate insert, duplicate email, và duplicate charge.
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Được gọi trước khi partition bị thu hồi
// Hoàn thành công việc đang xử lý và commit offset hiện tại
consumer.commitSync(currentOffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Được gọi sau khi partition mới được gán
// Load state per-partition cho các partition mới được gán
}
});
Phần 6: Đảm bảo Delivery
Kafka cung cấp ba delivery semantics. Hiểu hệ thống của bạn đang dùng cái nào theo mặc định không phải là tùy chọn.
At-most-once
Message có thể mất nhưng không bao giờ được xử lý hai lần. Đạt được bằng cách commit offset trước khi xử lý.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
consumer.commitSync(); // commit trước
for (ConsumerRecord<String, String> record : records) {
processOrder(record.value()); // nếu crash ở đây, message bị bỏ qua
}
Use case: metrics, log, và bất kỳ dữ liệu nào mà mất mát occasional là chấp nhận được và duplicate thì tốn kém.
At-least-once
Message không bao giờ mất nhưng có thể được xử lý nhiều hơn một lần. Đạt được bằng cách commit offset chỉ sau khi xử lý.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processOrder(record.value()); // nếu crash ở đây, message được retry từ offset đã commit
}
consumer.commitSync(); // commit sau: không mất dữ liệu, nhưng có thể xử lý lại khi crash
Đây là mặc định Kafka cho hầu hết hệ thống production. Logic xử lý của bạn phải idempotent: gọi nó hai lần với cùng message phải cho kết quả giống như gọi một lần.
// Idempotent: an toàn khi gọi nhiều lần với cùng đơn hàng
public void processOrder(Order order) {
// Dùng INSERT ... ON CONFLICT DO NOTHING hoặc INSERT ... ON CONFLICT DO UPDATE
// để đảm bảo event duplicate không tạo ra bản ghi duplicate
orderRepository.upsert(order);
}
Exactly-once
Message được xử lý đúng một lần. Kafka hỗ trợ điều này qua transaction, ghi vào Kafka và commit consumer offset cùng nhau một cách atomic. Phức tạp hơn để implement và có latency cao hơn.
// Phía producer: bật idempotence và transaction
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-1");
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("processed-orders", key, value));
// Commit consumer offset cùng lúc với produce một cách atomic
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Dùng exactly-once khi bạn đang transform một Kafka topic thành topic khác và duplicate thực sự không thể chấp nhận được. Với hầu hết use case consumer (ghi vào database, gửi email), at-least-once với xử lý idempotent đơn giản hơn và đủ dùng.
Phần 7: Ví dụ Spring Boot hoàn chỉnh
// Cấu hình Producer
@Configuration
public class KafkaProducerConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
}
// Publish order event
@Service
public class OrderService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public Order placeOrder(OrderRequest request) {
Order order = createOrder(request);
orderRepository.save(order);
try {
String payload = objectMapper.writeValueAsString(order);
// key = orderId: tất cả event của cùng order vào cùng partition
kafkaTemplate.send("orders", order.getId(), payload);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize order event", e);
}
return order;
}
}
// Cấu hình Consumer
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "fraud-detection");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Manual offset commit: Spring chỉ commit sau khi listener method return thành công
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
}
// Xử lý order event
@Service
public class FraudDetectionConsumer {
@KafkaListener(topics = "orders", groupId = "fraud-detection")
public void onOrder(ConsumerRecord<String, String> record) {
Order order = deserialize(record.value());
FraudResult result = fraudEngine.evaluate(order);
if (result.isSuspicious()) {
alertRepository.save(new FraudAlert(order.getId(), result.getReason()));
}
// Spring commit offset sau khi method này return mà không throw exception
// Nếu exception được throw, offset không được commit và message được retry
}
}
application.yml cho consumer:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: fraud-detection
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: record
Phần 8: Retention và Replay
Kafka lưu trữ message trên disk trong một khoảng thời gian có thể cấu hình. Mặc định là 7 ngày. Trong khoảng thời gian đó, bất kỳ consumer nào cũng có thể replay từ bất kỳ offset nào.
# Tạo topic với retention 30 ngày
kafka-topics.sh --create \
--topic orders \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=2592000000 # 30 ngày tính bằng milliseconds
# Xem cấu hình topic
kafka-topics.sh --describe --topic orders
Production example về replay: Analytics service crash và mất state. Nó reset committed offset về 7 ngày trước và replay tất cả. Order service (producer) không cần thay đổi gì và thậm chí không biết điều này đã xảy ra.
// Reset offset của consumer group về đầu (trong code, cho use case cụ thể)
consumer.subscribe(List.of("orders"));
consumer.poll(Duration.ofMillis(0)); // kích hoạt partition assignment
consumer.seekToBeginning(consumer.assignment());
Trade-off: Retention dài nghĩa là tốn nhiều disk hơn trên broker. Topic nhận 1 GB mỗi giờ với retention 30 ngày dùng 720 GB mỗi partition replica. Cần tính toán disk broker phù hợp hoặc dùng tiered storage (có trong Kafka 3.6+) để offload dữ liệu cũ hơn sang object storage như S3.
Phần 9: Những lỗi phổ biến của người mới
Lỗi 1: Không đặt auto.offset.reset rõ ràng
# Mặc định là "latest": consumer group mới bắt đầu đọc từ HIỆN TẠI
# Message đến trước khi consumer start bị bỏ qua
# Cho consumer mới cần đọc tất cả message lịch sử:
auto-offset-reset: earliest
# Cho consumer mới chỉ cần đọc message mới từ đây trở đi:
auto-offset-reset: latest
Nếu bạn deploy analytics consumer mới và thắc mắc tại sao không có dữ liệu từ tuần trước, auto.offset.reset=latest hầu như luôn là lý do.
Lỗi 2: Ít partition hơn consumer
Topic với 3 partition, consumer group với 5 consumer:
Partition 0 --> Consumer A (đang hoạt động)
Partition 1 --> Consumer B (đang hoạt động)
Partition 2 --> Consumer C (đang hoạt động)
(không có partition) Consumer D (idle)
(không có partition) Consumer E (idle)
Consumer D và E tốn bộ nhớ và CPU nhưng không xử lý gì. Bạn không thể tăng parallelism vượt quá số partition mà không repartition lại topic.
Lỗi 3: Dùng auto-commit với xử lý không idempotent
Auto-commit kích hoạt theo timer. Nếu việc xử lý của bạn mất lâu hơn auto.commit.interval.ms (mặc định 5 giây), Kafka commit offset cho message mà code của bạn vẫn đang xử lý. Crash sau khi commit nhưng trước khi xử lý hoàn thành gây mất message âm thầm.
Luôn dùng enable.auto.commit=false và commit rõ ràng.
Lỗi 4: Xử lý tốn thời gian bên trong poll loop
Kafka có session timeout (session.timeout.ms, mặc định 45 giây) và maximum poll interval (max.poll.interval.ms, mặc định 5 phút). Nếu consumer không gọi poll() trong khoảng thời gian đó, broker coi nó đã chết và kích hoạt rebalance.
// SAI: xử lý chậm bên trong poll loop
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
externalApiClient.process(record.value()); // 10 giây mỗi call
// Nếu có 100 record, poll không được gọi trong 1000 giây -> rebalance
}
}
// ĐÚNG: xử lý bất đồng bộ hoặc tăng max.poll.interval.ms
// và giảm max.poll.records để giữ batch size nhỏ
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000); // 1 phút
Lỗi 5: Không xử lý lỗi deserialization
Nếu một message không hợp lệ rơi vào topic và deserializer của bạn throw exception, hầu hết consumer loop sẽ retry message đó vô hạn, chặn tất cả message tiếp theo trong partition.
// Spring Kafka: cấu hình dead-letter topic cho message fail sau nhiều lần retry
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, String> template) {
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(template,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));
// Retry tối đa 3 lần với backoff 1 giây, sau đó gửi sang DLT
return new DefaultErrorHandler(recoverer,
new FixedBackOff(1000L, 3));
}
Tổng kết
| Khái niệm | Chức năng | Cài đặt quan trọng |
|---|---|---|
| Topic | Log event được đặt tên | retention.ms |
| Partition | Đơn vị parallelism và ordering | Số lượng đặt lúc tạo, khó thay đổi |
| Producer | Ghi vào topic | acks, linger.ms |
| Consumer | Đọc từ topic | enable.auto.commit, auto.offset.reset |
| Consumer group | Các reader song song chia sẻ topic | group.id |
| Offset | Vị trí trong partition | Commit sau khi xử lý thành công |
| Replication | Độ bền khi broker bị lỗi | replication.factor |
Mental model: Kafka là một append-only log, không phải queue. Message không bị xóa khi đọc. Chúng hết hạn sau retention window. Nhiều consumer độc lập đọc cùng message theo tốc độ của riêng mình. Mỗi consumer group có offset riêng cho từng partition. Thêm consumer group không bao giờ ảnh hưởng đến group khác. Đây là hành vi cốt lõi làm cho Kafka hữu ích: producer và consumer hoàn toàn tách biệt về thời gian và deployment.