from collections.abc import Sequence, Hashable
from itertools import islice, chain
from numbers import Integral
from pyrsistent._plist import plist
class PDeque(object):
"""
Persistent double ended queue (deque). Allows quick appends and pops in both ends. Implemented
using two persistent lists.
A maximum length can be specified to create a bounded queue.
Fully supports the Sequence and Hashable protocols including indexing and slicing but
if you need fast random access go for the PVector instead.
Do not instantiate directly, instead use the factory functions :py:func:`dq` or :py:func:`pdeque` to
create an instance.
Some examples:
>>> x = pdeque([1, 2, 3])
>>> x.left
1
>>> x.right
3
>>> x[0] == x.left
True
>>> x[-1] == x.right
True
>>> x.pop()
pdeque([1, 2])
>>> x.pop() == x[:-1]
True
>>> x.popleft()
pdeque([2, 3])
>>> x.append(4)
pdeque([1, 2, 3, 4])
>>> x.appendleft(4)
pdeque([4, 1, 2, 3])
>>> y = pdeque([1, 2, 3], maxlen=3)
>>> y.append(4)
pdeque([2, 3, 4], maxlen=3)
>>> y.appendleft(4)
pdeque([4, 1, 2], maxlen=3)
"""
__slots__ = ('_left_list', '_right_list', '_length', '_maxlen', '__weakref__')
def __new__(cls, left_list, right_list, length, maxlen=None):
instance = super(PDeque, cls).__new__(cls)
instance._left_list = left_list
instance._right_list = right_list
instance._length = length
if maxlen is not None:
if not isinstance(maxlen, Integral):
raise TypeError('An integer is required as maxlen')
if maxlen < 0:
raise ValueError("maxlen must be non-negative")
instance._maxlen = maxlen
return instance
@property
def right(self):
"""
Rightmost element in dqueue.
"""
return PDeque._tip_from_lists(self._right_list, self._left_list)
@property
def left(self):
"""
Leftmost element in dqueue.
"""
return PDeque._tip_from_lists(self._left_list, self._right_list)
@staticmethod
def _tip_from_lists(primary_list, secondary_list):
if primary_list:
return primary_list.first
if secondary_list:
return secondary_list[-1]
raise IndexError('No elements in empty deque')
def __iter__(self):
return chain(self._left_list, self._right_list.reverse())
def __repr__(self):
return "pdeque({0}{1})".format(list(self),
', maxlen={0}'.format(self._maxlen) if self._maxlen is not None else '')
__str__ = __repr__
@property
def maxlen(self):
"""
Maximum length of the queue.
"""
return self._maxlen
def pop(self, count=1):
"""
Return new deque with rightmost element removed. Popping the empty queue
will return the empty queue. A optional count can be given to indicate the
number of elements to pop. Popping with a negative index is the same as
popleft. Executes in amortized O(k) where k is the number of elements to pop.
>>> pdeque([1, 2]).pop()
pdeque([1])
>>> pdeque([1, 2]).pop(2)
pdeque([])
>>> pdeque([1, 2]).pop(-1)
pdeque([2])
"""
if count < 0:
return self.popleft(-count)
new_right_list, new_left_list = PDeque._pop_lists(self._right_list, self._left_list, count)
return PDeque(new_left_list, new_right_list, max(self._length - count, 0), self._maxlen)
def popleft(self, count=1):
"""
Return new deque with leftmost element removed. Otherwise functionally
equivalent to pop().
>>> pdeque([1, 2]).popleft()
pdeque([2])
"""
if count < 0:
return self.pop(-count)
new_left_list, new_right_list = PDeque._pop_lists(self._left_list, self._right_list, count)
return PDeque(new_left_list, new_right_list, max(self._length - count, 0), self._maxlen)
@staticmethod
def _pop_lists(primary_list, secondary_list, count):
new_primary_list = primary_list
new_secondary_list = secondary_list
while count > 0 and (new_primary_list or new_secondary_list):
count -= 1
if new_primary_list.rest:
new_primary_list = new_primary_list.rest
elif new_primary_list:
new_primary_list = new_secondary_list.reverse()
new_secondary_list = plist()
else:
new_primary_list = new_secondary_list.reverse().rest
new_secondary_list = plist()
return new_primary_list, new_secondary_list
def _is_empty(self):
return not self._left_list and not self._right_list
def __lt__(self, other):
if not isinstance(other, PDeque):
return NotImplemented
return tuple(self) < tuple(other)
def __eq__(self, other):
if not isinstance(other, PDeque):
return NotImplemented
if tuple(self) == tuple(other):
# Sanity check of the length value since it is redundant (there for performance)
assert len(self) == len(other)
return True
return False
def __hash__(self):
return hash(tuple(self))
def __len__(self):
return self._length
def append(self, elem):
"""
Return new deque with elem as the rightmost element.
>>> pdeque([1, 2]).append(3)
pdeque([1, 2, 3])
"""
new_left_list, new_right_list, new_length = self._append(self._left_list, self._right_list, elem)
return PDeque(new_left_list, new_right_list, new_length, self._maxlen)
def appendleft(self, elem):
"""
Return new deque with elem as the leftmost element.
>>> pdeque([1, 2]).appendleft(3)
pdeque([3, 1, 2])
"""
new_right_list, new_left_list, new_length = self._append(self._right_list, self._left_list, elem)
return PDeque(new_left_list, new_right_list, new_length, self._maxlen)
def _append(self, primary_list, secondary_list, elem):
if self._maxlen is not None and self._length == self._maxlen:
if self._maxlen == 0:
return primary_list, secondary_list, 0
new_primary_list, new_secondary_list = PDeque._pop_lists(primary_list, secondary_list, 1)
return new_primary_list, new_secondary_list.cons(elem), self._length
return primary_list, secondary_list.cons(elem), self._length + 1
@staticmethod
def _extend_list(the_list, iterable):
count = 0
for elem in iterable:
the_list = the_list.cons(elem)
count += 1
return the_list, count
def _extend(self, primary_list, secondary_list, iterable):
new_primary_list, extend_count = PDeque._extend_list(primary_list, iterable)
new_secondary_list = secondary_list
current_len = self._length + extend_count
if self._maxlen is not None and current_len > self._maxlen:
pop_len = current_len - self._maxlen
new_secondary_list, new_primary_list = PDeque._pop_lists(new_secondary_list, new_primary_list, pop_len)
extend_count -= pop_len
return new_primary_list, new_secondary_list, extend_count
def extend(self, iterable):
"""
Return new deque with all elements of iterable appended to the right.
>>> pdeque([1, 2]).extend([3, 4])
pdeque([1, 2, 3, 4])
"""
new_right_list, new_left_list, extend_count = self._extend(self._right_list, self._left_list, iterable)
return PDeque(new_left_list, new_right_list, self._length + extend_count, self._maxlen)
def extendleft(self, iterable):
"""
Return new deque with all elements of iterable appended to the left.
NB! The elements will be inserted in reverse order compared to the order in the iterable.
>>> pdeque([1, 2]).extendleft([3, 4])
pdeque([4, 3, 1, 2])
"""
new_left_list, new_right_list, extend_count = self._extend(self._left_list, self._right_list, iterable)
return PDeque(new_left_list, new_right_list, self._length + extend_count, self._maxlen)
def count(self, elem):
"""
Return the number of elements equal to elem present in the queue
>>> pdeque([1, 2, 1]).count(1)
2
"""
return self._left_list.count(elem) + self._right_list.count(elem)
def remove(self, elem):
"""
Return new deque with first element from left equal to elem removed. If no such element is found
a ValueError is raised.
>>> pdeque([2, 1, 2]).remove(2)
pdeque([1, 2])
"""
try:
return PDeque(self._left_list.remove(elem), self._right_list, self._length - 1)
except ValueError:
# Value not found in left list, try the right list
try:
# This is severely inefficient with a double reverse, should perhaps implement a remove_last()?
return PDeque(self._left_list,
self._right_list.reverse().remove(elem).reverse(), self._length - 1)
except ValueError as e:
raise ValueError('{0} not found in PDeque'.format(elem)) from e
def reverse(self):
"""
Return reversed deque.
>>> pdeque([1, 2, 3]).reverse()
pdeque([3, 2, 1])
Also supports the standard python reverse function.
>>> reversed(pdeque([1, 2, 3]))
pdeque([3, 2, 1])
"""
return PDeque(self._right_list, self._left_list, self._length)
__reversed__ = reverse
def rotate(self, steps):
"""
Return deque with elements rotated steps steps.
>>> x = pdeque([1, 2, 3])
>>> x.rotate(1)
pdeque([3, 1, 2])
>>> x.rotate(-2)
pdeque([3, 1, 2])
"""
popped_deque = self.pop(steps)
if steps >= 0:
return popped_deque.extendleft(islice(self.reverse(), steps))
return popped_deque.extend(islice(self, -steps))
def __reduce__(self):
# Pickling support
return pdeque, (list(self), self._maxlen)
def __getitem__(self, index):
if isinstance(index, slice):
if index.step is not None and index.step != 1:
# Too difficult, no structural sharing possible
return pdeque(tuple(self)[index], maxlen=self._maxlen)
result = self
if index.start is not None:
result = result.popleft(index.start % self._length)
if index.stop is not None:
result = result.pop(self._length - (index.stop % self._length))
return result
if not isinstance(index, Integral):
raise TypeError("'%s' object cannot be interpreted as an index" % type(index).__name__)
if index >= 0:
return self.popleft(index).left
shifted = len(self) + index
if shifted < 0:
raise IndexError(
"pdeque index {0} out of range {1}".format(index, len(self)),
)
return self.popleft(shifted).left
index = Sequence.index
Sequence.register(PDeque)
Hashable.register(PDeque)
def pdeque(iterable=(), maxlen=None):
"""
Return deque containing the elements of iterable. If maxlen is specified then
len(iterable) - maxlen elements are discarded from the left to if len(iterable) > maxlen.
>>> pdeque([1, 2, 3])
pdeque([1, 2, 3])
>>> pdeque([1, 2, 3, 4], maxlen=2)
pdeque([3, 4], maxlen=2)
"""
t = tuple(iterable)
if maxlen is not None:
t = t[-maxlen:]
length = len(t)
pivot = int(length / 2)
left = plist(t[:pivot])
right = plist(t[pivot:], reverse=True)
return PDeque(left, right, length, maxlen)
def dq(*elements):
"""
Return deque containing all arguments.
>>> dq(1, 2, 3)
pdeque([1, 2, 3])
"""
return pdeque(elements)