debeir.core.executor
1from typing import Dict, Optional, Union 2 3import loguru 4from debeir.engines.elasticsearch.executor import ElasticsearchExecutor 5from debeir.core.config import GenericConfig, NIRConfig 6from debeir.core.query import GenericElasticsearchQuery 7from debeir.rankers.transformer_sent_encoder import Encoder 8from debeir.utils.scaler import unpack_elasticsearch_scores 9from elasticsearch import AsyncElasticsearch as Elasticsearch 10 11 12class GenericElasticsearchExecutor(ElasticsearchExecutor): 13 """ 14 Generic Executor class for Elasticsearch 15 """ 16 query: GenericElasticsearchQuery 17 18 def __init__( 19 self, 20 topics: Dict[Union[str, int], Dict[str, str]], 21 client: Elasticsearch, 22 index_name: str, 23 output_file: str, 24 query: GenericElasticsearchQuery, 25 encoder: Optional[Encoder] = None, 26 config=None, 27 *args, 28 **kwargs, 29 ): 30 super().__init__( 31 topics, 32 client, 33 index_name, 34 output_file, 35 query, 36 encoder, 37 config=config, 38 *args, 39 **kwargs, 40 ) 41 42 self.query_fns = { 43 "query": self.generate_query, 44 "embedding": self.generate_embedding_query, 45 } 46 47 def generate_query(self, topic_num, best_fields=True, **kwargs): 48 """ 49 Generates a standard BM25 query given the topic number 50 51 :param topic_num: Query topic number to generate 52 :param best_fields: Whether to use a curated list of fields 53 :param kwargs: 54 :return: 55 """ 56 return self.query.generate_query(topic_num, **kwargs) 57 58 # def generate_query_ablation(self, topic_num, **kwargs): 59 # return self.query.generate_query_ablation(topic_num) 60 61 def generate_embedding_query( 62 self, 63 topic_num, 64 cosine_weights=None, 65 query_weights=None, 66 norm_weight=2.15, 67 automatic_scores=None, 68 **kwargs, 69 ): 70 """ 71 Executes an NIR-style query with combined scoring. 72 73 :param topic_num: 74 :param cosine_weights: 75 :param query_weights: 76 :param norm_weight: 77 :param automatic_scores: 78 :param kwargs: 79 :return: 80 """ 81 assert self.encoder is not None or self.config.encoder is not None 82 83 if "encoder" not in kwargs: 84 kwargs["encoder"] = self.encoder 85 86 return self.query.generate_query_embedding( 87 topic_num, 88 cosine_weights=cosine_weights, 89 query_weight=query_weights, 90 norm_weight=norm_weight, 91 automatic_scores=automatic_scores, 92 **kwargs, 93 ) 94 95 # @apply_config 96 async def execute_query( 97 self, query=None, return_size: int = None, return_id_only: bool = None, 98 topic_num=None, ablation=False, query_type=None, 99 **kwargs 100 ): 101 """ 102 Executes a query using the underlying elasticsearch client. 103 104 :param query: 105 :param topic_num: 106 :param ablation: 107 :param query_type: 108 :param return_size: 109 :param return_id_only: 110 :param kwargs: 111 :return: 112 """ 113 114 if ablation: 115 query_type = "ablation" 116 117 assert query is not None or topic_num is not None 118 119 if query: 120 if return_id_only: 121 # query["fields"] = [self.query.id_mapping] 122 # query["_source"] = False 123 query["_source"] = [self.query.id_mapping] 124 res = await self.client.search( 125 index=self.index_name, body=query, size=return_size 126 ) 127 128 return [query, res] 129 130 if topic_num: 131 loguru.logger.debug(query_type) 132 body = self.query_fns[query_type](topic_num=topic_num, **kwargs) 133 if return_id_only: 134 loguru.logger.debug("Skip") 135 body["_source"] = [self.query.id_mapping] 136 137 loguru.logger.debug(body) 138 res = await self.client.search( 139 index=self.index_name, body=body, size=return_size 140 ) 141 142 return [topic_num, res] 143 144 async def run_automatic_adjustment(self, return_results=False): 145 """ 146 Get the normalization constant to be used in NIR-style queries for all topics given an initial 147 run of BM25 results. 148 """ 149 loguru.logger.info("Running automatic BM25 weight adjustment") 150 151 # Backup variables temporarily 152 # size = self.return_size 153 # self.return_size = 1 154 # self.return_id_only = True 155 # prev_qt = self.config.query_type 156 # self.config.query_type = "query" 157 158 results = await self.run_all_queries(query_type="query", 159 return_results=True, 160 return_size=1, 161 return_id_only=True) 162 163 res = unpack_elasticsearch_scores(results) 164 self.query.set_bm25_scores(res) 165 166 if return_results: 167 return results 168 169 @classmethod 170 def build_from_config(cls, topics: Dict, query_obj: GenericElasticsearchQuery, client, 171 config: GenericConfig, nir_config: NIRConfig): 172 """ 173 Build an query executor engine from a config file. 174 """ 175 176 return cls( 177 topics=topics, 178 client=client, 179 config=config, 180 index_name=config.index, 181 output_file="", 182 return_size=nir_config.return_size, 183 query=query_obj 184 )
13class GenericElasticsearchExecutor(ElasticsearchExecutor): 14 """ 15 Generic Executor class for Elasticsearch 16 """ 17 query: GenericElasticsearchQuery 18 19 def __init__( 20 self, 21 topics: Dict[Union[str, int], Dict[str, str]], 22 client: Elasticsearch, 23 index_name: str, 24 output_file: str, 25 query: GenericElasticsearchQuery, 26 encoder: Optional[Encoder] = None, 27 config=None, 28 *args, 29 **kwargs, 30 ): 31 super().__init__( 32 topics, 33 client, 34 index_name, 35 output_file, 36 query, 37 encoder, 38 config=config, 39 *args, 40 **kwargs, 41 ) 42 43 self.query_fns = { 44 "query": self.generate_query, 45 "embedding": self.generate_embedding_query, 46 } 47 48 def generate_query(self, topic_num, best_fields=True, **kwargs): 49 """ 50 Generates a standard BM25 query given the topic number 51 52 :param topic_num: Query topic number to generate 53 :param best_fields: Whether to use a curated list of fields 54 :param kwargs: 55 :return: 56 """ 57 return self.query.generate_query(topic_num, **kwargs) 58 59 # def generate_query_ablation(self, topic_num, **kwargs): 60 # return self.query.generate_query_ablation(topic_num) 61 62 def generate_embedding_query( 63 self, 64 topic_num, 65 cosine_weights=None, 66 query_weights=None, 67 norm_weight=2.15, 68 automatic_scores=None, 69 **kwargs, 70 ): 71 """ 72 Executes an NIR-style query with combined scoring. 73 74 :param topic_num: 75 :param cosine_weights: 76 :param query_weights: 77 :param norm_weight: 78 :param automatic_scores: 79 :param kwargs: 80 :return: 81 """ 82 assert self.encoder is not None or self.config.encoder is not None 83 84 if "encoder" not in kwargs: 85 kwargs["encoder"] = self.encoder 86 87 return self.query.generate_query_embedding( 88 topic_num, 89 cosine_weights=cosine_weights, 90 query_weight=query_weights, 91 norm_weight=norm_weight, 92 automatic_scores=automatic_scores, 93 **kwargs, 94 ) 95 96 # @apply_config 97 async def execute_query( 98 self, query=None, return_size: int = None, return_id_only: bool = None, 99 topic_num=None, ablation=False, query_type=None, 100 **kwargs 101 ): 102 """ 103 Executes a query using the underlying elasticsearch client. 104 105 :param query: 106 :param topic_num: 107 :param ablation: 108 :param query_type: 109 :param return_size: 110 :param return_id_only: 111 :param kwargs: 112 :return: 113 """ 114 115 if ablation: 116 query_type = "ablation" 117 118 assert query is not None or topic_num is not None 119 120 if query: 121 if return_id_only: 122 # query["fields"] = [self.query.id_mapping] 123 # query["_source"] = False 124 query["_source"] = [self.query.id_mapping] 125 res = await self.client.search( 126 index=self.index_name, body=query, size=return_size 127 ) 128 129 return [query, res] 130 131 if topic_num: 132 loguru.logger.debug(query_type) 133 body = self.query_fns[query_type](topic_num=topic_num, **kwargs) 134 if return_id_only: 135 loguru.logger.debug("Skip") 136 body["_source"] = [self.query.id_mapping] 137 138 loguru.logger.debug(body) 139 res = await self.client.search( 140 index=self.index_name, body=body, size=return_size 141 ) 142 143 return [topic_num, res] 144 145 async def run_automatic_adjustment(self, return_results=False): 146 """ 147 Get the normalization constant to be used in NIR-style queries for all topics given an initial 148 run of BM25 results. 149 """ 150 loguru.logger.info("Running automatic BM25 weight adjustment") 151 152 # Backup variables temporarily 153 # size = self.return_size 154 # self.return_size = 1 155 # self.return_id_only = True 156 # prev_qt = self.config.query_type 157 # self.config.query_type = "query" 158 159 results = await self.run_all_queries(query_type="query", 160 return_results=True, 161 return_size=1, 162 return_id_only=True) 163 164 res = unpack_elasticsearch_scores(results) 165 self.query.set_bm25_scores(res) 166 167 if return_results: 168 return results 169 170 @classmethod 171 def build_from_config(cls, topics: Dict, query_obj: GenericElasticsearchQuery, client, 172 config: GenericConfig, nir_config: NIRConfig): 173 """ 174 Build an query executor engine from a config file. 175 """ 176 177 return cls( 178 topics=topics, 179 client=client, 180 config=config, 181 index_name=config.index, 182 output_file="", 183 return_size=nir_config.return_size, 184 query=query_obj 185 )
Generic Executor class for Elasticsearch
GenericElasticsearchExecutor( topics: Dict[Union[str, int], Dict[str, str]], client: elasticsearch.AsyncElasticsearch, index_name: str, output_file: str, query: debeir.core.query.GenericElasticsearchQuery, encoder: Optional[debeir.rankers.transformer_sent_encoder.Encoder] = None, config=None, *args, **kwargs)
19 def __init__( 20 self, 21 topics: Dict[Union[str, int], Dict[str, str]], 22 client: Elasticsearch, 23 index_name: str, 24 output_file: str, 25 query: GenericElasticsearchQuery, 26 encoder: Optional[Encoder] = None, 27 config=None, 28 *args, 29 **kwargs, 30 ): 31 super().__init__( 32 topics, 33 client, 34 index_name, 35 output_file, 36 query, 37 encoder, 38 config=config, 39 *args, 40 **kwargs, 41 ) 42 43 self.query_fns = { 44 "query": self.generate_query, 45 "embedding": self.generate_embedding_query, 46 }
def
generate_query(self, topic_num, best_fields=True, **kwargs):
48 def generate_query(self, topic_num, best_fields=True, **kwargs): 49 """ 50 Generates a standard BM25 query given the topic number 51 52 :param topic_num: Query topic number to generate 53 :param best_fields: Whether to use a curated list of fields 54 :param kwargs: 55 :return: 56 """ 57 return self.query.generate_query(topic_num, **kwargs)
Generates a standard BM25 query given the topic number
Parameters
- topic_num: Query topic number to generate
- best_fields: Whether to use a curated list of fields
- kwargs:
Returns
def
generate_embedding_query( self, topic_num, cosine_weights=None, query_weights=None, norm_weight=2.15, automatic_scores=None, **kwargs):
62 def generate_embedding_query( 63 self, 64 topic_num, 65 cosine_weights=None, 66 query_weights=None, 67 norm_weight=2.15, 68 automatic_scores=None, 69 **kwargs, 70 ): 71 """ 72 Executes an NIR-style query with combined scoring. 73 74 :param topic_num: 75 :param cosine_weights: 76 :param query_weights: 77 :param norm_weight: 78 :param automatic_scores: 79 :param kwargs: 80 :return: 81 """ 82 assert self.encoder is not None or self.config.encoder is not None 83 84 if "encoder" not in kwargs: 85 kwargs["encoder"] = self.encoder 86 87 return self.query.generate_query_embedding( 88 topic_num, 89 cosine_weights=cosine_weights, 90 query_weight=query_weights, 91 norm_weight=norm_weight, 92 automatic_scores=automatic_scores, 93 **kwargs, 94 )
Executes an NIR-style query with combined scoring.
Parameters
- topic_num:
- cosine_weights:
- query_weights:
- norm_weight:
- automatic_scores:
- kwargs:
Returns
async def
execute_query( self, query=None, return_size: int = None, return_id_only: bool = None, topic_num=None, ablation=False, query_type=None, **kwargs):
97 async def execute_query( 98 self, query=None, return_size: int = None, return_id_only: bool = None, 99 topic_num=None, ablation=False, query_type=None, 100 **kwargs 101 ): 102 """ 103 Executes a query using the underlying elasticsearch client. 104 105 :param query: 106 :param topic_num: 107 :param ablation: 108 :param query_type: 109 :param return_size: 110 :param return_id_only: 111 :param kwargs: 112 :return: 113 """ 114 115 if ablation: 116 query_type = "ablation" 117 118 assert query is not None or topic_num is not None 119 120 if query: 121 if return_id_only: 122 # query["fields"] = [self.query.id_mapping] 123 # query["_source"] = False 124 query["_source"] = [self.query.id_mapping] 125 res = await self.client.search( 126 index=self.index_name, body=query, size=return_size 127 ) 128 129 return [query, res] 130 131 if topic_num: 132 loguru.logger.debug(query_type) 133 body = self.query_fns[query_type](topic_num=topic_num, **kwargs) 134 if return_id_only: 135 loguru.logger.debug("Skip") 136 body["_source"] = [self.query.id_mapping] 137 138 loguru.logger.debug(body) 139 res = await self.client.search( 140 index=self.index_name, body=body, size=return_size 141 ) 142 143 return [topic_num, res]
Execute a query given parameters
Parameters
- args:
- kwargs:
async def
run_automatic_adjustment(self, return_results=False):
145 async def run_automatic_adjustment(self, return_results=False): 146 """ 147 Get the normalization constant to be used in NIR-style queries for all topics given an initial 148 run of BM25 results. 149 """ 150 loguru.logger.info("Running automatic BM25 weight adjustment") 151 152 # Backup variables temporarily 153 # size = self.return_size 154 # self.return_size = 1 155 # self.return_id_only = True 156 # prev_qt = self.config.query_type 157 # self.config.query_type = "query" 158 159 results = await self.run_all_queries(query_type="query", 160 return_results=True, 161 return_size=1, 162 return_id_only=True) 163 164 res = unpack_elasticsearch_scores(results) 165 self.query.set_bm25_scores(res) 166 167 if return_results: 168 return results
Get the normalization constant to be used in NIR-style queries for all topics given an initial run of BM25 results.
@classmethod
def
build_from_config( cls, topics: Dict, query_obj: debeir.core.query.GenericElasticsearchQuery, client, config: debeir.core.config.GenericConfig, nir_config: debeir.core.config.NIRConfig):
170 @classmethod 171 def build_from_config(cls, topics: Dict, query_obj: GenericElasticsearchQuery, client, 172 config: GenericConfig, nir_config: NIRConfig): 173 """ 174 Build an query executor engine from a config file. 175 """ 176 177 return cls( 178 topics=topics, 179 client=client, 180 config=config, 181 index_name=config.index, 182 output_file="", 183 return_size=nir_config.return_size, 184 query=query_obj 185 )
Build an query executor engine from a config file.