Introducció
En aquest mòdul, explorarem conceptes avançats de Kafka Streams, una potent llibreria per construir aplicacions i microserveis que processin dades en temps real. Ens centrarem en tècniques avançades per optimitzar el rendiment, gestionar estats complexos i integrar Kafka Streams amb altres components de l'ecosistema de Kafka.
Continguts
Optimització del Rendiment
Estratègies d'Optimització
-
Configuració de Buffering i Caching:
- Ajusta els paràmetres de buffering (
cache.max.bytes.buffering) per millorar el rendiment. - Utilitza el caching per reduir la latència de les operacions d'estat.
- Ajusta els paràmetres de buffering (
-
Paral·lelisme i Threads:
- Configura el nombre de threads (
num.stream.threads) per aprofitar millor els recursos de la màquina. - Assegura't que el nombre de threads no superi el nombre de particions per evitar la sobrecàrrega.
- Configura el nombre de threads (
-
Optimització de Serdes:
- Utilitza Serdes personalitzats per optimitzar la serialització i deserialització de dades.
- Evita la serialització/deserialització innecessària utilitzant
MaterializedambinMemoryKeyValueStore.
Exemple de Configuració
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "advanced-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10 MB props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); StreamsBuilder builder = new StreamsBuilder(); // Configuració de topologia... KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Gestió d'Estats Complexos
Utilització de State Stores
-
State Stores Persistents:
- Utilitza
RocksDBper emmagatzemar estats persistents. - Configura
StateStoreper a operacions de lectura/escriptura eficients.
- Utilitza
-
State Stores en Memòria:
- Utilitza
inMemoryKeyValueStoreper a estats temporals o de curta durada. - Beneficia't de la baixa latència de les operacions en memòria.
- Utilitza
Exemple de State Store
StoreBuilder<KeyValueStore<String, Long>> countStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("Counts"),
Serdes.String(),
Serdes.Long()
);
StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(countStore);
KStream<String, String> stream = builder.stream("input-topic");
stream.transform(() -> new Transformer<String, String, KeyValue<String, Long>>() {
private KeyValueStore<String, Long> stateStore;
@Override
public void init(ProcessorContext context) {
this.stateStore = (KeyValueStore<String, Long>) context.getStateStore("Counts");
}
@Override
public KeyValue<String, Long> transform(String key, String value) {
Long count = stateStore.get(key);
if (count == null) {
count = 0L;
}
count++;
stateStore.put(key, count);
return new KeyValue<>(key, count);
}
@Override
public void close() {}
}, "Counts");Processament de Fluxos amb Joins
Tipus de Joins
-
KStream-KStream Join:
- Uneix dos fluxos basats en una clau comuna.
- Configura la finestra de temps per a l'operació de join.
-
KStream-KTable Join:
- Uneix un flux amb una taula basada en una clau comuna.
- Utilitza
KTableper a dades que canvien menys freqüentment.
-
KTable-KTable Join:
- Uneix dues taules basades en una clau comuna.
- Ideal per a operacions de join en dades estàtiques o semi-estàtiques.
Exemple de KStream-KStream Join
KStream<String, String> leftStream = builder.stream("left-topic");
KStream<String, String> rightStream = builder.stream("right-topic");
KStream<String, String> joinedStream = leftStream.join(
rightStream,
(leftValue, rightValue) -> leftValue + ", " + rightValue,
JoinWindows.of(Duration.ofMinutes(5)),
Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
joinedStream.to("output-topic");Integració amb Kafka Connect
Utilització de Connectors
-
Sink Connectors:
- Envia dades processades a sistemes externs com bases de dades, sistemes de fitxers, etc.
- Configura connectors com
JdbcSinkConnector,ElasticsearchSinkConnector, etc.
-
Source Connectors:
- Llegeix dades de sistemes externs i les publica en temes de Kafka.
- Configura connectors com
JdbcSourceConnector,FileStreamSourceConnector, etc.
Exemple de Configuració de Sink Connector
{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "output-topic",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "user",
"connection.password": "password",
"auto.create": "true",
"insert.mode": "insert"
}
}Exercicis Pràctics
Exercici 1: Optimització de Rendiment
Descripció: Configura una aplicació de Kafka Streams per optimitzar el rendiment utilitzant caching i ajustant el nombre de threads.
Solució:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "performance-optimized-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 20 * 1024 * 1024L); // 20 MB props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); StreamsBuilder builder = new StreamsBuilder(); // Configuració de topologia... KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Exercici 2: Gestió d'Estats Complexos
Descripció: Implementa una aplicació de Kafka Streams que utilitzi un StateStore per mantenir un comptador de paraules.
Solució:
StoreBuilder<KeyValueStore<String, Long>> wordCountStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("WordCounts"),
Serdes.String(),
Serdes.Long()
);
StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(wordCountStore);
KStream<String, String> textLines = builder.stream("text-input");
KStream<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.transform(() -> new Transformer<String, String, KeyValue<String, Long>>() {
private KeyValueStore<String, Long> stateStore;
@Override
public void init(ProcessorContext context) {
this.stateStore = (KeyValueStore<String, Long>) context.getStateStore("WordCounts");
}
@Override
public KeyValue<String, Long> transform(String key, String word) {
Long count = stateStore.get(word);
if (count == null) {
count = 0L;
}
count++;
stateStore.put(word, count);
return new KeyValue<>(word, count);
}
@Override
public void close() {}
}, "WordCounts");
wordCounts.to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));Conclusió
En aquest mòdul, hem explorat conceptes avançats de Kafka Streams, incloent tècniques d'optimització del rendiment, gestió d'estats complexos, processament de fluxos amb joins i integració amb Kafka Connect. Aquests coneixements us permetran construir aplicacions de processament de dades en temps real més eficients i robustes. En el proper mòdul, ens centrarem en l'optimització del rendiment de Kafka en general.
Curs de Kafka
Mòdul 1: Introducció a Kafka
Mòdul 2: Conceptes bàsics de Kafka
Mòdul 3: Operacions de Kafka
Mòdul 4: Configuració i Gestió de Kafka
Mòdul 5: Temes Avançats de Kafka
- Optimització del Rendiment de Kafka
- Kafka en una Configuració Multi-Centre de Dades
- Kafka amb Registre d'Esquemes
- Kafka Streams Avançat
