-
-
Notifications
You must be signed in to change notification settings - Fork 694
Open
Labels
bugSomething isn't workingSomething isn't working
Description
- Problem: uvloop (e.g., 0.21.0) fails to install/compile/run on Python 3.14t.
- Impact: Any code path that tries to set uvloop policy will error.
- Only solution until uvloop supports 3.14t: use stdlib asyncio loops
- Unix/macOS: SelectorEventLoop (DefaultEventLoopPolicy)
- Windows: ProactorEventLoop (WindowsProactorEventLoopPolicy)
Code example (swarms/structs/multi_agent_exec.py)
Old code (full, uvloop/winloop):
def run_agents_concurrently_uvloop(
agents: List[AgentType],
task: str,
max_workers: Optional[int] = None,
) -> List[Any]:
"""
Run multiple agents concurrently using optimized async performance with uvloop/winloop.
"""
# Platform-specific event loop policy setup
if sys.platform in ("win32", "cygwin"):
# Windows: Try to use winloop
try:
import winloop
asyncio.set_event_loop_policy(winloop.EventLoopPolicy())
logger.info("Using winloop for enhanced Windows performance")
except ImportError:
logger.warning(
"winloop not available, falling back to standard asyncio. "
"Install winloop with: pip install winloop"
)
except RuntimeError as e:
logger.warning(
f"Could not set winloop policy: {e}. Using default asyncio."
)
else:
# Linux/macOS: Try to use uvloop
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logger.info("Using uvloop for enhanced Unix performance")
except ImportError:
logger.warning(
"uvloop not available, falling back to standard asyncio. "
"Install uvloop with: pip install uvloop"
)
except RuntimeError as e:
logger.warning(
f"Could not set uvloop policy: {e}. Using default asyncio."
)
if max_workers is None:
# Use 95% of available CPU cores for optimal performance
num_cores = os.cpu_count()
max_workers = int(num_cores * 0.95) if num_cores else 1
logger.info(
f"Running {len(agents)} agents concurrently with uvloop (max_workers: {max_workers})"
)
async def run_agents_async():
results = []
def run_agent_sync(agent: AgentType) -> Any:
return agent.run(task=task)
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
tasks = [
loop.run_in_executor(executor, run_agent_sync, agent)
for agent in agents
]
completed_tasks = await asyncio.gather(
*tasks, return_exceptions=True
)
for result in completed_tasks:
results.append(result)
return results
try:
return asyncio.run(run_agents_async())
except RuntimeError as e:
if "already running" in str(e).lower():
loop = asyncio.get_event_loop()
return loop.run_until_complete(run_agents_async())
else:
raise
New code (full, stdlib asyncio policies):
def run_agents_concurrently_uvloop(
agents: List[AgentType],
task: str,
max_workers: Optional[int] = None,
) -> List[Any]:
"""
Run multiple agents concurrently using stdlib asyncio policies.
"""
# Platform-specific event loop policy setup (stdlib asyncio only)
try:
if sys.platform in ("win32", "cygwin"):
# import winloop # not used on Python 3.14t
# asyncio.set_event_loop_policy(winloop.EventLoopPolicy())
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
logger.info("Using stdlib WindowsProactorEventLoopPolicy for Windows")
else:
# import uvloop # not used on Python 3.14t
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
logger.info("Using stdlib DefaultEventLoopPolicy for Unix-like systems")
except RuntimeError as e:
logger.warning(
f"Could not set asyncio policy: {e}. Continuing with existing policy."
)
if max_workers is None:
# Use 95% of available CPU cores for optimal performance
num_cores = os.cpu_count()
max_workers = int(num_cores * 0.95) if num_cores else 1
logger.info(
f"Running {len(agents)} agents concurrently with stdlib asyncio (max_workers: {max_workers})"
)
async def run_agents_async():
results = []
def run_agent_sync(agent: AgentType) -> Any:
return agent.run(task=task)
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
tasks = [
loop.run_in_executor(executor, run_agent_sync, agent)
for agent in agents
]
completed_tasks = await asyncio.gather(
*tasks, return_exceptions=True
)
for result in completed_tasks:
results.append(result)
return results
try:
return asyncio.run(run_agents_async())
except RuntimeError as e:
if "already running" in str(e).lower():
loop = asyncio.get_event_loop()
return loop.run_until_complete(run_agents_async())
else:
raise
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working