Source code for xotl.crdt.counter

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------
# Copyright (c) Merchise Autrement [~º/~] and Contributors
# All rights reserved.
#
# This is free software; you can do what the LICENCE file allows you to.
#
from xotl.crdt.base import CvRDT
from xotl.crdt.clocks import VClock


[docs]class GCounter(CvRDT): """A increment-only counter. """ def init(self): self.vclock = VClock() def __repr__(self): return f"<GCounter of {self.value}; {self.process}, {self.vclock}>"
[docs] def incr(self): "Increases the counter by one." self.vclock = self.vclock.bump(self.process)
@property def value(self) -> int: "The current value of the counter" return sum(d.counter for d in self.vclock.dots) def merge(self, other: "GCounter") -> None: # type: ignore "Merge this replica with another in-place" self.vclock += other.vclock def __le__(self, other) -> bool: if isinstance(other, GCounter): return self.vclock <= other.vclock else: return NotImplemented def reset(self): """Reset the counter to 0. .. warning:: This an operation that must be coordinated between processes. """ self.vclock.reset() def __eq__(self, other) -> bool: if isinstance(other, GCounter): return self.process == other.process and self.vclock == other.vclock else: return NotImplemented
[docs]class PNCounter(CvRDT): """A counter that allows increments and decrements.""" def init(self): self.pos = GCounter(process=self.process) self.neg = GCounter(process=self.process) def __repr__(self): return f"<PNCounter of {self.value}; with {self.pos} and {self.neg}>"
[docs] def incr(self): "Increase the counter by one." self.pos.incr()
[docs] def decr(self): "Decreases the counter by one." self.neg.incr()
@property def value(self) -> int: "The current value of the counter." return self.pos.value - self.neg.value def merge(self, other: "PNCounter") -> None: # type: ignore "Merge this replica with another in-place" self.pos.merge(other.pos) self.neg.merge(other.neg) def __le__(self, other) -> bool: if isinstance(other, PNCounter): return self.pos <= other.pos and self.neg <= other.neg else: return NotImplemented def __eq__(self, other): if isinstance(other, PNCounter): return ( self.process == other.process and self.pos == other.pos and self.neg == other.neg ) else: return NotImplemented def reset(self): """Reset the counter to 0. .. warning:: This an operation that must be coordinated between processes. """ self.pos.reset() self.neg.reset()