o
    h^                  
   @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZmZm	Z	m
Z
 d dlZd dlmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZ d dlm Z  ddl!m"Z"m#Z# ddl$m%Z%m&Z& ddl'm(Z(m)Z)m*Z*m+Z+ g d	Z,e-e.Z/d
Z0e)a1ej2dd Z3dd Z4G dd dZ5e6 a7e6e e8d< e9 Z:i Z;e<e=e>f e8d< e ?e5Z@e j?e8d< dd ZAdMddZBdd ZCeD ZEej2dd ZFe4de&fdeGfddZHe4dd  ZIe4e%fd!d"ZJe4d
e%fd#d$ZKd%d& ZLe4dMd'd(ZMd)d* ZNdNd+eOfd,d-ZPe&d
fdeGd+eOfd.d/ZQe
d0ZReeR ZSe	r*G d1d2 d2eeR eeR ZTn.zG d3d2 d2eeeR ZTW n  eUyW   G d4d5 d5ejVeSjVZWG d6d2 d2eeSeWd7ZTY nw d8d9 ZXeYeD ]3\ZZZ[eZ\d:rreZd;krrqa	 e]e[d<dZ^e^dusJ d=e^_d>d?Z^eXeZe^Z`eaeTeZe` qae4dde&fd@dAZbdde&fdBeGfdCdDZce4dde&fdeGfdEdFZde4dde&fdGdHZedIdJ ZfdKdL ZgdS )O    N)AnyGenericTYPE_CHECKINGTypeVar)_cleanup_python_rpc_handler)_delete_all_user_and_unforked_owner_rrefs_destroy_rref_context_get_current_rpc_agent_invoke_remote_builtin_invoke_remote_python_udf_invoke_remote_torchscript_invoke_rpc_builtin_invoke_rpc_python_udf_invoke_rpc_torchscript_is_current_rpc_agent_set_reset_current_rpc_agent_set_and_start_rpc_agentget_rpc_timeoutPyRRefRemoteProfilerManagerTensorPipeAgent
WorkerInfo)Future   )_group_membership_management_update_group_membership)DEFAULT_SHUTDOWN_TIMEOUTUNSET_RPC_TIMEOUT)_build_rpc_profiling_key_internal_rpc_pickler	PythonUDFRPCExecMode)	shutdownget_worker_inforemoterpc_sync	rpc_asyncRRefAllGatherStatesmethod_factory
new_methodTc                 c   s    | a zdV  W ta dS ta w )zX
    rpc_pickler: (.internal._InternalRPCPickler) Overrides the default RPC pickler
    N)_default_picklerr   )rpc_pickler r-   m/var/www/html/construction_image-detection-poc/venv/lib/python3.10/site-packages/torch/distributed/rpc/api.py_use_rpc_picklerI   s
   r/   c                    s   t   fdd}|S )Nc                     s   t  std | i |S )NzHRPC has not been initialized. Call torch.distributed.rpc.init_rpc first.)r   RuntimeError)argskwargsfuncr-   r.   wrapperW   s
   z%_require_initialized.<locals>.wrapper)	functoolswraps)r4   r5   r-   r3   r.   _require_initializedV   s   r8   c                   @   s   e Zd Zdd ZdS )r(   c                 C   s   i | _ t | _d S N)gathered_objects	threadingEventproceed_signal)selfr-   r-   r.   __init__d   s   
zAllGatherStates.__init__N)__name__
__module____qualname__r?   r-   r-   r-   r.   r(   c   s    r(   _ALL_WORKER_NAMES_all_gather_sequence_id!_all_gather_sequence_id_to_statesc                 C   s,   |   }dd |D at st|  d S d S )Nc                 S   s   h | ]}|j qS r-   )name).0worker_infor-   r-   r.   	<setcomp>   s    z#_init_rpc_states.<locals>.<setcomp>)get_worker_infosrC   r   r   )agentworker_infosr-   r-   r.   _init_rpc_states~   s
   rM   c                 C   s   t F |st}||v sJ | dt|  }||jvs%J | d|  d||j|< |t|j kr@|j  W d    d S W d    d S 1 sKw   Y  d S )Nz is not expected by leader.z reported intent sequence id z twice. )_all_gather_dict_lockrC   rE   r:   setkeysr=   )sequence_idworker_nameobjworker_namesstatesr-   r-   r.   _gather_to_leader   s    

"rV   c                 C   sZ   t  t|  }W d    n1 sw   Y  |j r#J d|  d||_|j  d S )NzTermination signal sequence id z got set twice.)rN   rE   r=   is_setr:   rO   )rQ   objects_maprU   r-   r-   r.   _broadcast_to_followers   s   

rY   c                   c   sP    g t _zdV  W ztjt j W t `dS t `w ztjt j W t `w t `w )a|  
    A context manager that collects all futures returned by ``rpc_async`` and
    waits them on the context manager's exit; relieving the user of needing
    to explicitly call wait.


    Example::
        >>> # xdoctest: +SKIP("distributed")
        >>> # On worker 0:
        >>> import torch
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker0", rank=0, world_size=2)
        >>> with rpc._wait_all():
        >>>    fut_1 = rpc.rpc_async(dst, torch.add, (torch.ones(2, 2), 1))
        >>>    fut_2 = rpc.rpc_async(dst, torch.add, (torch.ones(2, 2), 1))
        >>> #fut_1 and fut_2 are waited on
    N)_thread_local_varfuture_listtorchfutureswait_allr-   r-   r-   r.   	_wait_all   s   r_   timeoutc                 C   s  |st dus
J dt }t|}t  j}t! dt|}t	|d}|d t|< |t
| }W d   n1 s<w   Y  ||k}|tkrOt }	d}
n|tkrX|}	d}
n| }
}	|rft||| | nt|t||| |f|	d t t| }W d   n1 sw   Y  |jj|
d |ri }||h D ]}t|t||jf|	d}|||< qg }| D ]#\}}z|  W q ty } z|||f W Y d}~qd}~ww |rtdd	d
 |D  d|	dd|d d  t t|}W d   |jS 1 sw   Y  |jS )a  
    This is similar to torch.distributed.all_gather(), but is using RPC. It
    picks the worker with the smallest name (alphabetic order) as the leader.
    Then all followers send their data ``obj`` to the leader. After the leader
    has received all, it will broadcast the results back to all followers. This
    function blocks until all workers have received the gathered results.
    Nz=`_ALL_WORKER_NAMES` is not initialized for `def _all_gather`. r   r   r1   r`   r`   z
Followers c                 S   s   g | ]}|d  qS )r   r-   )rG   er-   r-   r.   
<listcomp>  s    z_all_gather.<locals>.<listcomp>z  timed out in _all_gather after z.2fz! seconds. The first exception is )rC   minr	   r#   rF   rN   joinsortedrD   getstrr   r   r   rV   r%   rE   r=   waitr&   rY   r:   itemsr0   appendpop)rS   rT   r`   leader_name	self_nameconcat_namessequence_numrQ   	is_leaderrpc_timeoutsignal_timeoutrU   #worker_name_to_response_future_dictfollower_namefuterrorsexr-   r-   r.   _all_gather   s   	





r{   c              
   C   sH   z
t dt|  W dS  ty# } ztd| W Y d}~dS d}~ww )a&  
    Synchronizes local and remote RPC processes.

    This will block until all local and remote RPC processes specified under worker_names
    reach this method to wait for all outstanding work to complete.

    Args:
        worker_names (List[str]): The set of workers to synchronize.

    Nz(Failed to complete barrier, got error %s)r{   rO   r0   loggererror)rT   rz   r-   r-   r.   _barrier  s   r~   c              
   C   s<   z	t d| d W dS  ty } ztd| |d}~ww )ag  
    Block until all local and remote RPC processes reach this method and wait
    for all outstanding work to complete. Every RPC process must call this
    method before exit to perform a graceful shutdown. This should be used to
    terminate the RPC framework, and there is no guarantee that the RPC
    framework will work after this method returns.
    Nrc   z=Failed to respond to 'Shutdown Proceed' in time, got error %s)r{   r0   r|   r}   )r`   rz   r-   r-   r.   _wait_all_workers/  s   	r   c              	   C   s   | rrzkt  }t|tr|jrt| t  |jd|d nE| }|j}t	|j
|d) | }|D ]}|j|krEt|jt|g i dfd q2|jd|d W d   n1 sWw   Y  W t  dS W t  dS W t  dS t  w t  dS )a  
    Perform a shutdown of the RPC agent, and then destroy the RPC agent. This
    stops the local agent from accepting outstanding requests, and shuts
    down the RPC framework by terminating all RPC threads. If ``graceful=True``,
    this will block until all local and remote RPC processes reach this method
    and wait for all outstanding work to complete. Otherwise, if
    ``graceful=False``, this is a local shutdown, and it does not wait for other
    RPC processes to reach this method.

    .. warning::
        For :class:`~torch.futures.Future` objects returned by
        :meth:`~torch.distributed.rpc.rpc_async`, ``future.wait()`` should not
        be called after ``shutdown()``.

    Args:
        graceful (bool): Whether to do a graceful shutdown or not. If True,
                         this will 1) wait until there is no pending system
                         messages for ``UserRRefs`` and delete them; 2) block
                         until all local and remote RPC processes have reached
                         this method and wait for all outstanding work to
                         complete.

    Example::
        Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
        on both workers. Refer to :meth:`~torch.distributed.init_process_group`
        API for more details. For example,

        export MASTER_ADDR=localhost
        export MASTER_PORT=5678

        Then run the following code in two different processes:

        >>> # xdoctest: +SKIP
        >>> # On worker 0:
        >>> import torch
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker0", rank=0, world_size=2)
        >>> # do some work
        >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
        >>> # ready to shutdown
        >>> rpc.shutdown()

        >>> # On worker 1:
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker1", rank=1, world_size=2)
        >>> # wait for worker 0 to finish work, and then shutdown.
        >>> rpc.shutdown()
    T)r"   r`   F)r1   N)r	   
isinstancer   is_static_groupr   r   rg   r#   rF   r   storerJ   r%   r   _finalize_shutdown)gracefulr`   rK   my_worker_infomy_nameall_worker_infosworkerr-   r-   r.   r"   A  s6   2




r"   c                   C   s>   zt t W t   t  t  d S t   t  t  w r9   )r   _ignore_rref_leakr	   r"   r   r   r-   r-   r-   r.   r     s   



r   c                 C   s   | dur
t  | S t   S )aI  
    Get :class:`~torch.distributed.rpc.WorkerInfo` of a given worker name.
    Use this :class:`~torch.distributed.rpc.WorkerInfo` to avoid passing an
    expensive string on every invocation.

    Args:
        worker_name (str): the string name of a worker. If ``None``, return the
                           the id of the current worker. (default ``None``)

    Returns:
        :class:`~torch.distributed.rpc.WorkerInfo` instance for the given
        ``worker_name`` or :class:`~torch.distributed.rpc.WorkerInfo` of the
        current worker if ``worker_name`` is ``None``.
    N)r	   r#   )rR   r-   r-   r.   r#     s   
r#   c                 C   s2   t | tr| S t | ttfrt| S td|  )Nz Cannot get WorkerInfo from name )r   r   rj   intr#   
ValueError)tor-   r-   r.   _to_worker_info  s
   
r   blockingc                 C   s,   t |  }|r
|S tt   }|| |S r9   )typelocal_valuer   
set_result)rrefr   	rref_typefuturer-   r-   r.   _rref_typeof_on_owner  s   

r   c                 C   s&   t |  t| f|d}|r| S |S )Nrb   )r&   ownerr   rk   )r   r`   r   rx   r-   r-   r.   _rref_typeof_on_user  s   r   Tc                   @      e Zd ZdS r'   Nr@   rA   rB   r-   r-   r-   r.   r'         r'   c                   @   r   r   r   r-   r-   r-   r.   r'     r   c                   @   r   )RRefMetaNr   r-   r-   r-   r.   r     r   r   c                   @   r   r   r   r-   r-   r-   r.   r'     r   )	metaclassc                    s    fdd}|j r||_ |S )Nc                    s   t tt|  |i |S r9   )getattrsuperr'   )r>   r1   r2   method_namer-   r.   method  s   zmethod_factory.<locals>.method)__doc__)r   	docstringr   r-   r   r.   r)     s   r)   ___str__r   z4RRef user-facing methods should all have docstrings.ztorch.distributed.rpc.PyRRefztorch.distributed.rpc.RRefc                 C   sh  t jd t jj|}t| }t }t|||t	j
|}|}	|r$|nd}|r*|ni }t|d}
|
r?|j}t|t jjr?|}|durQt|||g|R i |}n.t|t jjrlt|jt j|||
g|R i |}ntt|||\}}t|||||
}|rt j sJ |	dusJ |	| }|| W d   |S W d   |S 1 sw   Y  |S )an  
    Make a remote call to run ``func`` on worker ``to`` and return an
    :class:`~torch.distributed.rpc.RRef` to the result value immediately.
    Worker ``to`` will be the owner of the returned
    :class:`~torch.distributed.rpc.RRef`, and the worker calling ``remote`` is
    a user. The owner manages the global reference count of its
    :class:`~torch.distributed.rpc.RRef`, and the owner
    :class:`~torch.distributed.rpc.RRef` is only destructed when globally there
    are no living references to it.

    Args:
        to (str or WorkerInfo or int): name/rank/``WorkerInfo`` of the destination worker.
        func (Callable): a callable function, such as Python callables, builtin
                         operators (e.g. :meth:`~torch.add`) and annotated
                         TorchScript functions.
        args (tuple): the argument tuple for the ``func`` invocation.
        kwargs (dict): is a dictionary of keyword arguments for the ``func``
                       invocation.

        timeout (float, optional): timeout in seconds for this remote call. If the
                                   creation of this
                                   :class:`~torch.distributed.rpc.RRef` on worker
                                   ``to`` is not successfully processed on this
                                   worker within this timeout, then the next time
                                   there is an attempt to use the RRef (such as
                                   ``to_here()``), a timeout will be raised
                                   indicating this failure. A value of 0 indicates
                                   an infinite timeout, i.e. a timeout error will
                                   never be raised. If not provided, the default
                                   value set during initialization or with
                                   ``_set_rpc_timeout`` is used.

    Returns:
        A user :class:`~torch.distributed.rpc.RRef` instance to the result
        value. Use the blocking API :meth:`torch.distributed.rpc.RRef.to_here`
        to retrieve the result value locally.

    .. warning ::
        The ``remote`` API does not copy storages of argument tensors until
        sending them over the wire, which could be done by a different thread
        depending on the RPC backend type. The caller should make sure that the
        contents of those tensors stay intact until the returned RRef is
        confirmed by the owner, which can be checked using the
        :meth:`torch.distributed.rpc.RRef.confirmed_by_owner` API.

    .. warning ::
        Errors such as timeouts for the ``remote`` API are handled on a
        best-effort basis. This means that when remote calls initiated by
        ``remote`` fail, such as with a timeout error, we take a best-effort
        approach to error handling. This means that errors are handled and set
        on the resulting RRef on an asynchronous basis. If the RRef has not been
        used by the application before this handling (such as ``to_here`` or
        fork call), then future uses of the ``RRef`` will appropriately raise
        errors. However, it is possible that the user application will use the
        ``RRef`` before the errors are handled. In this case, errors may not be
        raised as they have not yet been handled.

    Example::

        Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
        on both workers. Refer to :meth:`~torch.distributed.init_process_group`
        API for more details. For example,

        export MASTER_ADDR=localhost
        export MASTER_PORT=5678

        Then run the following code in two different processes:

        >>> # xdoctest: +SKIP
        >>> # On worker 0:
        >>> import torch
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker0", rank=0, world_size=2)
        >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
        >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
        >>> x = rref1.to_here() + rref2.to_here()
        >>> rpc.shutdown()

        >>> # On worker 1:
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker1", rank=1, world_size=2)
        >>> rpc.shutdown()

        Below is an example of running a TorchScript function using RPC.

        >>> # On both workers:
        >>> @torch.jit.script
        >>> def my_script_add(tensor: torch.Tensor, scalar: int):
        >>>    return torch.add(tensor, scalar)

        >>> # On worker 0:
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker0", rank=0, world_size=2)
        >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
        >>> rref.to_here()
        >>> rpc.shutdown()

        >>> # On worker 1:
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker1", rank=1, world_size=2)
        >>> rpc.shutdown()
    ztorch.distributed.rpc_remoter-   _wrapped_async_rpc_functionN)r\   _C_log_api_usage_oncejit	_builtins_find_builtinr   _get_should_profile_enable_rpc_profilerr!   REMOTEhasattrr   r   ScriptFunctionr
   r   rF   _jit_internal_qualified_namer+   	serializer    r   autograd_profiler_enabled_call_end_callbacks_on_future_get_future_set_profiling_future)r   r4   r1   r2   r`   qualified_namedst_worker_infoshould_profilectx_managerrfis_async_execwrappedr   pickled_python_udftensorsrx   r-   r-   r.   r$   !  sh   h

	


&
&&r$   rt   c                 C   sR  t |stdtjj|}t| }t }t|||||}	|	}
|r%|nd}|r+|ni }t	|d}|r@|j
}t|tjjr@|}|d urRt|||g|R i |}n)t|tjjrht|jtj|||||}ntt|||\}}t|||||}|rtj sJ |
d usJ |
|}W d    |S W d    |S 1 sw   Y  |S )Nzfunction should be callable.r-   r   )callable	TypeErrorr\   r   r   r   r   r   r   r   r   r   r   r   r   rF   r   r   r+   r   r    r   r   r   r   )r   r4   rpc_typer1   r2   rt   r   r   r   r   r   r   r   rx   r   r   r-   r-   r.   _invoke_rpc  sd   


	


(
((r   c                 C   s(   t jd t| |tj|||}| S )a  
    Make a blocking RPC call to run function ``func`` on worker ``to``. RPC
    messages are sent and received in parallel to execution of Python code. This
    method is thread-safe.

    Args:
        to (str or WorkerInfo or int): name/rank/``WorkerInfo`` of the destination worker.
        func (Callable): a callable function, such as Python callables, builtin
                         operators (e.g. :meth:`~torch.add`) and annotated
                         TorchScript functions.
        args (tuple): the argument tuple for the ``func`` invocation.
        kwargs (dict): is a dictionary of keyword arguments for the ``func``
                       invocation.
        timeout (float, optional): timeout in seconds to use for this RPC. If
                                   the RPC does not complete in this amount of
                                   time, an exception indicating it has
                                   timed out will be raised. A value of 0
                                   indicates an infinite timeout, i.e. a timeout
                                   error will never be raised. If not provided,
                                   the default value set during initialization
                                   or with ``_set_rpc_timeout`` is used.

    Returns:
        Returns the result of running ``func`` with ``args`` and ``kwargs``.

    Example::
        Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
        on both workers. Refer to :meth:`~torch.distributed.init_process_group`
        API for more details. For example,

        export MASTER_ADDR=localhost
        export MASTER_PORT=5678

        Then run the following code in two different processes:

        >>> # xdoctest: +SKIP
        >>> # On worker 0:
        >>> import torch
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker0", rank=0, world_size=2)
        >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
        >>> rpc.shutdown()

        >>> # On worker 1:
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker1", rank=1, world_size=2)
        >>> rpc.shutdown()

        Below is an example of running a TorchScript function using RPC.

        >>> # On both workers:
        >>> @torch.jit.script
        >>> def my_script_add(tensor: torch.Tensor, scalar: int):
        >>>    return torch.add(tensor, scalar)

        >>> # On worker 0:
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker0", rank=0, world_size=2)
        >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
        >>> rpc.shutdown()

        >>> # On worker 1:
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker1", rank=1, world_size=2)
        >>> rpc.shutdown()

    ztorch.distributed.rpc_sync)r\   r   r   r   r!   SYNCrk   r   r4   r1   r2   r`   rx   r-   r-   r.   r%     s   Er%   c                 C   s:   t jd t| |tj|||}ttdrtj	| |S )aH  
    Make a non-blocking RPC call to run function ``func`` on worker ``to``. RPC
    messages are sent and received in parallel to execution of Python code. This
    method is thread-safe. This method will immediately return a
    :class:`~torch.futures.Future` that can be awaited on.

    Args:
        to (str or WorkerInfo or int): name/rank/``WorkerInfo`` of the destination worker.
        func (Callable): a callable function, such as Python callables, builtin
                         operators (e.g. :meth:`~torch.add`) and annotated
                         TorchScript functions.
        args (tuple): the argument tuple for the ``func`` invocation.
        kwargs (dict): is a dictionary of keyword arguments for the ``func``
                       invocation.
        timeout (float, optional): timeout in seconds to use for this RPC. If
                                   the RPC does not complete in this amount of
                                   time, an exception indicating it has
                                   timed out will be raised. A value of 0
                                   indicates an infinite timeout, i.e. a timeout
                                   error will never be raised. If not provided,
                                   the default value set during initialization
                                   or with ``_set_rpc_timeout`` is used.


    Returns:
        Returns a :class:`~torch.futures.Future` object that can be waited
        on. When completed, the return value of ``func`` on ``args`` and
        ``kwargs`` can be retrieved from the :class:`~torch.futures.Future`
        object.

    .. warning ::
        Using GPU tensors as arguments or return values of ``func`` is not
        supported since we don't support sending GPU tensors over the wire. You
        need to explicitly copy GPU tensors to CPU before using them as
        arguments or return values of ``func``.

    .. warning ::
        The ``rpc_async`` API does not copy storages of argument tensors until
        sending them over the wire, which could be done by a different thread
        depending on the RPC backend type. The caller should make sure that the
        contents of those tensors stay intact until the returned
        :class:`~torch.futures.Future` completes.

    Example::
        Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
        on both workers. Refer to :meth:`~torch.distributed.init_process_group`
        API for more details. For example,

        export MASTER_ADDR=localhost
        export MASTER_PORT=5678

        Then run the following code in two different processes:

        >>> # xdoctest: +SKIP
        >>> # On worker 0:
        >>> import torch
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker0", rank=0, world_size=2)
        >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
        >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
        >>> result = fut1.wait() + fut2.wait()
        >>> rpc.shutdown()

        >>> # On worker 1:
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker1", rank=1, world_size=2)
        >>> rpc.shutdown()

        Below is an example of running a TorchScript function using RPC.

        >>> # On both workers:
        >>> @torch.jit.script
        >>> def my_script_add(tensor: torch.Tensor, scalar: int):
        >>>    return torch.add(tensor, scalar)

        >>> # On worker 0:
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker0", rank=0, world_size=2)
        >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
        >>> ret = fut.wait()
        >>> rpc.shutdown()

        >>> # On worker 1:
        >>> import torch.distributed.rpc as rpc
        >>> rpc.init_rpc("worker1", rank=1, world_size=2)
        >>> rpc.shutdown()
    ztorch.distributed.rpc_asyncr[   )
r\   r   r   r   r!   ASYNCr   rZ   r[   rm   r   r-   r-   r.   r&   ?  s
   Y
r&   c                  C   s&   t jjj} t j ot jj | jkS r9   )	r\   r   	_profilerActiveProfilerTyper   r   	_autograd_profiler_typeLEGACY)r   r-   r-   r.   r     s   

r   c                 C   sj   t  }| r3|d u rt|tjjrtj|n|j}n|}t	||t
 j|j}t| tjj|}|S r9   )
contextlibnullcontextr   r\   r   r   r   r   rB   r   r#   rF   r   set_current_profiling_keyr   profilerrecord_function)r   r   r4   r   r   r   	func_namerpc_profiling_keyr-   r-   r.   r     s"   
r   r9   )T)hcollectionsr   r6   inspectloggingr;   typingr   r   r   r   r\   torch._C._distributed_rpcr   r   r   r	   r
   r   r   r   r   r   r   r   r   r   r   r   r   r   torch.futuresr   _utilsr   r   	constantsr   r   internalr   r   r    r!   __all__	getLoggerr@   r|   r   r+   contextmanagerr/   r8   r(   rO   rC   __annotations__RLockrN   rD   dictrj   r   defaultdictrE   rM   rV   rY   localrZ   r_   floatr{   r~   r   r"   r   r#   r   boolr   r   r   GenericWithOneTypeVarr'   r   	__class__r   r)   
getmembersr   r   
startswithr   r   replacer*   setattrr$   r   r%   r&   r   r   r-   r-   r-   r.   <module>   s   
P




Y
M	

	
 
:I_
