Skip to content

Modules

Top-level package for MultiMapWithTTL.

multimapwithttl

An implementation of multimap with per-item expiration backed up by Redis.

MultiMapWithTTL

An implementation of multimap with per-item expiration backed up by Redis.

It was based on: https://quickleft.com/blog/how-to-create-and-expire-list-items-in-redis/ without the need for an extra job to delete old items.

Values are internally stored on Redis using Sorted Sets :

key1: { (score1, value1), (score2, value2), ... }
key2: { (score3, value3), (score4, value4), ... }
...

Where the score is the timestamp when the value was added. We use the timestamp to filter expired values and when an insertion happens, we opportunistically garbage collect expired values. The key itself is set to expire through redis ttl mechanism together with the newest value. These operations result in a simulated multimap with item expiration.

You can use to keep track of values associated to keys, when the value has a notion of expiration.

>>> s = MultiMapWithTTL('multimap')
>>> s.add('a', 1, 2, 3)
>>> sorted(s.get('a'))
[1, 2, 3]
>>> s.add_many([('b', (4, 5, 6)), ('c', (7, 8, 9)), ])
>>> sorted(sorted(values) for values in s.get_many('a', 'b', 'c')))
[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

__init__(self, redis_client, key_prefix, ttl=3600, cast_fn=None) special

Initialize the instance.

Parameters:

Name Type Description Default
redis_client

A redis-py.StrictRedis client instance.

required
key_prefix str

A prefix to generate Redis keys.

required
ttl int

Set a timeout, in seconds, of when old values should be removed. After the timeout has expired without adding new items to a key, the key itself will be automatically deleted. Defaults to 60 min.

3600
cast_fn Callable[[Any], Any]

Cast the returned values from Redis to a desired type, defaults to int

None
Source code in multimapwithttl/multimapwithttl.py
def __init__(self, redis_client, key_prefix: str, ttl: int = 3600, cast_fn: Callable[[Any], Any] = None):
    """
    Initialize the instance.

    Args:
        redis_client: A redis-py.StrictRedis client instance.
        key_prefix (str): A prefix to generate Redis keys.
        ttl (int): Set a timeout, in seconds, of when old values should be removed.
            After the timeout has expired without adding new items to a key, the key itself
            will be automatically deleted. Defaults to 60 min.
        cast_fn (Callable[[str], T]): Cast the returned values from Redis
            to a desired type, defaults to `int`
    """
    self.key_prefix = key_prefix
    self.ttl = ttl
    self.cast_fn = cast_fn if cast_fn is not None else lambda x: int(x)
    self.redis = redis_client

    # From redis-py 3.x, the zadd method changed from accepting (*args) to a (mapping)
    self.old_redis = 'mapping' not in inspect.signature(redis_client.zadd).parameters.keys()

add(self, name, *values)

Insert *values at the name key.

Parameters:

Name Type Description Default
name str

The key name where values should be stored.

required
*values Iterable[Any]

A list of values to be stored at name.

()

Returns:

Type Description
None

None

Source code in multimapwithttl/multimapwithttl.py
def add(self, name: str, *values: Iterable[Any]) -> None:
    """
    Insert `*values` at the `name` key.

    Args:
        name (str): The key name where `values` should be stored.
        *values: A list of values to be stored at `name`.

    Returns:
        None
    """
    self.add_many(((name, values),))

add_many(self, data)

Bulk insert data.

Parameters:

Name Type Description Default
data

An iterator of (key, values) pairs.

required
Example

MultiMapWithTTL(redis_client, 'expiringset').add_many([ ('a', (1, 2, 3)), ('b', (4, 5, 6)), ('c', (7, 8, 9)), ])

required

Returns:

Type Description

None

Source code in multimapwithttl/multimapwithttl.py
def add_many(self, data):  # type: (Iterable[Tuple[str, Iterable[Any]]]) -> None
    """
    Bulk insert data.

    Args:
        data: An iterator of (key, values) pairs.

        Example:
            MultiMapWithTTL(redis_client, 'expiringset').add_many([
                ('a', (1, 2, 3)),
                ('b', (4, 5, 6)),
                ('c', (7, 8, 9)),
            ])

    Returns:
        None
    """
    scores = self._get_score_iter()
    self.add_many_with_ttl((name, zip(scores, values)) for name, values in data)

add_many_with_ttl(self, data)

Bulk insert data.

Parameters:

Name Type Description Default
data

An iterator of (key, (values/ttls)) pairs.

required
As this

MultiMapWithTTL(redis_client, 'expiringset').add_many_with_ttl([ ('a', ((value1, score1), (value2, score2), (value3, score3))), ('b', ((value4, score4), (value5, score5), (value6, score6))), ])

required
Example

MultiMapWithTTL(redis_client, 'expiringset').add_many_with_ttl([ ('a', ((1, 159165312), (2, 159165312), (3, 159165312))), ('b', ((4, 159165312), (5, 159165312), (6, 159165312))), ('c', ((7, 159165312), (8, 159165312), (9, 159165312))), ])

required

Returns:

Type Description

None

Source code in multimapwithttl/multimapwithttl.py
def add_many_with_ttl(self, data):  # type: (Iterable[Tuple[str,Iterable[Tuple[Any, int]]]]) -> None  # noqa
    """
    Bulk insert data.

    Args:
        data: An iterator of (key, (values/ttls)) pairs.

        The ttl is the expected timestamp when the value should expire.

        As this:
            MultiMapWithTTL(redis_client, 'expiringset').add_many_with_ttl([
                ('a', ((value1, score1), (value2, score2), (value3, score3))),
                ('b', ((value4, score4), (value5, score5), (value6, score6))),
            ])

        Example:
            MultiMapWithTTL(redis_client, 'expiringset').add_many_with_ttl([
                ('a', ((1, 159165312), (2, 159165312), (3, 159165312))),
                ('b', ((4, 159165312), (5, 159165312), (6, 159165312))),
                ('c', ((7, 159165312), (8, 159165312), (9, 159165312))),
            ])

    Returns:
        None
    """
    # The operations in the pipeline were ordered carefully such that failure
    # of the subsequent operations do not leave the data structure in an inconsistent state.
    # Thanks to that, we do not need to use a transaction or
    # wait calls, making the code efficient and robust.
    pipeline = self.redis.pipeline(transaction=False)
    current_score = self._get_current_score()

    for name, values in data:
        key = self._get_key(name)

        # expireat api requires a half open interval
        pipeline.expireat(key, self._get_ttl_score() + 1)
        # We may don't have values to add,
        # but we still want to execute the other steps on pipeline.

        if self.old_redis:
            # we're building a generator as expected by `.zadd(*args)`,
            # in the form of: score1, name1, score2, name2, ...
            params = [item for pair in values for item in pair]
            if params:
                pipeline.zadd(key, *params)
        else:
            mapping = {name: score for score, name in values}
            if mapping:
                pipeline.zadd(key, mapping=mapping)

        # note zremrangebyscore is inclusive
        pipeline.zremrangebyscore(key, 0, current_score)
    pipeline.execute()

delete(self, *names)

Delete *names from the multimap.

Source code in multimapwithttl/multimapwithttl.py
def delete(self, *names) -> None:
    """Delete `*names` from the multimap."""
    keys = (self._get_key(name) for name in names)
    self.redis.delete(*keys)

get(self, name)

Return a generator of all values stored at name that are not expired.

Source code in multimapwithttl/multimapwithttl.py
def get(self, name):  # type: (str) -> Generator[T, None, None]
    """Return a generator of all values stored at `name` that are not expired."""
    return next(self.get_many(name))

get_many(self, *names)

Return a generator of generators of all values stored at *names that are not expired.

Parameters:

Name Type Description Default
*names

Name of the keys being queried.

()

Returns:

Type Description
Generator[Generator[~T, NoneType, NoneType], NoneType, NoneType]

Generator[T]

Source code in multimapwithttl/multimapwithttl.py
def get_many(self, *names) -> Generator[Generator[T, None, None], None, None]:
    """
    Return a generator of generators of all values stored at `*names` that are not expired.

    Args:
        *names: Name of the keys being queried.

    Returns:
        Generator[T]
    """
    pipeline = self.redis.pipeline(transaction=False)
    current_score = self._get_current_score() + 1
    keys = (self._get_key(name) for name in names)
    for key in keys:
        # zrangebyscore inclusive range
        pipeline.zrangebyscore(key, current_score, "+inf")
    return ((self.cast_fn(x) for x in results) for results in pipeline.execute())