o
    h#=                     @   s`  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d	d
lmZmZm Z m!Z! d	dl"m#Z#m$Z$ ddgZ%edZ&edZ'de(ee&e!f  de)e*e!f fddZ+	d:de)e,ef deej- de.e, fddZ/	d:de)e,ef deej- ddfddZ0G dd dZ1dede#defd d!Z2dej3de#dej3fd"dZ4de$de#defd#dZ5d$ee* d%ee* de(e* fd&d'Z6d$ee* d%ee* de(e* fd(d)Z7G d*d+ d+ej8Z9d,ej8d-e*d.e*dej8fd/d0Z:d1e,d2e*de,fd3d4Z;d5Z<e	d6d7 Z=d8d9 Z>dS );    N)Sequence)contextmanager)wraps)Stats)AnyCallablecastOptionalTypeVarUnion)ShardedTensor)Shard   )_is_wrapped_exception_wrap_exceptionCheckpointExceptionWRAPPED_EXCEPTION)MetadataIndexSTATE_DICT_TYPEfind_tensor_shardfind_state_dict_objectTRresultsreturnc                 C   s    t tttf dd t| D S )Nc                 S   s   i | ]\}}t |r||qS  )r   ).0ierrr   r   v/var/www/html/construction_image-detection-poc/venv/lib/python3.10/site-packages/torch/distributed/checkpoint/utils.py
<dictcomp>'   s    z%_get_failure_dict.<locals>.<dictcomp>)r   dictintr   	enumerate)r   r   r   r   _get_failure_dict"   s   
r$   
local_dictgroupc                 C   s<   t |  }dgt| }tj|||d ttj|S )z*Gathers all keys, and returns them sorted.N)r&   )	listkeysdistget_world_sizeall_gather_objectset	itertoolschainfrom_iterable)r%   r&   r(   gathered_keysr   r   r   _all_gather_keys+   s   r1   
state_dictprocess_groupc                 C   sN   t |dkr	dS t| |}t|  }|| }t|dkr%td| dS )z
    Asserts that all ranks have the same keys in their state dict.
    This is a collective call which requires all ranks in ``process_group`` to
    join. It will also induce cross-rank communication and block CPU.
    r   Nr   z<Key(s) present in other ranks but not this one, difference: )r)   r*   r1   r,   r(   lenAssertionError)r2   r3   all_keysmy_keysdiffr   r   r   _assert_same_keys6   s   	
r9   c                	   @   sL  e Zd ZdZdeej dedefddZ	defdd	Z
defd
dZdee defddZdedeee  fddZdedee fddZdeee  defddZdedeg ef deee gee f defddZdedeg ef deee gef defddZdedeg ef dee fddZdedeg ef defdd Zd!S )"_DistWrapperaH  
    This is a wrapper around PG that provides a series of features around object collectives.

    It works without distributed initialized, where most collectives turns into nops.

    All variants that take functions are exception robust, meaning that if one or more
    ranks raise errors, all ranks will observe those.
    r&   use_distcoordinator_rankc                 C   sd   || _ || _|| _| jr'|d urt||n|| _t|| _| j|k| _d S d| _d| _d| _d S )Nr   T)	r&   r;   r<   r)   get_global_rankglobal_coordinator_rankget_rankrankis_coordinator)selfr&   r;   r<   r   r   r   __init__U   s   
z_DistWrapper.__init__r   c                 C   s   | j S N)r@   rB   r   r   r   r?   k   s   z_DistWrapper.get_rankc                 C   s   | j r	t| jS dS )Nr   )r;   r)   r*   r&   rE   r   r   r   r*   n   s   z_DistWrapper.get_world_sizeobjectc                 C   s.   |g}| j rtj|| j| jd tt|d S )z_Implement functionality similar to c10d::broadcast_object_list but without distributed enabled.)object_listr&   srcr   )r;   r)   broadcast_object_listr&   r<   r   r   )rB   rF   rG   r   r   r   broadcast_objects   s   z_DistWrapper.broadcast_objectc                 C   s^   | j r*| jrttt dgt| j nd}tj|| jr|nd| j	| jd |}|S |g}|S )zWImplement functionality similar to c10d::gather_object but without distributed enabled.N)objobject_gather_listdstr&   )
r;   rA   r   r'   r   r)   r*   r&   gather_objectr>   )rB   rF   gather_objsresultr   r   r   rN   ~   s   z_DistWrapper.gather_objectc                 C   sB   | j rttt dgt| j }tj||| jd |S |g}|S )z[Implement functionality similar to c10d::all_gather_object but without distributed enabled.N)rG   rK   r&   )r;   r   r'   r   r)   r*   r&   r+   )rB   rF   rO   r   r   r   r+      s   z_DistWrapper.all_gather_objectrG   c                 C   sZ   | j r!ttt dg}tj|| jr|nd| j| jd |d }|S |dus'J |d }|S )zXImplement functionality similar to c10d::scatter_object but without distributed enabled.N)scatter_object_output_listscatter_object_input_listrH   r&   r   )	r;   r   r'   r   r)   scatter_object_listrA   r>   r&   )rB   rG   gather_resultlocal_replyr   r   r   scatter_object   s   z_DistWrapper.scatter_objectstepmap_fun
reduce_func           
   
   C   s  z| }W n t y } z
t|}W Y d}~nd}~ww | |}d}| jrt|dus,J t|}t|dkrdztttt	t
f  |ttt |}W n t yc } zt||| j< W Y d}~nd}~ww t|dkrtt
||g|   }| |}	t|	t
r|	|	S )a^  
        Compute a value on each rank, then do centralized reduce on a single rank, followed by a scatter.

        This method operates in the following way:
            Run ``map_fun`` on all ranks
            Gather results on rank 0
            Call ``reduce_fun`` on all those values
            Scatter to each rank part of the result.
        Nr   )BaseExceptionr   rN   rA   r$   r4   r   r'   r   r   r   r   r@   r*   rV   
isinstance)
rB   rW   rX   rY   
local_dataeall_dataall_resultsnode_failuresrP   r   r   r   reduce_scatter   s<   



z_DistWrapper.reduce_scatterc           
   
   C   s   z| }W n t y } z
t|}W Y d}~nd}~ww | |}d}| jrf|dus,J t|}t|dkr[z|ttt |}W n t yZ } zt||| j	< W Y d}~nd}~ww t|dkrft
||}| |}	t|	t
rr|	tt|	S )aa  
        Compute a value on each rank, then do centralized reduce on a single rank, followed by a broadcast.

        This method operates in the following way:
            Run ``map_fun`` on all ranks
            Gather results on rank 0
            Call ``reduce_fun`` on all those values
            Broadcast the reduced value to all ranks.
        Nr   )rZ   r   rN   rA   r$   r4   r   r'   r   r@   r   rJ   r[   r   )
rB   rW   rX   rY   r\   r]   r^   rP   r`   final_resultr   r   r   
all_reduce   s0   





z_DistWrapper.all_reducec              
   C   sn   z| }W n t y } z
t|}W Y d}~nd}~ww | |}t|}t|dkr0t||ttt |S )z
        Compute a value on each rank, then all_gather them.

        This method operates in the following way:
            Run ``map_cp`` on all ranks
            all_gather the values to all ranks
        Nr   )	rZ   r   r+   r$   r4   r   r   r'   r   )rB   rW   rX   rP   r]   r_   r`   r   r   r   
all_gather  s   


z_DistWrapper.all_gatherc              
   C   sp   d}| j r'z| }W n ty& } zt|| jt|i}W Y d}~nd}~ww | |}t|tr3|tt|S )z
        Compute a value on rank 0 and broadcast it.

        This method operates in the following way:
            Run ``map_cp`` on rank 0
            broadcast the value
        N)	rA   rZ   r   r@   r   rJ   r[   r   r   )rB   rW   rX   rP   r]   rb   r   r   r   	broadcast  s   
 


z_DistWrapper.broadcastN)__name__
__module____qualname____doc__r	   r)   ProcessGroupboolr"   rC   r?   r*   r   rJ   r'   rN   r+   rV   strr   r   ra   rc   rd   re   r   r   r   r   r:   K   s^    	


/

(


r:   tensorindexc                 C   s   |j d u rtd|j d|  }|jd ur0t||jkr0t||j jj	|j kr0||j S |D ]}t|jj	|j krB|  S q2td|j  d|j d)NzCannot lookup z5 since its a ShardedTensor and no offset was providedzCould not find shard at 'z' for FQN: '')
offset
ValueErrorfqnlocal_shardsrn   r4   torchSizemetadatashard_offsets)rm   rn   shardsshardr   r   r   _find_shard7  s   


rz   c                 C   st   t | dr
| |S t| trt| |jS |jd ur8|jtdgt	| 
  kr+| S td|j d|j d| S )N__get_tensor_shard__r   FQN: '1' is not a ShardedTensor, can't find by offset: 'ro   )hasattrr{   r[   r   rz   rm   rp   rt   ru   r4   sizerq   rr   )rm   rn   r   r   r   r   L  s   



c                 C   sd   |j | vrtd|j  d| |j  }t|tjrt||S |jd ur0td|j  d|j d|S )NzCould not find FQN: 'ro   r|   r}   )rr   rq   r[   rt   Tensorr   rp   )r2   rn   rK   r   r   r   r   \  s   



abc                 C      dd t | |D S )Nc                 S   s   g | ]\}}|| qS r   r   r   i_ai_br   r   r   
<listcomp>k      z%_element_wise_add.<locals>.<listcomp>zipr   r   r   r   r   _element_wise_addj     r   c                 C   r   )Nc                 S   s   g | ]\}}|| qS r   r   r   r   r   r   r   o  r   z%_element_wise_sub.<locals>.<listcomp>r   r   r   r   r   _element_wise_subn  r   r   c                       s   e Zd Zdejdedef fddZejfdededefdd	Z	defd
dZ
defddZdefddZdd ZdddZ  ZS )_ReaderViewbase_streamrp   r4   c                    s*   t    || _|| _|| _| d d S Nr   )superrC   rp   r4   r   seek)rB   r   rp   r4   	__class__r   r   rC   s  s
   
z_ReaderView.__init__whencer   c                C   sD   |t jkr| j| }n|t jkrt j}| j| j | }| j||S rD   )osSEEK_SETrp   SEEK_ENDr4   r   r   )rB   rp   r   r   r   r   r   z  s   

z_ReaderView.seekc                 C   s   | j  | j S rD   )r   tellrp   rE   r   r   r   r     s   z_ReaderView.tellc                 C   
   | j  S rD   )r   readablerE   r   r   r   r        
z_ReaderView.readablec                 C   r   rD   )r   seekablerE   r   r   r   r     r   z_ReaderView.seekablec                 C   sB   | j |   }|dkrdS t ||krt|d | }| j|S r   )r4   r   
memoryviewr   readinto)rB   r   max_sizer   r   r   r     s   z_ReaderView.readintoc                 C   s.   | j |   }|dks||kr|}| j|S )Nr   )r4   r   r   read)rB   r   r   r   r   r   r     s   z_ReaderView.read)r   )rf   rg   rh   ioIOBaser"   rC   r   r   r   r   rk   r   r   r   r   __classcell__r   r   r   r   r   r  s    r   filerp   lengthc                 C   s   t | ||S rD   )r   )r   rp   r   r   r   r   _create_file_view  s   r   device_type	device_idc                 C   s   | dkrdS |  d| S )zDevice info normalization.cpu:r   )r   r   r   r   r   _normalize_device_info  s   r   Fc               	   c   s    t r=t rt dkr=t } |   zd V  W |   t| }|	d
d d S |   t| }|	d
d w d V  d S )Nr   time
   )ENABLE_PROFILEr)   is_availabler?   cProfileProfileenabledisabler   
sort_statsprint_stats)profilerstatsr   r   r   _profile  s   
r   c                    s   t  dtf fdd}|S )Nr   c                     s   t | dkrZtd j d t }dd |j D }d|v r4d|vs-J | |f| d |d< nd|v rId|vsBJ | |f| d |d< ntd	|  | d
 fi |S  | i |S )N   zThe argument order of zG has been changed. Please check the document to avoid future breakages.c                 S   s   g | ]}|j |jkr|jqS r   )kindKEYWORD_ONLYname)r   pr   r   r   r     s    z5_api_bc_check.<locals>.inner_func.<locals>.<listcomp>storage_writerr   storage_readerzUnexpected kwonlyargs = r   )	r4   warningswarnrf   inspect	signature
parametersvaluesRuntimeError)argskwargssig
kwonlyargsfuncr   r   
inner_func  s"   
z!_api_bc_check.<locals>.inner_func)r   r   )r   r   r   r   r   _api_bc_check  s   r   rD   )?r   r   r   r-   r   r   collections.abcr   
contextlibr   	functoolsr   pstatsr   typingr   r   r   r	   r
   r   rt   torch.distributeddistributedr)   'torch.distributed._shard.sharded_tensorr   -torch.distributed._shard.sharded_tensor.shardr   apir   r   r   r   rv   r   r   __all__r   r   r'   r!   r"   r$   rl   rj   r,   r1   r9   r:   rz   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   <module>   sn    






 m""(
