Source code for xotl.crdt.clocks

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------
# Copyright (c) Merchise Autrement [~º/~] and Contributors
# All rights reserved.
#
# This is free software; you can do what the LICENCE file allows you to.
#
'''Implements the Vector Clocks.

'''
from typing import Tuple, Sequence

from collections import deque
from dataclasses import dataclass
from itertools import groupby
from operator import attrgetter

from xotl.crdt.base import Process


[docs]@dataclass(frozen=True, order=False, eq=True) class Dot: '''A component on the vector clock. ''' # process names should be unique across all processes process: Process counter: int
[docs]@dataclass(frozen=True, init=False) class VClock: dots: Tuple[Dot, ...] def __init__(self, dots: Sequence[Dot] = None) -> None: if dots: assert len([d.process for d in dots]) == len({d.process for d in dots}), \ f'Repeated processes in {dots!r}' # Avoid silly counters. dots = [d for d in (dots or []) if d.counter >= 0] dots.sort(key=attrgetter('process')) object.__setattr__(self, 'dots', tuple(dots)) def __ge__(self, other: 'VClock') -> bool: '''True if this vclock descends (happens after) from other.''' if isinstance(other, VClock): # Remember, that '.dots' are ordered by 'process'; with this in # mind the algorithm is easy to follow. # # This algorithm consider missing processes as if they were there # with counter 0. But, then if any process is present with # counter 0, we should remove it from the dots. theirs = deque(d for d in other.dots if d.counter) ours = deque(d for d in self.dots if d.counter) if not theirs: return True # Every VC decends from the empty one. elif not ours: return False # Empty VC doesnt descent from non-empty ones. result = True while theirs and ours and result: their_dot = theirs.popleft() our_dot = ours.popleft() while ours and their_dot.process != our_dot.process: our_dot = ours.popleft() if our_dot.process == their_dot.process: result = our_dot.counter >= their_dot.counter else: assert not ours result = False return result and not theirs else: return NotImplemented def __eq__(self, other: 'VClock') -> bool: # type: ignore '''True if this vclock is the same as other.''' if isinstance(other, VClock): # The looping structure is similar to __ge__; however, we must # ensure every process present in `self` is also present in # `other` with the same counter, thus the stronger conditional in # the 'return'. This is faster than ``self >= other >= self``. theirs = deque(d for d in other.dots if d.counter) ours = deque(d for d in self.dots if d.counter) result = True while theirs and ours and result: their_dot = theirs.popleft() our_dot = ours.popleft() while ours and their_dot.process != our_dot.process: our_dot = ours.popleft() if our_dot.process == their_dot.process: result = our_dot.counter == their_dot.counter else: assert not ours result = False return result and not ours and not theirs else: return NotImplemented def __floordiv__(self, other: 'VClock') -> bool: '''True if neither self descends from other nor other from self. This means that self and other represent concurrent events in different replicas. In some texts this is represented as ``a || b``, here we have ``a // b``. ''' return not (self <= other) and not (other <= self) def __le__(self, other: 'VClock') -> bool: return other >= self def __gt__(self, other): '''True if ``self >= other`` but not viceversa.''' if isinstance(other, VClock): # Is this the same as 'self != other and self >= other'? return not (other >= self) and self >= other else: return NotImplemented def __lt__(self, other): '''True if ``self <= other`` but not viceversa.''' if isinstance(other, VClock): # Is this the same as 'self != other and self <= other'? return not (other <= self) and self <= other else: return NotImplemented def __bool__(self): return bool(self.dots)
[docs] def merge(self, *others: 'VClock') -> 'VClock': '''Return the least possible common descendant.''' from heapq import merge get_process = attrgetter('process') groups = groupby( merge(self.dots, *(o.dots for o in others), key=get_process), key=get_process ) dots = [ Dot(process, max(d.counter for d in lgroup)) for process, group in groups # convert group to a list so that we can do the double max above for lgroup in (list(group), ) ] # Silly little trick to avoid sorting what is sorted already result = VClock() object.__setattr__(result, 'dots', tuple(dots)) return result
def __add__(self, other): 'Return the merge with other.' return self.merge(other)
[docs] def bump(self, process): '''Return a new VC with the process's counter increased.''' try: i = index(self.dots, process, key=attrgetter('process')) dots = list(self.dots) dots[i] = Dot(process, dots[i].counter + 1) except ValueError: from heapq import merge new = Dot(process, 1) dots = merge(self.dots, [new], key=attrgetter('process')) result = VClock() object.__setattr__(result, 'dots', tuple(dots)) return result
def find(self, process: Process) -> Dot: i = index(self.dots, process, key=attrgetter('process')) return self.dots[i] @property def simplified(self): return VClock([d for d in self.dots if d.counter])
[docs] def reset(self): '''Reset the clock. Basically forget about all the clock state. ''' self.dots = ()
def index(a, x, key=None): 'Locate the leftmost value exactly equal to x' from bisect import bisect_left if not key: i = bisect_left(a, x) if i != len(a) and a[i] == x: return i else: i = bisect_left([key(y) for y in a], x) if i != len(a) and key(a[i]) == x: return i raise ValueError