import asyncio
import datetime
import functools
import logging
import socket
import time
import re
import os
import sys
MINUTE = datetime.timedelta(minutes=1).total_seconds()
HOUR = datetime.timedelta(hours=1).total_seconds()
DAY = datetime.timedelta(days=1).total_seconds()
WEEK = datetime.timedelta(weeks=1).total_seconds()
logger = logging.getLogger(__name__)
class ServiceBase(object):
"""Base service class."""
def __init__(self, loop):
self._loop = loop
self._should_stop = False
self._main_task = None
self._state = self.StoppedState(self)
def start(self):
return self._state.start()
def should_stop(self):
return self._state.should_stop()
async def wait(self):
return await self._state.wait()
def is_running(self):
return self._state.is_running()
async def _run(self):
raise NotImplementedError
class State(object):
def __init__(self, obj):
""":type obj: ServiceBase"""
self._obj = obj
def start(self):
pass
def should_stop(self):
pass
async def wait(self):
task = self._obj._main_task
if task:
await task
def is_running(self):
return False
class StoppedState(State):
def _on_stop(self, future):
self._obj._state = ServiceBase.StoppedState(self._obj)
self._obj._should_stop = False
def start(self):
obj = self._obj
obj._main_task = obj._loop.create_task(obj._run())
obj._main_task.add_done_callback(self._on_stop)
obj._state = ServiceBase.RunningState(obj)
class RunningState(State):
def should_stop(self):
obj = self._obj
obj._should_stop = True
obj._main_task.cancel()
obj._state = ServiceBase.StoppingState(obj)
def is_running(self):
return True
class StoppingState(State):
def start(self):
raise ProgrammingError(
"Cannot start stopping service. Please wait while it stop."
)
class ProgrammingError(Exception):
pass
class RateLimit:
"""Decorator to limit function calls to one per *period* seconds.
If less than *period* seconds have passed since the last call,
then the request to call the function is replace with an *on_drop*
call with the same arguments.
If *on_drop* is None [default] then the call is just dropped
"""
def __init__(self, period, timer=time.monotonic, *, on_drop=None):
self._next_call_time = None
self._period = period
self._timer = timer
self._on_drop = on_drop
@property
def should_be_called(self):
return (
self._next_call_time is None
or self._next_call_time <= self._timer()
)
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if self.should_be_called:
self._next_call_time = self._timer() + self._period
return func(*args, **kwargs)
elif self._on_drop is not None:
return self._on_drop(*args, **kwargs)
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if self.should_be_called:
self._next_call_time = self._timer() + self._period
return await func(*args, **kwargs)
elif self._on_drop is not None:
return self._on_drop(*args, **kwargs)
return async_wrapper if asyncio.iscoroutinefunction(func) else wrapper
rate_limit = RateLimit
class CoalesceCalls:
def __init__(self):
self.call_time = float("-inf")
self.delayed_call = None
def coalesce_calls(self, period, *, done_callback=None):
"""
Decorator to coalesce coroutine calls to one per *period* seconds.
Requests for a coroutine call in a given time period are coalesced:
If t is the time of the last call, then N call requests in the [t,
t+period) time interval results in a single call at the
t+period time iff N>0 i.e.,
if less than *period* seconds have passed since the last call,
then the calls are coalesced: (N-1) requests are dropped, Nth
requests is performed in *period* seconds.
It is unspecified which exact call is made if arguments differ.
If the call is not dropped then *done_callback* is attached
to the task when the coroutine is scheduled with the event loop.
Given `c` is the time of the last [actual] call (`loop.create_task()`)
And `T` is the coalesce time period
When a call request arrives at `t` time
Then
| call pending? | t>c+T | c<=t<=c+T | t<c |
|----------------+--------------------------------+------------+------|
| no p. call | call soon | call at c+T| warn |
| p. call at c+T | cancel the call/warn, call soon| drop call | warn |
"""
def decorator(coro):
@functools.wraps(coro)
async def wrapper(*args, **kwargs):
loop = kwargs.get("loop")
if loop is None:
loop = asyncio.get_event_loop()
if args or kwargs:
args_repr = "*%r, **%r" % (args, kwargs)
else: # special case no args case
args_repr = ""
call_repr = "%s(%s)" % (coro.__name__, args_repr)
def log_exception(task):
"""Log task's error
if any with event's loop exception handler.
CancelledError is not logged.
"""
if not task.cancelled() and task.exception() is not None:
loop.call_exception_handler(
{
"message": "Unhandled exception during "
+ call_repr,
"exception": task.exception(),
"task": task,
}
)
def call_delayed(coro, args, kwargs):
"""Call & schedule the delayed coroutine now."""
logger.info("Schedule call %s", call_repr)
self.call_time = loop.time()
self.delayed_call = None
task = loop.create_task(coro(*args, **kwargs))
task.add_done_callback(
log_exception
if done_callback is None
else done_callback
)
now = loop.time()
if now > (self.call_time + period): # call immediately
if self.delayed_call is not None:
# get string representation for logs
# before cancelling the call
old_delayed_call_repr = str(self.delayed_call)
self.delayed_call.cancel()
self.delayed_call = None
logger.warning(
"There was a scheduled call (%s)"
" but more than period (%r) seconds passed"
" since the last call (%r, now=%r)",
old_delayed_call_repr,
period,
self.call_time,
now,
)
logger.info(
"Satisfy the call request soon: %s. No calls in"
" more than %r seconds since the start",
call_repr,
period,
)
self.delayed_call = loop.call_soon(
call_delayed, coro, args, kwargs
)
elif self.call_time <= now <= (self.call_time + period):
delay = (self.call_time + period) - now
if self.delayed_call is not None: # drop call request
logger.info(
"Drop call request for %s"
", enforcing one call per %r seconds limit"
". Next call is in ~%.2f seconds",
call_repr,
period,
delay,
)
else: # schedule call request
assert self.delayed_call is None
logger.info(
"Delay call request: %s for ~%.2f seconds"
". Enforcing one call per %r seconds limit",
call_repr,
delay,
period,
)
self.delayed_call = loop.call_at(
self.call_time + period,
call_delayed,
coro,
args,
kwargs,
)
else: # now < call_time
logger.warning(
"Drop call request for %s, reason: last call time"
" (%r, now=%r) is in the future",
call_repr,
self.call_time,
now,
)
return wrapper
return decorator
webserver_gracefull_restart = CoalesceCalls()
def get_hostname():
"""Returns readable name of the server.
It is sent to CLN and allows user to sort out his servers.
"""
hostname = socket.getfqdn()
if hostname is None or hostname.lower().startswith("localhost"):
return socket.gethostname()
return hostname
# Everything from there is copied from setuptools package
# Copied from setuptools/_distutils/version.py
class Version:
"""Abstract base class for version numbering classes. Just provides
constructor (__init__) and reproducer (__repr__), because those
seem to be the same for all version numbering classes; and route
rich comparisons to _cmp.
"""
def __init__(self, vstring=None):
if vstring:
self.parse(vstring)
def __repr__(self):
return "{} ('{}')".format(self.__class__.__name__, str(self))
def __eq__(self, other):
c = self._cmp(other)
if c is NotImplemented:
return c
return c == 0
def __lt__(self, other):
c = self._cmp(other)
if c is NotImplemented:
return c
return c < 0
def __le__(self, other):
c = self._cmp(other)
if c is NotImplemented:
return c
return c <= 0
def __gt__(self, other):
c = self._cmp(other)
if c is NotImplemented:
return c
return c > 0
def __ge__(self, other):
c = self._cmp(other)
if c is NotImplemented:
return c
return c >= 0
# Copied from setuptools/_distutils/version.py
class LooseVersion(Version):
"""Version numbering for anarchists and software realists.
Implements the standard interface for version number classes as
described above. A version number consists of a series of numbers,
separated by either periods or strings of letters. When comparing
version numbers, the numeric components will be compared
numerically, and the alphabetic components lexically. The following
are all valid version numbers, in no particular order:
1.5.1
1.5.2b2
161
3.10a
8.02
3.4j
1996.07.12
3.2.pl0
3.1.1.6
2g6
11g
0.960923
2.2beta29
1.13++
5.5.kw
2.0b1pl0
In fact, there is no such thing as an invalid version number under
this scheme; the rules for comparison are simple and predictable,
but may not always give the results you want (for some definition
of "want").
"""
component_re = re.compile(r"(\d+ | [a-z]+ | \.)", re.VERBOSE)
def parse(self, vstring):
# I've given up on thinking I can reconstruct the version string
# from the parsed tuple -- so I just store the string here for
# use by __str__
self.vstring = vstring
components = [
x for x in self.component_re.split(vstring) if x and x != "."
]
for i, obj in enumerate(components):
try:
components[i] = int(obj)
except ValueError:
pass
self.version = components
def __str__(self):
return self.vstring
def __repr__(self):
return "LooseVersion ('%s')" % str(self)
def _cmp(self, other):
if isinstance(other, str):
other = LooseVersion(other)
elif not isinstance(other, LooseVersion):
return NotImplemented
if self.version == other.version:
return 0
if self.version < other.version:
return -1
if self.version > other.version:
return 1
# Copied from setuptools/_distutils/spawn.py
def find_executable(executable, path=None):
"""Tries to find 'executable' in the directories listed in 'path'.
A string listing directories separated by 'os.pathsep'; defaults to
os.environ['PATH']. Returns the complete filename or None if not found.
"""
_, ext = os.path.splitext(executable)
if (sys.platform == "win32") and (ext != ".exe"):
executable = executable + ".exe"
if os.path.isfile(executable):
return executable
if path is None:
path = os.environ.get("PATH", None)
if path is None:
try:
path = os.confstr("CS_PATH")
except (AttributeError, ValueError):
# os.confstr() or CS_PATH is not available
path = os.defpath
# bpo-35755: Don't use os.defpath if the PATH environment variable is
# set to an empty string
# PATH='' doesn't match, whereas PATH=':' looks in the current directory
if not path:
return None
paths = path.split(os.pathsep)
for p in paths:
f = os.path.join(p, executable)
if os.path.isfile(f):
# the file exists, we have a shot at spawn working
return f
return None