debeir.engines.elasticsearch.executor
1from typing import Dict, List, Optional, Union 2 3import tqdm.asyncio 4from debeir.core.config import apply_config 5from debeir.core.document import document_factory 6from debeir.core.query import GenericElasticsearchQuery 7from debeir.rankers.transformer_sent_encoder import Encoder 8from debeir.utils.utils import unpack_coroutine 9from elasticsearch import AsyncElasticsearch as Elasticsearch 10 11 12class ElasticsearchExecutor: 13 """ 14 Executes an elasticsearch query given the query generated from the config, topics and query class object. 15 16 Computes regular patterns of queries expected from general IR topics and indexes. 17 Includes: 18 1. Reranking 19 2. End-to-End Neural IR 20 3. Statistical keyword matching 21 """ 22 23 def __init__( 24 self, 25 topics: Dict[Union[str, int], Dict[str, str]], 26 client: Elasticsearch, 27 index_name: str, 28 output_file: str, 29 query: GenericElasticsearchQuery, 30 encoder: Optional[Encoder], 31 return_size: int = 1000, 32 test=False, 33 return_id_only=True, 34 config=None, 35 ): 36 self.topics = {"1": topics["1"]} if test else topics 37 self.client = client 38 self.index_name = index_name 39 self.output_file = output_file 40 self.return_size = return_size 41 self.query = query 42 self.encoder = encoder 43 self.return_id_only = return_id_only 44 self.config = config 45 self.document_cls = document_factory['elasticsearch'] 46 47 def generate_query(self, topic_num): 48 """ 49 Generates a query given a topic number from the list of topics 50 51 :param topic_num: 52 """ 53 raise NotImplementedError 54 55 def execute_query(self, *args, **kwargs): 56 """ 57 Execute a query given parameters 58 59 :param args: 60 :param kwargs: 61 """ 62 raise NotImplementedError 63 64 @apply_config 65 def _update_kwargs(self, **kwargs): 66 return kwargs 67 68 async def run_all_queries( 69 self, query_type=None, return_results=False, 70 return_size: int = None, return_id_only: bool = False, **kwargs 71 ) -> List: 72 """ 73 A generic function that will asynchronously run all topics using the execute_query() method 74 75 :param query_type: Which query to execute. Query_type determines which method is used to generate the queries 76 from self.query.query_funcs: Dict[str, func] 77 :param return_results: Whether to return raw results from the client. Useful for analysing results directly or 78 for computing the BM25 scores for log normalization in NIR-style scoring 79 :param return_size: Number of documents to return. Overrides the config value if exists. 80 :param return_id_only: Return the ID of the document only, rather than the full source document. 81 :param args: Arguments to pass to the execute_query method 82 :param kwargs: Keyword arguments to pass to the execute_query method 83 :return: 84 A list of results if return_results = True else an empty list is returned. 85 """ 86 if not await self.client.ping(): 87 await self.client.close() 88 raise RuntimeError( 89 f"Elasticsearch instance cannot be reached at {self.client}" 90 ) 91 92 kwargs = self._update_kwargs(**kwargs) 93 94 if return_size is None: 95 return_size = self.return_size 96 97 if return_id_only is None: 98 return_id_only = self.return_id_only 99 100 if query_type is None: 101 query_type = self.config.query_type 102 103 kwargs.pop('return_size', None) 104 kwargs.pop('return_id_only', None) 105 kwargs.pop('query_type', None) 106 107 tasks = [ 108 self.execute_query( 109 topic_num=topic_num, 110 query_type=query_type, 111 return_size=return_size, 112 return_id_only=return_id_only, 113 **kwargs 114 ) 115 for topic_num in self.topics 116 ] 117 118 results = [] 119 120 for f in tqdm.asyncio.tqdm.as_completed(tasks, desc="Running Queries"): 121 res = await unpack_coroutine(f) 122 123 if return_results: 124 results.append(res) 125 126 return results
class
ElasticsearchExecutor:
13class ElasticsearchExecutor: 14 """ 15 Executes an elasticsearch query given the query generated from the config, topics and query class object. 16 17 Computes regular patterns of queries expected from general IR topics and indexes. 18 Includes: 19 1. Reranking 20 2. End-to-End Neural IR 21 3. Statistical keyword matching 22 """ 23 24 def __init__( 25 self, 26 topics: Dict[Union[str, int], Dict[str, str]], 27 client: Elasticsearch, 28 index_name: str, 29 output_file: str, 30 query: GenericElasticsearchQuery, 31 encoder: Optional[Encoder], 32 return_size: int = 1000, 33 test=False, 34 return_id_only=True, 35 config=None, 36 ): 37 self.topics = {"1": topics["1"]} if test else topics 38 self.client = client 39 self.index_name = index_name 40 self.output_file = output_file 41 self.return_size = return_size 42 self.query = query 43 self.encoder = encoder 44 self.return_id_only = return_id_only 45 self.config = config 46 self.document_cls = document_factory['elasticsearch'] 47 48 def generate_query(self, topic_num): 49 """ 50 Generates a query given a topic number from the list of topics 51 52 :param topic_num: 53 """ 54 raise NotImplementedError 55 56 def execute_query(self, *args, **kwargs): 57 """ 58 Execute a query given parameters 59 60 :param args: 61 :param kwargs: 62 """ 63 raise NotImplementedError 64 65 @apply_config 66 def _update_kwargs(self, **kwargs): 67 return kwargs 68 69 async def run_all_queries( 70 self, query_type=None, return_results=False, 71 return_size: int = None, return_id_only: bool = False, **kwargs 72 ) -> List: 73 """ 74 A generic function that will asynchronously run all topics using the execute_query() method 75 76 :param query_type: Which query to execute. Query_type determines which method is used to generate the queries 77 from self.query.query_funcs: Dict[str, func] 78 :param return_results: Whether to return raw results from the client. Useful for analysing results directly or 79 for computing the BM25 scores for log normalization in NIR-style scoring 80 :param return_size: Number of documents to return. Overrides the config value if exists. 81 :param return_id_only: Return the ID of the document only, rather than the full source document. 82 :param args: Arguments to pass to the execute_query method 83 :param kwargs: Keyword arguments to pass to the execute_query method 84 :return: 85 A list of results if return_results = True else an empty list is returned. 86 """ 87 if not await self.client.ping(): 88 await self.client.close() 89 raise RuntimeError( 90 f"Elasticsearch instance cannot be reached at {self.client}" 91 ) 92 93 kwargs = self._update_kwargs(**kwargs) 94 95 if return_size is None: 96 return_size = self.return_size 97 98 if return_id_only is None: 99 return_id_only = self.return_id_only 100 101 if query_type is None: 102 query_type = self.config.query_type 103 104 kwargs.pop('return_size', None) 105 kwargs.pop('return_id_only', None) 106 kwargs.pop('query_type', None) 107 108 tasks = [ 109 self.execute_query( 110 topic_num=topic_num, 111 query_type=query_type, 112 return_size=return_size, 113 return_id_only=return_id_only, 114 **kwargs 115 ) 116 for topic_num in self.topics 117 ] 118 119 results = [] 120 121 for f in tqdm.asyncio.tqdm.as_completed(tasks, desc="Running Queries"): 122 res = await unpack_coroutine(f) 123 124 if return_results: 125 results.append(res) 126 127 return results
Executes an elasticsearch query given the query generated from the config, topics and query class object.
Computes regular patterns of queries expected from general IR topics and indexes. Includes: 1. Reranking 2. End-to-End Neural IR 3. Statistical keyword matching
ElasticsearchExecutor( topics: Dict[Union[str, int], Dict[str, str]], client: elasticsearch.AsyncElasticsearch, index_name: str, output_file: str, query: debeir.core.query.GenericElasticsearchQuery, encoder: Optional[debeir.rankers.transformer_sent_encoder.Encoder], return_size: int = 1000, test=False, return_id_only=True, config=None)
24 def __init__( 25 self, 26 topics: Dict[Union[str, int], Dict[str, str]], 27 client: Elasticsearch, 28 index_name: str, 29 output_file: str, 30 query: GenericElasticsearchQuery, 31 encoder: Optional[Encoder], 32 return_size: int = 1000, 33 test=False, 34 return_id_only=True, 35 config=None, 36 ): 37 self.topics = {"1": topics["1"]} if test else topics 38 self.client = client 39 self.index_name = index_name 40 self.output_file = output_file 41 self.return_size = return_size 42 self.query = query 43 self.encoder = encoder 44 self.return_id_only = return_id_only 45 self.config = config 46 self.document_cls = document_factory['elasticsearch']
def
generate_query(self, topic_num):
48 def generate_query(self, topic_num): 49 """ 50 Generates a query given a topic number from the list of topics 51 52 :param topic_num: 53 """ 54 raise NotImplementedError
Generates a query given a topic number from the list of topics
Parameters
- topic_num:
def
execute_query(self, *args, **kwargs):
56 def execute_query(self, *args, **kwargs): 57 """ 58 Execute a query given parameters 59 60 :param args: 61 :param kwargs: 62 """ 63 raise NotImplementedError
Execute a query given parameters
Parameters
- args:
- kwargs:
async def
run_all_queries( self, query_type=None, return_results=False, return_size: int = None, return_id_only: bool = False, **kwargs) -> List:
69 async def run_all_queries( 70 self, query_type=None, return_results=False, 71 return_size: int = None, return_id_only: bool = False, **kwargs 72 ) -> List: 73 """ 74 A generic function that will asynchronously run all topics using the execute_query() method 75 76 :param query_type: Which query to execute. Query_type determines which method is used to generate the queries 77 from self.query.query_funcs: Dict[str, func] 78 :param return_results: Whether to return raw results from the client. Useful for analysing results directly or 79 for computing the BM25 scores for log normalization in NIR-style scoring 80 :param return_size: Number of documents to return. Overrides the config value if exists. 81 :param return_id_only: Return the ID of the document only, rather than the full source document. 82 :param args: Arguments to pass to the execute_query method 83 :param kwargs: Keyword arguments to pass to the execute_query method 84 :return: 85 A list of results if return_results = True else an empty list is returned. 86 """ 87 if not await self.client.ping(): 88 await self.client.close() 89 raise RuntimeError( 90 f"Elasticsearch instance cannot be reached at {self.client}" 91 ) 92 93 kwargs = self._update_kwargs(**kwargs) 94 95 if return_size is None: 96 return_size = self.return_size 97 98 if return_id_only is None: 99 return_id_only = self.return_id_only 100 101 if query_type is None: 102 query_type = self.config.query_type 103 104 kwargs.pop('return_size', None) 105 kwargs.pop('return_id_only', None) 106 kwargs.pop('query_type', None) 107 108 tasks = [ 109 self.execute_query( 110 topic_num=topic_num, 111 query_type=query_type, 112 return_size=return_size, 113 return_id_only=return_id_only, 114 **kwargs 115 ) 116 for topic_num in self.topics 117 ] 118 119 results = [] 120 121 for f in tqdm.asyncio.tqdm.as_completed(tasks, desc="Running Queries"): 122 res = await unpack_coroutine(f) 123 124 if return_results: 125 results.append(res) 126 127 return results
A generic function that will asynchronously run all topics using the execute_query() method
Parameters
- query_type: Which query to execute. Query_type determines which method is used to generate the queries from self.query.query_funcs: Dict[str, func]
- return_results: Whether to return raw results from the client. Useful for analysing results directly or for computing the BM25 scores for log normalization in NIR-style scoring
- return_size: Number of documents to return. Overrides the config value if exists.
- return_id_only: Return the ID of the document only, rather than the full source document.
- args: Arguments to pass to the execute_query method
- kwargs: Keyword arguments to pass to the execute_query method
Returns
A list of results if return_results = True else an empty list is returned.