#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------
# Copyright (c) Merchise Autrement [~º/~] and Contributors
# All rights reserved.
#
# This is free software; you can do what the LICENCE file allows you to.
#
"""Implements the Vector Clocks.
"""
from typing import Tuple, Sequence
from collections import deque
from dataclasses import dataclass
from itertools import groupby
from operator import attrgetter
from xotl.crdt.base import Process
[docs]@dataclass(frozen=True, order=False, eq=True)
class Dot:
"""A component on the vector clock.
"""
# process names should be unique across all processes
process: Process
counter: int
[docs]@dataclass(frozen=True, init=False)
class VClock:
dots: Tuple[Dot, ...]
def __init__(self, dots: Sequence[Dot] = None) -> None:
if dots:
assert len([d.process for d in dots]) == len(
{d.process for d in dots}
), f"Repeated processes in {dots!r}"
# Avoid silly counters.
dots = [d for d in (dots or []) if d.counter >= 0]
dots.sort(key=attrgetter("process"))
object.__setattr__(self, "dots", tuple(dots))
def __ge__(self, other: "VClock") -> bool:
"""True if this vclock descends (happens after) from other."""
if isinstance(other, VClock):
# Remember, that '.dots' are ordered by 'process'; with this in
# mind the algorithm is easy to follow.
#
# This algorithm consider missing processes as if they were there
# with counter 0. But, then if any process is present with
# counter 0, we should remove it from the dots.
theirs = deque(d for d in other.dots if d.counter)
ours = deque(d for d in self.dots if d.counter)
if not theirs:
return True # Every VC decends from the empty one.
elif not ours:
return False # Empty VC doesnt descent from non-empty ones.
result = True
while theirs and ours and result:
their_dot = theirs.popleft()
our_dot = ours.popleft()
while ours and their_dot.process != our_dot.process:
our_dot = ours.popleft()
if our_dot.process == their_dot.process:
result = our_dot.counter >= their_dot.counter
else:
assert not ours
result = False
return result and not theirs
else:
return NotImplemented
def __eq__(self, other: "VClock") -> bool: # type: ignore
"""True if this vclock is the same as other."""
if isinstance(other, VClock):
# Equality requires that every process present in `self` must be
# present in `other` with the same counter. This is faster than
# ``self >= other >= self``.
theirs = [d for d in other.dots if d.counter]
ours = [d for d in self.dots if d.counter]
return ours == theirs
else:
return NotImplemented
def __hash__(self):
# NB: self.dots is ordered by process, so we get a consistent hash.
return hash(tuple(d for d in self.dots if d.counter))
def __floordiv__(self, other: "VClock") -> bool:
"""True if neither self descends from other nor other from self.
This means that self and other represent concurrent events in
different replicas. In some texts this is represented as ``a || b``,
here we have ``a // b``.
"""
return not (self <= other) and not (other <= self)
def __le__(self, other: "VClock") -> bool:
return other >= self
def __gt__(self, other):
"""True if ``self >= other`` but not viceversa."""
if isinstance(other, VClock):
# Is this the same as 'self != other and self >= other'?
return not (other >= self) and self >= other
else:
return NotImplemented
def __lt__(self, other):
"""True if ``self <= other`` but not viceversa."""
if isinstance(other, VClock):
# Is this the same as 'self != other and self <= other'?
return not (other <= self) and self <= other
else:
return NotImplemented
def __bool__(self): # pragma: no cover
return bool(self.dots)
[docs] def merge(self, *others: "VClock") -> "VClock":
"""Return the least possible common descendant."""
from heapq import merge
get_process = attrgetter("process")
groups = groupby(
merge(self.dots, *(o.dots for o in others), key=get_process),
key=get_process,
)
dots = [
Dot(process, max(d.counter for d in group))
for process, group in groups
]
# Silly little trick to avoid sorting what is sorted already
result = VClock()
object.__setattr__(result, "dots", tuple(dots))
return result
def __add__(self, other):
"Return the merge with other."
return self.merge(other)
[docs] def bump(self, process):
"""Return a new VC with the process's counter increased."""
try:
i = index(self.dots, process, key=attrgetter("process"))
dots = list(self.dots)
dots[i] = Dot(process, dots[i].counter + 1)
except ValueError:
from heapq import merge
new = Dot(process, 1)
dots = merge(self.dots, [new], key=attrgetter("process"))
result = VClock()
object.__setattr__(result, "dots", tuple(dots))
return result
def find(self, process: Process) -> Dot:
i = index(self.dots, process, key=attrgetter("process"))
return self.dots[i]
[docs] def reset(self):
"""Reset the clock.
Basically forget about all the clock state.
"""
object.__setattr__(self, "dots", ())
def index(a, x, key=None): # pragma: no cover
"Locate the leftmost value exactly equal to x."
from bisect import bisect_left
if not key:
i = bisect_left(a, x)
if i != len(a) and a[i] == x:
return i
else:
i = bisect_left([key(y) for y in a], x)
if i != len(a) and key(a[i]) == x:
return i
raise ValueError