
    ZiP'                         d dl Z d dlZd dlmZmZmZ d dlmZ d dlm	Z	 d dl
mZmZ d dlmZ 	  G d de j                  Zed	k    r e j                     dS dS )
    N)Mockpatch	MagicMock)ABNF)	WebSocket)WebSocketProtocolExceptionWebSocketPayloadException)SSLErrorc                   >    e Zd Zd Zd Zd Zd Zd Zd Zd Z	d Z
d	S )
LargePayloadTestc                    g d}|D ]\  }}|                      ||          5  d|z  }t          j        |t          j                  }|                                }|                     |t                     |                     t          |          |k               | 	                    t          |j
                  |           ddd           n# 1 swxY w Y   dS )z:Test WebSocket frame length encoding at various boundaries)	)}   zSingle byte length)~   zTwo byte length start)   zTwo byte length)  zTwo byte length max)   zEight byte length start) @  z16KB boundary)@  zJust over 16KB)   32KB)i   128KB)lengthdescription   AN)subTestr   create_frameOPCODE_BINARYformatassertIsInstancebytes
assertTruelenassertEqualdata)self
test_casesr   r   payloadframe	formatteds          ^/root/projects/butler/venv/lib/python3.11/site-packages/websocket/tests/test_large_payloads.py%test_frame_length_encoding_boundariesz6LargePayloadTest.test_frame_length_encoding_boundaries   s#   

 

 


 $. 	: 	:FKVEE : :- )'43EFF "LLNN	 %%i777I& 8999   UZ&999: : : : : : : : : : : : : : :	: 	:s   B"CC	C	c                 p  	 ddz  }g 	d}t          dt          |          |          D ]"}	                    ||||z                       #d	fd}ddlm}  ||d          }|                    t          |                    }|                     ||           |                     d	           d
S )zHTest receiving large payloads in chunks (simulating the 16KB recv issue)   Br   r   r   c                 L    t                    k    rdS          }dz  |S N       r"   bufsizeresult
call_countchunkss     r*   	mock_recvzCLargePayloadTest.test_recv_large_payload_chunked.<locals>.mock_recvO   1    S[[((sJ'F!OJMr0   frame_bufferTskip_utf8_validationr1   N)ranger"   appendwebsocket._abnfr;   recv_strictr#   assertGreater)
r%   large_payload
chunk_sizeir8   r;   fbr5   r6   r7   s
           @@r*   test_recv_large_payload_chunkedz0LargePayloadTest.test_recv_large_payload_chunkedA   s     u 
q#m,,j99 	= 	=AMM-A
N(:;<<<<
	 	 	 	 	 	 	100000\)$??? M 2 233///:q)))))r0   c                 N   d}g fd}ddl m}  ||d          }|                    |          }|                     t	          |          |           |                     t          d D                                  |                     t	                    d           d	S )
z&Simulate SSL BAD_LENGTH error scenarior   c                 ~                         |            | dk    rt          d          dt          | d          z  S )Nr   z[SSL: BAD_LENGTH] unknown error   C)r?   r
   min)r4   
recv_callss    r*   mock_recv_with_ssl_limitzTLargePayloadTest.test_ssl_large_payload_simulation.<locals>.mock_recv_with_ssl_limitk   sD    g&&&@AAA#gu----r0   r   r:   Tr<   c              3   "   K   | ]
}|d k    V  dS )r   N ).0calls     r*   	<genexpr>zELargePayloadTest.test_ssl_large_payload_simulation.<locals>.<genexpr>|   s&      AAdDEMAAAAAAr0   r1   N)r@   r;   rA   r#   r"   r!   allrB   )r%   payload_sizerM   r;   rF   r5   rL   s         @r*   !test_ssl_large_payload_simulationz2LargePayloadTest.test_ssl_large_payload_simulationc   s     
	. 	. 	. 	. 	. 	100000\2NNN -- 	Vl333AAjAAAAABBB3z??A.....r0   c                     g d}|D ]}|                      |          5  d|z  }t          j        |t          j                  }|                                }|                     |t                     |                     t          |j	                  |           d}|t          j
        k     rd|z   }n|t          j        k     rd|z   }nd|z   }|                     t          |          ||z              ddd           n# 1 swxY w Y   dS )z6Test frame formatting with various large payload sizes)i?  r   r   r   r   r   )size   D      
   N)r   r   r   r   r   r   r    r#   r"   r$   LENGTH_7	LENGTH_16)r%   
test_sizesrW   r'   r(   r)   	mask_sizeexpected_header_sizes           r*    test_frame_format_large_payloadsz1LargePayloadTest.test_frame_format_large_payloads   s    @??
 	N 	ND4(( N N+)'43EFF "LLNN	 %%i777  UZ$777 	$-'' I )( DN** I )( Y )   Y1E1LMMM=N N N N N N N N N N N N N N N	N 	Ns   CDD	D	c                 P   t                      }g fd}||_        d|j        _        t	                      }||_        d|_        ddz  }t          d          5 }d |_        |	                    |          }| 
                    |d           d	d	d	           d	S # 1 swxY w Y   d	S )
z?Test that large payloads are sent in chunks to avoid SSL issuesc                 f                         t          |                      t          |           S N)r?   r"   )r$   sent_chunkss    r*   	mock_sendzDLargePayloadTest.test_send_large_payload_chunking.<locals>.mock_send   s)    s4yy)))t99r0   g      >@T   Er   zwebsocket._core.sendc                      t          |          S rd   r2   )sockr$   s     r*   <lambda>zCLargePayloadTest.test_send_large_payload_chunking.<locals>.<lambda>   s    CII r0   r   N)r   send
gettimeoutreturn_valuer   ri   	connectedr   side_effectsend_binaryrB   )r%   	mock_sockrf   wsrC   mock_send_funcr5   re   s          @r*    test_send_large_payload_chunkingz1LargePayloadTest.test_send_large_payload_chunking   s    FF	 	 	 	 	 	 #	,0	) [[ u )** 	*n)E)EN& ^^M22F vq)))	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	*s   4BB"Bc           	         ddz  }t          j        |t           j                  }|                                }|                     |t
                     t          j        dd          dz   }t          ddddt           j        d|          }| 	                    t                    5  |                    d	           d
d
d
           d
S # 1 swxY w Y   d
S )z.Test UTF-8 validation with large text payloadsu   Hello 世界! i  z!Hi  s    invalid utf8r1   r   Fr<   N)r   r   OPCODE_TEXTr   r   r    structpackOPCODE_CLOSEassertRaisesr   validate)r%   
large_textr(   r)   invalid_utf8_close_datas        r*   test_utf8_validation_large_textz0LargePayloadTest.test_utf8_validation_large_text   s    &,
 !*d.>?? LLNN	i/// #)+dD"9"9<T"T Q1a!2A7NOO 9:: 	7 	7NNN666	7 	7 	7 	7 	7 	7 	7 	7 	7 	7 	7 	7 	7 	7 	7 	7 	7 	7s   %C		CCc                 "   ddz  fdt          dt                    d          D             dfd}ddlm}  ||d	          }|                    d          }|                     |           |                     d
           dS )z9Test frame buffer with edge cases that could trigger bugs   Fr   c                 *    g | ]}||d z            S )   rO   )rP   rE   payload_16ks     r*   
<listcomp>zALargePayloadTest.test_frame_buffer_edge_cases.<locals>.<listcomp>   s&    VVV+a!d(l+VVVr0   r   r   c                 L    t                    k    rdS          }dz  |S r/   r2   r3   s     r*   r8   z@LargePayloadTest.test_frame_buffer_edge_cases.<locals>.mock_recv   r9   r0   r:   Tr<   rY   N)r>   r"   r@   r;   rA   r#   )r%   r8   r;   rF   r5   r6   r7   r   s        @@@r*   test_frame_buffer_edge_casesz-LargePayloadTest.test_frame_buffer_edge_cases   s     Ul WVVVU1c+>N>NPT5U5UVVV
	 	 	 	 	 	 	100000\)$???&&---Q'''''r0   c                     d}d|z  }t          j        |t           j                  }|                                }|                     |t
                     |                     t          |j                  |           dS )z4Test behavior at WebSocket maximum frame size limitsi      GN)	r   r   r   r   r   r    r#   r"   r$   )r%   
large_sizer'   r(   r)   s        r*   test_max_frame_size_limitsz+LargePayloadTest.test_max_frame_size_limits   su     !
# !'4+=>> LLNN	i/// 	UZ*55555r0   N)__name__
__module____qualname__r+   rG   rU   ra   rt   r~   r   r   rO   r0   r*   r   r      s         :  :  :D *  *  *D/ / /:%N %N %NN* * *B7 7 7.( ( (86 6 6 6 6r0   r   __main__)unittestrw   unittest.mockr   r   r   r@   r   websocket._corer   websocket._exceptionsr   r	   websocket._ssl_compatr
   TestCaser   r   mainrO   r0   r*   <module>r      s      0 0 0 0 0 0 0 0 0 0             % % % % % % W W W W W W W W * * * * * *&o6 o6 o6 o6 o6x( o6 o6 o6d zHMOOOOO r0   