Ir al contenido principal
Versión: 5.3.x

Extensión de Kafka Embebido

[Traducción Beta No Oficial]

Esta página fue traducida por PageTurner AI (beta). No está respaldada oficialmente por el proyecto. ¿Encontraste un error? Reportar problema →

Kotest ofrece una extensión que inicia una instancia de Kafka embebido. Esto resulta útil en situaciones donde el uso de imágenes de Docker de Kafka presenta problemas.

Para usar esta extensión, añade el módulo io.kotest.extensions:kotest-extensions-embedded-kafka a la ruta de compilación de tus tests.

Primeros pasos:

Registra el listener embeddedKafkaListener en tu clase de test:

class EmbeddedKafkaListenerTest : FunSpec({
listener(embeddedKafkaListener)
})

o

class EmbeddedKafkaListenerTest : FunSpec() {
init {
listener(embeddedKafkaListener)
}
}

El broker se iniciará al crear la especificación y se detendrá cuando esta finalice.

Nota: La biblioteca embedded kafka subyacente utiliza un objeto global para el estado. No inicies múltiples instancias de Kafka simultáneamente.

Consumidor / Productor

Para crear un consumidor y productor, podemos usar métodos de conveniencia en el listener:

class EmbeddedKafkaListenerTest : FunSpec({

listener(embeddedKafkaListener)

test("send / receive") {

val producer = embeddedKafkaListener.stringStringProducer()
producer.send(ProducerRecord("foo", "a"))
producer.close()

val consumer = embeddedKafkaListener.stringStringConsumer("foo")
eventually(10.seconds) {
consumer.poll(1000).first().value() shouldBe "a"
}
consumer.close()
}

})

Los métodos stringStringProducer y stringStringConsumer devuelven un productor/consumidor que aceptan cadenas para claves y valores. Existen métodos similares para pares de bytes.

Alternativamente, puedes acceder al host/puerto donde se desplegó la instancia de Kafka y crear los clientes manualmente:

class EmbeddedKafkaListenerTest : FunSpec({

listener(embeddedKafkaListener)

val props = Properties().apply {
put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "${embeddedKafkaListener.host}:${embeddedKafkaListener.port}")
}

val producer = KafkaProducer<String, String>(props)

}

Puertos personalizados

Puedes crear una nueva instancia del listener especificando un puerto y usar esa instancia en lugar de la predeterminada.

class EmbeddedKafkaCustomPortTest : FunSpec({

val listener = EmbeddedKafkaListener(5678)
listener(listener)

test("send / receive") {

val producer = listener.stringStringProducer()
producer.send(ProducerRecord("foo", "a"))
producer.close()

val consumer = listener.stringStringConsumer("foo")
eventually(10.seconds) {
consumer.poll(1000).first().value() shouldBe "a"
}
consumer.close()
}
})

También puedes especificar el puerto de Zookeeper mediante una sobrecarga alternativa.

val listener = EmbeddedKafkaListener(kafkaPort = 6005, zookeeperPort = 9005)