Source code for ezmsg.sigproc.util.asio

import asyncio
from concurrent.futures import ThreadPoolExecutor
import contextlib
import inspect
import threading
from typing import Any, Coroutine, TypeVar

T = TypeVar("T")


[docs] class CoroutineExecutionError(Exception): """Custom exception for coroutine execution failures""" pass
[docs] def run_coroutine_sync(coroutine: Coroutine[Any, Any, T], timeout: float = 30) -> T: """ Executes an asyncio coroutine synchronously, with enhanced error handling. Args: coroutine: The asyncio coroutine to execute timeout: Maximum time in seconds to wait for coroutine completion (default: 30) Returns: The result of the coroutine execution Raises: CoroutineExecutionError: If execution fails due to threading or event loop issues TimeoutError: If execution exceeds the timeout period Exception: Any exception raised by the coroutine """ def run_in_new_loop() -> T: """ Creates and runs a new event loop in the current thread. Ensures proper cleanup of the loop. """ new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) try: return new_loop.run_until_complete( asyncio.wait_for(coroutine, timeout=timeout) ) finally: with contextlib.suppress(Exception): # Clean up any pending tasks pending = asyncio.all_tasks(new_loop) for task in pending: task.cancel() new_loop.run_until_complete( asyncio.gather(*pending, return_exceptions=True) ) new_loop.close() try: loop = asyncio.get_running_loop() except RuntimeError: try: return asyncio.run(asyncio.wait_for(coroutine, timeout=timeout)) except Exception as e: raise CoroutineExecutionError( f"Failed to execute coroutine: {str(e)}" ) from e if threading.current_thread() is threading.main_thread(): if not loop.is_running(): try: return loop.run_until_complete( asyncio.wait_for(coroutine, timeout=timeout) ) except Exception as e: raise CoroutineExecutionError( f"Failed to execute coroutine in main loop: {str(e)}" ) from e else: with ThreadPoolExecutor() as pool: try: future = pool.submit(run_in_new_loop) return future.result(timeout=timeout) except Exception as e: raise CoroutineExecutionError( f"Failed to execute coroutine in thread: {str(e)}" ) from e else: try: future = asyncio.run_coroutine_threadsafe(coroutine, loop) return future.result(timeout=timeout) except Exception as e: raise CoroutineExecutionError( f"Failed to execute coroutine threadsafe: {str(e)}" ) from e
[docs] class SyncToAsyncGeneratorWrapper: """ A wrapper for synchronous generators to be used in an async context. """
[docs] def __init__(self, gen): self._gen = gen self._closed = False # Prime the generator to ready for first send/next call try: is_not_primed = inspect.getgeneratorstate(self._gen) is inspect.GEN_CREATED except AttributeError as e: raise TypeError( "The provided generator is not a valid generator object" ) from e if is_not_primed: try: next(self._gen) except StopIteration: self._closed = True except Exception as e: raise RuntimeError(f"Failed to prime generator: {e}") from e
[docs] async def asend(self, value): if self._closed: raise StopAsyncIteration("Generator is closed") try: return await asyncio.to_thread(self._gen.send, value) except StopIteration as e: self._closed = True raise StopAsyncIteration("Generator is closed") from e except Exception as e: raise RuntimeError(f"Error while sending value to generator: {e}") from e
async def __anext__(self): if self._closed: raise StopAsyncIteration("Generator is closed") try: return await asyncio.to_thread(self._gen.__next__) except StopIteration as e: self._closed = True raise StopAsyncIteration("Generator is closed") from e except Exception as e: raise RuntimeError( f"Error while getting next value from generator: {e}" ) from e
[docs] async def aclose(self): if self._closed: return try: await asyncio.to_thread(self._gen.close) except Exception as e: raise RuntimeError(f"Error while closing generator: {e}") from e finally: self._closed = True
def __aiter__(self): return self def __getattr__(self, name): return getattr(self._gen, name)