Skip to content

Optimize _SyncSocketReaderFuture and with a Cythonized version of the Future Object #750

@Vizonex

Description

@Vizonex

Something that has always annoyed me with both this library and the winloop version is the lack of a C Extension on these classes that need to subclassed to the Future object. I am aware that we could extern it but wrapping that much code from the _asyncio C Module would just break ABI Changes and begging Python to wrap _asynciomodule.c as a C-API capsule would be a waste of time, so I came up with a second plan that involves writing a new Future Object as a cython cdef extension class to resolve this issue. Thank goodness I was able to successfully get this to work because it took a very long time for me to get it to be successful. My idea involves making another submodule for it and possibly naming it as a UVFuture to not be confused with aio_Future which will still be used elsewhere and then we can turn the remaining objects that rely on sub classing asyncio.Future to be replaced with this new object.

# Will Hypothetically say the Loop cdef extension is in here.
from cpython.contextvars cimport PyContext_CopyCurrent
from cpython.list cimport PyList_GET_SIZE
import sys
from asyncio import base_futures, format_helpers
from asyncio import (
    CancelledError,
    InvalidStateError,
    get_event_loop
)
from collections.abc import Awaitable
from types import GenericAlias, coroutine

cdef enum fut_state:
    _PENDING = 0
    _CANCELLED = 1
    _FINISHED = 2

# This is mostly being done as an attempt to show that uvloop and winloop are infact
# still optimizable if they had a cdef Future extension instead of a normal class to 
# go off of...
 
cdef class UVFuture:
    cdef:
        fut_state state
        object _result
        object _exception
        Loop _loop
        object _source_traceback
        object _cancel_message
        object _cancelled_exc
        list _callbacks
        public bint _asyncio_future_blocking
        bint __log_traceback


    def __init__(self, *, loop=None):
        self.state = _PENDING
        self._result = None
        self._exception = None
        self._source_traceback = None
        self._cancel_message = None
        self._cancelled_exc = None
        self.__log_traceback = False

        if loop is None:
            self._loop = get_event_loop()
        else:
            self._loop = loop
        
        self._callbacks = []

        if self._loop.get_debug():
            self._source_traceback = format_helpers.extract_stack(
                sys._getframe(1))

    @property
    def _state(self):
        # compatability with python futures
        if self.state == _PENDING:
            return "PENDING"
        elif self.state == _CANCELLED:                
            return "CANCELLED"
        return "FINISHED"

    def __repr__(self):
        return base_futures._future_repr(self)

    def __del__(self):
        cdef dict context
        cdef object exc
        if not self.__log_traceback:
            # set_exception() was not called, or result() or exception()
            # has consumed the exception
            return
        exc = self._exception
        context = {
            'message':
                f'{self.__class__.__name__} exception was never retrieved',
            'exception': exc,
            'future': self,
        }
        if self._source_traceback:
            context['source_traceback'] = self._source_traceback
        self._loop.call_exception_handler(context)
    
    __class_getitem__ = classmethod(GenericAlias)

    @property
    def _log_traceback(self):
        return self.__log_traceback

    @_log_traceback.setter
    def _log_traceback(self, val):
        if val:
            raise ValueError('_log_traceback can only be set to False')
        self.__log_traceback = False

    def get_loop(self):
        """Return the event loop the Future is bound to."""
        loop = self._loop
        if loop is None:
            raise RuntimeError("Future object is not initialized.")
        return loop

    def _make_cancelled_error(self):
        """Create the CancelledError to raise if the Future is cancelled.

        This should only be called once when handling a cancellation since
        it erases the saved context exception value.
        """
        if self._cancelled_exc is not None:
            exc = self._cancelled_exc
            self._cancelled_exc = None
            return exc

        if self._cancel_message is None:
            exc = CancelledError()
        else:
            exc = CancelledError(self._cancel_message)
        exc.__context__ = self._cancelled_exc
        # Remove the reference since we don't need this anymore.
        self._cancelled_exc = None
        return exc

    cpdef bint cancel(self, object msg=None):
        """Cancel the future and schedule callbacks.

        If the future is already done or cancelled, return False.  Otherwise,
        change the future's state to cancelled, schedule the callbacks and
        return True.
        """
        self.__log_traceback = False
        if self.state != _PENDING:
            return False
        self.state = _CANCELLED
        self._cancel_message = msg
        self.__schedule_callbacks()
        return True

    cpdef object __schedule_callbacks(self):
        """Internal: Ask the event loop to call all callbacks.

        The callbacks are scheduled to be called as soon as possible. Also
        clears the callback list.
        """
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback, ctx in callbacks:
            self._loop.call_soon(callback, self, context=ctx)

    cpdef bint cancelled(self):
        """Return True if the future was cancelled."""
        return self.state == _CANCELLED

    # Don't implement running(); see http://bugs.python.org/issue18699

    cpdef bint done(self):
        """Return True if the future is done.

        Done means either that a result / exception are available, or that the
        future was cancelled.
        """
        return self.state != _PENDING

    cpdef object result(self):
        """Return the result this future represents.

        If the future has been cancelled, raises CancelledError.  If the
        future's result isn't yet available, raises InvalidStateError.  If
        the future is done and has an exception set, this exception is raised.
        """
        if self.state == _CANCELLED:
            exc = self._make_cancelled_error()
            raise exc
        if self.state != _FINISHED:
            raise InvalidStateError('Result is not ready.')
        self.__log_traceback = False
        if self._exception is not None:
            raise self._exception.with_traceback(self._exception_tb)
        return self._result

    cpdef object exception(self):
        """Return the exception that was set on this future.

        The exception (or None if no exception was set) is returned only if
        the future is done.  If the future has been cancelled, raises
        CancelledError.  If the future isn't done yet, raises
        InvalidStateError.
        """
        if self.state == _CANCELLED:
            exc = self._make_cancelled_error()
            raise exc
        if self.state != _FINISHED:
            raise InvalidStateError('Exception is not set.')
        self.__log_traceback = False
        return self._exception

    def add_done_callback(self, fn, *, context=None):
        """Add a callback to be run when the future becomes done.

        The callback is called with a single argument - the future object. If
        the future is already done when this is called, the callback is
        scheduled with call_soon.
        """
        if self.state != _PENDING:
            self._loop.call_soon(fn, self, context=context)
        else:
            self._callbacks.append((fn, PyContext_CopyCurrent() if context is None else context))

    # New method not in PEP 3148.

    cpdef Py_ssize_t remove_done_callback(self, object fn):
        """Remove all instances of a callback from the "call when done" list.

        Returns the number of callbacks removed.
        """
        cdef Py_ssize_t removed_count
        cdef list filtered_callbacks = [(f, ctx)
                              for (f, ctx) in self._callbacks
                              if f != fn]
        removed_count = PyList_GET_SIZE(self._callbacks) - PyList_GET_SIZE(filtered_callbacks)
        if removed_count:
            self._callbacks[:] = filtered_callbacks
        return removed_count

    # So-called internal methods (note: no set_running_or_notify_cancel()).

    cpdef set_result(self, object result):
        """Mark the future done and set its result.

        If the future is already done when this method is called, raises
        InvalidStateError.
        """
        if self.state != _PENDING:
            raise InvalidStateError(f'{self._state}: {self!r}')
        self._result = result
        self.state = _FINISHED
        self.__schedule_callbacks()

    cpdef set_exception(self, BaseException exception):
        """Mark the future done and set an exception.

        If the future is already done when this method is called, raises
        InvalidStateError.
        """
        if self.state != _PENDING:
            raise InvalidStateError(f'{self._state}: {self!r}')
        if isinstance(exception, type):
            exception = exception()
        if type(exception) is StopIteration:
            raise TypeError("StopIteration interacts badly with generators "
                            "and cannot be raised into a Future")
        self._exception = exception
        self._exception_tb = exception.__traceback__
        self.state = _FINISHED
        self.__schedule_callbacks()
        self.__log_traceback = True

    @coroutine
    def __await__(self):
        while not self.done():
            self._asyncio_future_blocking = True
            yield  # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  # May raise too.

    __iter__ = __await__  # make compatible with 'yield from'.

# flag as awaitable incase needed...
Awaitable.register(Future)

After that the following changes can be made. Same can later be done to _SyncSocketWriterFuture No more costly loading functions on __remove_reader

cdef class _SyncSocketReaderFuture(UVFuture):
    cdef: 
         object __sock
         Loop __loop
    def __init__(self, sock, loop):
        UVFuture.__init__(self, loop=loop)
        self.__sock = sock
        self.__loop = loop

    def __remove_reader(self):
        if self.__sock is not None and self.__sock.fileno() != -1:
            self.__loop.remove_reader(self.__sock)
            self.__sock = None

    if PY39:
        def cancel(self, msg=None):
            self.__remove_reader()
            UVFuture.cancel(self, msg=msg)

    else:
        def cancel(self):
            self.__remove_reader()
            UVFuture.cancel(self)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions