#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------
# Copyright (c) Merchise Autrement [~º/~] and Contributors
# All rights reserved.
#
# This is free software; you can do what the LICENCE file allows you to.
#
from typing import Iterable, Any
from xotl.crdt.base import CvRDT
from xotl.crdt.clocks import VClock, Dot
[docs]class GSet(CvRDT):
'''The Grow-only set.
'''
def init(self):
self.items = set()
@property
def value(self) -> frozenset:
return frozenset(self.items)
def __le__(self, other) -> bool:
if not isinstance(other, GSet):
return NotImplemented
return self.value <= other.value
def __eq__(self, other) -> bool:
if not isinstance(other, GSet):
return NotImplemented
return self.process == other.process and self.items == other.items
def merge(self, other: 'GSet') -> None: # type: ignore
self.items |= other.value
[docs] def add(self, item):
'Add `item` to the set.'
self.items.add(item)
def reset(self, items: Iterable[Any] = None):
'Reset the set with `items`.'
self.items = set(items or [])
[docs]class TwoPhaseSet(CvRDT):
def init(self):
self.living = GSet(process=self.process)
self.dead = GSet(process=self.process)
@property
def value(self) -> frozenset:
'''The current value.'''
return frozenset(self.living.value - self.dead.value)
def __le__(self, other) -> bool:
if not isinstance(other, TwoPhaseSet):
return NotImplemented
return self.living <= other.living or self.dead <= other.dead
def __eq__(self, other) -> bool:
if not isinstance(other, TwoPhaseSet):
return NotImplemented
return (self.process == other.process and
self.living == other.living and
self.dead == other.dead)
def merge(self, other: 'TwoPhaseSet') -> None: # type: ignore
self.living.items |= other.living.items
self.dead.items |= other.dead.items
[docs] def add(self, item) -> None:
'Add `item` to the set.'
self.living.add(item)
[docs] def remove(self, item) -> bool:
'''Remove `item` to the set.
If `item` is not in (this replica's view of) the set, do nothing. If
it was, remove it and the item will never in the set again.
'''
if item in self.value:
self.dead.add(item)
return True
else:
return False
def reset(self, items: Iterable[Any] = None):
'''Reset to an initial value of `items`.'''
self.living.reset(items)
self.dead.reset()
[docs]class USet(CvRDT):
'''The USet.
.. warning:: You must be careful using this directly. You MUST never add
the same item twice. This as way to implement the `ORSet`:class:.
'''
def init(self):
self.vclock = VClock()
self.items = set()
@property
def value(self):
return frozenset(self.items)
def __le__(self, other) -> bool:
if isinstance(other, USet):
return self.vclock <= other.vclock
else:
return NotImplemented
def __eq__(self, other) -> bool:
if isinstance(other, USet):
return self.process == other.process and self.vclock == other.vclock
else:
return NotImplemented
def merge(self, other: 'USet') -> None: # type: ignore
if self.vclock >= other.vclock:
# Our history contains all of others so we can stay the same.
pass
elif self.vclock < other.vclock:
# other has seen events we haven't and all our events have been
# witnessed by other; so we must simply take the state of other.
self.items = set(other.items)
self.vclock += other.vclock
elif self.vclock // other.vclock:
# We have diverging items; our assumption about unique items and
# the precondition on 'remove' ensures that a replica cannot
# remove an item unless its addition was in the history.
self.items |= other.items
self.vclock += other.vclock
else:
assert False
[docs] def add(self, item) -> None:
'''Add `item` to the set.'''
self.vclock = self.vclock.bump(self.process)
self.items.add(item)
[docs] def remove(self, item) -> None:
'''Remove `item` from the set.
If `item` is not in (this replica's view of) the set, nothing happens.
'''
if item in self.items:
self.vclock = self.vclock.bump(self.process)
self.items.remove(item)
def __repr__(self):
return f"<USet: {self.value}; {self.process}, {self.vclock.simplified}>"
def reset(self, items: Iterable[Any] = None):
'Reset the value with `items`.'
self.vlock = VClock()
self.items = set(items or [])
[docs]class ORSet(CvRDT):
'''The Observed-Remove Set.
'''
def init(self):
self.items: USet = USet(process=self.process)
self.ticks = 0
def __le__(self, other) -> bool:
if isinstance(other, ORSet):
return self.items <= other.items
else:
return NotImplemented
def __eq__(self, other) -> bool:
if isinstance(other, ORSet):
return (self.process == other.process and
self.items == other.items and
self.ticks == other.ticks)
else:
return NotImplemented
def merge(self, other: 'ORSet') -> None: # type: ignore
self.items.merge(other.items)
@property
def value(self):
return frozenset(item for item, _, _ in self.items.value)
@property
def dot(self) -> Dot:
return self.items.vclock.find(self.process)
@property
def dot_counter(self) -> int:
try:
return self.dot.counter
except ValueError:
return 0
[docs] def add(self, item):
'''Add `item` to the set.
'''
# USet requires unique items, we expect the processes names are unique
# in the cluster and each have an ever increasing tick.
self.ticks += 1
x = (item, self.process, self.ticks)
self.items.add(x)
[docs] def remove(self, item):
'''Remove `item` from the set.
We remove the **observed instances** of `item` in this replica; if
this replica hasn't any, do nothing. This also means that an
``add(x)`` at one replica concurrent with a ``remove(x)`` at another,
will result in the item being kept.
'''
xs = [x for x in self.items.value if x[0] == item]
if xs:
# I have to hack the internal VClock of 'self.items' to ensure
# just a single bump.
counter = self.dot_counter
for x in xs:
self.items.remove(x)
object.__setattr__(self.dot, 'counter', counter + 1)
def __repr__(self):
return f"<ORSet: {self.value}; {self.process}, {self.items}>"
def reset(self, items: Iterable[Any] = None):
'''Reset the value of the set with `items`.
'''
self.items.reset(items)