Source code for xotl.crdt.register

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------
# Copyright (c) Merchise Autrement [~º/~] and Contributors
# All rights reserved.
#
# This is free software; you can do what the LICENCE file allows you to.
#
from time import monotonic
from xotl.crdt.base import CvRDT
from xotl.crdt.clocks import VClock, Dot


[docs]class LWWRegister(CvRDT): """The Last-Write-Wins Register. If two processes set a value concurrently (as per vector clock counter) and with the same time stamp. The process with highest `priority <xotl.crdt.base.Process>`:class: wins. """ def init(self): self.vclock = VClock([Dot(self.process, 0)]) self.timestamp = 0 self.atom = None @property def value(self): return self.atom @property def dot(self) -> Dot: # pragma: no cover return self.vclock.find(self.process)
[docs] def set(self, value, *, _timestamp=None): """Set the `value` of the register. `value` should be an immutable object. Putting a mutable object may lead to unexpected behavior (specially if it implements an unsafe hash). """ hash(value) # Check is immutable; mutable objs should raise an error if _timestamp is None: ts = max(self.timestamp, monotonic()) else: ts = _timestamp self.vclock = self.vclock.bump(self.process) self.timestamp = ts self.atom = value
def __le__(self, other) -> bool: if isinstance(other, LWWRegister): return self.vclock <= other.vclock else: return NotImplemented def __eq__(self, other) -> bool: if isinstance(other, LWWRegister): return self.process == other.process and self.vclock == other.vclock else: return NotImplemented def __lt__(self, other) -> bool: if isinstance(other, LWWRegister): return self.vclock < other.vclock else: return NotImplemented def __gt__(self, other) -> bool: if isinstance(other, LWWRegister): return self.vclock > other.vclock else: return NotImplemented def __ge__(self, other) -> bool: if isinstance(other, LWWRegister): return self.vclock >= other.vclock else: return NotImplemented def __floordiv__(self, other) -> bool: if isinstance(other, LWWRegister): return self.vclock // other.vclock else: raise TypeError( f"'//' not supported for instances " f"of type '{type(self).__name__}' and " f"type '{type(other).__name__}'" )
[docs] def __lshift__(self, other) -> bool: """True is `other` wins. `other` wins if: - its vector clock dominates ours (it descends from ours and knows even more than we do). - its vector clock is concurrent with ours but other is marked with a higher timestamp. - none of the above, but other's process has higher priority """ if not isinstance(other, LWWRegister): raise TypeError( f"'<<' not supported for instances " f"of type '{type(self).__name__}' and " f"type '{type(other).__name__}'" ) if self.vclock < other.vclock: return True elif self.vclock > other.vclock: return False elif self.vclock // other.vclock or self.vclock == other.vclock: if self.timestamp < other.timestamp: return True elif self.timestamp > other.timestamp: return False else: return self.process < other.process else: assert False # pragma: no cover
def merge(self, other: "LWWRegister") -> None: # type: ignore if self << other: assert not (other << self) self.atom = other.value self.vclock += other.vclock self.timestamp = max(self.timestamp, other.timestamp) def __repr__(self): return f"<LWWRegister: {self.value}; {self.process}, {self.vclock}>" def reset(self, value=None): """Reset the internal state of value. This method should only be used within the boundaries of a coordination controlled layer. Notice it may not be sufficient for a majority of the nodes to agree on the value, but the whole set of nodes. You probably only need to call this when removing/adding an process from the cluster. """ self.vclock = VClock() self.atom = value