Skip to content

BUG: KafkaProducer leaves garbage #2258

@Gw1p

Description

@Gw1p

Hello,

I found that it is not safe to turn off garbage collection when using KafkaProducer because it leaves behind garbage.

Steps to reproduce:

import gc
from kafka import KafkaProducer
import time

gc.disable()

producer = KafkaProducer(
    bootstrap_servers=['some-server'],
    acks=0,
    api_version=(1, 0, 0)
)
message = "test_message"
encoded_message = message.encode("utf-8")
producer.send("some-topic", encoded_message)
# Give the producer a chance to do some work
time.sleep(1)

gc.set_debug(gc.DEBUG_SAVEALL)
gc.collect()
gc.set_debug(0)

print("Collected garbage")

if gc.garbage:
    print("Found some garbage %d = %r" % (len(gc.garbage), gc.garbage))

Note, you cannot run this in a python shell as it will not leave any garbage.

Output:

Collected garbage
Found some garbage 34 = [_pytest.store.StoreKey[typing.Union[_pytest.config.Config, NoneType]], {'__module__': '_pytest.store', '__doc__': 'StoreKey is an object used as a key to a Store.\n\n    A StoreKey is associated with the type T of the value of the key.\n\n    A StoreKey is unique and cannot conflict with another key.\n    ', '__slots__': (), '__origin__': _pytest.store.StoreKey, '__extra__': None, '_gorg': _pytest.store.StoreKey, '__abstractmethods__': frozenset(), '_abc_registry': <_weakrefset.WeakSet object at 0x7f2633a2c898>, '_abc_cache': <_weakrefset.WeakSet object at 0x7f2633a2c860>, '_abc_generic_negative_cache': <_weakrefset.WeakSet object at 0x7f2633a2c940>, '_abc_generic_negative_cache_version': 40, '__parameters__': (), '__args__': (typing.Union[_pytest.config.Config, NoneType],), '__next_in_mro__': <class 'object'>, '__orig_bases__': (typing.Generic[~T],), '__tree_hash__': -4243921850862713175}, (_pytest.store.StoreKey[typing.Union[_pytest.config.Config, NoneType]], _pytest.store.StoreKey, typing.Generic, <class 'object'>), (_pytest.store.StoreKey,), (typing.Union[_pytest.config.Config, NoneType],), typing.Dict[str, _ForwardRef('FixtureDef[Any]')], (typing.Dict[str, _ForwardRef('FixtureDef[Any]')], typing.Dict, <class 'dict'>, typing.MutableMapping, <class 'collections.abc.MutableMapping'>, typing.Mapping, <class 'collections.abc.Mapping'>, typing.Collection, <class 'collections.abc.Collection'>, <class 'collections.abc.Sized'>, typing.Iterable, <class 'collections.abc.Iterable'>, typing.Container, <class 'collections.abc.Container'>, typing.Generic, <class 'object'>), _pytest.store.StoreKey[typing.Dict[str, _ForwardRef('FixtureDef[Any]')]], (_pytest.store.StoreKey[typing.Dict[str, _ForwardRef('FixtureDef[Any]')]], _pytest.store.StoreKey, typing.Generic, <class 'object'>), {'__module__': 'typing', '__slots__': (), '__new__': <staticmethod object at 0x7f2638488278>, '__origin__': typing.Dict, '__extra__': <class 'dict'>, '_gorg': typing.Dict, '__doc__': None, '__abstractmethods__': frozenset(), '_abc_registry': <_weakrefset.WeakSet object at 0x7f263782f320>, '_abc_cache': <_weakrefset.WeakSet object at 0x7f263782f2b0>, '_abc_generic_negative_cache': <_weakrefset.WeakSet object at 0x7f263782f438>, '_abc_generic_negative_cache_version': 40, '__parameters__': (), '__args__': (<class 'str'>, _ForwardRef('FixtureDef[Any]')), '__next_in_mro__': <class 'object'>, '__orig_bases__': (<class 'dict'>, typing.MutableMapping[~KT, ~VT]), '__subclasshook__': <function _make_subclasshook.<locals>.__extrahook__ at 0x7f26339b9b70>, '__tree_hash__': -2573890543485547523}, (typing.Dict, <class 'dict'>, typing.MutableMapping), {'__module__': '_pytest.store', '__doc__': 'StoreKey is an object used as a key to a Store.\n\n    A StoreKey is associated with the type T of the value of the key.\n\n    A StoreKey is unique and cannot conflict with another key.\n    ', '__slots__': (), '__origin__': _pytest.store.StoreKey, '__extra__': None, '_gorg': _pytest.store.StoreKey, '__abstractmethods__': frozenset(), '_abc_registry': <_weakrefset.WeakSet object at 0x7f2633a2c898>, '_abc_cache': <_weakrefset.WeakSet object at 0x7f2633a2c860>, '_abc_generic_negative_cache': <_weakrefset.WeakSet object at 0x7f2633a2c940>, '_abc_generic_negative_cache_version': 40, '__parameters__': (), '__args__': (typing.Dict[str, _ForwardRef('FixtureDef[Any]')],), '__next_in_mro__': <class 'object'>, '__orig_bases__': (typing.Generic[~T],), '__tree_hash__': -2080973868901861766}, (_pytest.store.StoreKey,), (<class 'str'>, _ForwardRef('FixtureDef[Any]')), <function _make_subclasshook.<locals>.__extrahook__ at 0x7f26339b9b70>, (typing.Dict[str, _ForwardRef('FixtureDef[Any]')],), _ForwardRef('FixtureDef[Any]'), (<cell at 0x7f26339b7168: GenericMeta object at 0x28289f8>,), <cell at 0x7f26339b7168: GenericMeta object at 0x28289f8>, <kafka.producer.future.FutureProduceResult object at 0x7f263315b0b8>, [<bound method FutureRecordMetadata._produce_success of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>], {'is_done': True, 'value': (-1, None, None), 'exception': None, '_callbacks': [<bound method FutureRecordMetadata._produce_success of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>], '_errbacks': [<bound method Future.failure of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>], 'topic_partition': TopicPartition(topic='some-topic', partition=0), '_latch': <gevent._gevent_cevent.Event object at 0x7f263340f348>}, [<bound method Future.failure of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>], <gevent._gevent_cevent.Event object at 0x7f263340f348>, [], <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>, [], {'is_done': True, 'value': RecordMetadata(topic='some-topic', partition=0, topic_partition=TopicPartition(topic='some-topic', partition=0), offset=-1, timestamp=1629906912582, log_start_offset=None, checksum=None, serialized_key_size=-1, serialized_value_size=12, serialized_header_size=-1), 'exception': None, '_callbacks': [], '_errbacks': [], '_produce_future': <kafka.producer.future.FutureProduceResult object at 0x7f263315b0b8>, 'args': (0, 1629906912582, None, -1, 12, -1)}, [], (0, 1629906912582, None, -1, 12, -1), <bound method FutureRecordMetadata._produce_success of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>, <bound method Future.failure of <kafka.producer.future.FutureRecordMetadata object at 0x7f26334076d8>>, (-1, None, None), RecordMetadata(topic='some-topic', partition=0, topic_partition=TopicPartition(topic='some-topic', partition=0), offset=-1, timestamp=1629906912582, log_start_offset=None, checksum=None, serialized_key_size=-1, serialized_value_size=12, serialized_header_size=-1)]

A lot of the garbage seems to come from kafka.producer.future.FutureRecordMetadata and kafka.producer.future.FutureProduceResult.

Are there reference loops somewhere?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions