debeir.core.executor

  1from typing import Dict, Optional, Union
  2
  3import loguru
  4from debeir.engines.elasticsearch.executor import ElasticsearchExecutor
  5from debeir.core.config import GenericConfig, NIRConfig
  6from debeir.core.query import GenericElasticsearchQuery
  7from debeir.rankers.transformer_sent_encoder import Encoder
  8from debeir.utils.scaler import unpack_elasticsearch_scores
  9from elasticsearch import AsyncElasticsearch as Elasticsearch
 10
 11
 12class GenericElasticsearchExecutor(ElasticsearchExecutor):
 13    """
 14    Generic Executor class for Elasticsearch
 15    """
 16    query: GenericElasticsearchQuery
 17
 18    def __init__(
 19            self,
 20            topics: Dict[Union[str, int], Dict[str, str]],
 21            client: Elasticsearch,
 22            index_name: str,
 23            output_file: str,
 24            query: GenericElasticsearchQuery,
 25            encoder: Optional[Encoder] = None,
 26            config=None,
 27            *args,
 28            **kwargs,
 29    ):
 30        super().__init__(
 31            topics,
 32            client,
 33            index_name,
 34            output_file,
 35            query,
 36            encoder,
 37            config=config,
 38            *args,
 39            **kwargs,
 40        )
 41
 42        self.query_fns = {
 43            "query": self.generate_query,
 44            "embedding": self.generate_embedding_query,
 45        }
 46
 47    def generate_query(self, topic_num, best_fields=True, **kwargs):
 48        """
 49        Generates a standard BM25 query given the topic number
 50
 51        :param topic_num: Query topic number to generate
 52        :param best_fields: Whether to use a curated list of fields
 53        :param kwargs:
 54        :return:
 55        """
 56        return self.query.generate_query(topic_num, **kwargs)
 57
 58    # def generate_query_ablation(self, topic_num, **kwargs):
 59    #    return self.query.generate_query_ablation(topic_num)
 60
 61    def generate_embedding_query(
 62            self,
 63            topic_num,
 64            cosine_weights=None,
 65            query_weights=None,
 66            norm_weight=2.15,
 67            automatic_scores=None,
 68            **kwargs,
 69    ):
 70        """
 71        Executes an NIR-style query with combined scoring.
 72
 73        :param topic_num:
 74        :param cosine_weights:
 75        :param query_weights:
 76        :param norm_weight:
 77        :param automatic_scores:
 78        :param kwargs:
 79        :return:
 80        """
 81        assert self.encoder is not None or self.config.encoder is not None
 82
 83        if "encoder" not in kwargs:
 84            kwargs["encoder"] = self.encoder
 85
 86        return self.query.generate_query_embedding(
 87            topic_num,
 88            cosine_weights=cosine_weights,
 89            query_weight=query_weights,
 90            norm_weight=norm_weight,
 91            automatic_scores=automatic_scores,
 92            **kwargs,
 93        )
 94
 95    # @apply_config
 96    async def execute_query(
 97            self, query=None, return_size: int = None, return_id_only: bool = None,
 98            topic_num=None, ablation=False, query_type=None,
 99            **kwargs
100    ):
101        """
102        Executes a query using the underlying elasticsearch client.
103
104        :param query:
105        :param topic_num:
106        :param ablation:
107        :param query_type:
108        :param return_size:
109        :param return_id_only:
110        :param kwargs:
111        :return:
112        """
113
114        if ablation:
115            query_type = "ablation"
116
117        assert query is not None or topic_num is not None
118
119        if query:
120            if return_id_only:
121                # query["fields"] = [self.query.id_mapping]
122                # query["_source"] = False
123                query["_source"] = [self.query.id_mapping]
124            res = await self.client.search(
125                index=self.index_name, body=query, size=return_size
126            )
127
128            return [query, res]
129
130        if topic_num:
131            loguru.logger.debug(query_type)
132            body = self.query_fns[query_type](topic_num=topic_num, **kwargs)
133            if return_id_only:
134                loguru.logger.debug("Skip")
135                body["_source"] = [self.query.id_mapping]
136
137            loguru.logger.debug(body)
138            res = await self.client.search(
139                index=self.index_name, body=body, size=return_size
140            )
141
142            return [topic_num, res]
143
144    async def run_automatic_adjustment(self, return_results=False):
145        """
146        Get the normalization constant to be used in NIR-style queries for all topics given an initial
147        run of BM25 results.
148        """
149        loguru.logger.info("Running automatic BM25 weight adjustment")
150
151        # Backup variables temporarily
152        # size = self.return_size
153        # self.return_size = 1
154        # self.return_id_only = True
155        # prev_qt = self.config.query_type
156        # self.config.query_type = "query"
157
158        results = await self.run_all_queries(query_type="query",
159                                             return_results=True,
160                                             return_size=1,
161                                             return_id_only=True)
162
163        res = unpack_elasticsearch_scores(results)
164        self.query.set_bm25_scores(res)
165
166        if return_results:
167            return results
168
169    @classmethod
170    def build_from_config(cls, topics: Dict, query_obj: GenericElasticsearchQuery, client,
171                          config: GenericConfig, nir_config: NIRConfig):
172        """
173        Build an query executor engine from a config file.
174        """
175
176        return cls(
177            topics=topics,
178            client=client,
179            config=config,
180            index_name=config.index,
181            output_file="",
182            return_size=nir_config.return_size,
183            query=query_obj
184        )
class GenericElasticsearchExecutor(debeir.engines.elasticsearch.executor.ElasticsearchExecutor):
 13class GenericElasticsearchExecutor(ElasticsearchExecutor):
 14    """
 15    Generic Executor class for Elasticsearch
 16    """
 17    query: GenericElasticsearchQuery
 18
 19    def __init__(
 20            self,
 21            topics: Dict[Union[str, int], Dict[str, str]],
 22            client: Elasticsearch,
 23            index_name: str,
 24            output_file: str,
 25            query: GenericElasticsearchQuery,
 26            encoder: Optional[Encoder] = None,
 27            config=None,
 28            *args,
 29            **kwargs,
 30    ):
 31        super().__init__(
 32            topics,
 33            client,
 34            index_name,
 35            output_file,
 36            query,
 37            encoder,
 38            config=config,
 39            *args,
 40            **kwargs,
 41        )
 42
 43        self.query_fns = {
 44            "query": self.generate_query,
 45            "embedding": self.generate_embedding_query,
 46        }
 47
 48    def generate_query(self, topic_num, best_fields=True, **kwargs):
 49        """
 50        Generates a standard BM25 query given the topic number
 51
 52        :param topic_num: Query topic number to generate
 53        :param best_fields: Whether to use a curated list of fields
 54        :param kwargs:
 55        :return:
 56        """
 57        return self.query.generate_query(topic_num, **kwargs)
 58
 59    # def generate_query_ablation(self, topic_num, **kwargs):
 60    #    return self.query.generate_query_ablation(topic_num)
 61
 62    def generate_embedding_query(
 63            self,
 64            topic_num,
 65            cosine_weights=None,
 66            query_weights=None,
 67            norm_weight=2.15,
 68            automatic_scores=None,
 69            **kwargs,
 70    ):
 71        """
 72        Executes an NIR-style query with combined scoring.
 73
 74        :param topic_num:
 75        :param cosine_weights:
 76        :param query_weights:
 77        :param norm_weight:
 78        :param automatic_scores:
 79        :param kwargs:
 80        :return:
 81        """
 82        assert self.encoder is not None or self.config.encoder is not None
 83
 84        if "encoder" not in kwargs:
 85            kwargs["encoder"] = self.encoder
 86
 87        return self.query.generate_query_embedding(
 88            topic_num,
 89            cosine_weights=cosine_weights,
 90            query_weight=query_weights,
 91            norm_weight=norm_weight,
 92            automatic_scores=automatic_scores,
 93            **kwargs,
 94        )
 95
 96    # @apply_config
 97    async def execute_query(
 98            self, query=None, return_size: int = None, return_id_only: bool = None,
 99            topic_num=None, ablation=False, query_type=None,
100            **kwargs
101    ):
102        """
103        Executes a query using the underlying elasticsearch client.
104
105        :param query:
106        :param topic_num:
107        :param ablation:
108        :param query_type:
109        :param return_size:
110        :param return_id_only:
111        :param kwargs:
112        :return:
113        """
114
115        if ablation:
116            query_type = "ablation"
117
118        assert query is not None or topic_num is not None
119
120        if query:
121            if return_id_only:
122                # query["fields"] = [self.query.id_mapping]
123                # query["_source"] = False
124                query["_source"] = [self.query.id_mapping]
125            res = await self.client.search(
126                index=self.index_name, body=query, size=return_size
127            )
128
129            return [query, res]
130
131        if topic_num:
132            loguru.logger.debug(query_type)
133            body = self.query_fns[query_type](topic_num=topic_num, **kwargs)
134            if return_id_only:
135                loguru.logger.debug("Skip")
136                body["_source"] = [self.query.id_mapping]
137
138            loguru.logger.debug(body)
139            res = await self.client.search(
140                index=self.index_name, body=body, size=return_size
141            )
142
143            return [topic_num, res]
144
145    async def run_automatic_adjustment(self, return_results=False):
146        """
147        Get the normalization constant to be used in NIR-style queries for all topics given an initial
148        run of BM25 results.
149        """
150        loguru.logger.info("Running automatic BM25 weight adjustment")
151
152        # Backup variables temporarily
153        # size = self.return_size
154        # self.return_size = 1
155        # self.return_id_only = True
156        # prev_qt = self.config.query_type
157        # self.config.query_type = "query"
158
159        results = await self.run_all_queries(query_type="query",
160                                             return_results=True,
161                                             return_size=1,
162                                             return_id_only=True)
163
164        res = unpack_elasticsearch_scores(results)
165        self.query.set_bm25_scores(res)
166
167        if return_results:
168            return results
169
170    @classmethod
171    def build_from_config(cls, topics: Dict, query_obj: GenericElasticsearchQuery, client,
172                          config: GenericConfig, nir_config: NIRConfig):
173        """
174        Build an query executor engine from a config file.
175        """
176
177        return cls(
178            topics=topics,
179            client=client,
180            config=config,
181            index_name=config.index,
182            output_file="",
183            return_size=nir_config.return_size,
184            query=query_obj
185        )

Generic Executor class for Elasticsearch

GenericElasticsearchExecutor( topics: Dict[Union[str, int], Dict[str, str]], client: elasticsearch.AsyncElasticsearch, index_name: str, output_file: str, query: debeir.core.query.GenericElasticsearchQuery, encoder: Optional[debeir.rankers.transformer_sent_encoder.Encoder] = None, config=None, *args, **kwargs)
19    def __init__(
20            self,
21            topics: Dict[Union[str, int], Dict[str, str]],
22            client: Elasticsearch,
23            index_name: str,
24            output_file: str,
25            query: GenericElasticsearchQuery,
26            encoder: Optional[Encoder] = None,
27            config=None,
28            *args,
29            **kwargs,
30    ):
31        super().__init__(
32            topics,
33            client,
34            index_name,
35            output_file,
36            query,
37            encoder,
38            config=config,
39            *args,
40            **kwargs,
41        )
42
43        self.query_fns = {
44            "query": self.generate_query,
45            "embedding": self.generate_embedding_query,
46        }
def generate_query(self, topic_num, best_fields=True, **kwargs):
48    def generate_query(self, topic_num, best_fields=True, **kwargs):
49        """
50        Generates a standard BM25 query given the topic number
51
52        :param topic_num: Query topic number to generate
53        :param best_fields: Whether to use a curated list of fields
54        :param kwargs:
55        :return:
56        """
57        return self.query.generate_query(topic_num, **kwargs)

Generates a standard BM25 query given the topic number

Parameters
  • topic_num: Query topic number to generate
  • best_fields: Whether to use a curated list of fields
  • kwargs:
Returns
def generate_embedding_query( self, topic_num, cosine_weights=None, query_weights=None, norm_weight=2.15, automatic_scores=None, **kwargs):
62    def generate_embedding_query(
63            self,
64            topic_num,
65            cosine_weights=None,
66            query_weights=None,
67            norm_weight=2.15,
68            automatic_scores=None,
69            **kwargs,
70    ):
71        """
72        Executes an NIR-style query with combined scoring.
73
74        :param topic_num:
75        :param cosine_weights:
76        :param query_weights:
77        :param norm_weight:
78        :param automatic_scores:
79        :param kwargs:
80        :return:
81        """
82        assert self.encoder is not None or self.config.encoder is not None
83
84        if "encoder" not in kwargs:
85            kwargs["encoder"] = self.encoder
86
87        return self.query.generate_query_embedding(
88            topic_num,
89            cosine_weights=cosine_weights,
90            query_weight=query_weights,
91            norm_weight=norm_weight,
92            automatic_scores=automatic_scores,
93            **kwargs,
94        )

Executes an NIR-style query with combined scoring.

Parameters
  • topic_num:
  • cosine_weights:
  • query_weights:
  • norm_weight:
  • automatic_scores:
  • kwargs:
Returns
async def execute_query( self, query=None, return_size: int = None, return_id_only: bool = None, topic_num=None, ablation=False, query_type=None, **kwargs):
 97    async def execute_query(
 98            self, query=None, return_size: int = None, return_id_only: bool = None,
 99            topic_num=None, ablation=False, query_type=None,
100            **kwargs
101    ):
102        """
103        Executes a query using the underlying elasticsearch client.
104
105        :param query:
106        :param topic_num:
107        :param ablation:
108        :param query_type:
109        :param return_size:
110        :param return_id_only:
111        :param kwargs:
112        :return:
113        """
114
115        if ablation:
116            query_type = "ablation"
117
118        assert query is not None or topic_num is not None
119
120        if query:
121            if return_id_only:
122                # query["fields"] = [self.query.id_mapping]
123                # query["_source"] = False
124                query["_source"] = [self.query.id_mapping]
125            res = await self.client.search(
126                index=self.index_name, body=query, size=return_size
127            )
128
129            return [query, res]
130
131        if topic_num:
132            loguru.logger.debug(query_type)
133            body = self.query_fns[query_type](topic_num=topic_num, **kwargs)
134            if return_id_only:
135                loguru.logger.debug("Skip")
136                body["_source"] = [self.query.id_mapping]
137
138            loguru.logger.debug(body)
139            res = await self.client.search(
140                index=self.index_name, body=body, size=return_size
141            )
142
143            return [topic_num, res]

Execute a query given parameters

Parameters
  • args:
  • kwargs:
async def run_automatic_adjustment(self, return_results=False):
145    async def run_automatic_adjustment(self, return_results=False):
146        """
147        Get the normalization constant to be used in NIR-style queries for all topics given an initial
148        run of BM25 results.
149        """
150        loguru.logger.info("Running automatic BM25 weight adjustment")
151
152        # Backup variables temporarily
153        # size = self.return_size
154        # self.return_size = 1
155        # self.return_id_only = True
156        # prev_qt = self.config.query_type
157        # self.config.query_type = "query"
158
159        results = await self.run_all_queries(query_type="query",
160                                             return_results=True,
161                                             return_size=1,
162                                             return_id_only=True)
163
164        res = unpack_elasticsearch_scores(results)
165        self.query.set_bm25_scores(res)
166
167        if return_results:
168            return results

Get the normalization constant to be used in NIR-style queries for all topics given an initial run of BM25 results.

@classmethod
def build_from_config( cls, topics: Dict, query_obj: debeir.core.query.GenericElasticsearchQuery, client, config: debeir.core.config.GenericConfig, nir_config: debeir.core.config.NIRConfig):
170    @classmethod
171    def build_from_config(cls, topics: Dict, query_obj: GenericElasticsearchQuery, client,
172                          config: GenericConfig, nir_config: NIRConfig):
173        """
174        Build an query executor engine from a config file.
175        """
176
177        return cls(
178            topics=topics,
179            client=client,
180            config=config,
181            index_name=config.index,
182            output_file="",
183            return_size=nir_config.return_size,
184            query=query_obj
185        )

Build an query executor engine from a config file.