kafka_connector¶
-
class
kafka_connector.avro_loop_consumer.
AvroLoopConsumer
(bootstrap_servers, schema_registry_url, consumer_group, topics, config={'log_level': 0, 'api.version.request': True}, error_callback=<function AvroLoopConsumer.<lambda>>)¶ Bases:
object
AvroConsumer with possibility to register an on_delivery function which is called whenever new messages arrive.
The default config is
>>> default_config = { ... 'log_level': 0, ... 'api.version.request': True, ... }
-
__init__
(bootstrap_servers, schema_registry_url, consumer_group, topics, config={'log_level': 0, 'api.version.request': True}, error_callback=<function AvroLoopConsumer.<lambda>>)¶ Parameters: - bootstrap_servers (str) – Initial list of brokers as a CSV list of broker host or host:port.
- schema_registry_url (str) – url for schema registry
- topics (list(str)) – List of topics (strings) to subscribe to. Regexp pattern subscriptions are supported by prefixing
the topic string with
"^"
, e.g.["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"]
- config (dict) – A config dictionary with properties listed at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
- error_callback (lambda err: function(err)) – function that handles occurring error events
-
__weakref__
¶ list of weak references to the object (if defined)
-
static
error_callback
(err)¶ Handles error message
-
loop
(on_delivery, timeout=None)¶ Consumes and decodes Avro messages from kafka
Parameters: - on_delivery (lambda msg: function(msg)) – function that handles successful received and decoded messages
- timeout (float) – Maximum time to block waiting for message, event or callback
-
stop
()¶ Stops the timer if it is running
-
-
class
kafka_connector.avro_loop_producer.
AvroLoopProducer
(bootstrap_servers, schema_registry_url, topic, key_schema, value_schema, poll_timeout=0.01, config={'queue.buffering.max.messages': 100000, 'queue.buffering.max.ms': 10, 'log_level': 0, 'default.topic.config': {'produce.offset.report': True}, 'message.send.max.retries': 200, 'api.version.request': True}, error_callback=<function AvroLoopProducer.<lambda>>)¶ Bases:
object
AvroProducer with integrated timer function that calls a data producing function every defined interval.
The default config is
>>> default_config = { ... 'log_level': 0, ... 'api.version.request': True, ... 'queue.buffering.max.messages': 100000, ... 'queue.buffering.max.ms': 10, ... 'message.send.max.retries': 200, ... 'default.topic.config': ... { ... 'produce.offset.report': True ... } ... }
-
__init__
(bootstrap_servers, schema_registry_url, topic, key_schema, value_schema, poll_timeout=0.01, config={'queue.buffering.max.messages': 100000, 'queue.buffering.max.ms': 10, 'log_level': 0, 'default.topic.config': {'produce.offset.report': True}, 'message.send.max.retries': 200, 'api.version.request': True}, error_callback=<function AvroLoopProducer.<lambda>>)¶ Parameters: - bootstrap_servers (str) – Initial list of brokers as a CSV list of broker host or host:port.
- schema_registry_url (str) – url for schema registry
- topic (str) – topic name
- key_schema (str) – Avro schema for key
- value_schema (str) – Avro schema for value
- poll_timeout (None, float) – If timeout is a number or None: Polls the producer for events and calls the corresponding
callbacks (if registered). On False do not call
confluent_kafka.Producer.poll(timeout)()
. - config (dict) – A config dictionary with properties listed at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
- error_callback (lambda err: function(err)) – function that handles occurring error events
Raises: avro.schema.SchemaParseException – if either key or value schema is invalid
-
__weakref__
¶ list of weak references to the object (if defined)
-
_loop_produce
(data_function)¶ Preprocess data_function. Only allow valid results being pushed to Kafka.
Parameters: data_function (function that returns a list of dicts or a single dict with possible keys key, value, timestamp, partition and on_delivery) – the result of this function is used as **kwargs
forproduce()
-
static
error_callback
(err)¶ Handles error message
-
loop
(data_function, interval=1, unit=<Unit.SECOND: 2>, begin=<Begin.FULL_SECOND: 4>)¶ Start timer that calls
data_function
every defined interval.Parameters: - data_function (function that returns a list of dicts or a single dict with possible keys key, value,
timestamp, partition and on_delivery) – the result of this function is used as
**kwargs
forproduce()
- interval (int) – interval step
- unit (
Unit
) – unit for interval - begin (
kafka_connector.timer.Begin
or list ofdatetime.time
) – Set start point. Either choose one ofkafka_connector.timer.Begin
elements or a list ofdatetime.time
including start times. In the second case, the start time is set to the time which is the closest from the current timestamp.
- data_function (function that returns a list of dicts or a single dict with possible keys key, value,
timestamp, partition and on_delivery) – the result of this function is used as
-
produce
(key=None, value=None, timestamp=None, partition=None, on_delivery=<function AvroLoopProducer.<lambda>>)¶ Sends message to kafka by encoding with specified avro schema
Parameters: - key (any) – An object to serialize
- value – An object to serialize
- timestamp – Message timestamp (CreateTime) in microseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
- partition (int) – Partition to produce to, elses uses the configured partitioner.
- on_delivery (lambda err, msg) – callbacks from
produce()
Raises: - BufferError – if the internal producer message queue is full (
queue.buffering.max.messages
exceeded) - KafkaException – see exception code
- NotImplementedError – if timestamp is specified without underlying library support.
- avro.schema.SchemaParseException – schema is not a valid Avro schema
-
stop
()¶ Stops the timer if it is running
-