4

I have been working on yet another API using FastAPI and trying to write test cases for the APIs, but facing error that event loop is closed.

My setup:

So, I am using asyncpg driver/library to connect to the main db and for testcases using aiosqlite driver/library to connect with db.sqlite3 file based db

factories.py (kindly ignore this as this is just to generate fake data)

import factory
from factory.enums import CREATE_STRATEGY
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession
from factory.fuzzy import FuzzyChoice
from app.models import User


class SQLAlchemyModelFactory(factory.alchemy.SQLAlchemyModelFactory):

    class Meta:
        abstract = True
        sqlalchemy_session = None
        sqlalchemy_session_persistence = "commit"


    @classmethod
    def set_session(cls, session: Session):
        """Set the session dynamically for all factories inheriting this class"""
        cls._meta.sqlalchemy_session = session


    @classmethod
    async def _save(cls, model_class, session, args, kwargs):
        """Save the model instance using an async session."""
        if not isinstance(session, AsyncSession):
            raise ValueError("AsyncSQLAlchemyModelFactory requires an AsyncSession")
        obj = model_class(*args, **kwargs)
        session.add(obj)
        await session.commit()
        return obj


    @classmethod
    async def create(cls, **kwargs):
        """Override create to be async."""
        return await cls._generate(CREATE_STRATEGY, kwargs)


class UserFactory(SQLAlchemyModelFactory):
    class Meta:
        model = User

    
    first_name = factory.Faker("first_name")
    last_name = factory.Faker("last_name")
    email = factory.Faker("email")
    password = factory.Faker('text', max_nb_chars=8)
    gender = FuzzyChoice(("male", "female", "others"))
    mobile = factory.Faker("phone_number")
    is_superuser = factory.Faker("boolean")
    is_staff = factory.Faker("boolean")
    is_active = factory.Faker("boolean")
    created_at = factory.Faker("date_time")
    updated_at = factory.Faker("date_time")
    logged_in_at = factory.Faker("date_time")
    logged_out_at = factory.Faker("date_time")

models.py:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import relationship, DeclarativeBase


# TEST_DATABASE_FILENAME = "db.sqlite3"
# SQLALCHEMY_TEST_DATABASE_URL = f"sqlite+aiosqlite:///{TEST_DATABASE_FILENAME}"
# SQLALCHEMY_DATABASE_URL="postgresql+asyncpg://myapp:password@localhost:5432/appdb"


engine = create_async_engine(SQLALCHEMY_DATABASE_URL, echo=False, pool_size=10, pool_pre_ping=True)

asyncsession = async_sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession, expire_on_commit=False)

# all the models

app/tests.py (base test class)

import asyncio
import unittest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.models import Base


class AsyncTestCaseHelper:
    @staticmethod
    async def init(engine):
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)

    @staticmethod
    async def cleanup(engine):
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.drop_all)
        await engine.dispose()
        if os.path.exists(TEST_DATABASE_FILENAME):
            os.remove(TEST_DATABASE_FILENAME)



class AsyncAPITestCase(unittest.IsolatedAsyncioTestCase):
    router = None

    @classmethod
    def setUpClass(cls):
        cls.engine = create_async_engine(
            SQLALCHEMY_TEST_DATABASE_URL,
            connect_args={"check_same_thread": False},
        )
        cls.asyncsession = async_sessionmaker(
            bind=cls.engine, class_=AsyncSession, expire_on_commit=False
        )
        
        asyncio.run(AsyncTestCaseHelper.init(cls.engine))
        

    @classmethod
    def tearDownClass(cls):
        asyncio.run(AsyncTestCaseHelper.cleanup(cls.engine))
        cls.engine = None
        cls.asyncsession = None


    async def asyncSetUp(self):
        self.db_session = self.asyncsession()
        self.transport = ASGITransport(app=self.router)
        self.client = AsyncClient(transport=self.transport, base_url="http://test")


    async def asyncTearDown(self):
        await self.db_session.rollback()
        await self.db_session.close()
        await self.client.aclose()
        await self.transport.aclose()
        await self.engine.dispose()

api/tests.py (the actual file where test cases are written)

import factories
import pytest
from starlette import status
from api.controller import apirouter
from app.tests import AsyncAPITestCase


class TestToken(AsyncAPITestCase):

    router=apirouter

    async def asyncSetUp(self):
        await super().asyncSetUp()
        factories.UserFactory.set_session(session=self.db_session)
        factories.PermissionFactory.set_session(session=self.db_session)
        factories.ContentTypeFactory.set_session(session=self.db_session)
        factories.GroupFactory.set_session(session=self.db_session)

        # Create test users
        self.correct_user = await factories.UserFactory.create(
            email="[email protected]", 
            password="test@12345"
        )
        self.incorrect_user = await factories.UserFactory.create(
            email="[email protected]", 
            password="wrongpass"
        )

        await self.db_session.commit()
        await self.db_session.refresh(self.correct_user)
        await self.db_session.refresh(self.incorrect_user)


    @pytest.mark.asyncio
    async def test_token_generation_with_invalid_credential(self):
        url = apirouter.url_path_for("token-signin")
        response = await self.client.post(url, json={
            "email": self.incorrect_user.email,
            "password": self.incorrect_user.password,
        })
        assert response.status_code == status.HTTP_401_UNAUTHORIZED


    @pytest.mark.asyncio
    async def test_token_generation_with_valid_credential(self):
        url = apirouter.url_path_for("token-signin")
        response = await self.client.post(url, json={
            "email": self.correct_user.email,
            "password": self.correct_user.password,
        })
        assert response.status_code == status.HTTP_200_OK

requirements.txt

fastapi
uvicorn
sqlalchemy
asyncpg
factory_boy
httpx
aiosqlite
pytest
pytest-asyncio
faker

I try to run my testcases: pytest api\tests.py -v

it gives beelow error(whole traceback):

self = <ProactorEventLoop running=False closed=True debug=True>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

C:\Users\Anura\miniconda3\Lib\asyncio\base_events.py:540: RuntimeError

During handling of the above exception, another exception occurred:

self = <api.tests.TestToken testMethod=test_token_generation_with_valid_credential>

    async def test_token_generation_with_valid_credential(self):
        url = apirouter.url_path_for("token-signin")
>       response = await self.client.post(url, json={
            "email": self.correct_user.email,
            "password": self.correct_user.password,
        })

api\tests.py:111:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
venv\Lib\site-packages\httpx\_client.py:1859: in post
    return await self.request(
venv\Lib\site-packages\httpx\_client.py:1540: in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
venv\Lib\site-packages\httpx\_client.py:1629: in send
    response = await self._send_handling_auth(
venv\Lib\site-packages\httpx\_client.py:1657: in _send_handling_auth
    response = await self._send_handling_redirects(
venv\Lib\site-packages\httpx\_client.py:1694: in _send_handling_redirects
    response = await self._send_single_request(request)
venv\Lib\site-packages\httpx\_client.py:1730: in _send_single_request
    response = await transport.handle_async_request(request)
venv\Lib\site-packages\httpx\_transports\asgi.py:170: in handle_async_request
    await self.app(scope, receive, send)
venv\Lib\site-packages\starlette\routing.py:714: in __call__
    await self.middleware_stack(scope, receive, send)
venv\Lib\site-packages\starlette\routing.py:734: in app
    await route.handle(scope, receive, send)
venv\Lib\site-packages\starlette\routing.py:288: in handle
    await self.app(scope, receive, send)
venv\Lib\site-packages\starlette\routing.py:76: in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
venv\Lib\site-packages\starlette\_exception_handler.py:53: in wrapped_app
    raise exc
venv\Lib\site-packages\starlette\_exception_handler.py:42: in wrapped_app
    await app(scope, receive, sender)
venv\Lib\site-packages\starlette\routing.py:73: in app
    response = await f(request)
venv\Lib\site-packages\fastapi\routing.py:301: in app
    raw_response = await run_endpoint_function(
venv\Lib\site-packages\fastapi\routing.py:212: in run_endpoint_function
    return await dependant.call(**values)
fastapi_extensions\views.py:148: in dispatch_request
    return await super().dispatch_request(request=request)
fastapi_extensions\views.py:76: in dispatch_request
    return await method(request)
api\views.py:20: in post
    user = await User.fetch(email = data.email, password = data.password, first_only=True)
fastapi_extensions\models.py:77: in fetch
    result = await session.execute(select(cls).filter_by(**kwargs))
venv\Lib\site-packages\sqlalchemy\ext\asyncio\session.py:463: in execute
    result = await greenlet_spawn(
venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py:201: in greenlet_spawn
    result = context.throw(*sys.exc_info())
venv\Lib\site-packages\sqlalchemy\orm\session.py:2365: in execute
    return self._execute_internal(
venv\Lib\site-packages\sqlalchemy\orm\session.py:2241: in _execute_internal
    conn = self._connection_for_bind(bind)
venv\Lib\site-packages\sqlalchemy\orm\session.py:2110: in _connection_for_bind
    return trans._connection_for_bind(engine, execution_options)
<string>:2: in _connection_for_bind
    ???
venv\Lib\site-packages\sqlalchemy\orm\state_changes.py:139: in _go
    ret_value = fn(self, *arg, **kw)
venv\Lib\site-packages\sqlalchemy\orm\session.py:1189: in _connection_for_bind
    conn = bind.connect()
venv\Lib\site-packages\sqlalchemy\engine\base.py:3274: in connect
    return self._connection_cls(self)
venv\Lib\site-packages\sqlalchemy\engine\base.py:146: in __init__
    self._dbapi_connection = engine.raw_connection()
venv\Lib\site-packages\sqlalchemy\engine\base.py:3298: in raw_connection
    return self.pool.connect()
venv\Lib\site-packages\sqlalchemy\pool\base.py:449: in connect
    return _ConnectionFairy._checkout(self)
venv\Lib\site-packages\sqlalchemy\pool\base.py:1363: in _checkout
    with util.safe_reraise():
venv\Lib\site-packages\sqlalchemy\util\langhelpers.py:146: in __exit__
    raise exc_value.with_traceback(exc_tb)
venv\Lib\site-packages\sqlalchemy\pool\base.py:1301: in _checkout
    result = pool._dialect._do_ping_w_event(
venv\Lib\site-packages\sqlalchemy\engine\default.py:720: in _do_ping_w_event
    return self.do_ping(dbapi_connection)
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:1163: in do_ping
    dbapi_connection.ping()
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:813: in ping
    self._handle_exception(error)
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:794: in _handle_exception
    raise error
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:811: in ping
    _ = self.await_(self._async_ping())
venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py:132: in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py:196: in greenlet_spawn
    value = await result
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:820: in _async_ping
    await tr.start()
venv\Lib\site-packages\asyncpg\transaction.py:146: in start
    await self._connection.execute(query)
venv\Lib\site-packages\asyncpg\connection.py:349: in execute
    result = await self._protocol.query(query, timeout)
asyncpg\\protocol\\protocol.pyx:375: in query
    ???
asyncpg\\protocol\\protocol.pyx:368: in asyncpg.protocol.protocol.BaseProtocol.query
    ???
asyncpg\\protocol\\coreproto.pyx:1174: in asyncpg.protocol.protocol.CoreProtocol._simple_query
    ???
asyncpg\\protocol\\protocol.pyx:967: in asyncpg.protocol.protocol.BaseProtocol._write
    ???
C:\Users\Anura\miniconda3\Lib\asyncio\proactor_events.py:366: in write
    self._loop_writing(data=bytes(data))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_ProactorSocketTransport fd=1556 read=<_OverlappedFuture cancelled created at C:\Users\Anura\miniconda3\Lib\asyncio\windows_events.py:506>>, f = None, data = b'Q\x00\x00\x00\x0bBEGIN;\x00'

    def _loop_writing(self, f=None, data=None):
        try:
            if f is not None and self._write_fut is None and self._closing:
                # XXX most likely self._force_close() has been called, and
                # it has set self._write_fut to None.
                return
            assert f is self._write_fut
            self._write_fut = None
            self._pending_write = 0
            if f:
                f.result()
            if data is None:
                data = self._buffer
                self._buffer = None
            if not data:
                if self._closing:
                    self._loop.call_soon(self._call_connection_lost, None)
                if self._eof_written:
                    self._sock.shutdown(socket.SHUT_WR)
                # Now that we've reduced the buffer size, tell the
                # protocol to resume writing if it was paused.  Note that
                # we do this last since the callback is called immediately
                # and it may add more data to the buffer (even causing the
                # protocol to be paused again).
                self._maybe_resume_protocol()
            else:
>               self._write_fut = self._loop._proactor.send(self._sock, data)
E               AttributeError: 'NoneType' object has no attribute 'send'

in short error happens here:

    async def test_token_generation_with_valid_credential(self):
        url = apirouter.url_path_for("token-signin")
>       response = await self.client.post(url, json={
            "email": self.correct_user.email,
            "password": self.correct_user.password,
        })

AttributeError: 'NoneType' object has no attribute 'send'
# which is infact due to
    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is 

Note: if i try to run a single test case then it runs but if i try to run 2 or more test cases it fails and show the above error

My attempts:

I tried to switch my test cases based on pure pytest and pytest_asyncio(using fixtures and all) way still having the same issue with that too but i noticed if i use poolclass=NullPool in my both of the connections (asyncpg and aiosqlite) it somehow works but setting asyncpg pool to NullPool won't be the ideal for the production. since creating a new connection for each request introduces slight delays and is generally not a good practice due to its drawbacks.

Kindly let me know if you need anything from me like my attempts with pytest and pytest_asyncio and all

thanks for your patience and reading this whole thing :)

11
  • This is because you are not decorating your test cases with pytest.mark.asyncio so your event loop is closed when the first test case is completed. This is why when you run a single test case, it works fine. Commented May 11 at 14:03
  • Hi @defalt. Sir Thanks for pointing this out. I already have tried with this but still the same issue of event loop getting closed Commented May 11 at 16:03
  • Try setting the scope of the event loop to session. pytest.mark.asyncio(loop_scope='session') Commented May 11 at 19:35
  • @defalt Sir I tried this as well but same issue persists Commented May 12 at 5:40
  • I am gone through similar question but None of the solution worked for me. e.g RuntimeError: Event loop is closed" when using pytest-asyncio to test FastAPI routes, pytest-asyncio has a closed event loop, but only when running all tests and more Commented May 12 at 5:55

1 Answer 1

1

Without the full code snippets, it's not easy to fully reproduce the problem. Furthermore, based on usage of ProactorEventLoop , it looks like the code is running on a Windows machine, so anyone running on a Unix based system may get different behavior. However, based on the description, I would recommend the following:

  • Using a session-scoped pytest fixture, for managing the event loop (to @defalt 's point), which ensures the same event loop will be used.
import asyncio

@pytest.fixture(scope="session", autouse=True)
def event_loop():
    loop = asyncio.get_event_loop_policy().new_event_loop()
    asyncio.set_event_loop(loop)
    yield loop
    loop.close()
  • Migrate to consistently using pytest , rather than having a mix of unittest and pytest , as the two test frameworks maintain the event loops differently (and to the point above, the intention is that event_loop fixture controls the loop and policy). This should be a matter of converting the corresponding asyncSetup and asyncTearDown methods into a properly scoped e.g; "module", fixture.

  • Decorate the asynchronous test functions with @pytest.mark.asyncio , or have the asyncio_mode pytest configurations set explicitly in a pyproject.toml or pytest.ini , if those are utilized in the project.

After trying the changes detailed above, re-run the tests and report back on if it mitigates the issue. The alternative would be migrating fully to unittest if that was the preferred approach - the same concepts would apply with regards to event loop maintenance being handled in one test framework, likely as part of the set up and teardown methods.

Sign up to request clarification or add additional context in comments.

1 Comment

Sir thanks for the answer. I already have tried even i tried creating fixtures with pytest as well as pytest_asyncio at session level but still the same issue of event loop getting closed

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.