o
    lhL0                     @   s   d dl Z d dlZ d dlZd dlZd dlZd dlZ	 e jdddkZ	e jddZ
e
dkZdZG dd	 d	ejZed
krBe  dS dS )    NTEST_WITH_INTERNET01LOCAL_WS_SERVER_PORTz-1Tc                   @   s6  e Zd ZG dd dZdd Zdd Zdd Zee	d	d
d Z
edddd Zee	d	dd Zeeddd Zeeddd Zeeddd Zeeddd Zeeddd Zeeddd Zeeddd  Zee	d	d!d" Zee	d	d#d$ Zee	d	d%d& Zd'S )(WebSocketAppTestc                   @   s   e Zd ZdZdS )zWebSocketAppTest.NotSetYetz?A marker class for signalling that a value hasn't been set yet.N)__name__
__module____qualname____doc__ r   r   l/var/www/html/construction_image-detection-poc/venv/lib/python3.10/site-packages/websocket/tests/test_app.py	NotSetYet'   s    r   c                 C   s6   t t t t_t t_t t_t t_d S N)	wsenableTrace	TRACEABLEr   r   keep_running_openkeep_running_closeget_mask_key_idon_error_dataselfr   r   r   setUp*   s
   



zWebSocketAppTest.setUpc                 C   s,   t  t _t  t _t  t _t  t _d S r   )r   r   r   r   r   r   r   r   r   r   tearDown2   s   


zWebSocketAppTest.tearDownc                 C   s   d S r   r   r   r   r   r   close8      zWebSocketAppTest.closez/Tests using local websocket server are disabledc                    s@   dd } fdd}dd }t jdt |||d}|  d	S )
{A WebSocketApp should keep running as long as its self.keep_running
        is not False (in the boolean context).
        c                 _   s   |  d | jt_d| _dS )zmSet the keep_running flag for later inspection and immediately
            close the connection.
            hello!FN)sendkeep_runningr   r   r   argskwargsr   r   r   on_openC   s   

z3WebSocketAppTest.test_keep_running.<locals>.on_openc                       t |    d S r   printr   _messager   r   r   
on_messageK      z6WebSocketAppTest.test_keep_running.<locals>.on_messagec                 _   s   | j t_dS )z.Set the keep_running flag for the test to use.N)r   r   r   r    r   r   r   on_closeO   s   z4WebSocketAppTest.test_keep_running.<locals>.on_closews://127.0.0.1:)r#   r,   r*   Nr   WebSocketAppr   run_forever)r   r#   r*   r,   appr   r   r   test_keep_running;   s   z"WebSocketAppTest.test_keep_runningFz$Test disabled for now (requires rel)c                    s:   dd } fdd}t jdt ||d}|jdd d	S )
r   c                 _   s    |  d |   |  d dS )z*Send a message, receive, and send one morer   zgoodbye!N)r   recvr    r   r   r   r#   b   s   
z=WebSocketAppTest.test_run_forever_dispatcher.<locals>.on_openc                    r$   r   r%   r'   r   r   r   r*   h   r+   z@WebSocketAppTest.test_run_forever_dispatcher.<locals>.on_messager-   )r#   r*   
Dispatcher)
dispatcherNr.   )r   r#   r*   r1   r   r   r   test_run_forever_dispatcher\   s   z,WebSocketAppTest.test_run_forever_dispatcherc                 C   s<   t dt }tjd|jd  | }| |d dS )zaThe WebSocketApp.run_forever() method should return `False` when the application ends gracefully.r-   g?)intervalfunctionFN)	r   r/   r   	threadingTimerr   startr0   assertEqual)r   r1   teardownr   r   r   $test_run_forever_teardown_clean_exitv   s   z5WebSocketAppTest.test_run_forever_teardown_clean_exitz%Internet-requiring tests are disabledc                 C   s0   dd }t jd|d}| t|jt| dS )zhA WebSocketApp should forward the received mask_key function down
        to the actual socket.
        c                   S   s   dS )Nz    r   r   r   r   r   my_mask_key_func   r   z=WebSocketAppTest.test_sock_mask_key.<locals>.my_mask_key_funcwss://api-pub.bitfinex.com/ws/1)get_mask_keyN)r   r/   r<   idrA   )r   r?   r1   r   r   r   test_sock_mask_key   s
   z#WebSocketAppTest.test_sock_mask_keyc                 C   sB   dd }dd }t jd||d}| jt j|jddd	tjid
 dS )z7Test exception handling if ping_interval < ping_timeoutc                 S      t d |   d S NzGot a ping!r%   r1   r(   r   r   r   on_ping   r+   zIWebSocketAppTest.test_invalid_ping_interval_ping_timeout.<locals>.on_pingc                 S   rD   NzGot a pong! No need to respondr%   rF   r   r   r   on_pong   r+   zIWebSocketAppTest.test_invalid_ping_interval_ping_timeout.<locals>.on_pongr@   rG   rI         	cert_reqsping_intervalping_timeoutssloptNr   r/   assertRaisesWebSocketExceptionr0   ssl	CERT_NONEr   rG   rI   r1   r   r   r   'test_invalid_ping_interval_ping_timeout   s   
z8WebSocketAppTest.test_invalid_ping_interval_ping_timeoutc                 C   s:   dd }dd }t jd||d}|jddd	tjid
 dS )z+Test WebSocketApp proper ping functionalityc                 S   rD   rE   r%   rF   r   r   r   rG      r+   z4WebSocketAppTest.test_ping_interval.<locals>.on_pingc                 S   rD   rH   r%   rF   r   r   r   rI      r+   z4WebSocketAppTest.test_ping_interval.<locals>.on_pongr@   rJ   rL   rK   rM   rN   N)r   r/   r0   rU   rV   rW   r   r   r   test_ping_interval   s   
z#WebSocketAppTest.test_ping_intervalc                 C   s   t d}|jdddd dS )zTest WebSocketApp close opcode'wss://tsock.us1.twilio.com/v3/wsconnectrL   rK   zPing payload)rO   rP   ping_payloadN)r   r/   r0   r   r1   r   r   r   test_opcode_close   s   
z"WebSocketAppTest.test_opcode_closec                 C   *   t d}| jt j|jddtjid dS )z1A WebSocketApp handling of negative ping_intervalr@   rM   )rO   rQ   NrR   r\   r   r   r   test_bad_ping_interval      

z'WebSocketAppTest.test_bad_ping_intervalc                 C   r^   )z0A WebSocketApp handling of negative ping_timeoutr@   rM   )rP   rQ   NrR   r\   r   r   r   test_bad_ping_timeout   ra   z&WebSocketAppTest.test_bad_ping_timeoutc                 C   s   dd }t jd|d}t jt jjdd}| ddg|| t jt jjd	d}| d
d
g|| t d}t jt jjd	d}| d
d
g|| | jt j|jdd d
S )zKTest extraction of close frame status code and close reason in WebSocketAppc                 S   s   t d d S )Nzon_close reached)r&   )wsappclose_status_code	close_msgr   r   r   r,         z9WebSocketAppTest.test_close_status_code.<locals>.on_closerZ   )r,   s   no-init-from-client)opcodedatai  zno-init-from-client    Nztest if connection is closed)ri   )	r   r/   ABNFOPCODE_CLOSEr<   _get_close_argsrS   "WebSocketConnectionClosedExceptionr   )r   r,   r1   
closeframeapp2r   r   r   test_close_status_code   s$   

z'WebSocketAppTest.test_close_status_codec                    sx   d ddd } fdd}dd }t jdt |||d	}|jd
dd | | |  t | t d dS )z)Test callback function exception handlingNc                 S      t dNCallback failedRuntimeError)r1   r   r   r   r#        zBWebSocketAppTest.test_callback_function_exception.<locals>.on_openc                    s   | | d S r   r   )r1   errexc
passed_appr   r   on_error  s   zCWebSocketAppTest.test_callback_function_exception.<locals>.on_errorc                 S   s   |    d S r   r   rF   r   r   r   rI     rg   zBWebSocketAppTest.test_callback_function_exception.<locals>.on_pongr-   r#   r|   rI   rL   rK   rO   rP   rt   )r   r/   r   r0   r<   assertIsInstancerv   str)r   r#   r|   rI   r1   r   ry   r    test_callback_function_exception   s   z1WebSocketAppTest.test_callback_function_exceptionc                 C   sH   G dd d}| }|  |j|j | |jt |  t|jd dS )z'Test callback method exception handlingc                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
zBWebSocketAppTest.test_callback_method_exception.<locals>.Callbacksc                 S   s@   d | _ d | _tjdt | j| j| jd| _| jj	ddd d S )Nr-   r~   rL   rK   r   )
rz   r{   r   r/   r   r#   r|   rI   r1   r0   r   r   r   r   __init__$  s   zKWebSocketAppTest.test_callback_method_exception.<locals>.Callbacks.__init__c                 S   rr   rs   ru   )r   r(   r   r   r   r#   /  rw   zJWebSocketAppTest.test_callback_method_exception.<locals>.Callbacks.on_openc                 S   s   || _ || _d S r   )r{   rz   )r   r1   rx   r   r   r   r|   2  s   
zKWebSocketAppTest.test_callback_method_exception.<locals>.Callbacks.on_errorc                 S   s   |   d S r   r}   )r   r1   r(   r   r   r   rI   6  rg   zJWebSocketAppTest.test_callback_method_exception.<locals>.Callbacks.on_pongN)r   r   r	   r   r#   r|   rI   r   r   r   r   	Callbacks#  s
    r   rt   N)r<   r{   r1   r   rz   rv   r   )r   r   	callbacksr   r   r   test_callback_method_exception  s
   z/WebSocketAppTest.test_callback_method_exceptionc                    st   dd  fdd}fdd}t jdt ||d}|jd	d
dd | d	 |  t j | t d dS )zTest reconnectr   Nc                    s   | d S r   r   )r(   rx   )rz   r   r   r|   G  s   z1WebSocketAppTest.test_reconnect.<locals>.on_errorc                    s2    d7   dkr| j    dkr|   d S d S )NrK   rL   )sockshutdownr   rF   )
pong_countr   r   rI   K  s   
z0WebSocketAppTest.test_reconnect.<locals>.on_pongr-   )rI   r|   rL   rK      )rO   rP   	reconnectzping/pong timed out)r   r/   r   r0   r<   r   WebSocketTimeoutExceptionr   )r   r|   rI   r1   r   )rz   r   r   test_reconnect?  s   
zWebSocketAppTest.test_reconnectN)r   r   r	   r   r   r   r   unittest
skipUnlessTEST_WITH_LOCAL_SERVERr2   r6   r>   r   rC   rX   rY   r]   r`   rc   rq   r   r   r   r   r   r   r   r   &   sR    





















r   __main__)osos.pathrU   r9   r   	websocketr   environgetr   r   r   r   TestCaser   r   mainr   r   r   r   <module>   s"     ;