o
    hs                    @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZmZ d dlm	Z	m
Z
 d dlmZ d dlmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZmZ d dlmZ d d	lm Z  d
dl!m"Z" d
dl#m$Z$m%Z%m&Z& d
dl'm(Z( er|d dlm)Z) g dZ*e+e,Z-G dd deZ.e.j/Z/e.j0Z0e.j1Z1e.j2Z2e.j3Z3e.j4Z4e.j5Z5e.j6Z6e.j7Z7e.j8Z8e/Z9e0Z:e1Z;e8Z<e=dZ>G dd deZ?	dPde@eAeBee?  f deeA deCfddZDG dd deZEdPdeBejF deeC fddZG	dPdeBejF deeC de@eAej)f fd d!ZHG d"d# d#eEZIG d$d% d%eIZJG d&d' d'eIZKG d(d) d)eIZL	*dQd+eBee?  d,eAdeBe? fd-d.ZMd+eBee?  deBe? fd/d0ZNd+e@eAeBe? f d1eeAgeAf d2eAde@eAeBe? f fd3d4ZOd5e@eAeBee?  f d6eAd2eAd7eAde@eAeAf f
d8d9ZPG d:d; d;eEZQG d<d= d=eQZRG d>d? d?eQZS	 	@dRdAdBZTG dCdD dDeQZUG dEdF dFeQZVG dGdH dHeQZWdIeCfdJdKZXd1eeAgeAf d2eAfdLdMZYdNdO ZZdS )S    N)ABCabstractmethod)Counterdefaultdict)Enum)AnyCallable
NamedTupleOptionalTYPE_CHECKINGUnion)OptimizedModule)
FSDPModuleUnshardHandle)_Loss)record_function   )generate_stage_to_rank_mapping)merge_chunkssplit_args_kwargs_into_chunksTensorChunkSpec)_PipelineStageBase)Work)	get_schedule_classPipelineScheduleSinglePipelineScheduleMultiSchedule1F1BScheduleGPipeScheduleInterleaved1F1BScheduleLoopedBFSScheduleInterleavedZeroBubbleScheduleZBVZeroBubblec                   @   sH   e Zd ZdZdZdZdZdZdZdZ	dZ
d	Zd
Zdd Zedd ZdS )_ComputationTyper                        	   
   c                 C   sH   t jdt jdt jdt jdt jdt jdt jdt jdt j	d	t j
d
i
}||  S )NFIWUNSHARDRESHARDSEND_FRECV_FSEND_BRECV_BB)r"   FORWARDBACKWARD_INPUTBACKWARD_WEIGHTr/   r0   r1   r2   r3   r4   FULL_BACKWARD)selfstr_map r<   z/var/www/html/construction_image-detection-poc/venv/lib/python3.10/site-packages/torch/distributed/pipelining/schedules.py__str__9   s   z_ComputationType.__str__c                 C   s   | dkrt jS | dkrt jS | dkrt jS | dkrt jS | dkr#t jS | dkr*t jS | dkr1t jS | dkr8t jS | d	kr?t j	S | d
krFt j
S td|  )Nr,   r-   r.   r/   r0   r1   r2   r3   r4   r5   zInvalid computation type )r"   r6   r7   r8   r/   r0   r1   r2   r3   r4   r9   RuntimeErroractionr<   r<   r=   from_strH   s*   z_ComputationType.from_strN)__name__
__module____qualname__r6   r7   r8   r/   r0   r1   r2   r3   r4   r9   r>   staticmethodrB   r<   r<   r<   r=   r"   ,   s    r"   z?(\d+)(F|I|B|W|UNSHARD|RESHARD|SEND_F|RECV_F|SEND_B|RECV_B)(\d*)c                   @   sH   e Zd ZU eed< eed< dZee ed< dd Ze	de
fdd	ZdS )
_Actionstage_indexcomputation_typeNmicrobatch_indexc                 C   s4   t | j}|t | j7 }| jd ur|t | j7 }|S N)strrH   rI   rJ   )r:   reprr<   r<   r=   __repr__~   s
   

z_Action.__repr__action_stringc                 C   sh   |   } t|  }r&| \}}}tt|t|t|r#t|S dS | dkr,dS t	d|  d)z
        Reverse of __repr__

        String should be formatted as [stage][action type][(microbatch)]
            e.g. `2F0`, `1UNSHARD`, `3SEND_F1`
        N zInvalid action string: zD, should be formatted as [stage][action type][(microbatch)] e.g. 2F0)
strip_action_regexmatchgroupsrG   intr"   rB   lenr?   )rO   rS   rH   rI   rJ   r<   r<   r=   rB      s   
z_Action.from_str)rC   rD   rE   rU   __annotations__r"   rJ   r
   rN   rF   rL   rB   r<   r<   r<   r=   rG   y   s   
 rG   pipeline_ordererror_step_numberreturnc                    s6  t D ]}tt| D ]}| | du r!d| |< qqtdd  D fddtD }fddtD }ttj	|ddi}t}d	d t|D }d
d t
|g|R  D dt|d d  dfddt|D  }	 fddt
||D }
|	d d|
 d }|S )z
    Formats the pipeline order in a timestep (row) x rank (column) grid of actions
    and returns the formatted string.

    If `error_step_number` is passed in, an additional label will be added to signify which step
    that it is erroring on.
    NrP   c                 s   s    | ]}t |V  qd S rK   )rV   ).0actionsr<   r<   r=   	<genexpr>   s    z)_format_pipeline_order.<locals>.<genexpr>c              	      s*   g | ]}d t |tt  d  qS )zStep r   )rL   zfillrV   r[   i)	num_stepsr<   r=   
<listcomp>   s    z*_format_pipeline_order.<locals>.<listcomp>c                    s   g | ]} |d g  qS )rP   )get)r[   key)ra   rX   r<   r=   rb      s    	fillvaluec                 S   s   g | ]}d t | qS )zRank rL   r_   r<   r<   r=   rb          c                 S   s   g | ]}t d d |D qS )c                 s   s(    | ]}|d urt t|ndV  qd S )Nr   )rV   rL   )r[   itemr<   r<   r=   r]      s   & 4_format_pipeline_order.<locals>.<listcomp>.<genexpr>)max)r[   colr<   r<   r=   rb      s     r   r#   c                 3   s&    | ]\}}|d  |  V  qdS <Nr<   )r[   r`   labelmax_lengthsr<   r=   r]      s    
c                    sZ   g | ])\}}| d d fddt|D   dur(t| d  kr(dnd qS )z: rl   c                 3   s*    | ]\}}t |d  |  V  qdS rm   rf   )r[   r`   rh   rp   r<   r=   r]      s   ( ri   Nr   z <-- ERROR HERErP   )join	enumeraterU   split)r[   ro   row)rY   rq   r<   r=   rb      s    	
)copydeepcopyrangerV   rj   valuessortedlist	itertoolszip_longestziprr   rs   )rX   rY   rankr`   step_labelsrank_actionstransposed_actions	num_ranksrank_labels
header_rowformatted_rowsformatted_tabler<   )rY   rq   ra   rX   r=   _format_pipeline_order   s8   

 
	r   c                   @   sb  e Zd Z					d'dedeedejf  deee	df  dee
ee	f  deee
eef ee f  d	efd
dZdd Zdd Zdd Ze				d(dee dee dee dee fddZeddddee fddZ				d(dee dee dee dee fddZdd Z	d)deedf d ee
eef  fd!d"Zd#ee d$efd%d&ZdS )*_PipelineScheduleNTn_microbatchesloss_fn.args_chunk_speckwargs_chunk_specoutput_merge_specscale_gradsc                 C   sL   || _ || _|| _|| _|| _|| _	 | jd u| _g | _t	d| j
j d S )NzUsing %s)_n_microbatches_loss_fnr   _args_chunk_spec_kwargs_chunk_spec_output_merge_spec_has_backward_internal_lossesloggerinfo	__class__rC   )r:   r   r   r   r   r   r   r<   r<   r=   __init__   s   
z_PipelineSchedule.__init__c                 C   s4   |j r| jr| ||| }| j| d S d S d S rK   )is_lastr   _compute_lossr   append)r:   stageoutput
target_mbsmb_indexlossr<   r<   r=   _maybe_compute_loss   s   z%_PipelineSchedule._maybe_compute_lossc                 C   sd   d|  kot | jk n  }|jr| jr|r| j| S t | jdkr0|s0td| d| j d S )Nr   zLoss for microbatch z6 is not available. Available losses for microbatches: )rV   r   r   r   r?   )r:   r   r   valid_indexr<   r<   r=   _maybe_get_loss  s   
z!_PipelineSchedule._maybe_get_lossc                 C   s|   t |ts|g}tdd |D }|r7|dur7t| j| jkr-td| j dt| j |  || j | j  dS )zB
        Update the losses to those in the internal state
        c                 s   s    | ]}|j V  qd S rK   r   r[   r   r<   r<   r=   r]     s    z3_PipelineSchedule._update_losses.<locals>.<genexpr>N
Expecting z losses but got )	
isinstancer|   anyrV   r   r   r?   clearextend)r:   stageslossescontains_last_stager<   r<   r=   _update_losses  s   
z _PipelineSchedule._update_lossesarg_mbs	kwarg_mbsr   r   c                 C      t )z
        Run one iteration of the pipeline schedule with list of microbatches.
        Will go through all the microbatches according to the schedule
        implementation.

        Args:
            microbatches: list of microbatch args.
        NotImplementedError)r:   r   r   r   r   r<   r<   r=   _step_microbatches%  s   z$_PipelineSchedule._step_microbatchestargetr   c                O   r   )  
        Run one iteration of the pipeline schedule with *whole-batch* input.
        Will chunk the input into microbatches automatically, and go through the
        microbatches according to the schedule implementation.

        args: positional arguments to the model (as in non-pipeline case).
        kwargs: keyword arguments to the model (as in non-pipeline case).
        target: target for the loss function.
        losses: a list to store the losses for each microbatch.
        r   )r:   r   r   argskwargsr<   r<   r=   step7  s   z_PipelineSchedule.stepc                    s   dt f fdd}|dur||d ndg j }|dur#||d ni g j }|dur2||d |durDt|tsDtd	t| ||fS )
z*
        Pre-process/check inputs
        namec                    sR   t | tst| dt|  t|  jkr'td j d| dt|  d S )Nz must be a list but got a r   rl   z	 but got )r   r|   	TypeErrortyperV   r   
ValueError)mbsr   r:   r<   r=   check_type_and_lenP  s   
z;_PipelineSchedule._check_inputs.<locals>.check_type_and_lenNr   r<   r   r   z losses must be a list but got a )rL   r   r   r|   r   r   )r:   r   r   r   r   r   r<   r   r=   _check_inputsE  s   

z_PipelineSchedule._check_inputsc                 C   s   |  ||S rK   )r   )r:   r   r   r<   r<   r=   r   k  s   z_PipelineSchedule._compute_lossr   r   c                 C   sB   |s|rt ||| j| j| j\}}||fS dg| j i g| j fS )zj
        Splits a full-batch input into chunks (i.e. microbatches) and returns
        the chunks
        r<   )r   r   r   r   )r:   r   r   
args_splitkwargs_splitr<   r<   r=   _split_inputsn  s   	z_PipelineSchedule._split_inputsoutput_chunksrZ   c                 C   s   t || jS )z
        Merge output chunks back to a batch state.
        If output_merge_spec is None, the utility will merge output chunks by dimension 0 (batch dim).
        )r   r   )r:   r   r<   r<   r=   _merge_outputs  s   z _PipelineSchedule._merge_outputsNNNNTNNNNrK   )rC   rD   rE   rU   r
   r   torchTensortupler   dictrL   r   r   boolr   r   r   r   r   r|   r   r   r   r   r   r   r<   r<   r<   r=   r      sv    
"
&

r   p2p_opsdescc                 C   s>   t | dkrdS |r| dnd}td||  t|  S )zt
    Simple wrapper over batch_isend_irecv from torch.distributed, which just adds a descriptive logger on top.
    r   Nz, rP   zbatch_p2p %s%s)rV   r   debugdistbatch_isend_irecvpop)r   r   desc_strr<   r<   r=   
_batch_p2p  s
   r   c                 C   s`   t t}i }t| dkr|S | D ]
}||j | qt| D ]\}}t||d||< q!|S )z
    Sorts the list of P2P ops by the peer rank, and then calls
    batch_isend_irecv. Return a dictionary of works by peer rank. This function
    helps us avoid hangs in case of skip connections.
    r   r   )r   r|   rV   peerr   r{   itemsr   )r   r   ops_by_peerwork_by_peeropr   opsr<   r<   r=   _sorted_batch_p2p  s   r   c                       s   e Zd ZdZ					ddededee deee	df  d	ee
ee	f  d
eee
eef ee f  def fddZdd Zddddee fddZ  ZS )r   a  
    Base class for single-stage schedules.
    Implements the `step` method.
    Derived classes should implement `_step_microbatches`.

    Gradients are scaled by num_microbatches depending on the `scale_grads` argument, defaulting to True.  This setting
    should match the configuration of your loss_fn, which may either average losses (scale_grads=True)
    or sum losses (scale_grads=False).
    NTr   r   r   r   .r   r   r   c                    s\   t  j||||||d || _|j| _| j| j_d| _|| jk r,td| d| j dd S )Nr   r   r   r   r   r   FzNumber of microbatches (z9) must be greater than or equal to the number of stages (z).)	superr   _stage
num_stages_num_stagesr   has_backward_stage_initializedr   )r:   r   r   r   r   r   r   r   r   r<   r=   r     s&   	

zPipelineScheduleSingle.__init__c                 C   s0   | j | j|| | jr| j | j d| _d S NT)r   _prepare_forward_infrar   r   _prepare_backward_infrar   )r:   r   r   r<   r<   r=   _initialize_stage  s   
z(PipelineScheduleSingle._initialize_stager   r   c                O   sd   | j   | ||\}}|durtt|| j}nd}| |||| | j jr0| 	| j j
S dS r   N)r   clear_runtime_statesr   r|   r   tensor_splitr   r   r   r   r   )r:   r   r   r   r   r   r   targets_splitr<   r<   r=   r     s   
zPipelineScheduleSingle.stepr   )rC   rD   rE   __doc__r   rU   r
   r   r   r   r   rL   r   r   r   r   r   r|   r   __classcell__r<   r<   r   r=   r     s0     "r   c                
   @   D   e Zd ZdZ				d	dee dee dee dee fddZdS )
_ScheduleForwardOnlyzo
    The forward-only schedule.
    Will go through all the microbatches and perform only the forward pass
    Nr   r   r   r   c           
   	   C   s  |dus|durt d| ||||\}}| js#| |d |d  g }t| jD ]X}td| > | j|}t	|dd}|
 D ]}	|	  qD| j||| ||  | j|}t	|dd}||
  W d   n1 stw   Y  td| jj| q*|D ]}	|	  qdS )	z<
        Run one iteration of the pipeline schedule
        Nz7Forward-only schedule does not support loss computationr   Forward fwd_recvr   fwd_send[%s] Forwarded microbatch %s)r?   r   r   r   ry   r   r   r   get_fwd_recv_opsr   rz   waitforward_one_chunkget_fwd_send_opsr   r   r   rH   )
r:   r   r   r   r   fwd_sends_to_waitr`   r   worksworkr<   r<   r=   r     s.   


z'_ScheduleForwardOnly._step_microbatchesr   rC   rD   rE   r   r
   r|   r   r<   r<   r<   r=   r   	      r   c                
   @   r   )
r   z^
    The GPipe schedule.
    Will go through all the microbatches in a fill-drain manner.
    Nr   r   r   r   c              	   C   s0  |  ||||\}}| js| |d |d  g }t| jD ]a}td| > | j|}t|dd}|	 D ]}	|	
  q8| j||| || }
| j|}t|dd}||	  W d   n1 shw   Y  td| jj| | | j|
|| q|D ]}	|	
  q| jsdS g }t| jD ]a}td| G | j|}t|d	d}|	 D ]}	|	
  q| | j|}| jj|||| jd
 kd | j|}t|dd}||	  W d   n1 sw   Y  td| jj| q| jj| jr| jnd
d | | j| |D ]}	|	
  qdS )z
        Run one iteration of the pipeline schedule with list of microbatches.
        Will go through all the microbatches according to the GPipe schedule.

        Args:
            microbatches: list of microbatch args.
        r   r   r   r   r   Nr   z	Backward bwd_recvr   r   last_backwardbwd_sendz[%s] Backwarded microbatch %sgrad_scale_factor)r   r   r   ry   r   r   r   r   r   rz   r   r   r   r   r   r   rH   r   r   get_bwd_recv_opsr   backward_one_chunkget_bwd_send_opsr   r   )r:   r   r   r   r   r   r`   r   r   r   r   bwd_sends_to_waitr   r<   r<   r=   r   B  s\   


z ScheduleGPipe._step_microbatchesr   r  r<   r<   r<   r=   r   <  r  r   c                
   @   r   )
r   zo
    The 1F1B schedule.
    Will perform one forward and one backward on the microbatches in steady state.
    Nr   r   r   r   c                 C   s  |  ||||\}}| js| |d |d  t| j| j| jj }d}d}d}g }	t|D ]E}
| j	|}t
|dd }rB|  | j||| || }|rT|  | j|}	||d krft
|	dd}| | j||| |d7 }q.	 | j|}t
|	| dd }r|  | | j|}| jj|||| jd kd	 | j|}|d7 }|| jkrn4| j	|}t
|| d
d }r|  | j||| || }| | j||| | j|}	|d7 }qut
|dd}|| jk r2| j|}t
|dd }r|  | | j|}| jj|||| jd kd	 |r|  | j|}t
|dd}|d7 }|| jk s| jj| jr<| jndd |rG|  | | j| dS )z
        Run one iteration of the pipeline schedule with list of microbatches.
        Will go through all the microbatches according to the 1F1B schedule.

        Args:
            microbatches: list of microbatch args.
        r   Nr   r   r   r   Tfwd_send_bwd_recvr  bwd_send_fwd_recvr  r  r  )r   r   r   minr   r   r   rH   ry   r   r   r   r   r   r   r	  r   r
  r  r   r   )r:   r   r   r   r   warmup_chunksfwd_mb_indexbwd_mb_index	send_work	fwd_sends_	fwd_recvs	recv_workr   	bwd_recvs	fuse_workr   	bwd_sendsr<   r<   r=   r     s   

,
zSchedule1F1B._step_microbatchesr   r  r<   r<   r<   r=   r     r  r   r$   compute_actionsmax_active_stagesc           
         s   dt dttt  dtt  fdd}t  g dt f fdd}dt f fd	d
}t| D ]?\}}|du r7q.||| |d tt fdd}ttfdd }|D ]}	||	 qX|D ]}	||	 qa| q.S )aR  Given a basic schedule involving only compute actions (F,B,W), add UNSHARD/RESHARD actions for FSDP.

    UNSHARD refers to fetching the full contents of an FSDP-sharded layer, requiring an all-gather operation.
    RESHARD does the opposite, releasing memory (but doing no commmunication)

    We abandon the "timestep lock"  during lowering

    max_active_stages controls how many prefetches we allow. It should be measured in mb and tuneable but in practice
    3 stages is probably the thing we want?
    (to account for having one f and one b active, and something else prefetching?)
    countnext_actionsrZ   c                 S   sT   t  }g }|D ] }|dur'|j|vr'||j ||j t|| kr' |S q|S )zdRemove duplicates (same stage, different microbatch), find next 'count' stages that will do compute.N)setrH   addr   rV   )r  r  seenretar<   r<   r=   next_stage_indicesA  s    z0_add_unshard_reshard.<locals>.next_stage_indicesrH   c                          |  t| td  d S rK   )r   r   rG   r/   rH   active_stagesfsdp_aware_actionsr<   r=   _unshardS     
z&_add_unshard_reshard.<locals>._unshardc                    r%  rK   )remover   rG   r0   r&  r'  r<   r=   _reshardW  r+  z&_add_unshard_reshard.<locals>._reshardNc                       |  vS rK   r<   s)r(  r<   r=   <lambda>b      z&_add_unshard_reshard.<locals>.<lambda>c                    r.  rK   r<   r/  )next_nr<   r=   r1  d  r2  )rU   r|   r
   rG   r  rs   filterr   )
r  r  r$  r*  r-  r`   rA   fetchevictr   r<   )r(  r)  r3  r=   _add_unshard_reshard1  s.   




r7  c                 C   s   g }| r]|  d}|du rqt| r+| d  }du r+|  d t| r+| d  }du s|jtkrV|durV|jtkrV|j|jkrV|j|jkrV|t|jt	|j |  d n|| | s|S )a9  Given a basic schedule involving only compute actions (F,I,W), merge adjacent I and W ops into B ops.
    (note: I = BACKWARD_INPUT, W = BACKWARD_WEIGHT, B = FULL_BACKWARD)

    B refers to running the whole backward (not separating grad_input and grad_weight), which can be more efficient
    in some cases.
    r   N)
r   rV   rI   r7   r8   rH   rJ   r   rG   r9   )r  merged_actionsrA   next_actionr<   r<   r=   	_merge_bww  s(   	




r:  stage_to_rankr   c                    s  dd | D }dd | D }dt dtffdd dt dtt t f f fdd	}dtt  d
tt  dtffdd}| rd}t| D ]x}t| | dksZJ d|dt| | | | d }	||	|| shqB|	d ur|| |	 || |	  |	r||	\}
}|| |
 || |
 ||j	 | ||j	 | | | 
d t| | dkr| |= d}qB|sJ d| s<|S )Nc                 S      i | ]}|g qS r<   r<   r[   r   r<   r<   r=   
<dictcomp>  s    z"_add_send_recv.<locals>.<dictcomp>c                 S      i | ]}|t  qS r<   r  r=  r<   r<   r=   r>        rA   rZ   c                    sd   | j tkr| j d ko| jd | jkS | j ttfv r0| jdko/| jd | jkS dS )Nr   r   F)rI   r,   rH   r7   r9   r@   )r   r;  r<   r=   
_has_comms  s   
z"_add_send_recv.<locals>._has_commsc                    sx    | sJ |  d| j }| j}| j}t||tkrtnt|}|tkr(|d n|d }t||tkr4tnt|}||fS )Nz is not a valid comm actionr   )	rH   rI   rJ   rG   r,   r1   r3   r2   r4   )rA   	stage_idxctypemb_idxsendrecv_stage_idxrecv)rB  r<   r=   
_get_comms  s   z"_add_send_recv.<locals>._get_commsprev_actionsc                    s   | du rdS | j tkr,| jdks,t| jt| j|v rdS t| jd t| j|v r*dS dS | j ttfv rd| j d ksdt| jt| j|v rFdS t| jd t| j|v rTdS t| jd t| j|v rbdS dS dS )a  We don't put our own recv ops in the schedule, we let a sender on another rank put our recv ops in place.
        This helps ensure a sane (non-hanging) ordering of sends and recvs.
        But it also means we might not be able to schedule our next compute action yet.
        NTr   r   F)	rI   r,   rH   rG   r2   rJ   r7   r9   r4   )rA   rJ  )r   r<   r=   _ready_to_schedule  s8   z*_add_send_recv.<locals>._ready_to_scheduleFr   rank=z, len(compute_actions[rank])=Tz6Malformed compute schedule, can't schedule sends/recvs)rG   r   r   r
   r  r{   rV   r   r   rH   r   )r  r;  r   comm_actionsrJ  rI  rK  progressr   rA   rF  rH  r<   )rB  r   r;  r=   _add_send_recv  sH   
,rO  r\   pp_group_sizenum_microbatchesc                 C   s|  t | |ksJ d| dt |  t|D ]}|| v s#J d| qdd t|D }i }| D ]}| | D ]}|d u r>q7t|tsKJ d| d|j}|j}	|j}
|	tkrb|| t |
 nb|	t	kr|
|| t v syJ d| d	|
 d
|| t	 |
 nA|	t
kr|
|| t v sJ d| d	|
 d
|| t
 |
 n |	tkr|
|| t
 v sJ d| d	|
 d|| t |
 ||vr|||< q7|| }||ksJ d| d| d| q7q1|D ]U}t || t }t || t	 }t || t
 }t || t }||ksJ d| dt d| d| ||| d  |ks;J d| d| d| d| d| 
q|S )Nz2Schedule has incorrect number of ranks - expected z	, actual z%Schedule is missing actions for rank c                 S   s*   i | ]}|t t tt tt tt iqS r<   )r,   r  r5   r-   r.   )r[   stage_idr<   r<   r=   r>    s    z&_validate_schedule.<locals>.<dictcomp>zGot an invalid action: z, expected instance of _Actionz Running Full Backward for stage z, microbatch z without first running Forwardz!Running Backward Input for stage z"Running Backward Weight for stage z% without first running Backward InputzStage z is assigned to both rank z
 and rank zGot rl   z microbatches for stage z, expected r#   z(Invalid backward microbatches for stage z: expected z( total backwards,             but got B=z, I=z, W=)rV   ry   r   rG   rH   rI   rJ   r,   r   r5   r-   r.   )r\   rP  r   rQ  r   stage_actionsstage_index_to_rank_mappingrA   s_idrD  mb_idexisting_rankf_mbb_mbi_mbw_mbr<   r<   r=   _validate_schedule  sz   	


"r\  c                       s$  e Zd ZdZ						d$dee dedee dee	e
df  d	eeee
f  d
eeeeef e	e f  dee def fddZde	edf fddZdeeeee  f ddfddZdd Zd%ddZddddee fddZ				d&dee d ee d!ee dee fd"d#Z  ZS )'r   aX  
    Base class for multi-stage schedules.
    Implements the `step` method.

    Gradients are scaled by num_microbatches depending on the `scale_grads` argument, defaulting to True.  This setting
    should match the configuration of your loss_fn, which may either average losses (scale_grads=True)
    or sum losses (scale_grads=False).
    NTr   r   r   r   .r   r   use_full_backwardr   c	           
         s   t  j||||||d || _|d j| _|d j| _|d j| _t	| j| j| _
| jD ]}	| j
|	_
q,| jD ]}	| j|	_q6d| _| jd u  fdd| _i | _|d urZtd d S d S )Nr   r   Fc                    s
   | j o S rK   r   )r   has_lossr<   r=   r1       
 z0PipelineScheduleMulti.__init__.<locals>.<lambda>zDeprecation warning: 'use_full_backward' is no longer supported. Simply stop passing it, and everything should still work fine.)r   r   _stagesr   r   
group_sizerP  
group_rankr   r   stage_index_to_group_rankr   r   _stages_initializedr   _should_compute_lossrX   r   warning)
r:   r   r   r   r   r   r   r]  r   r   r   r^  r=   r   Y  s8   	




zPipelineScheduleMulti.__init__r   c                 C   sV   t  }| jD ]}|jr|| j||}n|| j||}| jr%|| j qd| _d S r   )r   ra  is_firstr   r   r   r   re  )r:   r   r   next_stage_argsr   r<   r<   r=   _initialize_stages  s   

z(PipelineScheduleMulti._initialize_stagesr\   rZ   c                 C   s.   t || j| j| j| _| jD ]}| j|_qdS )z]
        Allocates the stage index to rank mapping which is needed for communication
        N)r\  rP  r   r   rd  ra  )r:   r\   r   r<   r<   r=   _validate_and_set_stage_mapping  s   

z5PipelineScheduleMulti._validate_and_set_stage_mappingc                 C   sZ   t |ddd}t|}| jD ]
}|| j|  qW d   dS 1 s&w   Y  dS )zQDump a CSV representation of the schedule into a file with the provided filename.wrP   newlineN)opencsvwriterrX   writerowr:   filenamecsvfilerq  r   r<   r<   r=   	_dump_csv  s   

"zPipelineScheduleMulti._dump_csvcompute_onlyc                 C   sx   |dksJ t |dd }t|}t|D ]\}}dd |D | j|< qW d   n1 s/w   Y  | | j dS )zLoad a CSV representation of the schedule from a file with the provided filename.
        This API will most likely get renamed/refactored so is marked as internal for now.

        format must be "compute_only" for PipelineScheduleMulti.
        rw  rP   rm  c                 S      g | ]}t |qS r<   rG   rB   r[   r0  r<   r<   r=   rb         z3PipelineScheduleMulti._load_csv.<locals>.<listcomp>N)ro  rp  readerrs   rX   rk  )r:   rt  formatru  r|  r   ru   r<   r<   r=   	_load_csv  s   
zPipelineScheduleMulti._load_csvr   r   c          	      O   sz   | j D ]}|  q| ||\}}|dur tt|| j}nd}| |||| | j D ]}|jr:| 	|j
  S q-dS r   )ra  r   r   r|   r   r   r   r   r   r   r   )	r:   r   r   r   r   r   r   r   r   r<   r<   r=   r     s   


zPipelineScheduleMulti.stepr   r   r   c                 C   sN  |  ||||\}}| js| |d |d  dd | jD }t }t }| D ]!}|dkr9|| j|d   || jd k rJ|| j|d   q)t	 }	t
| j| j D ]\}
}zg }|dur3|j}|j}|j}|dusuJ d|tjkr|| }|||| || }| |||| ||| n|tjkr|| }| ||}|	|  d7  < |	| | jk}| jr| jnd}|j||d|d |r|| ||| nZ|tjkr|| }| ||}|j||d	d	d ||| n9|tjkr,|| }|	|  d7  < |	| | jk}| jr| jnd}|j||d
 |r+|| ntd| |D ]Z}| j| }d}|
t |k rI||
 }|dur|j}|j}|j}|dus`J d|tjkr}|d |v r{||d  }||!| q5|tttfv rq5td| q5|D ]Z}| j| }d}|
t |k r||
 }|dur|j}|j}|j}|dusJ d|ttfv rƐq|ttfv r|d |v r||d  }||"| qtd| q|rt#|$  W qV t%y } zt&'d| j| j(j)|
| t&'dt*| j|
d |d}~ww | +| j| dS )
        Operate on the microbatches for looped schedules (multiple stages on each rank).

        TODO: Does not use sorted_batch_isend_irecv(). As a result, this schedule does
        not support models with skip connections.
        r   c                 S      i | ]}|j |qS r<   r&  r   r<   r<   r=   r>        z<PipelineScheduleMulti._step_microbatches.<locals>.<dictcomp>r   NzCAll currently supported action types require valid microbatch_indexTr   full_backwardr  Fr  zUnknown computation type zy[Rank %s] pipeline schedule %s caught the following exception                      at time_step %s when running action %sz%srY   ),r   re  rj  ra  r  keysr   rd  r   r   rs   rX   r   rI   rJ   rH   r"   r6   r   r   r   r   r9   r   r   r   r
  r  r7   r8   backward_weight_one_chunkr   rV   r   r	  r   r   	Exceptionr   errorr   rC   r   r   )r:   r   r   r   r   stage_index_to_stageall_prev_ranksall_next_ranksrH   backward_counter	time_steprA   r   rI   r   r   r   r   r  r  	prev_rankprev_rank_opsprev_rank_action	next_ranknext_rank_opsnext_rank_actioner<   r<   r=   r     s"  










z(PipelineScheduleMulti._step_microbatches)NNNNNTrw  r   )rC   rD   rE   r   r|   r   rU   r
   r   r   r   r   rL   r   r   r   r   rj  rG   rk  rv  r~  r   r   r   r<   r<   r   r=   r   O  s^    	2

$r   c                
       s   e Zd ZdZ	ddeeeee  f de	f fddZ
dde	de	f fdd	Zde	fd
dZdd Z				ddee dee dee dee fddZ  ZS )_PipelineScheduleRuntimea%  
    Provides a simple runtime that requires a 'schedule IR' including specified communication operations.

    Can be instantiated directly by creating _PipelineScheduleRuntime and calling load_csv, or can be
    subclassed and the subclass can be responsible for creating a schedule IR.
    rw  r\   r}  c                    s   t  | i  _|dkr.|D ]}g  j|< || D ]}|dus"J  j| | qqdS |dkrP|D ]}t||  j|< q4t j fdd jd _dS td|d)	z
        Given an in-memory representation for a simple compute-only schedule, lower it to a complex schedule including
        communication actions.  Stores the schedule in self, and must be called before running step_mo()
        compute_commsNrw  c                    
    j |  S rK   rd  r/  r   r<   r=   r1    r`  z8_PipelineScheduleRuntime._load_actions.<locals>.<lambda>)r;  r   format= is not implemented)r   rk  pipeline_order_with_commsr   r7  rO  r   r   )r:   r\   r}  r   rA   r   r   r=   _load_actions  s*   


z&_PipelineScheduleRuntime._load_actionsrt  c                    s   |dkrt  | | | j dS |dkrOi }t|dd'}t|}t|D ]\}}dd |D ||< q(| j||d W d   dS 1 sHw   Y  dS td	|d
)a  Loads a csv in simple format and then lowers it to include comunication actions

        format must be either "compute_only" or "compute_comms".  If compute_only, the lowering passes
        will automatically be run to generate a compute_comms schedule.
        rw  r  rP   rm  c                 S   rx  r<   ry  rz  r<   r<   r=   rb     r{  z6_PipelineScheduleRuntime._load_csv.<locals>.<listcomp>)r}  Nr  r  )	r   r~  r  rX   ro  rp  r|  rs   r   )r:   rt  r}  r\   ru  r|  r   ru   r   r<   r=   r~    s   
"z"_PipelineScheduleRuntime._load_csvc                 C   sl   | j dus	J dt|ddd}t|}| j D ]
}|| j |  qW d   dS 1 s/w   Y  dS )zaDump a CSV representation of the compute + comms schedule into a file with the provided filename.Nz6Must initialize compute_comms schedule before dump_csvrl  rP   rm  )r  ro  rp  rq  rr  rs  r<   r<   r=   rv    s   

"z"_PipelineScheduleRuntime._dump_csvc                    s   t  j fdd jS )Nc                    r  rK   r  r/  r   r<   r=   r1    r`  z4_PipelineScheduleRuntime._simulate.<locals>.<lambda>)_simulate_comms_computer  r   r   r<   r   r=   	_simulate  s
   
z"_PipelineScheduleRuntime._simulateNr   r   r   r   c                    s@  |  ||||\}}| js| |d |d  dd | jD }| jdus(J di }i }g }i  t dtf fdd}	t }
t| j| j	 D ]2\}}z|j
}|jdurZ|jnd	}|dksn|ttfv snJ d
|d|j}|| }t|jt}|d |v }|d |v }td|| |tkr|t|| n|tkr|t|| n|tkr||f|vsJ dt|||||f< n|tkr||f|vsJ dt|||||f< nw|tkr|r|vr| vsJ d|d|jjdd |< nT|tkr0|r.|v sJ d|d| vs)J d|d|j  n+|t kr||r<|	| |j!s[|s[||f|v sRJ d|d|"||f#  |$||| || }| %|||| |r{||d  &|| n|t'kr|r|	| |j(s|s||f|v sJ d|d|"||f#  | )||}|
|  d7  < |
| | j*k}| j+r| j*nd}|j,||d|d |r|+| |r||d  -|.|| nt|t/kr2|r|	| |j(s|s||f|v s	J d|d|"||f#  | )||}|j,||ddd |r1||d  -|.|| n)|t0krS|r>|	| |
|  d7  < |j1||
| | j*kd nt2d
|dW qH t3y{ } zt4d|| t5t6| j|d |d}~ww t7|r|" #  t7|st7 dksJ d | 8| j| dS )!r  r   c                 S   r  r<   r&  r   r<   r<   r=   r>    r  z?_PipelineScheduleRuntime._step_microbatches.<locals>.<dictcomp>Nz=Must call _load_actions() before calling _step_microbatches()rC  c                    s>   |  v r |      | = |  | v sJ d| dS )zQIf an unshard is active for `stage_idx`, wait() it and mark `stage_idx` unshared.z*Attempted to compute on sharded stage_idx=N)r   r   )rC  unshard_opsunsharded_stagesr<   r=   _assert_unsharded  s   

zF_PipelineScheduleRuntime._step_microbatches.<locals>._assert_unshardedzaction=z missing mb_indexr   z8_PipelineScheduleRuntime running time_step %d, action %szARecv twice for {stage_idx=} {mb_index=} without executing forwardzBRecv twice for {stage_idx=} {mb_index=} without executing backwardzUnsharding the same stage_idx=z twiceT)async_opzResharding stage_idx=z without unshardingz before finishing unshardzComputing action=z before receiving inputz Attempted to run compute action=r  Fr  z is unknown or unsupportedz\_PipelineScheduleRuntime caught exception at step %s when running action %s.  Full Schedule:r  zUnused unshard operations)9r   re  rj  ra  r  r  rU   r   rs   r   rI   rJ   r/   r0   rH   r   submodr   r   r   r1   r   r   r   r3   r  r2   r   r4   r	  unshardreshardr6   rh  r   r   r   r   set_local_fwd_inputr9   r   r   r   r   r
  set_local_bwd_inputget_local_bwd_outputr7   r8   r  r   r  r  printr   rV   r   )r:   r   r   r   r   r  bwd_recv_opsfwd_recv_opssend_opsr  r  r  rA   	comp_typer   rC  r   stage_uses_fsdpis_next_stage_on_this_rankis_prev_stage_on_this_rankr   r   r  r  r  r<   r  r=   r     s  





















z+_PipelineScheduleRuntime._step_microbatchesr  r   )rC   rD   rE   r   r   rU   r|   r
   rG   rL   r  r~  rv  r  r   r   r<   r<   r   r=   r    s0    
#	r  c                       sn   e Zd ZdZ			ddee dedeee	e
f  deeeeef ee f  def
 fd	d
Zdd Z  ZS )r   ai  
    Breadth-First Pipeline Parallelism.
    See https://arxiv.org/abs/2211.05953 for details.
    Simliar to Interleaved 1F1B, Looped BFS supports multiple stages per rank.
    What is different is that when microbatches are ready for multiple local
    stages, Loops BFS will prioritizes the earlier stage, running all available
    microbatches at once.
    NTr   r   r   r   r   c                    sD   t  j|||||d i | _t| jD ]}| |}|| j|< qd S )N)r   r   r   r   r   )r   r   rX   ry   rP  !_calculate_single_rank_operations)r:   r   r   r   r   r   r   rank_opsr   r<   r=   r     s   
zScheduleLoopedBFS.__init__c                    s   t | j}t|| j| | j}dd t|D }|D ] | fddt| jD  qd| jd |  }|d g|  t|D ] | fddtt| jD  qA|S )Nc                 S      g | ]}d qS rK   r<   r[   r  r<   r<   r=   rb         zGScheduleLoopedBFS._calculate_single_rank_operations.<locals>.<listcomp>c                 3       | ]
}t  tj|V  qd S rK   )rG   r"   r6   r[   r   r&  r<   r=   r]     
    
zFScheduleLoopedBFS._calculate_single_rank_operations.<locals>.<genexpr>r#   r   c                 3   r  rK   )rG   r"   r9   r  r&  r<   r=   r]   !  r  )rV   ra  ry   rP  r   r   reversed)r:   r   n_local_stagesstage_indicesr  post_warmup_opsr<   r&  r=   r    s    


z3ScheduleLoopedBFS._calculate_single_rank_operations)NNT)rC   rD   rE   r   r|   r   rU   r
   r   r   r   r   rL   r   r   r   r   r  r   r<   r<   r   r=   r     s"    r   Fc
                 C   s  t t}
t t}t t}dd t|D }| | d|d |   ||  }|	r-|| d }|| | }g }d}|	r;tnt}t|D ]}||k rn||}|
|  }d |
|< |t|tj| ||d krm|	d g|  qA||  krz|| k rn n[||}|
|  }d |
|< |t|tj| ||}||  }d ||< |t||| || |	r|| |kr||| }||  }d ||< |t|tj
| |d7 }qA|	s|d  ||}||  }d ||< |t||| || |	r"|| |kr"||| }||  }d ||< |t|tj
| |d7 }qA|	rU|t|k rU||| }||  }d ||< |t|tj
| |d7 }|	rU|t|k s-|S )Nc                 S   r  rK   r<   r  r<   r<   r=   rb   ;  r  z&_get_1f1b_rank_ops.<locals>.<listcomp>r#   r   r   )r   rU   ry   r7   r9   r   rG   r"   r6   r   r8   rV   )r  rP  
warmup_opsfwd_bwd_opscooldown_opsr   forward_stage_indexbackward_stage_indexnum_1f1b_microbatchesenable_zero_bubblefwd_stage_mb_indexbwd_stage_mb_indexweight_stage_mb_indexr  r  	total_opsbackward_op_idsweight_op_countFULL_BACKWARD_OR_BACKWARD_INPUTr   fwd_stage_indexr   r  bwd_stage_indexr  weight_stage_indexweight_mb_indexr<   r<   r=   _get_1f1b_rank_ops(  s   	












r  c                          e Zd ZdZ					ddee dedee dee	e
df  d	eeee
f  d
eeeeef e	e f  def fddZdeee  fddZ  ZS )r   a  
    The Interleaved 1F1B schedule.
    See https://arxiv.org/pdf/2104.04473 for details.
    Will perform one forward and one backward on the microbatches in steady
    state and supports multiple stages per rank. When microbatches are ready for
    multiple local stages, Interleaved 1F1B prioritizes the earlier microbatch
    (also called "depth first").

    This schedule is mostly similar to the original paper.
    It differs by being relaxing the requirement of num_microbatch % pp_size == 0.
    Using the flex_pp schedule, we will have num_rounds = max(1, n_microbatches // pp_group_size) and
    it works as long as n_microbatches % num_rounds is 0. As a few examples, support

    1. pp_group_size = 4, n_microbatches = 10. We will have num_rounds = 2 and n_microbatches % 2 is 0.
    2. pp_group_size = 4, n_microbatches = 3. We will have num_rounds = 1 and n_microbatches % 1 is 0.
    NTr   r   r   r   .r   r   r   c           
   	      s   |d j | _t j|||||||d t|| _|d j| _td|| j | _	|| j	 | _
|| j	 dkr@td| j	 d| di | _t| jD ]}| |}	|	| j|< qHd S )Nr   r   r   r   r   r   r   r   r   z_Interleaved 1F1B requires the number of microbatches to be a multiple of the number of rounds (), but got .)rb  rP  r   r   rV   r  rc  r   rj   number_of_roundsmicrobatches_per_roundr   rX   ry   r  )
r:   r   r   r   r   r   r   r   r   r  r   r<   r=   r     s6   

	
z ScheduleInterleaved1F1B.__init__rZ   c           	   	      s   fdd}| j j }| }|| }| | }td |||  fdd} fdd}tj j|| ||S )Nc                    s<    j d  j }d}|| jd |    }t| j j  S )Nr   r#   r  r  rP  r  r   r   warmups_ops_last_stagemultiply_factorr  r   r<   r=   get_rank_warmup_ops     zVScheduleInterleaved1F1B._calculate_single_rank_operations.<locals>.get_rank_warmup_ops=rank %s, warmup_ops %s, 1f1b %s, cooldown_ops %s total_ops %sc                       | j  j }|j   S rK   r  r  rP  r   local_indexr   r:   r<   r=   r       zVScheduleInterleaved1F1B._calculate_single_rank_operations.<locals>.forward_stage_indexc                    ,   j d |  j j   }|j   S Nr   r  r  rP  r  r   r:   r  r<   r=   r       zWScheduleInterleaved1F1B._calculate_single_rank_operations.<locals>.backward_stage_indexr  r   r   r   r  rP  )	r:   r   r  microbatch_opsr  r  r  r  r  r<   r  r=   r    s4   
	z9ScheduleInterleaved1F1B._calculate_single_rank_operationsr   rC   rD   rE   r   r|   r   rU   r
   r   r   r   r   rL   r   r   r   r   rG   r  r   r<   r<   r   r=   r     s.    &r   c                       s   e Zd ZdZ					ddee dedee dee	e
df  d	eeee
f  d
eeeeef e	e f  def fddZdeee  fddZdd Z  ZS )r    aw  
    The Interleaved Zero Bubble schedule.
    See https://arxiv.org/pdf/2401.10241 for details.
    Will perform one forward and one backward on inputs for the microbatches in steady
    state and supports multiple stages per rank. Uses the backward for weights to fill in
    the pipeline bubble.

    In particular this is implementing the ZB1P schedule in the paper.
    NTr   r   r   r   .r   r   r   c              	      s   |D ]}t |jtrtdq|d j| _t j|||||||d t|| _	|d j
| _td|| j | _|| j | _|| j dkrOtd| j d| di | _t| jD ]}	| |	}
|
| j|	< qW| | j	| j | _d S )NzYThe Zero Bubble schedule is not supported with stage modules that have used torch.compiler   r  r   zZZero bubble requires the number of microbatches to be a multiple of the number of rounds (r  r  )r   r  r   r?   rb  rP  r   r   rV   r  rc  r   rj   r  r  r   rX   ry   r  _add_bubbles_to_actionsr:   r   r   r   r   r   r   r   r   r   r  r   r<   r=   r   0  sF   
	


z&ScheduleInterleavedZeroBubble.__init__rZ   c           
         s   fdd}| j j }| }|| }| | }td |||  fdd} fdd} }	tj j|| |||	dd	
S )
Nc                    s<    j d  j }d}|| jd |    }t| j j  S r  r  r  r   r<   r=   r  g  r  z\ScheduleInterleavedZeroBubble._calculate_single_rank_operations.<locals>.get_rank_warmup_opsr  c                    r  rK   r  r  r  r<   r=   r    r  z\ScheduleInterleavedZeroBubble._calculate_single_rank_operations.<locals>.forward_stage_indexc                    r  r  r  r  r  r<   r=   r    r  z]ScheduleInterleavedZeroBubble._calculate_single_rank_operations.<locals>.backward_stage_indexT)r  r  )
r:   r   r  r  r  r  r  r  r  r  r<   r  r=   r  f  s:   	z?ScheduleInterleavedZeroBubble._calculate_single_rank_operationsc                 C   sr  | j }dd }t }i }i }i }d}t| jD ]}	g ||	< d||	< d||	< q	 d}
t }t| jD ]q}	||	 }|t||	 kr@q1d}
||	 | d ur||	 | }|d usVJ |\}}}||||||s||	 ||	 |  |d urz||||f ||	  d7  < q1||	 d  ||	  d7  < q1||	  d7  < ||	 d  q1|| |
rnq'|dkrt	d|| |S )Nc                 S   sh   |t jkr| dkr| d ||f|vrdS dS |t jkr2| |d kr)| t j|f|vS | d ||f|vS dS )Nr   r   TF)r"   r6   r9   )r   r   
microbatchnum_stages_globalseen_opsr<   r<   r=   need_bubble  s   

zJScheduleInterleavedZeroBubble._add_bubbles_to_actions.<locals>.need_bubbler   TFr   z?Non zero bubbles added: total_bubbles_added=%s bubbles_added=%s)
rX   r  ry   rP  rV   r   r   updater   rg  )r:   r  r\   r  r  resultnext_pointerbubbles_addedtotal_bubbles_addedr   should_stoptemp_seen_ops	timestamptemp_actionrH   r   r  r<   r<   r=   r    sZ   




"z5ScheduleInterleavedZeroBubble._add_bubbles_to_actionsr   )rC   rD   rE   r   r|   r   rU   r
   r   r   r   r   rL   r   r   r   r   rG   r  r  r   r<   r<   r   r=   r    %  s0    6@r    c                       r  )r!   a  
    The Zero Bubble schedule (ZBV variant).
    See https://arxiv.org/pdf/2401.10241 Section 6 for details.

    This schedules requires exactly two stages per rank.

    This schedule will perform one forward and one backward on inputs for the microbatches in steady
    state and supports multiple stages per rank. Uses backward with respect to weights to fill in
    the pipeline bubble.

    This ZB-V schedule would have the "zero bubble" property only if time forward == time backward input == time backward weights.
    In practice, this is not likely true for real models so alternatively
    a greedy scheduler could be implemented for unequal/unbalanced time.
    NTr   r   r   r   .r   r   r   c              	      s   |d j | _t j|||||||d t| j| jdd| _| jD ]}| j|_q t|| _	| j	dkr:t
d| j	 d|d j| _|d j| _i | _t| jD ]}	| |	}
|
| j|	< qNd S )Nr   r  v)styler#   z0ZBV requires exactly 2 stages per rank, but got r  )rb  rP  r   r   r   r   rd  ra  rV   r  r   rc  r   r   rX   ry   r  r  r   r<   r=   r     s:   
	





zScheduleZBVZeroBubble.__init__rZ   c                    s>  t d j d  j}dd t|D }d\}}}}d j|  d }|}	 jd | }
t|D ]}|t|	t|d |d7 }q0|}t|D ]}|t|
t|d |d7 }|t|	t|d |d7 }qG j| }t|D ](}|t|
t|d |d7 }|t|
t|d |t|
t	|d |d7 }qo||k s||k r||k r|t|	t|d |d7 }|t|	t|d |t|	t	|d |d7 }|t|
t|d |d7 }|t|
t|d |t|
t	|d |d7 }||k s||k s||}}|}t|D ]}|t|	t|d |d7 }|t|
t|d |d7 }q j| }t|D ]}|t|	t|d |d7 }|t|	t	|d |d7 }q,||k rd|t|
t	|d |d7 }||k sQ||k r||t|	t	|d |d7 }||k si||kr||ksJ ||kr||ksJ  fdd|D }|S )Nr#   r   c                 S   r  rK   r<   r  r<   r<   r=   rb   )	  r  zKScheduleZBVZeroBubble._calculate_single_rank_operations.<locals>.<listcomp>)r   r   r   r   )rI   rJ   c                    s2   g | ]}|d ur|j d ur|j  jk r|nd qS rK   )rJ   r   )r[   rA   r   r<   r=   rb   	  s    
)
rj   rP  r   ry   r   r   rG   r,   r-   r.   )r:   r   n_micror  f0_cntf1_cntb0_cntb1_cnt	warmup_n1stage_id_chunk0stage_id_chunk1r  	warmup_n2	warmup_n3w0_cntw1_cntcooldown_n1cooldown_n2r<   r   r=   r  %	  s   











z7ScheduleZBVZeroBubble._calculate_single_rank_operationsr   r  r<   r<   r   r=   r!     s.    ,r!   schedule_namec              	   C   s`   t tttttttd}dd | D }| 	 }||vr*t
d|  dt|  |||  S )z
    Maps a schedule name (case insensitive) to its corresponding class object.

    Args:
        schedule_name (str): The name of the schedule.
    )1F1BInterleaved1F1BGPipe	LoopedBFSInterleavedZeroBubbler   r   ZBVZeroBubblec                 S   s   i | ]}|  |qS r<   )lower)r[   kr<   r<   r=   r>  	  r{  z&get_schedule_class.<locals>.<dictcomp>zUnknown schedule name 'z'. The valid options are )r   r   r   r   r    r   r   r!   r  r  r   r|   )r  schedule_maplowercase_keyslowercase_schedule_namer<   r<   r=   r   	  s    
r   c           	         s  fddt D dd t D dd D  dtdtt f fdd}dtt d	tf fd
d}rd}t D ]-}t| dkrKq@| d }||rh|dur^||| | d d}q@||d q@t ddD ]}t| dkr|= qtt D ]6}t| dkrq| d durq| d }||r|dur|| d<  | | | d qt ddD ]}t| dkrЈ|= q|stdt	 D ]}td|d| d   qt
ds:S )a  This function dry-run simulates the actions in the schedule from the perspective of all ranks, and flags
    any deadlocks caused by missing or misordered communications.  It also simulates any bubbles in time where a rank
    can not execute any action due to waiting for unmet dependencies.  The total number of simulator steps can be used
    as a metric for unit tests involving IR optimization passes as reordering and merging of IR can reduce the number
    of simulated steps.

    The simulation is not high-fidelity and does not model overlapping of compute and communication, or cuda streams.
    Future work may be to enhance this and model the compute time, comms overlap, and even memory.
    c                    s    i | ]}|d d  | D qS )c                 S   s   g | ]}|d ur|qS rK   r<   )r[   r#  r<   r<   r=   rb   	  rg   z6_simulate_comms_compute.<locals>.<dictcomp>.<listcomp>r<   r=  )rX   r<   r=   r>  	  s    z+_simulate_comms_compute.<locals>.<dictcomp>c                 S   r<  r<   r<   r=  r<   r<   r=   r>  	  s    c                 S   r?  r<   r@  r=  r<   r<   r=   r>  	  rA  r   rA   c                    s,   |   | |d ur |  | d S d S rK   )r   r   )r   rA   )_prev_ops_rank	_scheduler<   r=   add_to_schedule	  s   z0_simulate_comms_compute.<locals>.add_to_schedulerZ   c                    s  | d u rdS | j } | }| jtkr7| j dkrdS t| j t| j|v r'dS t| j d t| j|v r5dS dS | jttfv rq| j d krGdS t| j t| j|v rSdS t| j d t| j|v radS t| j d t| j|v rodS dS | jt	krxdS | jt
krt| j t| j}||v S | jtkr|d }t|t
| j}| | v S | jtkrt| j t| j}t| j t| j}||v p||v S | jtkr|d }t|t| j}| | v S td|  )NTr   r   FzUnsupported action type )rH   rI   r,   rG   r2   rJ   r7   r9   r4   r8   r1   r3   r   )rA   rC  prev_ops
expected_fpeer_stage_idxexpected_send
expected_bexpected_bw)r  r   r;  r<   r=   rK  	  s`   








z3_simulate_comms_compute.<locals>._ready_to_scheduleFr   NT)reverser  zWIP comms schedule:
rL  z next action= zSchedule is not progressing)r{   rU   r
   rG   r   rV   r   r   r  r   r   )	rX   r;  r   r  rK  rN  r   rA   r`   r<   )r  r  r   rX   r;  r=   r  	  s`   
:
-r  c                 C   s   g }t | D ]+}t| | D ]"\}}|du rq|t||jtttfv r&dndd|||dd qqddl}t	|d}|
d	|i| W d   dS 1 sOw   Y  dS )
a  
    This function dumps a schedule IR into a chrometrace format so it can be visualized.

    It is currently very basic and only serves as a graphical alternative to dumping the schedule IR as text.

    As future work we may extend this to include more accurate heuristics for durations, or let users input durations,
    add 'flow events' to let the UI show the connection between sends and recvs, and model cuda streams for comm/compute
    as separate streams on the chrometrace view.
    NcomputationcommunicationXr   )r   catphpidtidtsdurr   rl  traceEvents)r{   rs   r   rL   rI   r,   r5   r.   jsonro  dump)schedulert  eventsr   timesteprA   r.  fr<   r<   r=   _dump_chrometrace;
  s,   
"r4  rK   )r$   )r   F)[rw   rp  r}   loggingreabcr   r   collectionsr   r   enumr   typingr   r   r	   r
   r   r   r   torch.distributeddistributedr   torch._dynamor   torch.distributed.fsdpr   r   torch.nn.modules.lossr   torch.profilerr   _utilsr   r  r   r   r   r   r   r   __all__	getLoggerrC   r   r"   r6   r7   r8   r/   r0   r1   r2   r3   r4   r9   r,   r-   r.   r5   compilerR   rG   r   rU   r|   rL   r   r   P2POpr   r   r   r   r   r   r7  r:  rO  r\  r   r  r   r  r   r    r!   r   r  r4  r<   r<   r<   r=   <module>   s    
6%
> 7
S3[ 

F

#
i

L  U  HI
 
t E 4
 