kafka_connector

class kafka_connector.avro_loop_consumer.AvroLoopConsumer(bootstrap_servers, schema_registry_url, consumer_group, topics, config={'log_level': 0, 'api.version.request': True}, error_callback=<function AvroLoopConsumer.<lambda>>)

Bases: object

AvroConsumer with possibility to register an on_delivery function which is called whenever new messages arrive.

The default config is

>>> default_config = {
...    'log_level': 0,
...    'api.version.request': True,
...  }
__init__(bootstrap_servers, schema_registry_url, consumer_group, topics, config={'log_level': 0, 'api.version.request': True}, error_callback=<function AvroLoopConsumer.<lambda>>)
Parameters:
  • bootstrap_servers (str) – Initial list of brokers as a CSV list of broker host or host:port.
  • schema_registry_url (str) – url for schema registry
  • topics (list(str)) – List of topics (strings) to subscribe to. Regexp pattern subscriptions are supported by prefixing the topic string with "^", e.g. ["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"]
  • config (dict) – A config dictionary with properties listed at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  • error_callback (lambda err: function(err)) – function that handles occurring error events
__weakref__

list of weak references to the object (if defined)

static error_callback(err)

Handles error message

is_stopped
Returns:True, if consumer loop finished
Return type:bool
loop(on_delivery, timeout=None)

Consumes and decodes Avro messages from kafka

Parameters:
  • on_delivery (lambda msg: function(msg)) – function that handles successful received and decoded messages
  • timeout (float) – Maximum time to block waiting for message, event or callback
stop()

Stops the timer if it is running

class kafka_connector.avro_loop_producer.AvroLoopProducer(bootstrap_servers, schema_registry_url, topic, key_schema, value_schema, poll_timeout=0.01, config={'queue.buffering.max.messages': 100000, 'queue.buffering.max.ms': 10, 'log_level': 0, 'default.topic.config': {'produce.offset.report': True}, 'message.send.max.retries': 200, 'api.version.request': True}, error_callback=<function AvroLoopProducer.<lambda>>)

Bases: object

AvroProducer with integrated timer function that calls a data producing function every defined interval.

The default config is

>>> default_config = {
...    'log_level': 0,
...    'api.version.request': True,
...    'queue.buffering.max.messages': 100000,
...    'queue.buffering.max.ms': 10,
...    'message.send.max.retries': 200,
...    'default.topic.config':
...      {
...        'produce.offset.report': True
...      }
...  }
__init__(bootstrap_servers, schema_registry_url, topic, key_schema, value_schema, poll_timeout=0.01, config={'queue.buffering.max.messages': 100000, 'queue.buffering.max.ms': 10, 'log_level': 0, 'default.topic.config': {'produce.offset.report': True}, 'message.send.max.retries': 200, 'api.version.request': True}, error_callback=<function AvroLoopProducer.<lambda>>)
Parameters:
  • bootstrap_servers (str) – Initial list of brokers as a CSV list of broker host or host:port.
  • schema_registry_url (str) – url for schema registry
  • topic (str) – topic name
  • key_schema (str) – Avro schema for key
  • value_schema (str) – Avro schema for value
  • poll_timeout (None, float) – If timeout is a number or None: Polls the producer for events and calls the corresponding callbacks (if registered). On False do not call confluent_kafka.Producer.poll(timeout)().
  • config (dict) – A config dictionary with properties listed at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  • error_callback (lambda err: function(err)) – function that handles occurring error events
Raises:

avro.schema.SchemaParseException – if either key or value schema is invalid

__weakref__

list of weak references to the object (if defined)

_loop_produce(data_function)

Preprocess data_function. Only allow valid results being pushed to Kafka.

Parameters:data_function (function that returns a list of dicts or a single dict with possible keys key, value, timestamp, partition and on_delivery) – the result of this function is used as **kwargs for produce()
static error_callback(err)

Handles error message

loop(data_function, interval=1, unit=<Unit.SECOND: 2>, begin=<Begin.FULL_SECOND: 4>)

Start timer that calls data_function every defined interval.

Parameters:
  • data_function (function that returns a list of dicts or a single dict with possible keys key, value, timestamp, partition and on_delivery) – the result of this function is used as **kwargs for produce()
  • interval (int) – interval step
  • unit (Unit) – unit for interval
  • begin (kafka_connector.timer.Begin or list of datetime.time) – Set start point. Either choose one of kafka_connector.timer.Begin elements or a list of datetime.time including start times. In the second case, the start time is set to the time which is the closest from the current timestamp.
static on_delivery(err, msg)

Handles callbacks from produce()

produce(key=None, value=None, timestamp=None, partition=None, on_delivery=<function AvroLoopProducer.<lambda>>)

Sends message to kafka by encoding with specified avro schema

Parameters:
  • key (any) – An object to serialize
  • value – An object to serialize
  • timestamp – Message timestamp (CreateTime) in microseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
  • partition (int) – Partition to produce to, elses uses the configured partitioner.
  • on_delivery (lambda err, msg) – callbacks from produce()
Raises:
  • BufferError – if the internal producer message queue is full (queue.buffering.max.messages exceeded)
  • KafkaException – see exception code
  • NotImplementedError – if timestamp is specified without underlying library support.
  • avro.schema.SchemaParseException – schema is not a valid Avro schema
stop()

Stops the timer if it is running