di: pythonic dependency injection

In this article, I would like to introduce di, a dependency injection framework that seeks to be:
  • Intuitive: simple things are easy, complex things are possible
  • Succinct: low boilerplate, can run your code directly via autowiring
  • Correct: tested with MyPy; exports valid type annotations
  • Powerful: with lifecycles, shared dependencies and a flexible API
  • Performant: supporting async dependencies and concurrent execution
  • Introduction to dependency injection
    Dependency injection is a technique for organizing larger applications by handing over control of your dependencies to "something" (often a framework) that assembles them for you.
    It's a complicated topic, so maybe it's better to just look at an example.
    We'll build our example around a pretty common scenario: you have a class that needs to make web requests, so it needs an HTTP client (we'll use httpx).
    from httpx import Client
    
    
    class WeatherClient:
        def __init__(self) -> None:
            self.client = Client()
        def get_temp(self, lat: float, long: float) -> float:
            self.client.get(...)
            ...
    This class does not use dependency injection.
    The problem arises when we want to test this class: we probably don't want to make real web requests in our unit test.
    Luckily, httpx offers the ability to use mock transports:
    import httpx
    
    def handler(request: httpx.Request) -> httpx.Response:
        return httpx.Response(200, json={"temp": 43.21})
    
    client = httpx.Client(transport=httpx.MockTransport(handler))
    But we have a problem: how do we get our client using a mock transport into our WeatherClient?
    One approach often used is monkey patching.
    While monkey patching has its use cases, it can also lead to a lot of very confusing problems and messy code, especially in a larger project.
    Dependency injection gives us an alternative: have WeatherClient accept an instance of httpx.Client:
    from typing import Optional
    
    from httpx import Client
    
    
    class WeatherClient:
        def __init__(self, client: Optional[Client] = None) -> None:
            self.client = client or Client()
    This form of dependency injection is called constructor injection because we inject the dependency in the constructor / __init__ method.
    In this case, we chose to add a None default so that we can detect when the caller did not include a Client and so we can make a default one.
    This is largely a matter of preference, and depends on the project you are working on.
    For a library, this is probably a good idea.
    For an application, maybe not so much.
    Note: did you notice that httpx.Client itself uses dependency injection for the transport parameter? Neat huh!
    The problems
    As per above, dependency injection is not as crazy as it sounds, it can be simple.
    And you're probably already using it in libraries like httpx.
    But what are the downsides?
    Well the main issue is that when you end up with deeply nested / a lot of dependencies, as would tend to happen in a larger project, you can end up with some really ugly constructor calls, or you may need the same dependency in multiple places, which means you need a temporary variable, etc.
    Basically, the code to assemble and keep track of all of these dependencies can become a headache in and of itself.
    Enter: dependency injection frameworks.
    Dependency injection frameworks seek to solve this problem by abstracting away the boilerplate of creating and connecting dependencies together.
    Dependency injection frameworks
    Although dependency injection frameworks come in many shapes and sizes, in principle they all provide similar functionality:
  • Register dependencies and providers for those dependencies (i.e. tell the framework how to build your dependencies)
  • Inject your dependencies for you
  • Some frameworks also have more advanced features such as:
  • Autowiring: transitive dependencies are discovered from parameter names or type annotations
  • Lifecycles: the ability to hook into creation, deletion and error events during the lifecycle of your dependency.
  • Scopes: singleton dependencies or dependencies that are re-used within each call if they are needed in multiple places.
  • di is one of the latter frameworks, offering all of these features and more.
    Using di
    Let's start with a simple example.
    We want to build a web app that uses our WeatherClient.
    import os
    from dataclasses import dataclass, field
    
    from xyz import App
    
    
    def url_from_env() -> str:
        return os.environ["WEATHER_URL"]
    
    
    @dataclass
    class AppConfig:
         weather_url: str = field(default_factory=url_from_env)
    
    
    class MyApp:
        def __init__(self, weather_client: WeatherClient) -> None:
        self.weather_client = weather_client
    Building this by hand might look something like:
    app = App(
        weather_client=WeatherClient(
            client=httpx.Client(
                ...
            )
        ),
        config=AppConfig(
            ...
        )
    )
    Using di:
    from di import Container, Dependant
    
    
    container = Container()
    app = container.execute_sync(
        container.solve(Dependant(App))
    )
    Notice that it doesn't matter how deep the dependencies go or how many there are, we only ever said "we want an App instance".
    What about the injection of the Client?
    client = Client(
        transport=MockTransport(handler=handler)
    )
    container.bind(Dependant(lambda: client), Client)
    app = container.execute_sync(
        container.solve(Dependant(App))
    )
    Again, the important thing here is that Client could be nested 10 layers of dependencies deep and the above code would not change at all.
    Advanced functionality
    Aside from the basic usage above, di support a lot more functionality.
    Here are some snippets demonstrating the functionality, but for full fledged examples and more explanation, see our docs.
    Dependency sharing
    from di import Container, Depends, Dependant
    
    def func() -> object:
        return object()
    
    def dependant(
        one: object = Depends(func),
        two: object = Depends(func),
        three: object = Depends(func, share=False)
    ) -> None:
        assert one is two and two is not three
    
    container = Container()
    container.execute_sync(container.solve(Dependant(dependant)))
    In this example, func is called twice (by default in two separate threads), once for one and two, which share the same value, and once for three.
    Scopes and lifecycles
    from typing import Generator
    
    from di import Container, Depends, Dependant
    
    def func() -> Generator[int, None, None]:
       print("func startup")
       yield 1
       print("func shutdown")
    
    
    def dependant(v: int = Depends(func, scope=1234)) -> int:
        print("computing")
        return v + 1
    
    container = Container()
    with container.enter_local_scope(1234):
        print("enter scope")
        res = container.execute_sync(
           container.solve(Dependant(dependant))
        )
        print("exit scope")
    
    assert res == 2
    This example will print:
    enter scope
    func startup
    computing
    func shutdown
    exit scope
    And is equivalent to:
    from contextlib import contextmanager
    
    
    with contextmanager(func)() as value:
        dependant(v=value)
    Scopes in di are arbitrary: any hashable value will do.
    As long as you are consistent in declaring scopes, di will not care what you use. If "you" are a framework calling into user code, this gives you the freedom to define and manage your own scopes. For a web framework, these might be "app" and "request". For a CLI, there might be a single "call" scope. For a TUI, there could be an "app" and "event scope"
    Solved dependencies
    Often, you have to run the same dependency multiple times but aren't dynamically changing the dependency graph every execution (you would be dynamically changing the graph if you were doing container.bind between each call).
    In these scenarios, di lets you save the entire computed dependency graph into a SavedDependency and execute the same graph multiple times.
    In fact, this is what we've been doing already every time we called container.solve.
    # solving locks in binds, but not cached values or scopes
    solved = container.solve(Dependant(dependency))
    res1 = container.execute_sync(solved)
    res2 = container.execute_sync(solved)
    Async execution
    Simply swap out execute_sync for execute_async for async support:
    from di import Container, Dependant
    
    
    async def dep() -> int:
        return 1
    
    async def main() -> None:
        container = Container()
        solved = container.solve(Dependant(dep))
        res = await container.execute_async(solved)
        assert res == 1
    Like sync execution, async execution supports arbitrary callables as well as generator functions that get used as context managers.
    DependantProtocol
    The DependantProtocol interface provides an abstract representation of a dependency specification.
    The Container uses this abstract representation to build dependency graphs.
    This means that you can easily customize a lot of the behavior in di by overriding this API.
    If you want to use some of the built in functionality and just tweak it slightly, you can inherit from di.Dependant, but you don't have to.
    For example, to accept a default value instead of a callable:
    from di import Dependant
    from di.types.providers import DependencyType
    from di.types.scopes import Scope
    
    class DefaultDependant(Dependant[DependencyType]):
        def __init__(
            self,
            default: DependencyType,
            scope: Scope,
            shared: bool,
        ) -> None:
            super().__init__(
                call=lambda: default,
                scope=scope,
                share=share,
             )
    Now if there is no bind / override on the dependency, it will execute just like if it were being called directly.
    Executors
    One of the design principles of di is to isolate the work of the container into distinct steps:
  • Wiring, which can be controlled via the DependantProtocol
  • Solving, which is owned by the container
  • Execution, which is managed via the Executor API
  • To this end, di allows you to provide your own executor, which only needs to take a list of groups of tasks (callable functions that take no arguments) and execute them in the specified order.
    The Executor API is as follows:
    import typing
    
    ResultType = typing.TypeVar("ResultType")
    
    Task = typing.Union[
        typing.Callable[[], None],
        typing.Callable[[], typing.Awaitable[None]]
    ]
    
    
    class SyncExecutor(typing.Protocol):
        def execute_sync(
            self,
            tasks: typing.List[typing.List[Task]],
            get_result: typing.Callable[[], ResultType],
        ) -> ResultType:
            ...
    
    
    class AsyncExecutor(typing.Protocol):
        async def execute_async(
            self,
            tasks: typing.List[typing.List[Task]],
            get_result: typing.Callable[[], ResultType],
        ) -> ResultType:
            ...
    The default executor (di.executors.DefaultExecutor) supports both sync and async execution.
    Dependencies are executed concurrently (via threads in sync execution and tasks in async execution) for dependencies that can be executed in concurrently.
    A simple SyncExecutor implementation might look like:
    class SimpleExecutor(SyncExecutor):
        def execute_sync(
            self,
            tasks: typing.List[typing.List[Task]],
            get_result: typing.Callable[[], ResultType],
        ) -> ResultType:
            for group in tasks:
                for task in group:
                    res = task()
                    if inspect.isawaitable(res):
                        raise TypeError
             return get_result()
    Note that SyncExecutor may be handed async functions as tasks.
    The sync/async portion refers to the API of the executor itself, not the the tasks it may be handed.
    It is up to the implementation to raise an exception or execute the async task somehow.
    Integration into other libraries
    While di should be simple enough that it can be used by end users, it shines when integrated into a framework or library that fully embraces inversion of control.
    To this end, we provide sample integrations, currently only for Starlette and Textual, but more ideas are welcome!
    Performance
    The results strongly depend on the size of the DAG.
    For a relatively large but not completely absurd DAG of 12 vertices with each vertex having 0-3 edges, di performs slightly faster than FastAPI (a couple percent per request).
    For larger DAGs, especially where di is able to execute dependencies concurrently, di can be 10x faster than FastAPI.
    To be fair, this is not a real world scenario, and FastAPI does a lot more than the Starlette di integration is doing (keeping track of stuff for OpenAPI, etc.).
    If these benchmarks are unfair for other reasons, please let me know.
    Conclusion
    Dependency injection and inversion of control are powerful techniques, but it can get complicated.
    Python has shown time and time again that it can take complicated things and make them simple for users.
    FastAPI is one of the best examples of this.
    I hope that di can help other libraries, frameworks or projects use inversion of control and dependency injection without having to go through the pain of writing their own dependency injection system.

    25

    This website collects cookies to deliver better user experience

    di: pythonic dependency injection