Source code for xotl.crdt.sets

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------
# Copyright (c) Merchise Autrement [~º/~] and Contributors
# All rights reserved.
#
# This is free software; you can do what the LICENCE file allows you to.
#
from typing import Iterable, Any
from xotl.crdt.base import CvRDT
from xotl.crdt.clocks import VClock, Dot


[docs]class GSet(CvRDT): """The Grow-only set. """ def init(self): self.items = set() @property def value(self) -> frozenset: return frozenset(self.items) def __le__(self, other) -> bool: if not isinstance(other, GSet): return NotImplemented return self.value <= other.value def __eq__(self, other) -> bool: if not isinstance(other, GSet): return NotImplemented return self.process == other.process and self.items == other.items def merge(self, other: "GSet") -> None: # type: ignore self.items |= other.value
[docs] def add(self, item): "Add `item` to the set." self.items.add(item)
def reset(self, items: Iterable[Any] = None): "Reset the set with `items`." self.items = set(items or [])
[docs]class TwoPhaseSet(CvRDT): def init(self): self.living = GSet(process=self.process) self.dead = GSet(process=self.process) @property def value(self) -> frozenset: """The current value.""" return frozenset(self.living.value - self.dead.value) def __le__(self, other) -> bool: if not isinstance(other, TwoPhaseSet): return NotImplemented return self.living <= other.living or self.dead <= other.dead def __eq__(self, other) -> bool: if not isinstance(other, TwoPhaseSet): return NotImplemented return ( self.process == other.process and self.living == other.living and self.dead == other.dead ) def merge(self, other: "TwoPhaseSet") -> None: # type: ignore self.living.items |= other.living.items self.dead.items |= other.dead.items
[docs] def add(self, item) -> None: "Add `item` to the set." self.living.add(item)
[docs] def remove(self, item) -> bool: """Remove `item` to the set. If `item` is not in (this replica's view of) the set, do nothing. If it was, remove it and the item will never in the set again. """ if item in self.value: self.dead.add(item) return True else: return False
def reset(self, items: Iterable[Any] = None): """Reset to an initial value of `items`.""" self.living.reset(items) self.dead.reset()
[docs]class USet(CvRDT): """The USet. .. warning:: You must be careful using this directly. You MUST never add the same item twice. This as way to implement the `ORSet`:class:. """ def init(self): self.vclock = VClock() self.items = set() @property def value(self): return frozenset(self.items) def __le__(self, other) -> bool: if isinstance(other, USet): return self.vclock <= other.vclock else: return NotImplemented def __eq__(self, other) -> bool: if isinstance(other, USet): return self.process == other.process and self.vclock == other.vclock else: return NotImplemented def merge(self, other: "USet") -> None: # type: ignore if self.vclock >= other.vclock: # Our history contains all of others so we can stay the same. pass elif self.vclock < other.vclock: # other has seen events we haven't and all our events have been # witnessed by other; so we must simply take the state of other. self.items = set(other.items) self.vclock += other.vclock elif self.vclock // other.vclock: # We have diverging items; our assumption about unique items and # the precondition on 'remove' ensures that a replica cannot # remove an item unless its addition was in the history. self.items |= other.items self.vclock += other.vclock else: assert False # pragma: no cover
[docs] def add(self, item) -> None: """Add `item` to the set.""" self.vclock = self.vclock.bump(self.process) self.items.add(item)
[docs] def remove(self, item) -> None: """Remove `item` from the set. If `item` is not in (this replica's view of) the set, nothing happens. """ if item in self.items: self.vclock = self.vclock.bump(self.process) self.items.remove(item)
def __repr__(self): return f"<USet: {self.value}; {self.process}, {self.vclock}>" def reset(self, items: Iterable[Any] = None): "Reset the value with `items`." self.vlock = VClock() self.items = set(items or [])
[docs]class ORSet(CvRDT): """The Observed-Remove Set. """ def init(self): self.items: USet = USet(process=self.process) self.ticks = 0 def __le__(self, other) -> bool: if isinstance(other, ORSet): return self.items <= other.items else: return NotImplemented def __eq__(self, other) -> bool: if isinstance(other, ORSet): return ( self.process == other.process and self.items == other.items and self.ticks == other.ticks ) else: return NotImplemented def merge(self, other: "ORSet") -> None: # type: ignore self.items.merge(other.items) @property def value(self): return frozenset(item for item, _, _ in self.items.value) @property def dot(self) -> Dot: return self.items.vclock.find(self.process) @property def dot_counter(self) -> int: try: return self.dot.counter except ValueError: # pragma: no cover return 0
[docs] def add(self, item): """Add `item` to the set. """ # USet requires unique items, we expect the processes names are unique # in the cluster and each have an ever increasing tick. self.ticks += 1 x = (item, self.process, self.ticks) self.items.add(x)
[docs] def remove(self, item): """Remove `item` from the set. We remove the **observed instances** of `item` in this replica; if this replica hasn't any, do nothing. This also means that an ``add(x)`` at one replica concurrent with a ``remove(x)`` at another, will result in the item being kept. """ xs = [x for x in self.items.value if x[0] == item] if xs: # I have to hack the internal VClock of 'self.items' to ensure # just a single bump. counter = self.dot_counter for x in xs: self.items.remove(x) object.__setattr__(self.dot, "counter", counter + 1)
def __repr__(self): return f"<ORSet: {self.value}; {self.process}, {self.items}>" def reset(self, items: Iterable[Any] = None): """Reset the value of the set with `items`. """ self.init() for item in items or []: self.add(item)