o
    hi                     @  s   d Z ddlmZ ddlmZ ddlmZmZmZm	Z	m
Z
mZmZ ddlmZ edZeG dd de	e Zd	dddd
d'ddZ		d(d)ddZdd fd*d%d&ZdS )+zl
A set of primitive functions for performing collective ops.

Each should also handle single rank scenario.
    )annotations)	dataclass)AnyCallablecastGenericOptionalTypeVarUnionNTc                   @  s2   e Zd ZU ded< ded< ded< dZded	< dS )
SyncPayloadOptional[str]
stage_nameboolsuccessr   payloadNzOptional[Exception]	exception)__name__
__module____qualname____annotations__r    r   r   v/var/www/html/construction_image-detection-poc/venv/lib/python3.10/site-packages/torch/distributed/collective_utils.pyr      s
   
 r   T)r   r   rankpg
data_or_fnUnion[T, Callable[[], T]]r   r   r   r   r   intr   Optional[dist.ProcessGroup]returnc             
   C  s&  |s
| dur
t dd}d}|du r|dks |durB| |krBt| r@z|  }W n ty? } z
d}|}W Y d}~nd}~ww | }t||||d}|dure|g}	tj|	||d t|	dksaJ |	d }|jsd| d	}
|durz|
d
|j	 7 }
|j
dur|
d|j
 7 }
t|
|j
tt|jS )aK  
    Broadcasts the data payload from rank 0 to all other ranks.
    Or if a function is passed, execute it in rank 0 and broadcast result to all other ranks.

    Can be used to broadcast a failure signal to stop all ranks.

    If the function raises an exception, all ranks will raise.

    Args:
        data_or_fn: the data to broadcast or function to execute and broadcast result.
        success: False to stop all ranks.
        stage_name: the name of the logical stage for synchronization and debugging
        rank: rank to broadcast data or execute function and broadcast resutls.
        pg: the process group for sync
    Throws:
        RuntimeError from original exception trace
    Returns:
        the value after synchronization

    Example usage:
    >> id = broadcast(data_or_fn=allocate_id, rank=0, pg=ext_pg.my_pg)
    Nz9Data or Function is expected to be None if not successfulr   Fr   r   r   r   )srcgroup   zRank z failedz: stage z: exception )AssertionErrorr   callable	Exceptionr   distbroadcast_object_listlenr   r   r   RuntimeErrorr   r   r   )r   r   r   r   r   r   r   esync_objbroadcast_list	error_msgr   r   r   	broadcast   sF   $

r/   list[T]c              
   C  sX  d}d}d}t | r&z|  }W n ty% } z
d}|}W Y d}~nd}~ww | }t||||d}|durdgt| }t||| ttt |d j}g }	g }
d}t	tt
tt  |D ]-\}}|j|krs|d| d|j d	7 }q]|js|jdur|	||jf q]|
|j q]t|	dkrt||	|	d |
S |jstd
|j |j|jgS )a.  
    A simple all_gather primitive with basic synchronization guard logic,
    by checking payload from all ranks has the same stage name.

    Args:
        data_or_fn: the data to be all gathered across ranks or function to be executed
        stage_name: the sync stage name for out-of-sync protection
        pg: the process group for sync
    Throws:
        RuntimeError from original exception trace
    Returns:
        a list of synced data from all ranks

    Example usage:
    >> all_ids = all_gather(data_or_fn=allocate_id, pg=ext_pg.my_pg)
    NTFr    r    z)Unexpected stage name received from rank z:  z!all_gather failed with exception )r%   r&   r   r'   get_world_sizeall_gather_object_enforce_typer   r   r   	enumeratelistr   r   appendr   r)   r*   )r   r   r   r   r   r   r+   r,   
total_listexception_listret_listr.   ispr   r   r   
all_gatheri   sb   


r=   c                 C  s   t | t |kS )N)type)xyr   r   r   <lambda>   s    rA   dist.ProcessGroupobject_list	list[Any]objr   type_checkerCallable[[Any, Any], bool]Nonec              	   C  st   t j||| d t|}|dkrdS |d }td|D ]}|||| s7td| dt||  dt| qdS )aN  
    Similar to plain all_gather_object but with additional type checking
    AFTER gather is done to ensure basic consistency.
    If check does not pass, all ranks will fail with exception.

    This is generally to prevent conditional logic leading to
    unexpected messages being received. This is considered fatal code error,
    but due to logic stacks this might happen implicitly in practice.

    The default check does not check sub type (considered different)
    or covariance (considered same) but users can pass in custom checker
    if more complicated check is needed.
    )r"   r   Nr#   zObject type at index z is z, while first object type is )r'   all_gather_objectr)   range	TypeErrorr>   )r   rC   rE   rF   list_len	first_objr;   r   r   r   r4      s   r4   )r   r   r   r   r   r   r   r   r   r   r   r   )NN)r   r   r   r   r   r   r   r0   )
r   rB   rC   rD   rE   r   rF   rG   r   rH   )__doc__
__future__r   dataclassesr   typingr   r   r   r   r   r	   r
   torch.distributeddistributedr'   r   r   r/   r=   r4   r   r   r   r   <module>   s$   $
NU