
    Zi                          d dl Z d dlZd dlZd dlmZ d dlZd dlmZmZm	Z	 	 d dl
mZ n# e$ r	 d dlmZ Y nw xY wdZdZ G d de          ZdS )	    N)Thread)APIErrorDatetimeSerializer
batch_post)Emptyi  i  P c                   h    e Zd ZdZ ej        d          Z	 	 	 	 	 	 	 	 dd	Zd
 Zd Z	d Z
d Zd ZdS )Consumerz.Consumes the messages from the client's queue.posthogd   N      ?F
      c                     t          j        |            d| _        || _        || _        || _        || _        || _        || _        || _	        d| _
        || _        |	| _        |
| _        dS )zCreate a consumer thread.TN)r   __init__daemonflush_atflush_intervalapi_keyhoston_errorqueuegziprunningretriestimeouthistorical_migration)selfr   r   r   r   r   r   r   r   r   r   s              K/root/projects/butler/venv/lib/python3.11/site-packages/posthog/consumer.pyr   zConsumer.__init__   sr     	 ,	 
	
 $8!!!    c                     | j                             d           | j        r|                                  | j        | j                             d           dS )zRuns the consumer.zconsumer is running...zconsumer exited.N)logdebugr   uploadr   s    r   runzConsumer.run=   sW    /000l 	KKMMM l 	 	)*****r   c                     d| _         dS )zPause the consumer.FN)r   r$   s    r   pausezConsumer.pauseE   s    r   c                    d}|                                  }t          |          dk    rdS 	 |                     |           d}nQ# t          $ rD}| j                            d|           d}| j        r|                     ||           Y d}~nd}~ww xY w|D ]}| j                                         |S # |D ]}| j                                         |c cY S xY w)z:Upload the next batch of items, return whether successful.Fr   Tzerror uploading: %sN)	nextlenrequest	Exceptionr!   errorr   r   	task_done)r   successbatcheitems        r   r#   zConsumer.uploadI   s   		u::??5	LLGG 	( 	( 	(HNN0!444G} (a'''		(  ' '
$$&&&&N  ' '
$$&&&&NNNNNNNNs/   A B6 
B:B	B6 BB6 6#Cc                    | j         }g }t          j                    }d}t          |          | j        k     rt          j                    |z
  }|| j        k    rn	 |                    d| j        |z
            }t          t          j        |t                    
                                          }|t          k    r)| j                            dt          |                     |                    |           ||z  }|t           k    r| j                            d|           n*n# t$          $ r Y nw xY wt          |          | j        k     |S )z)Return the next batch of items to upload.r   T)blockr   )clsz)Item exceeds 900kib limit, dropping. (%s)zhit batch size limit (size: %d))r   time	monotonicr*   r   r   getjsondumpsr   encodeMAX_MSG_SIZEr!   r-   strappendBATCH_SIZE_LIMITr"   r   )r   r   items
start_time
total_sizeelapsedr2   	item_sizes           r   r)   zConsumer.next^   sY   
^%%

%jj4=((n&&3G$---yytT5H75RySS
45G H H H O O Q QRR	|++HNNCSYY   T"""i'
!111HNN#DjQQQ 2    # %jj4=((( s   BD* (A D* *
D76D7c                      d }t          j        t           j        t           j        dz   |           fd            } |             dS )z=Attempt to upload the batch and retry before raising an errorc                     t          | t                    r-| j        dk    rdS d| j        cxk    odk     nc o
| j        dk    S dS )NzN/AFi  i  i  )
isinstancer   status)excs    r   fatal_exceptionz)Consumer.request.<locals>.fatal_exception   s_    #x(( 	 :&& 5sz////C////FSZ35FF ur      )	max_triesgiveupc                  d    t          j        j        j        j         j                   d S )N)r   r   r0   r   )r   r   r   r   r   r   )r0   r   s   r   send_requestz&Consumer.request.<locals>.send_request   sC     	Y%)%>     r   N)backoffon_exceptionexpor,   r   )r   r0   rJ   rO   s   ``  r   r+   zConsumer.request|   su    
	 
	 
	 
	L)t|a/?

 

 

	 	 	 	 	

 

	 	r   )r   NNr   Fr   r   F)__name__
__module____qualname____doc__logging	getLoggerr!   r   r%   r'   r#   r)   r+    r   r   r	   r	      s        88
'
I
&
&C "9 9 9 9B+ + +    *  <    r   r	   )r9   rW   r6   	threadingr   rP   posthog.requestr   r   r   r   r   ImportErrorQueuer<   r?   r	   rY   r   r   <module>r^      s              D D D D D D D D D D     # A A A A Av A A A A As   ) 77