o
    h                     @   s   d dl mZmZ d dlZd dlmZ ddlmZ ee	e
ejf ZdgZdedejfd	d
Zdeeef deejejf fddZdee deej fddZG dd deZdS )    )OptionalUnionN) _TensorPipeRpcBackendOptionsBase   )	constantsTensorPipeRpcBackendOptionsdevicereturnc                 C   s*   t | } | jdkrtd| j d| S )NcudazA`set_devices` expect a list of CUDA devices, but got device type .)torchr   type
ValueError)r    r   q/var/www/html/construction_image-detection-poc/venv/lib/python3.10/site-packages/torch/distributed/rpc/options.py
_to_device   s   

r   
device_mapc                 C   sj   i }i }|   D ]*\}}t|t|}}||v r*td| d||  d| |||< |||< q|S )Nz9`device_map` only supports 1-to-1 mapping, trying to map  and  to )itemsr   r   r   )r   full_device_mapreverse_mapkvr   r   r   _to_device_map   s"   
r   devicesc                 C   s   t tt| S )N)listmapr   )r   r   r   r   _to_device_list*   s   r   c                       s   e Zd ZdZejejejddddddede	de
deee
eeef f  deee  d	ee d
ee f fddZde
deeef f fddZdee fddZ  ZS )r   a'  
    The backend options for
    :class:`~torch.distributed.rpc.TensorPipeAgent`, derived from
    :class:`~torch.distributed.rpc.RpcBackendOptions`.

    Args:
        num_worker_threads (int, optional): The number of threads in the
            thread-pool used by
            :class:`~torch.distributed.rpc.TensorPipeAgent` to execute
            requests (default: 16).
        rpc_timeout (float, optional): The default timeout, in seconds,
            for RPC requests (default: 60 seconds). If the RPC has not
            completed in this timeframe, an exception indicating so will
            be raised. Callers can override this timeout for individual
            RPCs in :meth:`~torch.distributed.rpc.rpc_sync` and
            :meth:`~torch.distributed.rpc.rpc_async` if necessary.
        init_method (str, optional): The URL to initialize the distributed
            store used for rendezvous. It takes any value accepted for the
            same argument of :meth:`~torch.distributed.init_process_group`
            (default: ``env://``).
        device_maps (Dict[str, Dict], optional): Device placement mappings from
            this worker to the callee. Key is the callee worker name and value
            the dictionary (``Dict`` of ``int``, ``str``, or ``torch.device``)
            that maps this worker's devices to the callee worker's devices.
            (default: ``None``)
        devices (List[int, str, or ``torch.device``], optional): all local
            CUDA devices used by RPC agent. By Default, it will be initialized
            to all local devices from its own ``device_maps`` and corresponding
            devices from its peers' ``device_maps``. When processing CUDA RPC
            requests, the agent will properly synchronize CUDA streams for
            all devices in this ``List``.
    N)num_worker_threadsrpc_timeoutinit_methoddevice_mapsr   _transports	_channelsr   r    r!   r"   r   r#   r$   c          
   	      sN   |d u ri ndd |  D }|d u rg nt|}	t |||||||	 d S )Nc                 S   s   i | ]	\}}|t |qS r   )r   ).0r   r   r   r   r   
<dictcomp>^   s    z8TensorPipeRpcBackendOptions.__init__.<locals>.<dictcomp>)r   r   super__init__)
selfr   r    r!   r"   r   r#   r$   full_device_mapsfull_device_list	__class__r   r   r(   P   s   z$TensorPipeRpcBackendOptions.__init__tor   c              	      sz   t |}t j}||v r4| D ]#\}}||| v r3||| | kr3td| d| d|| |  qt || dS )a0  
        Set device mapping between each RPC caller and callee pair. This
        function can be called multiple times to incrementally add
        device placement configurations.

        Args:
            to (str): Callee name.
            device_map (Dict of int, str, or torch.device): Device placement
                mappings from this worker to the callee. This map must be
                invertible.

        Example:
            >>> # xdoctest: +SKIP("distributed")
            >>> # both workers
            >>> def add(x, y):
            >>>     print(x)  # tensor([1., 1.], device='cuda:1')
            >>>     return x + y, (x + y).to(2)
            >>>
            >>> # on worker 0
            >>> options = TensorPipeRpcBackendOptions(
            >>>     num_worker_threads=8,
            >>>     device_maps={"worker1": {0: 1}}
            >>> # maps worker0's cuda:0 to worker1's cuda:1
            >>> )
            >>> options.set_device_map("worker1", {1: 2})
            >>> # maps worker0's cuda:1 to worker1's cuda:2
            >>>
            >>> rpc.init_rpc(
            >>>     "worker0",
            >>>     rank=0,
            >>>     world_size=2,
            >>>     backend=rpc.BackendType.TENSORPIPE,
            >>>     rpc_backend_options=options
            >>> )
            >>>
            >>> x = torch.ones(2)
            >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1))
            >>> # The first argument will be moved to cuda:1 on worker1. When
            >>> # sending the return value back, it will follow the invert of
            >>> # the device map, and hence will be moved back to cuda:0 and
            >>> # cuda:1 on worker0
            >>> print(rets[0])  # tensor([2., 2.], device='cuda:0')
            >>> print(rets[1])  # tensor([2., 2.], device='cuda:1')
        z=`set_device_map` only supports 1-to-1 mapping, trying to map r   r   N)r   r'   r"   r   r   _set_device_map)r)   r.   r   r   curr_device_mapsr   r   r,   r   r   set_device_mapk   s    -
z*TensorPipeRpcBackendOptions.set_device_mapc                 C   s   t || _dS )ab  
        Set local devices used by the TensorPipe RPC agent. When processing
        CUDA RPC requests, the TensorPipe RPC agent will properly synchronize
        CUDA streams for all devices in this ``List``.

        Args:
            devices (List of int, str, or torch.device): local devices used by
                the TensorPipe RPC agent.
        N)r   r   )r)   r   r   r   r   set_devices   s   
z'TensorPipeRpcBackendOptions.set_devices)__name__
__module____qualname____doc__rpc_contantsDEFAULT_NUM_WORKER_THREADSDEFAULT_RPC_TIMEOUT_SECDEFAULT_INIT_METHODintfloatstrr   dict
DeviceTyper   r(   r1   r2   __classcell__r   r   r,   r   r   .   s4    $
	:)typingr   r   r   torch._C._distributed_rpcr    r   r7   r;   r=   r   r?   __all__r   r>   r   r   r   r   r   r   r   r   <module>   s   


