"""
Low-level operating system functions from :mod:`os`.

Cooperative I/O
===============

This module provides cooperative versions of :func:`os.read` and
:func:`os.write`. These functions are *not* monkey-patched; you
must explicitly call them or monkey patch them yourself.

POSIX functions
---------------

On POSIX, non-blocking IO is available.

- :func:`nb_read`
- :func:`nb_write`
- :func:`make_nonblocking`

All Platforms
-------------

On non-POSIX platforms (e.g., Windows), non-blocking IO is not
available. On those platforms (and on POSIX), cooperative IO can
be done with the threadpool.

- :func:`tp_read`
- :func:`tp_write`

Child Processes
===============

The functions :func:`fork` and (on POSIX) :func:`forkpty` and :func:`waitpid` can be used
to manage child processes.

.. warning::

   Forking a process that uses greenlets does not eliminate all non-running
   greenlets. Any that were scheduled in the hub of the forking thread in the parent
   remain scheduled in the child; compare this to how normal threads operate. (This behaviour
   may change is a subsequent major release.)
"""

from __future__ import absolute_import

import os
from stat import S_ISREG

from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import reinit
from gevent.event import Event
from gevent._config import config
from gevent._util import copy_globals
import errno

EAGAIN = getattr(errno, 'EAGAIN', 11)

try:
    import fcntl
except ImportError:
    fcntl = None

__implements__ = ['fork',]
__extensions__ = ['tp_read', 'tp_write']

_read = os.read
_write = os.write
_close = os.close
_fstat = os.fstat


ignored_errors = [EAGAIN, errno.EINTR]


# An escape hatch (set to False) if you have problems with
# close on regular files because you are trying to poll on it. If you
# need this please let the maintainers know!
_NO_DEFER_REG_FILE = True

if fcntl:
    def _check_fd_valid(fd):
        # see libev/check_valid_fd.c. This is what libev does;
        # libuv is much more sophisticated.
        return fcntl.fcntl(fd, fcntl.F_GETFD)
else:
    def _check_fd_valid(fd): # pylint: disable=unused-argument
        # Windows. Nothing we can reliably use here across
        # all event loops. We need to write some code and make it part
        # of the event loop interface.
        pass

if fcntl:
    _closing_fd_to_event = {}
    __implements__ += ['close',]

    def close(fd):
        """
        Close a file descriptor.

        This function cooperates with gevent to avoid crashing
        the process if you (accidentally) call it while you're
        still performing IO on the file descriptor; for example, if you have it
        registered with a ``Selector`` implementation, which documents
        that you *must* unregister FDs before closing them.

        If the *fd* refers to a regular file, this cooperation is *not*
        used. This is because trying to use gevent to poll on regular
        files doesn't work and shouldn't be done. We assume that everyone
        is following the rules.

        .. caution::
           This function is not intended for use on Windows.

        .. versionadded:: 25.8.1
        """
        # TODO: Should we limit this method (after the _fstat) to
        # just...sockets, fifo, pipe,...and actually I think the list goes
        # on. But at any rate, even though it usually doesn't make any
        # sense, you CAN create IO watchers for regular files, so in
        # theory you could run into the problems we're fixing (issue 2100)
        # even with a regular file. I consider that very unlikely though.
        # Temp escape hatch if needed.

        # First, check to see if it's invalid already, because
        # the stdlib throws OSError in this case; fstat does
        # the same.
        #
        # Our C code uses ``fcntl(F_GETFD)``, use that.
        # But we also want to know if the file is regular, so
        # we stat it as well.
        _check_fd_valid(fd)
        stats = _fstat(fd)
        if S_ISREG(stats.st_mode) and _NO_DEFER_REG_FILE:
            return _close(fd)

        hub = get_hub()
        loop = hub.loop

        if fd in _closing_fd_to_event:
            # If this is the second time we're closing the same
            # FD number, and the fd is still in our map, it means our
            # check watcher hasn't run yet, which means if we return
            # immediately and anyone tries to do something with that
            # fd (read, stat, whatever), it might STILL BE VALID. That's
            # bad, and breaks some tests. So wait for it to really be
            # closed.
            event = _closing_fd_to_event.pop(fd, None)
            if event is not None:
                # Wake the loop up, let it know we've got
                # stuff to do. This is necessary on libev,
                # because if the *fd* didn't actually have any active
                # watchers, then when we call ``closing_fd``,
                # the call it makes to ``ev_feed_fd_event`` silently
                # does nothing. This should actually be needed only
                # rarely (e.g., when there are no other greenlets and no
                # scheduled timers, etc, i.e. hub is idle).
                # This showed up in ``test_asyncore.py:
                # FileWrapperTest.test_close_twice``
                # Without this, we would just hang.
                with loop.idle() as watcher:
                    hub.wait(watcher)

                # Now our event SHOULD be set, because the check watcher
                # SHOULD have run. But don't do an unbounded wait, just in
                # case. Getting a weird error with a FD (most likely an
                # OSError because it will be closed) is better than
                # hanging the process forever.
                event.wait(0.001) # arbitrary amount of time

            # Closing twice should raise OSError
            raise OSError(errno.EBADF)

        # Ok, first time we're closing this FD. (If it was the
        # second, EVEN IF the fd had already come out of the
        # map so we failed that check, the _fstat call should
        # have raised a OSError.

        loop.closing_fd(fd)
        check = loop.check()
        event = Event()
        _closing_fd_to_event[fd] = event
        # Unlike closing sockets, we don't check the return value,
        # and always defer it. This is because this case doesn't
        # necessarily have access to any active watchers (yet)
        def cb(fd):
            # If they closed the FD through some other mechanism,
            # such as a socket.close, it will be invalid now.
            try:
                _close(fd)
            except OSError:
                pass
            finally:
                event.set()
                _closing_fd_to_event.pop(fd, None)
                check.stop()
                check.close()
        check.start(cb, fd)


    __extensions__ += ['make_nonblocking', 'nb_read', 'nb_write',]

    def make_nonblocking(fd):
        """Put the file descriptor *fd* into non-blocking mode if
        possible.

        :return: A boolean value that evaluates to True if successful.
        """
        flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
        if not bool(flags & os.O_NONBLOCK):
            fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            return True

    def nb_read(fd, n):
        """
        Read up to *n* bytes from file descriptor *fd*. Return a
        byte string containing the bytes read, which may be shorter than
        *n*. If end-of-file is reached, an empty string is returned.

        The descriptor must be in non-blocking mode.
        """
        hub = None
        event = None
        try:
            while 1:
                try:
                    result = _read(fd, n)
                    return result
                except OSError as e:
                    if e.errno not in ignored_errors:
                        raise
                if hub is None:
                    hub = get_hub()
                    event = hub.loop.io(fd, 1)
                hub.wait(event)
        finally:
            if event is not None:
                event.close()
                event = None
                hub = None


    def nb_write(fd, buf):
        """
        Write some number of bytes from buffer *buf* to file
        descriptor *fd*. Return the number of bytes written, which may
        be less than the length of *buf*.

        The file descriptor must be in non-blocking mode.
        """
        hub = None
        event = None
        try:
            while 1:
                try:
                    result = _write(fd, buf)
                    return result
                except OSError as e:
                    if e.errno not in ignored_errors:
                        raise
                if hub is None:
                    hub = get_hub()
                    event = hub.loop.io(fd, 2)
                hub.wait(event)
        finally:
            if event is not None:
                event.close()
                event = None
                hub = None


def tp_read(fd, n):
    """Read up to *n* bytes from file descriptor *fd*. Return a string
    containing the bytes read. If end-of-file is reached, an empty string
    is returned.

    Reading is done using the threadpool.
    """
    return get_hub().threadpool.apply(_read, (fd, n))


def tp_write(fd, buf):
    """Write bytes from buffer *buf* to file descriptor *fd*. Return the
    number of bytes written.

    Writing is done using the threadpool.
    """
    return get_hub().threadpool.apply(_write, (fd, buf))


if hasattr(os, 'fork'):
    # pylint:disable=function-redefined,redefined-outer-name

    _raw_fork = os.fork

    def fork_gevent():
        """
        Forks the process using :func:`os.fork` and prepares the
        child process to continue using gevent before returning.

        .. note::

            The PID returned by this function may not be waitable with
            either the original :func:`os.waitpid` or this module's
            :func:`waitpid` and it may not generate SIGCHLD signals if
            libev child watchers are or ever have been in use. For
            example, the :mod:`gevent.subprocess` module uses libev
            child watchers (which parts of gevent use libev child
            watchers is subject to change at any time). Most
            applications should use :func:`fork_and_watch`, which is
            monkey-patched as the default replacement for
            :func:`os.fork` and implements the ``fork`` function of
            this module by default, unless the environment variable
            ``GEVENT_NOWAITPID`` is defined before this module is
            imported.

        .. versionadded:: 1.1b2
        """
        import warnings
        # The simple `catch_warnings(action='ignore', category=DeprecationWarning)`
        # is only available in 3.11+.
        with warnings.catch_warnings():
            warnings.simplefilter('ignore', DeprecationWarning)
            result = _raw_fork()
        if not result:
            reinit()
        return result

    def fork():
        """
        A wrapper for :func:`fork_gevent` for non-POSIX platforms.
        """
        return fork_gevent()

    if hasattr(os, 'forkpty'):
        _raw_forkpty = os.forkpty

        def forkpty_gevent():
            """
            Forks the process using :func:`os.forkpty` and prepares the
            child process to continue using gevent before returning.

            Returns a tuple (pid, master_fd). The `master_fd` is *not* put into
            non-blocking mode.

            Availability: Some Unix systems.

            .. seealso:: This function has the same limitations as :func:`fork_gevent`.

            .. versionadded:: 1.1b5
            """
            pid, master_fd = _raw_forkpty()
            if not pid:
                reinit()
            return pid, master_fd

        forkpty = forkpty_gevent

        __implements__.append('forkpty')
        __extensions__.append("forkpty_gevent")

    if hasattr(os, 'WNOWAIT') or hasattr(os, 'WNOHANG'):
        # We can only do this on POSIX
        import time

        _waitpid = os.waitpid
        _WNOHANG = os.WNOHANG

        # replaced by the signal module.
        _on_child_hook = lambda: None

        # {pid -> watcher or tuple(pid, rstatus, timestamp)}
        _watched_children = {}

        def _on_child(watcher, callback):
            # XXX: Could handle tracing here by not stopping
            # until the pid is terminated
            watcher.stop()
            try:
                _watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time())
                if callback:
                    callback(watcher)
                # dispatch an "event"; used by gevent.signal.signal
                _on_child_hook()
                # now is as good a time as any to reap children
                _reap_children()
            finally:
                watcher.close()

        def _reap_children(timeout=60):
            # Remove all the dead children that haven't been waited on
            # for the *timeout* seconds.
            # Some platforms queue delivery of SIGCHLD for all children that die;
            # in that case, a well-behaved application should call waitpid() for each
            # signal.
            # Some platforms (linux) only guarantee one delivery if multiple children
            # die. On that platform, the well-behave application calls waitpid() in a loop
            # until it gets back -1, indicating no more dead children need to be waited for.
            # In either case, waitpid should be called the same number of times as dead children,
            # thus removing all the watchers when a SIGCHLD arrives. The (generous) timeout
            # is to work with applications that neglect to call waitpid and prevent "unlimited"
            # growth.
            # Note that we don't watch for the case of pid wraparound. That is, we fork a new
            # child with the same pid as an existing watcher, but the child is already dead,
            # just not waited on yet.
            now = time.time()
            oldest_allowed = now - timeout
            dead = [
                pid for pid, val
                in _watched_children.items()
                if isinstance(val, tuple) and val[2] < oldest_allowed
            ]
            for pid in dead:
                del _watched_children[pid]

        def waitpid(pid, options):
            """
            Wait for a child process to finish.

            If the child process was spawned using
            :func:`fork_and_watch`, then this function behaves
            cooperatively. If not, it *may* have race conditions; see
            :func:`fork_gevent` for more information.

            The arguments are as for the underlying
            :func:`os.waitpid`. Some combinations of *options* may not
            be supported cooperatively (as of 1.1 that includes
            WUNTRACED). Using a *pid* of 0 to request waiting on only processes
            from the current process group is not cooperative. A *pid* of -1
            to wait for any child is non-blocking, but may or may not
            require a trip around the event loop, depending on whether any children
            have already terminated but not been waited on.

            Availability: POSIX.

            .. versionadded:: 1.1b1
            .. versionchanged:: 1.2a1
               More cases are handled in a cooperative manner.
            """
            # pylint: disable=too-many-return-statements
            # XXX Does not handle tracing children

            # So long as libev's loop doesn't run, it's OK to add
            # child watchers. The SIGCHLD handler only feeds events
            # for the next iteration of the loop to handle. (And the
            # signal handler itself is only called from the next loop
            # iteration.)

            if pid <= 0:
                # magic functions for multiple children.
                if pid == -1:
                    # Any child. If we have one that we're watching
                    # and that finished, we will use that one,
                    # preferring the oldest. Otherwise, let the OS
                    # take care of it.
                    finished_at = None
                    for k, v in _watched_children.items():
                        if (
                                isinstance(v, tuple)
                                and (finished_at is None or v[2] < finished_at)
                        ):
                            pid = k
                            finished_at = v[2]

                if pid <= 0:
                    # We didn't have one that was ready. If there are
                    # no funky options set, and the pid was -1
                    # (meaning any process, not 0, which means process
                    # group--- libev doesn't know about process
                    # groups) then we can use a child watcher of pid 0; otherwise,
                    # pass through to the OS.
                    if pid == -1 and options == 0:
                        hub = get_hub()
                        with hub.loop.child(0, False) as watcher:
                            hub.wait(watcher)
                            return watcher.rpid, watcher.rstatus
                    # There were funky options/pid, so we must go to the OS.
                    return _waitpid(pid, options)

            if pid in _watched_children:
                # yes, we're watching it

                # Note that the remainder of this code must be careful to NOT
                # yield to the event loop except at well known times, or
                # we have a race condition between the _on_child callback and the
                # code here that could lead to a process to hang.
                if options & _WNOHANG or isinstance(_watched_children[pid], tuple):
                    # We're either asked not to block, or it already finished, in which
                    # case blocking doesn't matter
                    result = _watched_children[pid]
                    if isinstance(result, tuple):
                        # it finished. libev child watchers
                        # are one-shot
                        del _watched_children[pid]
                        return result[:2]
                    # it's not finished
                    return (0, 0)

                # Ok, we need to "block". Do so via a watcher so that we're
                # cooperative. We know it's our child, etc, so this should work.
                watcher = _watched_children[pid]
                # We can't start a watcher that's already started,
                # so we can't reuse the existing watcher. Notice that the
                # old watcher must not have fired already, or during this time, but
                # only after we successfully `start()` the watcher. So this must
                # not yield to the event loop.
                with watcher.loop.child(pid, False) as new_watcher:
                    get_hub().wait(new_watcher)
                # Ok, so now the new watcher is done. That means
                # the old watcher's callback (_on_child) should
                # have fired, potentially taking this child out of
                # _watched_children (but that could depend on how
                # many callbacks there were to run, so use the
                # watcher object directly; libev sets all the
                # watchers at the same time).
                return watcher.rpid, watcher.rstatus

            # we're not watching it and it may not even  be our child,
            # so we must go to the OS to be sure to get the right semantics (exception)
            # XXX
            # libuv has a race condition because the signal
            # handler is a Python function, so the InterruptedError
            # is raised before the signal handler runs and calls the
            # child watcher
            # we're not watching it
            return _waitpid(pid, options)

        def _watch_child(pid, callback=None, loop=None, ref=False):
            loop = loop or get_hub().loop
            watcher = loop.child(pid, ref=ref)
            _watched_children[pid] = watcher
            watcher.start(_on_child, watcher, callback)

        def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent):
            """
            Fork a child process and start a child watcher for it in the parent process.

            This call cooperates with :func:`waitpid` to enable cooperatively waiting
            for children to finish. When monkey-patching, these functions are patched in as
            :func:`os.fork` and :func:`os.waitpid`, respectively.

            In the child process, this function calls :func:`gevent.hub.reinit` before returning.

            Availability: POSIX.

            :keyword callback: If given, a callable that will be called with the child watcher
                when the child finishes.
            :keyword loop: The loop to start the watcher in. Defaults to the
                loop of the current hub.
            :keyword fork: The fork function. Defaults to :func:`the one defined in this
                module <gevent.os.fork_gevent>` (which automatically calls :func:`gevent.hub.reinit`).
                Pass the builtin :func:`os.fork` function if you do not need to
                initialize gevent in the child process.

            .. versionadded:: 1.1b1
            .. seealso::
                :func:`gevent.monkey.get_original` To access the builtin :func:`os.fork`.
            """
            pid = fork()
            if pid:
                # parent
                _watch_child(pid, callback, loop, ref)
            return pid

        __extensions__.append('fork_and_watch')
        __extensions__.append('fork_gevent')

        if 'forkpty' in __implements__:
            def forkpty_and_watch(callback=None, loop=None, ref=False, forkpty=forkpty_gevent):
                """
                Like :func:`fork_and_watch`, except using :func:`forkpty_gevent`.

                Availability: Some Unix systems.

                .. versionadded:: 1.1b5
                """
                result = []

                def _fork():
                    pid_and_fd = forkpty()
                    result.append(pid_and_fd)
                    return pid_and_fd[0]
                fork_and_watch(callback, loop, ref, _fork)
                return result[0]

            __extensions__.append('forkpty_and_watch')

        # Watch children by default
        if not config.disable_watch_children:
            # Broken out into separate functions instead of simple name aliases
            # for documentation purposes.
            def fork(*args, **kwargs):
                """
                Forks a child process and starts a child watcher for it in the
                parent process so that ``waitpid`` and SIGCHLD work as expected.

                This implementation of ``fork`` is a wrapper for :func:`fork_and_watch`
                when the environment variable ``GEVENT_NOWAITPID`` is *not* defined.
                This is the default and should be used by most applications.

                .. versionchanged:: 1.1b2
                """
                # take any args to match fork_and_watch
                return fork_and_watch(*args, **kwargs)

            if 'forkpty' in __implements__:
                def forkpty(*args, **kwargs):
                    """
                    Like :func:`fork`, but using :func:`forkpty_gevent`.

                    This implementation of ``forkpty`` is a wrapper for :func:`forkpty_and_watch`
                    when the environment variable ``GEVENT_NOWAITPID`` is *not* defined.
                    This is the default and should be used by most applications.

                    .. versionadded:: 1.1b5
                    """
                    # take any args to match fork_and_watch
                    return forkpty_and_watch(*args, **kwargs)
            __implements__.append("waitpid")

            if hasattr(os, 'posix_spawn'):
                _raw_posix_spawn = os.posix_spawn
                _raw_posix_spawnp = os.posix_spawnp

                def posix_spawn(*args, **kwargs):
                    pid = _raw_posix_spawn(*args, **kwargs)
                    _watch_child(pid)
                    return pid

                def posix_spawnp(*args, **kwargs):
                    pid = _raw_posix_spawnp(*args, **kwargs)
                    _watch_child(pid)
                    return pid

                __implements__.append("posix_spawn")
                __implements__.append("posix_spawnp")
        else:
            def fork():
                """
                Forks a child process, initializes gevent in the child,
                but *does not* prepare the parent to wait for the child or receive SIGCHLD.

                This implementation of ``fork`` is a wrapper for :func:`fork_gevent`
                when the environment variable ``GEVENT_NOWAITPID`` *is* defined.
                This is not recommended for most applications.
                """
                return fork_gevent()

            if 'forkpty' in __implements__:
                def forkpty():
                    """
                    Like :func:`fork`, but using :func:`os.forkpty`

                    This implementation of ``forkpty`` is a wrapper for :func:`forkpty_gevent`
                    when the environment variable ``GEVENT_NOWAITPID`` *is* defined.
                    This is not recommended for most applications.

                    .. versionadded:: 1.1b5
                    """
                    return forkpty_gevent()
            __extensions__.append("waitpid")

else:
    __implements__.remove('fork')


__imports__ = copy_globals(os, globals(),
                           names_to_ignore=__implements__ + __extensions__,
                           dunder_names_to_keep=())

__all__ = list(set(__implements__ + __extensions__))
