Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event loop is closed #991

Open
Saurus119 opened this issue Nov 18, 2024 · 2 comments
Open

Event loop is closed #991

Saurus119 opened this issue Nov 18, 2024 · 2 comments
Labels
bug needsinfo Requires additional information from the issue author

Comments

@Saurus119
Copy link

Saurus119 commented Nov 18, 2024

Hello,
Please can anyone try to help with finding error with this setup?

Error:

self = <ProactorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

I am using Windows :

pytest~=8.2.2
pytest-asyncio~=0.21.1
asyncpg~=0.30.0

pytest.ini:

[pytest]
asyncio_mode = auto
asyncio_default_fixture_loop_scope = session

conftest.py

@pytest_asyncio.fixture(scope='session')
def event_loop():
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()
    
@pytest.fixture
async def async_db_session(db_session):
    async with sessionmanager.session() as session:
        yield session

My tests:

class OrganizationRepository:

    async def test_get_existing_organization_by_id(self, organization_factory, async_db_session):
        organization = organization_factory()
        _ = organization_factory()

        detected_organization = await OrganizationRepository(async_db_session).get_by_id(
            organization.id
        )
        assert detected_organization.id == organization.id

    async def test_get_not_existing_organization_by_id(self, async_db_session):
        assert await OrganizationRepository(async_db_session).get_by_id(1) is None

The problem is, that every time when I run "all tests" my first test of the class fails because of Event loop is closed. Second run just fine and doesn´t fail on event loop.

ERROR:sqlalchemy.pool.impl.AsyncAdaptedQueuePool:Exception terminating connection <AdaptedConnection <asyncpg.connection.Connection object at 0x0000017288345300>>
Traceback (most recent call last):
  File "venv\Lib\site-packages\sqlalchemy\pool\base.py", line 374, in _close_connection
    self._dialect.do_terminate(connection)
  File "venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 1117, in do_terminate
    dbapi_connection.terminate()
  File "venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 895, in terminate
    self.await_(self._connection.close(timeout=2))
  File "venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
            ^^^^^^^^^^^^
  File "venv\Lib\site-packages\asyncpg\connection.py", line 1504, in close
    await self._protocol.close(timeout)
  File "asyncpg\\protocol\\protocol.pyx", line 627, in close
  File "asyncpg\\protocol\\protocol.pyx", line 660, in asyncpg.protocol.protocol.BaseProtocol._request_cancel
  File "venv\Lib\site-packages\asyncpg\connection.py", line 1673, in _cancel_current_command
    self._cancellations.add(self._loop.create_task(self._cancel(waiter)))
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Python311\Lib\asyncio\base_events.py", line 434, in create_task
    self._check_closed()
  File "C:\Python311\Lib\asyncio\base_events.py", line 519, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

tests\test_domain\test_organization\test_organization_repository.py:6 (TestGetOrganization.test_get_existing_organization_by_id)
self = <ProactorEventLoop running=False closed=True debug=False>
callback = <bound method BaseProtocol._on_waiter_completed of <asyncpg.protocol.protocol.Protocol object at 0x0000017287BD14F0>>
context = <_contextvars.Context object at 0x0000017288B90E00>
args = (<Future finished exception=AttributeError("'NoneType' object has no attribute 'send'")>,)

    def call_soon(self, callback, *args, context=None):
        """Arrange for a callback to be called as soon as possible.
    
        This operates as a FIFO queue: callbacks are called in the
        order in which they are registered.  Each callback will be
        called exactly once.
    
        Any positional arguments after the callback will be passed to
        the callback when it is called.
        """
>       self._check_closed()

C:\Python311\Lib\asyncio\base_events.py:761: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <ProactorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

C:\Python311\Lib\asyncio\base_events.py:519: RuntimeError

During handling of the above exception, another exception occurred:

self = <tests.test_domain.test_organization.test_organization_repository.TestGetOrganization object at 0x00000172844F5AD0>
organization_factory = <class 'tests.protocol_drivers.db_factories.OrganizationFactory'>
async_db_session = <sqlalchemy.ext.asyncio.session.AsyncSession object at 0x0000017288B85D10>

    async def test_get_existing_organization_by_id(self, organization_factory, async_db_session):
        organization = organization_factory()
        _ = organization_factory()
    
>       detected_organization = await OrganizationRepository(async_db_session).get_by_id(
            organization.id
        )

tests\test_domain\test_organization\test_organization_repository.py:11: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
app\domain\organization\organization_repository.py:21: in get_by_id
    result = await self._db.execute(select_stmt)
venv\Lib\site-packages\sqlalchemy\ext\asyncio\session.py:461: in execute
    result = await greenlet_spawn(
venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py:201: in greenlet_spawn
    result = context.throw(*sys.exc_info())
venv\Lib\site-packages\sqlalchemy\orm\session.py:2362: in execute
    return self._execute_internal(
venv\Lib\site-packages\sqlalchemy\orm\session.py:2237: in _execute_internal
    conn = self._connection_for_bind(bind)
venv\Lib\site-packages\sqlalchemy\orm\session.py:2106: in _connection_for_bind
    return trans._connection_for_bind(engine, execution_options)
<string>:2: in _connection_for_bind
    ???
venv\Lib\site-packages\sqlalchemy\orm\state_changes.py:139: in _go
    ret_value = fn(self, *arg, **kw)
venv\Lib\site-packages\sqlalchemy\orm\session.py:1189: in _connection_for_bind
    conn = bind.connect()
venv\Lib\site-packages\sqlalchemy\engine\base.py:3278: in connect
    return self._connection_cls(self)
venv\Lib\site-packages\sqlalchemy\engine\base.py:146: in __init__
    self._dbapi_connection = engine.raw_connection()
venv\Lib\site-packages\sqlalchemy\engine\base.py:3302: in raw_connection
    return self.pool.connect()
venv\Lib\site-packages\sqlalchemy\pool\base.py:449: in connect
    return _ConnectionFairy._checkout(self)
venv\Lib\site-packages\sqlalchemy\pool\base.py:1362: in _checkout
    with util.safe_reraise():
venv\Lib\site-packages\sqlalchemy\util\langhelpers.py:146: in __exit__
    raise exc_value.with_traceback(exc_tb)
venv\Lib\site-packages\sqlalchemy\pool\base.py:1300: in _checkout
    result = pool._dialect._do_ping_w_event(
venv\Lib\site-packages\sqlalchemy\engine\default.py:716: in _do_ping_w_event
    return self.do_ping(dbapi_connection)
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:1150: in do_ping
    dbapi_connection.ping()
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:801: in ping
    self._handle_exception(error)
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:782: in _handle_exception
    raise error
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:799: in ping
    _ = self.await_(self._async_ping())
venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py:132: in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py:196: in greenlet_spawn
    value = await result
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:808: in _async_ping
    await tr.start()
venv\Lib\site-packages\asyncpg\transaction.py:146: in start
    await self._connection.execute(query)
venv\Lib\site-packages\sentry_sdk\integrations\asyncpg.py:73: in _inner
    res = await f(*args, **kwargs)
venv\Lib\site-packages\asyncpg\connection.py:349: in execute
    result = await self._protocol.query(query, timeout)
asyncpg\\protocol\\protocol.pyx:375: in query
    ???
asyncpg\\protocol\\protocol.pyx:368: in asyncpg.protocol.protocol.BaseProtocol.query
    ???
asyncpg\\protocol\\coreproto.pyx:1174: in asyncpg.protocol.protocol.CoreProtocol._simple_query
    ???
asyncpg\\protocol\\protocol.pyx:967: in asyncpg.protocol.protocol.BaseProtocol._write
    ???
C:\Python311\Lib\asyncio\proactor_events.py:365: in write
    self._loop_writing(data=bytes(data))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_ProactorSocketTransport fd=2564 read=<_OverlappedFuture cancelled>>
f = None, data = b'Q\x00\x00\x00\x0bBEGIN;\x00'

    def _loop_writing(self, f=None, data=None):
        try:
            if f is not None and self._write_fut is None and self._closing:
                # XXX most likely self._force_close() has been called, and
                # it has set self._write_fut to None.
                return
            assert f is self._write_fut
            self._write_fut = None
            self._pending_write = 0
            if f:
                f.result()
            if data is None:
                data = self._buffer
                self._buffer = None
            if not data:
                if self._closing:
                    self._loop.call_soon(self._call_connection_lost, None)
                if self._eof_written:
                    self._sock.shutdown(socket.SHUT_WR)
                # Now that we've reduced the buffer size, tell the
                # protocol to resume writing if it was paused.  Note that
                # we do this last since the callback is called immediately
                # and it may add more data to the buffer (even causing the
                # protocol to be paused again).
                self._maybe_resume_protocol()
            else:
>               self._write_fut = self._loop._proactor.send(self._sock, data)
E               AttributeError: 'NoneType' object has no attribute 'send'

C:\Python311\Lib\asyncio\proactor_events.py:401: AttributeError

I also tried pytest-asyncio 0.23x but still had same problem. Does anyone have an idea what can be wrong? Why tests pass, but every time only first one in the class fails? I tried to convert it also to function tests but same problem, first test fails on event loop, rest will go through just fine. When I rerun failed test as "second" run through pycharm "Rerun failed tests" it is success.

For my session manager I same code as is in this blog:
https://medium.com/@tclaitken/setting-up-a-fastapi-app-with-async-sqlalchemy-2-0-pydantic-v2-e6c540be4308

@seifertm
Copy link
Contributor

seifertm commented Dec 6, 2024

@Saurus119 Could you share the output of pytest --setup-show, preferably with the latest version of pytest-asyncio?

@seifertm seifertm added bug needsinfo Requires additional information from the issue author labels Dec 6, 2024
@ferdimsu
Copy link

ferdimsu commented Dec 24, 2024

I faced a similar problem when using SQLAlchemy and asyncpg in a remarkably similar setup.

Setting: asyncio_default_fixture_loop_scope = function and relying on a custom implementation of event_loop worked for me

@pytest.fixture(scope="session")
def event_loop(event_loop_policy):
    # Needed to work with asyncpg
    loop = event_loop_policy.new_event_loop()
    yield loop
    loop.close()

Nevertheless, a warning is always emitted when executing the tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug needsinfo Requires additional information from the issue author
Projects
None yet
Development

No branches or pull requests

3 participants