
    Zi}K                        d dl mZ d dlZd dlmZmZ d dlmZmZm	Z	 d dl
mZ d dlmZmZmZmZmZ d dlmZmZmZmZmZ d dlmZ d d	lmZmZmZ d d
lmZ d dl m!Z! d dl"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z- d dl)Z)d dl.Z.d dl/m0Z0  e.j1        e2          Z3ej4        d ej5        dej6        dej7        diZ8d e89                                D             Z:dZ; G d deee          Z<dS )    )cached_propertyN)ConfigurationParameter$EmbeddingsQueueConfigurationInternal)SqlDBParameterValueget_sql)BatchSizeExceededError)ProducerConsumerConsumerCallbackFndecode_vectorencode_vector)OperationRecord	LogRecordScalarEncodingSeqId	Operation)System)OpenTelemetryClientOpenTelemetryGranularitytrace_method)override)defaultdict)SequenceOptionalDictSetTuplecast)UUID)Table	functions)create_topic_name         c                     i | ]\  }}||	S  r(   ).0kvs      `/root/projects/butler/venv/lib64/python3.11/site-packages/chromadb/db/mixins/embeddings_queue.py
<dictcomp>r-   /   s    BBBA1BBB    Fc                       e Zd ZU dZ G d d          Zeeee         f         ed<   e	e
         ed<   eed<   eed<   dZd	ef fd
Z edej                  ed: fd                        Z edej                  ededdfd                        Z edej                  ededdfd                        Z edej                  edededefd                        Z edej                  ededee         dee         fd                        Z edej                  e	 	 	 d;dedede	e         de	e         de	e         defd                        Z ed ej                  ed!eddfd"                        Zedefd#            Zedefd$            Ze  ed%ej                  ede
fd&                                    Z! ed'ej                  dede"e	e#         e	e         e	e         f         fd(            Z$ ed)ej                  d*eddfd+            Z% ed,ej                  de	e         de	e         de"e
e
f         fd-            Z& ed.ej                  de
fd/            Z' ed0ej                  d1edee(         ddfd2            Z) ed3ej                  d4edee(         ddfd5            Z*e+de,fd6            Z-d7e,ddfd8Z.de
fd9Z/ xZ0S )<SqlEmbeddingsQueuea  A SQL database that stores embeddings, allowing a traditional RDBMS to be used as
    the primary ingest queue and satisfying the top level Producer/Consumer interfaces.

    Note that this class is only suitable for use cases where the producer and consumer
    are in the same process.

    This is because notification of new embeddings happens solely in-process: this
    implementation does not actively listen to the the database for new records added by
    other processes.
    c            
       ^    e Zd ZU eed<   eed<   eed<   eed<   eed<   dededededef
dZdS )SqlEmbeddingsQueue.Subscriptionid
topic_namestartendcallbackc                 L    || _         || _        || _        || _        || _        d S N)r3   r4   r5   r6   r7   )selfr3   r4   r5   r6   r7   s         r,   __init__z(SqlEmbeddingsQueue.Subscription.__init__I   s+     DG(DODJDH$DMMMr.   N)	__name__
__module____qualname__r    __annotations__strintr   r;   r(   r.   r,   Subscriptionr2   B   s         


$$$$	%	% 	% 		%
 	% )	% 	% 	% 	% 	% 	%r.   rB   _subscriptions_max_batch_size_tenant_topic_namespace   systemc                 D   t          t                    | _        d | _        |                    t
                    | _        |j                            d          | _        |j                            d          | _	        t                                          |           d S )N	tenant_idtopic_namespace)r   setrC   rD   requirer   _opentelemetry_clientsettingsrE   rF   superr;   )r:   rH   	__class__s     r,   r;   zSqlEmbeddingsQueue.__init__^   s~    )#..#%+^^4G%H%H"..{;; & 7 78I J J     r.   zSqlEmbeddingsQueue.reset_statereturnNc                     t                                                       t          t                    | _        	 | `d S # t          $ r Y d S w xY wr9   )rP   reset_stater   rL   rC   configAttributeError)r:   rQ   s    r,   rT   zSqlEmbeddingsQueue.reset_statef   s[     	)#..	 	 	 	DD	s   A   
AAzSqlEmbeddingsQueue.delete_topiccollection_idc                    t          | j        | j        |          }t          d          }|                                                     |                              |j        t          |          k              	                                }| 
                                5 }t          ||                                           \  }}|                    ||           d d d            d S # 1 swxY w Y   d S )Nembeddings_queue)r#   rE   rF   r!   querybuilderfrom_wheretopicr   deletetxr   parameter_formatexecute)r:   rW   r4   tqcursqlparamss           r,   
delete_logzSqlEmbeddingsQueue.delete_logs   s    'L$/
 

 $%%U1XXU17nZ88899VXX	 	
 WWYY 	%#!!T%:%:%<%<==KCKKV$$$	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	%s   !<C**C.1C.zSqlEmbeddingsQueue.purge_logc                 "   t          d          }|                                                     |                              t	          j        t          d          j        d                                        |j        t          | 
                    |                    k                                  t          d                                        |j        t          d          j        k              }t          | j        | j        |          }|                                 5 }t'          ||                                           \  }}|                    ||           |                                }|rt/          d |D                       }	n	 d d d            d S t          d          }
|                                                     |
                              |
j        t          |	          k                                   |
j        t          |          k                                              }t'          ||                                           \  }}|                    ||           d d d            d S # 1 swxY w Y   d S )Nsegments
max_seq_idc              3   &   K   | ]}|d          V  dS )r   Nr(   )r)   rows     r,   	<genexpr>z/SqlEmbeddingsQueue.purge_log.<locals>.<genexpr>   s&       ; ;CQ ; ; ; ; ; ;r.   rY   )r!   rZ   r[   selectr"   Coalesceseq_idr\   
collectionr   
uuid_to_db	left_joinonr3   
segment_idr#   rE   rF   r_   r   r`   ra   fetchallminr]   r^   )r:   rW   
segments_tsegment_ids_qr4   rd   re   rf   results
min_seq_idrb   rc   s               r,   	purge_logzSqlEmbeddingsQueue.purge_log   s   
 :&&
U:
 VI&u\':':'A2FFGGU%8V8V)W)WW  Yu\**++R
|!4!4!??@@ 	 'L$/
 

 WWYY 	%#!-1F1F1H1HIIKCKKV$$$llnnG   ; ;7 ; ; ;;;

	% 	% 	% 	% 	% 	% 	% 	% ())A!!##qqx."<"<<==qw."<"<<==  "!T%:%:%<%<==KCKKV$$$'	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	%s   (A-J"CJJJz#SqlEmbeddingsQueue.submit_embedding	embeddingc                 h    | j         st          d          |                     ||g          d         S )NComponent not runningr   )_runningRuntimeErrorsubmit_embeddings)r:   rW   r~   s      r,   submit_embeddingz#SqlEmbeddingsQueue.submit_embedding   s:    
 } 	86777%%mi[AA!DDr.   z$SqlEmbeddingsQueue.submit_embeddings
embeddingsc                     | j         st          d          t          |          dk    rg S t          |          | j        k    r"t	          d| j        dd| j        dd          | j        }t          | j        | j        |          }t          d          }| 
                                                    |                              |j        |j        |j        |j        |j        |j                  }i }|D ]}|                     |          \  }	}
}|                    t+          t,          |d                            t+          |          t+          |d	                   t+          |	          t+          |
          t+          |                    }t          |          ||d	         <   |                                 5 }t1          ||                                           \  }}| d
}|                    ||                                          }t9          t:          d           gt          |          z  }g }|D ]n\  }}||||         <   |||                  }t=          |t?          ||d         |d         |d         |d                             }|                     |           o| !                    ||           | j        "                    d          j#        r| $                    |           |cd d d            S # 1 swxY w Y   d S )Nr   r   z)
                Cannot submit more than ,zf embeddings at once.
                Please submit your embeddings in batches of size
                z or less.
                rY   	operationr3   z RETURNING seq_id, idr~   encodingmetadata)r3   r~   r   r   r   
log_offsetrecordautomatically_purge)%r   r   lenmax_batch_sizer	   rU   r#   rE   rF   r!   rZ   intocolumnsr   r]   r3   vectorr   r   !_prepare_vector_encoding_metadatainsertr   _operation_codesr_   r   r`   ra   rw   r   r   r   r   append_notify_allget_parametervaluer}   )r:   rW   r   _r4   rb   r   	id_to_idxr~   embedding_bytesr   r   rd   re   rf   r{   seq_idsembedding_recordsrq   r3   submit_embedding_recordembedding_records                         r,   r   z$SqlEmbeddingsQueue.submit_embeddings   s   
 } 	86777z??aIz??T000()-)<   $      K&L$/
 

 $%%T!WWWQ[!'141:qzRR 	
 %'	# 	8 	8I
 66yAA	]]/	+0FGHHz**y////x((x(( F *-YIio&&WWYY  	#!&$*?*?*A*ABBKC ///Ckk#v..7799GE4(()C- - G !#% ; ;
)/	"&*4Yr]*C' $-%*"9+"F!8!D!8!D"9+"F  	$ 	$ 	$  "(()9::::Z):;;;{(()>??E .}---A 	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	s   ?D7LL
LzSqlEmbeddingsQueue.subscribe
consume_fnr5   r6   r3   c                 `   | j         st          d          t          | j        | j        |          }|pt          j                    }|                     ||          \  }}|                     |||||          }| 	                    |           | j
        |                             |           |S )Nr   )r   r   r#   rE   rF   uuiduuid4_validate_rangerB   	_backfillrC   add)	r:   rW   r   r5   r6   r3   r4   subscription_idsubscriptions	            r,   	subscribezSqlEmbeddingsQueue.subscribe  s     } 	86777&L$/
 

 ,
))%55
s((ZZ
 

 	|$$$J'++L999r.   zSqlEmbeddingsQueue.unsubscriber   c                     | j                                         D ]I\  }}|D ]A}|j        |k    r4|                    |           t	          |          dk    r| j         |=   d S BJd S )Nr   )rC   itemsr3   remover   )r:   r   r4   subscriptionsr   s        r,   unsubscribezSqlEmbeddingsQueue.unsubscribe.  s     *.)<)B)B)D)D 	 	%J -  ?o55!((666=))Q.. /
;FFF	 6	 	r.   c                     dS )Nrk   r(   r:   s    r,   	min_seqidzSqlEmbeddingsQueue.min_seqid9  s    rr.   c                     dS )Nl    r(   r   s    r,   	max_seqidzSqlEmbeddingsQueue.max_seqid=  s    yr.   z!SqlEmbeddingsQueue.max_batch_sizec                    | j         |                                 5 }|                    d           |                                }|D ]G}d|d         v r;t	          |d                             d          d                   | j        z  | _         H| j         d| j        z  | _         d d d            n# 1 swxY w Y   | j         S )NzPRAGMA compile_options;MAX_VARIABLE_NUMBERr   =r$   i  )rD   r_   ra   rw   rA   splitVARIABLES_PER_RECORD)r:   rd   compile_optionsoptions       r,   r   z!SqlEmbeddingsQueue.max_batch_sizeA  s    ' Lc5666"%,,..-  F,q	99/26!9??33G3G3J/K/K 50, '/ ,/$2K+KD(L L L L L L L L L L L L L L L  ##s   B
B22B69B6z4SqlEmbeddingsQueue._prepare_vector_encoding_metadatac                     |d         9t          t          |d                   }|j        }t          |d         |          }nd }d }|d         rt	          j        |d                   nd }|||fS )Nr~   r   r   )r   r   r   r   jsondumps)r:   r~   encoding_typer   r   r   s         r,   r   z4SqlEmbeddingsQueue._prepare_vector_encoding_metadataX  s|     [!- :1FGGM$*H+Ik,BMRROO"OH8A*8MW4:i
3444SW(22r.   zSqlEmbeddingsQueue._backfillr   c                    t          d          }|                                                     |                              |j        t          |j                  k                                  |j        t          |j                  k                                  |j        t          |j	                  k              
                    |j        |j        |j        |j        |j        |j                                      |j                  }|                                 5 }t%          ||                                           \  }}|                    ||           |                                }|D ]}|d         r,t-          |d                   }	t/          |d         |	          }
nd}	d}
|                     |t3          |d         t5          t6          |d                  |d         |
|	|d         rt9          j        |d                   nd	          
          g           	 ddd           dS # 1 swxY w Y   dS )zUBackfill the given subscription with any currently matching records in the
        DBrY   r&      Nr   r$   r%      )r   r3   r~   r   r   r   )r!   rZ   r[   r\   r]   r   r4   rq   r5   r6   ro   r   r3   r   r   r   orderbyr_   r   r`   ra   rw   r   r   _notify_oner   r   _operation_codes_invr   loads)r:   r   rb   rc   rd   re   rf   rowsrm   r   r   s              r,   r   zSqlEmbeddingsQueue._backfilli  sI    $%%U1XXU17n\-DEEEFFU18n\-?@@@AAU18~l.>???@@VAHak141:qzRRWQX 	
 WWYY 	#!!T%:%:%<%<==KCKKV$$$<<>>D  q6 "-c!f55H*3q68<<FF#H!F   !'*1v#2*>s1v*F#&q6*0)1?B1v)OCF);););4$ $ $	 	 	   		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   #DH55H9<H9z"SqlEmbeddingsQueue._validate_rangec                 
   |p|                                  }|p|                                 }t          |t                    rt          |t                    st	          d          ||k    rt          d| d|           ||fS )z[Validate and normalize the start and end SeqIDs for a subscription using this
        impl.z2SeqIDs must be integers for sql-based EmbeddingsDBzInvalid SeqID range: z to )_next_seq_idr   
isinstancerA   	TypeError
ValueError)r:   r5   r6   s      r,   r   z"SqlEmbeddingsQueue._validate_range  s     ,**,,%T^^%%%%% 	RZS-A-A 	RPQQQC<<EUEEEEFFFczr.   zSqlEmbeddingsQueue._next_seq_idc                    t          d          }|                                                     |                              t	          j        |j                            }|                                 5 }|                    |	                                           t          |                                d                   dz   cddd           S # 1 swxY w Y   dS )z%Get the next SeqID for this database.rY   r   r$   N)r!   rZ   r[   ro   r"   Maxrq   r_   ra   r   rA   fetchoner:   rb   rc   rd   s       r,   r   zSqlEmbeddingsQueue._next_seq_id  s     $%%%%a((//	ah0G0GHHWWYY 	.#KK		$$$s||~~a())A-	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	.s   5ACCCzSqlEmbeddingsQueue._notify_allr]   c                 d    | j         r&| j        |         D ]}|                     ||           dS dS )z:Send a notification to each subscriber of the given topic.N)r   rC   r   )r:   r]   r   subs       r,   r   zSqlEmbeddingsQueue._notify_all  sR     = 	2*51 2 2  j1111	2 	22 2r.   zSqlEmbeddingsQueue._notify_oner   c                    d}g }|D ]>}|d         |j         k    r|d         |j        k    rd} n|                    |           ?	 t          |          dk    r|                    |           |r|                     |j                   dS dS # t          $ rU}t          	                    d|j        j
         d|j         dz   t          |                     t          r|Y d}~dS d}~ww xY w)	z+Send a notification to a single subscriber.Fr   Tr   z6Exception occurred invoking consumer for subscription z	to topic z %sN)r5   r6   r   r   r7   r   r3   BaseExceptionloggererrorhexr4   r@   _called_from_test)r:   r   r   should_unsubscribefiltered_embeddingsr~   es          r,   r   zSqlEmbeddingsQueue._notify_one  sQ    # # 	2 	2I&#)33&00%)"&&y1111	&''!++0111! )  ((((() ) 	 	 	LLUUU1cn1112A  
 !      	s   AB 
C.A
C))C.c                 d   t          d          }|                                                     |                              |j                                      d          }|                                 5 }|                    |                                           |	                                }d d d            n# 1 swxY w Y   |M| 
                                dk    }t          t          d|          g          }|                     |           |S t          j        |d                   S )Nembeddings_queue_configr$   r   r   )r!   rZ   r[   ro   config_json_strlimitr_   ra   r   r   _get_wal_sizer   r   
set_configfrom_json_str)r:   rb   rc   rd   resultis_fresh_systemrU   s          r,   rU   zSqlEmbeddingsQueue.config  s@   +,,%%a((//0ABBHHKKWWYY 	$#KK		$$$\\^^F	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ >"0022a7O9'(=OOP F OOF###M3A&)LLLs   6<B>>CCrU   c                     |                                  5 }|                    dd|                                f           d d d            n# 1 swxY w Y   	 | `d S # t          $ r Y d S w xY w)Nz
                INSERT OR REPLACE INTO embeddings_queue_config (id, config_json_str)
                VALUES (?, ?)
            r$   )r_   ra   to_json_strrU   rV   )r:   rU   rd   s      r,   r   zSqlEmbeddingsQueue.set_config  s    WWYY 
	#KK
 &&((	 	 	
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 
		 	 	 	DD	s#   +AAAA 
A*)A*c                    t          d          }|                                                     |                              t	          j        d                    }|                                 5 }|                    |                                           t          |
                                d                   cd d d            S # 1 swxY w Y   d S )NrY   *r   )r!   rZ   r[   ro   r"   Countr_   ra   r   rA   r   r   s       r,   r   z SqlEmbeddingsQueue._get_wal_size  s    $%%%%a((//	0D0DEEWWYY 	*#KK		$$$s||~~a())	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	*s   0ACCC)rR   N)NNN)1r<   r=   r>   __doc__rB   r   r@   r   r?   r   rA   r   r   r;   r   r   ALLr   rT   r    rg   r}   r   r   r   r   r   r   r   r   r   r   propertyr   r   bytesr   r   r   r   r   r   r   r   r   rU   r   r   __classcell__)rQ   s   @r,   r0   r0   6   s        	 	% % % % % % % %* c,//0000c]"""LLL!v ! ! ! ! ! ! \24L4PQQ	 	 	 	 	 X RQ	 \35M5QRR% % % % % X SR% \02J2NOO)%t )% )% )% )% X PO)%V \79Q9UVVE!E.=E	E E E X WVE \8:R:VWWQ!Q/7/HQ	%Q Q Q X XWQf \02J2NOO
 "&#!  ' 	
 e_ TN 
   X PO8 \24L4PQQ4 D    X RQ 5    X 5    X \57O7STT$ $ $ $ X UT X$( \> $ 3(3	xx}<	=3 3 3	 3 \02J2NOO&l &t & & & PO&P \68P8TUUe_+3E?	sCx   VU \35M5QRR.c . . . SR. \24L4PQQ2 2(92E 2$ 2 2 2 RQ2 \24L4PQQ| )9L QU    RQ: M< M M M _M$!E $    (*s * * * * * * * *r.   r0   )=	functoolsr   r   chromadb.api.configurationr   r   chromadb.db.baser   r   r   chromadb.errorsr	   chromadb.ingestr
   r   r   r   r   chromadb.typesr   r   r   r   r   chromadb.configr    chromadb.telemetry.opentelemetryr   r   r   	overridesr   collectionsr   typingr   r   r   r   r   r   r   r    pypikar!   r"   loggingchromadb.ingest.impl.utilsr#   	getLoggerr<   r   ADDUPDATEUPSERTDELETEr   r   r   r   r0   r(   r.   r,   <module>r     s   % % % % % %         < ; ; ; ; ; ; ; ; ; 2 2 2 2 2 2                           # " " " " "         
       # # # # # # = = = = = = = = = = = = = = = =       # # # # # # # #   8 8 8 8 8 8 
	8	$	$ M1aaa	  CB)9)?)?)A)ABBB   E* E* E* E* E*( E* E* E* E* E*r.   