debeir.core.indexer

 1import abc
 2import threading
 3from queue import Queue
 4from typing import List
 5
 6from debeir.rankers.transformer_sent_encoder import Encoder
 7from debeir.utils.utils import remove_excess_whitespace
 8from elasticsearch import Elasticsearch
 9
10
11class Indexer:
12    def __init__(self, client):
13        super().__init__()
14        self.client = client
15
16    @abc.abstractmethod
17    def get_field(self, document, field):
18        pass
19
20
21class SemanticElasticsearchIndexer(Indexer, threading.Thread):
22    """
23    Create a NIR-style index, with dense field representations with provided sentence encoder
24    Assumes you've already indexed to start with.
25    """
26
27    def __init__(self, es_client: Elasticsearch, encoder: Encoder, index: str,
28                 fields_to_encode: List[str], queue: Queue):
29        super().__init__(es_client)
30        self.encoder = encoder
31        self.index = index
32        self.fields = fields_to_encode
33        self.q = queue
34        self.update_mappings(self.index, self.fields, self.client)
35
36    @classmethod
37    def update_mappings(self, index, fields, client: Elasticsearch):
38        mapping = {}
39        value = {
40            "type": "dense_vector",
41            "dims": 768
42        }
43
44        for field in fields:
45            mapping[field + "_Embedding"] = value
46            mapping[field + "_Text"] = {"type": "text"}
47
48        client.indices.put_mapping(
49            body={
50                "properties": mapping
51            }, index=index)
52
53    # async def create_index(self, document_itr=None):
54    #    await self._update_mappings()
55
56    #    if document_itr is None:
57    #        document_itr = helpers.async_scan(self.es_client, index=self.index)
58
59    #    bar = tqdm(desc="Indexing", total=35_000)
60
61    #    async for document in document_itr:
62    #        doc = document["_source"]
63    #        await self.index_document(doc)
64
65    #        bar.update(1)
66
67    def get_field(self, document, field):
68        if field not in document:
69            return False
70
71        if "f{field}_Text" in document and document["f{field}_Text"] != 0:
72            return False
73
74        if 'Textblock' in document[field]:
75            return remove_excess_whitespace(document[field]['Textblock'])
76
77        return remove_excess_whitespace(document[field])
78
79    def index_document(self, document):
80        update_doc = {}
81        doc = document["_source"]
82
83        for field in self.fields:
84            text_field = self.get_field(doc, field)
85
86            if text_field:
87                embedding = self.encoder.encode(topic=text_field, disable_cache=True)
88                update_doc[f"{field}_Embedding"] = embedding
89                update_doc[f"{field}_Text"] = text_field
90
91        if update_doc:
92            self.client.update(index=self.index,
93                               id=document['_id'],
94                               doc=update_doc)
95
96    def run(self):
97        while not self.q.empty():
98            document = self.q.get()
99            self.index_document(document)
class Indexer:
12class Indexer:
13    def __init__(self, client):
14        super().__init__()
15        self.client = client
16
17    @abc.abstractmethod
18    def get_field(self, document, field):
19        pass
Indexer(client)
13    def __init__(self, client):
14        super().__init__()
15        self.client = client
@abc.abstractmethod
def get_field(self, document, field):
17    @abc.abstractmethod
18    def get_field(self, document, field):
19        pass
class SemanticElasticsearchIndexer(Indexer, threading.Thread):
 22class SemanticElasticsearchIndexer(Indexer, threading.Thread):
 23    """
 24    Create a NIR-style index, with dense field representations with provided sentence encoder
 25    Assumes you've already indexed to start with.
 26    """
 27
 28    def __init__(self, es_client: Elasticsearch, encoder: Encoder, index: str,
 29                 fields_to_encode: List[str], queue: Queue):
 30        super().__init__(es_client)
 31        self.encoder = encoder
 32        self.index = index
 33        self.fields = fields_to_encode
 34        self.q = queue
 35        self.update_mappings(self.index, self.fields, self.client)
 36
 37    @classmethod
 38    def update_mappings(self, index, fields, client: Elasticsearch):
 39        mapping = {}
 40        value = {
 41            "type": "dense_vector",
 42            "dims": 768
 43        }
 44
 45        for field in fields:
 46            mapping[field + "_Embedding"] = value
 47            mapping[field + "_Text"] = {"type": "text"}
 48
 49        client.indices.put_mapping(
 50            body={
 51                "properties": mapping
 52            }, index=index)
 53
 54    # async def create_index(self, document_itr=None):
 55    #    await self._update_mappings()
 56
 57    #    if document_itr is None:
 58    #        document_itr = helpers.async_scan(self.es_client, index=self.index)
 59
 60    #    bar = tqdm(desc="Indexing", total=35_000)
 61
 62    #    async for document in document_itr:
 63    #        doc = document["_source"]
 64    #        await self.index_document(doc)
 65
 66    #        bar.update(1)
 67
 68    def get_field(self, document, field):
 69        if field not in document:
 70            return False
 71
 72        if "f{field}_Text" in document and document["f{field}_Text"] != 0:
 73            return False
 74
 75        if 'Textblock' in document[field]:
 76            return remove_excess_whitespace(document[field]['Textblock'])
 77
 78        return remove_excess_whitespace(document[field])
 79
 80    def index_document(self, document):
 81        update_doc = {}
 82        doc = document["_source"]
 83
 84        for field in self.fields:
 85            text_field = self.get_field(doc, field)
 86
 87            if text_field:
 88                embedding = self.encoder.encode(topic=text_field, disable_cache=True)
 89                update_doc[f"{field}_Embedding"] = embedding
 90                update_doc[f"{field}_Text"] = text_field
 91
 92        if update_doc:
 93            self.client.update(index=self.index,
 94                               id=document['_id'],
 95                               doc=update_doc)
 96
 97    def run(self):
 98        while not self.q.empty():
 99            document = self.q.get()
100            self.index_document(document)

Create a NIR-style index, with dense field representations with provided sentence encoder Assumes you've already indexed to start with.

SemanticElasticsearchIndexer( es_client: elasticsearch.Elasticsearch, encoder: debeir.rankers.transformer_sent_encoder.Encoder, index: str, fields_to_encode: List[str], queue: queue.Queue)
28    def __init__(self, es_client: Elasticsearch, encoder: Encoder, index: str,
29                 fields_to_encode: List[str], queue: Queue):
30        super().__init__(es_client)
31        self.encoder = encoder
32        self.index = index
33        self.fields = fields_to_encode
34        self.q = queue
35        self.update_mappings(self.index, self.fields, self.client)

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

@classmethod
def update_mappings(self, index, fields, client: elasticsearch.Elasticsearch):
37    @classmethod
38    def update_mappings(self, index, fields, client: Elasticsearch):
39        mapping = {}
40        value = {
41            "type": "dense_vector",
42            "dims": 768
43        }
44
45        for field in fields:
46            mapping[field + "_Embedding"] = value
47            mapping[field + "_Text"] = {"type": "text"}
48
49        client.indices.put_mapping(
50            body={
51                "properties": mapping
52            }, index=index)
def get_field(self, document, field):
68    def get_field(self, document, field):
69        if field not in document:
70            return False
71
72        if "f{field}_Text" in document and document["f{field}_Text"] != 0:
73            return False
74
75        if 'Textblock' in document[field]:
76            return remove_excess_whitespace(document[field]['Textblock'])
77
78        return remove_excess_whitespace(document[field])
def index_document(self, document):
80    def index_document(self, document):
81        update_doc = {}
82        doc = document["_source"]
83
84        for field in self.fields:
85            text_field = self.get_field(doc, field)
86
87            if text_field:
88                embedding = self.encoder.encode(topic=text_field, disable_cache=True)
89                update_doc[f"{field}_Embedding"] = embedding
90                update_doc[f"{field}_Text"] = text_field
91
92        if update_doc:
93            self.client.update(index=self.index,
94                               id=document['_id'],
95                               doc=update_doc)
def run(self):
 97    def run(self):
 98        while not self.q.empty():
 99            document = self.q.get()
100            self.index_document(document)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Inherited Members
threading.Thread
start
join
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id