1 回答

TA貢獻(xiàn)1804條經(jīng)驗(yàn) 獲得超8個(gè)贊
好的,明白了,該async_handler方法必須在測(cè)試的早期定義...
這現(xiàn)在有效:
"""Event emitter playground"""
import asyncio
import logging
import pytest
from pyee import AsyncIOEventEmitter
LOG = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_setup(event_loop):
"""Receive event from emitter and complete future!"""
LOG.info("1 - start")
event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())
@event_emitter.on("event")
def async_handler(message):
LOG.info(">>> %s", message)
future_result.set_result(message)
future_result = event_loop.create_future()
LOG.info("2 - emit event")
event_emitter.emit("event", "Hi")
LOG.info(await future_result)
添加回答
舉報(bào)