Source code for kafka.util

import binascii
import collections
import struct
import sys
from threading import Thread, Event

import six

from kafka.common import BufferUnderflowError


def crc32(data):
    return binascii.crc32(data) & 0xffffffff


def write_int_string(s):
    if s is not None and not isinstance(s, six.binary_type):
        raise TypeError('Expected "%s" to be bytes\n'
                        'data=%s' % (type(s), repr(s)))
    if s is None:
        return struct.pack('>i', -1)
    else:
        return struct.pack('>i%ds' % len(s), len(s), s)


def write_short_string(s):
    if s is not None and not isinstance(s, six.binary_type):
        raise TypeError('Expected "%s" to be bytes\n'
                        'data=%s' % (type(s), repr(s)))
    if s is None:
        return struct.pack('>h', -1)
    elif len(s) > 32767 and sys.version_info < (2, 7):
        # Python 2.6 issues a deprecation warning instead of a struct error
        raise struct.error(len(s))
    else:
        return struct.pack('>h%ds' % len(s), len(s), s)


def read_short_string(data, cur):
    if len(data) < cur + 2:
        raise BufferUnderflowError("Not enough data left")

    (strlen,) = struct.unpack('>h', data[cur:cur + 2])
    if strlen == -1:
        return None, cur + 2

    cur += 2
    if len(data) < cur + strlen:
        raise BufferUnderflowError("Not enough data left")

    out = data[cur:cur + strlen]
    return out, cur + strlen


def read_int_string(data, cur):
    if len(data) < cur + 4:
        raise BufferUnderflowError(
            "Not enough data left to read string len (%d < %d)" %
            (len(data), cur + 4))

    (strlen,) = struct.unpack('>i', data[cur:cur + 4])
    if strlen == -1:
        return None, cur + 4

    cur += 4
    if len(data) < cur + strlen:
        raise BufferUnderflowError("Not enough data left")

    out = data[cur:cur + strlen]
    return out, cur + strlen


def relative_unpack(fmt, data, cur):
    size = struct.calcsize(fmt)
    if len(data) < cur + size:
        raise BufferUnderflowError("Not enough data left")

    out = struct.unpack(fmt, data[cur:cur + size])
    return out, cur + size


def group_by_topic_and_partition(tuples):
    out = collections.defaultdict(dict)
    for t in tuples:
        out[t.topic][t.partition] = t
    return out


[docs]def kafka_bytestring(s): """ Takes a string or bytes instance Returns bytes, encoding strings in utf-8 as necessary """ if isinstance(s, six.binary_type): return s if isinstance(s, six.string_types): return s.encode('utf-8') raise TypeError(s)
[docs]class ReentrantTimer(object): """ A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer) Arguments: t: timer interval in milliseconds fn: a callable to invoke args: tuple of args to be passed to function kwargs: keyword arguments to be passed to function """ def __init__(self, t, fn, *args, **kwargs): if t <= 0: raise ValueError('Invalid timeout value') if not callable(fn): raise ValueError('fn must be callable') self.thread = None self.t = t / 1000.0 self.fn = fn self.args = args self.kwargs = kwargs self.active = None def _timer(self, active): # python2.6 Event.wait() always returns None # python2.7 and greater returns the flag value (true/false) # we want the flag value, so add an 'or' here for python2.6 # this is redundant for later python versions (FLAG OR FLAG == FLAG) while not (active.wait(self.t) or active.is_set()): self.fn(*self.args, **self.kwargs) def start(self): if self.thread is not None: self.stop() self.active = Event() self.thread = Thread(target=self._timer, args=(self.active,)) self.thread.daemon = True # So the app exits when main thread exits self.thread.start() def stop(self): if self.thread is None: return self.active.set() self.thread.join(self.t + 1) # noinspection PyAttributeOutsideInit self.timer = None self.fn = None