Solución:
Estoy usando: max.poll.interval.ms = 100 y max.poll.records = 20 para obtener como 200 mensajes por segundo.
Las propiedades max.poll.interval.ms y max.poll.records no funcionan de esta manera.
max.poll.interval.ms indica el intervalo de tiempo máximo en milisegundos que el consumidor tiene que esperar entre cada encuesta de consumidor del tema.
registros.max.poll. indica el número máximo de registros que el consumidor puede consumir durante cada encuesta de consumidor sobre el tema.
El intervalo entre cada encuesta no está controlado por las dos propiedades anteriores, sino por el tiempo que tarda el consumidor en reconocer los registros obtenidos.
Por ejemplo, digamos que existe un tema X con 1000 registros y el tiempo que tarda el consumidor en reconocer los registros recuperados es de 20 ms. Con max.poll.interval.ms = 100 y max.poll.records = 20, el consumidor sondeará el tema de Kafka cada 20 ms y, en cada encuesta, se recuperarán un máximo de 20 registros. En caso de que el tiempo necesario para reconocer los registros obtenidos sea mayor que max.poll.interval.ms, el sondeo se considerará fallido y ese lote en particular volverá a sondear desde el tema de Kafka.
Un KafkaConsumer (también el que es utilizado internamente por KafkaStreams
lee el registro lo más rápido posible.
El parámetro que menciona puede tener un impacto en el rendimiento, pero no puede controlar la velocidad de datos real. También tenga en cuenta que max.poll.records
solo configura cuantos registros poll()
retorno, pero no tiene ningún impacto en la comunicación entre el cliente y el intermediario. A KafkaConsumer
puede obtener más registros al hablar con el corredor y luego devolver mensajes almacenados en búfer en poll()
siempre que los registros estén en el búfer (es decir, en este caso, poll()
es un operador del lado del cliente que solo garantiza que no se agote el tiempo de espera a través de max.poll.interval.ms
). Por lo tanto, es posible que esté más interesado en fetch.max.bytes
, que determina el tamaño de los bytes que se obtienen del intermediario. Si reduce este parámetro, el consumidor es menos eficiente y, por lo tanto, el rendimiento debería disminuir. (aunque no se recomienda).
Otra forma de configurar el rendimiento son las cuotas (https://kafka.apache.org/documentation/#design_quotas) Es una configuración del lado del corredor que le permite limitar la cantidad de datos que un cliente puede leer y / o escribir.
Lo mejor que se puede hacer en Kafka Streams (y también cuando se usa un KafkaConsumer simple) es limitar las llamadas a poll()
a mano. Para Kafka Streams, puede agregar un Thread.sleep()
en cualquier UDF. Si no desea incorporar esto a un operador existente, simplemente puede agregar un foreach()
con estado efímero (es decir, una variable miembro de clase) para rastrear el rendimiento y calcular cuánto necesita dormir para acelerar el rendimiento en consecuencia.