"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
from collections import namedtuple
import asyncio
import ctypes
import errno
import logging
import os
import struct
import platform
from defence360agent.subsys import sysctl
Event = namedtuple("Event", ("path", "flags", "cookie", "name", "wd"))
logger = logging.getLogger(__name__)
class Inotify:
"""
Tiny wrapper for inotify api. See `man inotify` for details
"""
ACCESS = 0x1 #: File was accessed
MODIFY = 0x2 #: File was modified
ATTRIB = 0x4 #: Metadata changed
CLOSE_WRITE = 0x8 #: Writable file was closed
CLOSE_NOWRITE = 0x10 #: Unwritable file closed
OPEN = 0x20 #: File was opened
MOVED_FROM = 0x40 #: File was moved from X
MOVED_TO = 0x80 #: File was moved to Y
CREATE = 0x100 #: Subfile was created
DELETE = 0x200 #: Subfile was deleted
DELETE_SELF = 0x400 #: Self was deleted
MOVE_SELF = 0x800 #: Self was moved
UNMOUNT = 0x2000 #: Backing fs was unmounted
Q_OVERFLOW = 0x4000 #: Event queue overflowed
IGNORED = 0x8000 #: File was ignored
ONLYDIR = 0x1000000 #: only watch the path if it is a directory
DONT_FOLLOW = 0x2000000 #: don't follow a sym link
EXCL_UNLINK = 0x4000000 #: exclude events on unlinked objects
MASK_ADD = 0x20000000 #: add to the mask of an already existing watch
ISDIR = 0x40000000 #: event occurred against dir
ONESHOT = 0x80000000 #: only send event once
_n = "libc.{}".format("so.6" if platform.system() != "Darwin" else "dylib")
_libc = ctypes.CDLL(_n, use_errno=True)
event_prefix = struct.Struct("iIII")
@staticmethod
def _call(method, *args):
"""
Wrapper to all calls to C functions. Raises OSError with appropriate
errno as argument in case of error return value.
:param method: method to call
:param args: method args
:return: called function return value in case of success
"""
ret = getattr(Inotify._libc, method)(*args)
if ret == -1:
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno))
return ret
@staticmethod
def init():
"""
Initialize an inotify instance.
See `man inotify_init` for details
:return: a file descriptor of new inotify instance
"""
return Inotify._call("inotify_init")
@staticmethod
def add_watch(fd, path, mask):
"""
Add a watch to an initialized inotify instance. This method is
idempotent. If called twice with the same :fd: and :path: and
different mask, will change watch flags of current watch.
See `man inotify_add_watch` for details
:param fd: file descriptor returned by `init()`
:param path: path to file or directory to watch
:param mask: bitmask of events to monitor
:return: file descriptor of watch
"""
return Inotify._call("inotify_add_watch", fd, path, mask)
@staticmethod
def rm_watch(fd, wd):
"""
Remove existing watch from inotify instance.
:param fd: file descriptor of inotify instance
:param wd: watch file descriptor, returned by `add_watch()`
:return: zero
"""
return Inotify._call("inotify_rm_watch", fd, wd)
@staticmethod
def unpack_prefix(data):
"""
Unpacks prefix of event struct.
See `man inotify` for details
:param data: struct bytestring
:return: tuple of (wd, flag, cookie, length)
"""
return Inotify.event_prefix.unpack(data)
@staticmethod
def unpack_name(data):
"""
Unpack name field of inotify event struct
See `man inotify` for details
:param data: struct bytestring
:return: name string
"""
return struct.unpack("%ds" % len(data), data)[0].rstrip(b"\x00")
class Watcher:
"""
Asynchronous watcher for inotify events
"""
_CHUNK_SIZE = 1024
_MAX_WATCH_RETRIES = 3
_WATCHERS_RAISE_COEFF = 1.5
_MAX_USER_WATCHES = "fs.inotify.max_user_watches"
def __init__(self, loop, coro_callback=None):
self._loop = loop
self._fd = Inotify.init()
self._queue = asyncio.Queue()
self._callback = coro_callback or self._queue.put
self._loop.add_reader(self._fd, self._read)
self._reset_state()
def _reset_state(self):
self.paths = {}
self.descriptors = {}
self.buf = b""
def _read(self):
self.buf += os.read(self._fd, self._CHUNK_SIZE)
# shortcut
struct_size = Inotify.event_prefix.size
while len(self.buf) >= struct_size:
wd, flags, cookie, length = Inotify.unpack_prefix(
self.buf[:struct_size]
)
struct_end = struct_size + length
name = Inotify.unpack_name(self.buf[struct_size:struct_end])
self.buf = self.buf[struct_end:]
if wd not in self.paths:
continue
path = self.paths[wd]
if flags & Inotify.IGNORED:
logger.warning(
"Got IGNORED event for %s, cleaning watch", path
)
self._cleanup_watch(path)
continue
if flags & Inotify.Q_OVERFLOW:
logger.error("Inotify queue overflow")
continue
ev = Event(path, flags, cookie, name, wd)
self._loop.create_task(self._callback(ev))
def _raise_user_watches(self):
current_max_watches = sysctl.read(self._MAX_USER_WATCHES)
new_max_watchers = current_max_watches + int(
current_max_watches * self._WATCHERS_RAISE_COEFF
)
logger.info(
"Raising %s to %s", self._MAX_USER_WATCHES, new_max_watchers
)
sysctl.write(self._MAX_USER_WATCHES, new_max_watchers)
def close(self):
"""
Close watcher. Close inotify fd, remove reader and reset state
:return:
"""
self._loop.remove_reader(self._fd)
try:
os.close(self._fd)
finally:
self._reset_state()
self._fd = None
def watch(self, path, mask):
"""
Add file to watch
:param path: file or directory to watch
:param mask: events mask for this watch
"""
assert isinstance(path, bytes), "Path must be bytes"
logger.info("Watching %r", path)
retries = 0
while True:
try:
wd = Inotify.add_watch(self._fd, path, mask)
self.paths[wd] = path
self.descriptors[path] = wd
break
except OSError as e:
if (
retries < self._MAX_WATCH_RETRIES
and e.errno == errno.ENOSPC
):
self._raise_user_watches()
retries += 1
logger.warning(
"Inotify: not enough watches (%r), retrying...", path
)
continue
logger.error("Inotify failed while watching %r", path)
raise
def _cleanup_watch(self, path):
descriptor = self.descriptors.pop(path, None)
if descriptor is not None:
self.paths.pop(descriptor, None)
def unwatch(self, path):
"""
Remove file or directory from watch
:param path: file or directory to remove watch from
"""
if path not in self.descriptors:
return
logger.info("Stop watching %r", path)
try:
Inotify.rm_watch(self._fd, self.descriptors[path])
finally:
self._cleanup_watch(path)
async def get_event(self):
"""
Get watch event
:return: `Event` named tuple
"""
event = await self._queue.get()
logger.debug("Inotify event: %s", event)
return event