debeir.engines.elasticsearch.executor

  1from typing import Dict, List, Optional, Union
  2
  3import tqdm.asyncio
  4from debeir.core.config import apply_config
  5from debeir.core.document import document_factory
  6from debeir.core.query import GenericElasticsearchQuery
  7from debeir.rankers.transformer_sent_encoder import Encoder
  8from debeir.utils.utils import unpack_coroutine
  9from elasticsearch import AsyncElasticsearch as Elasticsearch
 10
 11
 12class ElasticsearchExecutor:
 13    """
 14    Executes an elasticsearch query given the query generated from the config, topics and query class object.
 15
 16    Computes regular patterns of queries expected from general IR topics and indexes.
 17    Includes:
 18        1. Reranking
 19        2. End-to-End Neural IR
 20        3. Statistical keyword matching
 21    """
 22
 23    def __init__(
 24            self,
 25            topics: Dict[Union[str, int], Dict[str, str]],
 26            client: Elasticsearch,
 27            index_name: str,
 28            output_file: str,
 29            query: GenericElasticsearchQuery,
 30            encoder: Optional[Encoder],
 31            return_size: int = 1000,
 32            test=False,
 33            return_id_only=True,
 34            config=None,
 35    ):
 36        self.topics = {"1": topics["1"]} if test else topics
 37        self.client = client
 38        self.index_name = index_name
 39        self.output_file = output_file
 40        self.return_size = return_size
 41        self.query = query
 42        self.encoder = encoder
 43        self.return_id_only = return_id_only
 44        self.config = config
 45        self.document_cls = document_factory['elasticsearch']
 46
 47    def generate_query(self, topic_num):
 48        """
 49        Generates a query given a topic number from the list of topics
 50
 51        :param topic_num:
 52        """
 53        raise NotImplementedError
 54
 55    def execute_query(self, *args, **kwargs):
 56        """
 57        Execute a query given parameters
 58
 59        :param args:
 60        :param kwargs:
 61        """
 62        raise NotImplementedError
 63
 64    @apply_config
 65    def _update_kwargs(self, **kwargs):
 66        return kwargs
 67
 68    async def run_all_queries(
 69            self, query_type=None, return_results=False,
 70            return_size: int = None, return_id_only: bool = False, **kwargs
 71    ) -> List:
 72        """
 73        A generic function that will asynchronously run all topics using the execute_query() method
 74
 75        :param query_type: Which query to execute. Query_type determines which method is used to generate the queries
 76               from self.query.query_funcs: Dict[str, func]
 77        :param return_results: Whether to return raw results from the client. Useful for analysing results directly or
 78               for computing the BM25 scores for log normalization in NIR-style scoring
 79        :param return_size: Number of documents to return. Overrides the config value if exists.
 80        :param return_id_only: Return the ID of the document only, rather than the full source document.
 81        :param args: Arguments to pass to the execute_query method
 82        :param kwargs: Keyword arguments to pass to the execute_query method
 83        :return:
 84            A list of results if return_results = True else an empty list is returned.
 85        """
 86        if not await self.client.ping():
 87            await self.client.close()
 88            raise RuntimeError(
 89                f"Elasticsearch instance cannot be reached at {self.client}"
 90            )
 91
 92        kwargs = self._update_kwargs(**kwargs)
 93
 94        if return_size is None:
 95            return_size = self.return_size
 96
 97        if return_id_only is None:
 98            return_id_only = self.return_id_only
 99
100        if query_type is None:
101            query_type = self.config.query_type
102
103        kwargs.pop('return_size', None)
104        kwargs.pop('return_id_only', None)
105        kwargs.pop('query_type', None)
106
107        tasks = [
108            self.execute_query(
109                topic_num=topic_num,
110                query_type=query_type,
111                return_size=return_size,
112                return_id_only=return_id_only,
113                **kwargs
114            )
115            for topic_num in self.topics
116        ]
117
118        results = []
119
120        for f in tqdm.asyncio.tqdm.as_completed(tasks, desc="Running Queries"):
121            res = await unpack_coroutine(f)
122
123            if return_results:
124                results.append(res)
125
126        return results
class ElasticsearchExecutor:
 13class ElasticsearchExecutor:
 14    """
 15    Executes an elasticsearch query given the query generated from the config, topics and query class object.
 16
 17    Computes regular patterns of queries expected from general IR topics and indexes.
 18    Includes:
 19        1. Reranking
 20        2. End-to-End Neural IR
 21        3. Statistical keyword matching
 22    """
 23
 24    def __init__(
 25            self,
 26            topics: Dict[Union[str, int], Dict[str, str]],
 27            client: Elasticsearch,
 28            index_name: str,
 29            output_file: str,
 30            query: GenericElasticsearchQuery,
 31            encoder: Optional[Encoder],
 32            return_size: int = 1000,
 33            test=False,
 34            return_id_only=True,
 35            config=None,
 36    ):
 37        self.topics = {"1": topics["1"]} if test else topics
 38        self.client = client
 39        self.index_name = index_name
 40        self.output_file = output_file
 41        self.return_size = return_size
 42        self.query = query
 43        self.encoder = encoder
 44        self.return_id_only = return_id_only
 45        self.config = config
 46        self.document_cls = document_factory['elasticsearch']
 47
 48    def generate_query(self, topic_num):
 49        """
 50        Generates a query given a topic number from the list of topics
 51
 52        :param topic_num:
 53        """
 54        raise NotImplementedError
 55
 56    def execute_query(self, *args, **kwargs):
 57        """
 58        Execute a query given parameters
 59
 60        :param args:
 61        :param kwargs:
 62        """
 63        raise NotImplementedError
 64
 65    @apply_config
 66    def _update_kwargs(self, **kwargs):
 67        return kwargs
 68
 69    async def run_all_queries(
 70            self, query_type=None, return_results=False,
 71            return_size: int = None, return_id_only: bool = False, **kwargs
 72    ) -> List:
 73        """
 74        A generic function that will asynchronously run all topics using the execute_query() method
 75
 76        :param query_type: Which query to execute. Query_type determines which method is used to generate the queries
 77               from self.query.query_funcs: Dict[str, func]
 78        :param return_results: Whether to return raw results from the client. Useful for analysing results directly or
 79               for computing the BM25 scores for log normalization in NIR-style scoring
 80        :param return_size: Number of documents to return. Overrides the config value if exists.
 81        :param return_id_only: Return the ID of the document only, rather than the full source document.
 82        :param args: Arguments to pass to the execute_query method
 83        :param kwargs: Keyword arguments to pass to the execute_query method
 84        :return:
 85            A list of results if return_results = True else an empty list is returned.
 86        """
 87        if not await self.client.ping():
 88            await self.client.close()
 89            raise RuntimeError(
 90                f"Elasticsearch instance cannot be reached at {self.client}"
 91            )
 92
 93        kwargs = self._update_kwargs(**kwargs)
 94
 95        if return_size is None:
 96            return_size = self.return_size
 97
 98        if return_id_only is None:
 99            return_id_only = self.return_id_only
100
101        if query_type is None:
102            query_type = self.config.query_type
103
104        kwargs.pop('return_size', None)
105        kwargs.pop('return_id_only', None)
106        kwargs.pop('query_type', None)
107
108        tasks = [
109            self.execute_query(
110                topic_num=topic_num,
111                query_type=query_type,
112                return_size=return_size,
113                return_id_only=return_id_only,
114                **kwargs
115            )
116            for topic_num in self.topics
117        ]
118
119        results = []
120
121        for f in tqdm.asyncio.tqdm.as_completed(tasks, desc="Running Queries"):
122            res = await unpack_coroutine(f)
123
124            if return_results:
125                results.append(res)
126
127        return results

Executes an elasticsearch query given the query generated from the config, topics and query class object.

Computes regular patterns of queries expected from general IR topics and indexes. Includes: 1. Reranking 2. End-to-End Neural IR 3. Statistical keyword matching

ElasticsearchExecutor( topics: Dict[Union[str, int], Dict[str, str]], client: elasticsearch.AsyncElasticsearch, index_name: str, output_file: str, query: debeir.core.query.GenericElasticsearchQuery, encoder: Optional[debeir.rankers.transformer_sent_encoder.Encoder], return_size: int = 1000, test=False, return_id_only=True, config=None)
24    def __init__(
25            self,
26            topics: Dict[Union[str, int], Dict[str, str]],
27            client: Elasticsearch,
28            index_name: str,
29            output_file: str,
30            query: GenericElasticsearchQuery,
31            encoder: Optional[Encoder],
32            return_size: int = 1000,
33            test=False,
34            return_id_only=True,
35            config=None,
36    ):
37        self.topics = {"1": topics["1"]} if test else topics
38        self.client = client
39        self.index_name = index_name
40        self.output_file = output_file
41        self.return_size = return_size
42        self.query = query
43        self.encoder = encoder
44        self.return_id_only = return_id_only
45        self.config = config
46        self.document_cls = document_factory['elasticsearch']
def generate_query(self, topic_num):
48    def generate_query(self, topic_num):
49        """
50        Generates a query given a topic number from the list of topics
51
52        :param topic_num:
53        """
54        raise NotImplementedError

Generates a query given a topic number from the list of topics

Parameters
  • topic_num:
def execute_query(self, *args, **kwargs):
56    def execute_query(self, *args, **kwargs):
57        """
58        Execute a query given parameters
59
60        :param args:
61        :param kwargs:
62        """
63        raise NotImplementedError

Execute a query given parameters

Parameters
  • args:
  • kwargs:
async def run_all_queries( self, query_type=None, return_results=False, return_size: int = None, return_id_only: bool = False, **kwargs) -> List:
 69    async def run_all_queries(
 70            self, query_type=None, return_results=False,
 71            return_size: int = None, return_id_only: bool = False, **kwargs
 72    ) -> List:
 73        """
 74        A generic function that will asynchronously run all topics using the execute_query() method
 75
 76        :param query_type: Which query to execute. Query_type determines which method is used to generate the queries
 77               from self.query.query_funcs: Dict[str, func]
 78        :param return_results: Whether to return raw results from the client. Useful for analysing results directly or
 79               for computing the BM25 scores for log normalization in NIR-style scoring
 80        :param return_size: Number of documents to return. Overrides the config value if exists.
 81        :param return_id_only: Return the ID of the document only, rather than the full source document.
 82        :param args: Arguments to pass to the execute_query method
 83        :param kwargs: Keyword arguments to pass to the execute_query method
 84        :return:
 85            A list of results if return_results = True else an empty list is returned.
 86        """
 87        if not await self.client.ping():
 88            await self.client.close()
 89            raise RuntimeError(
 90                f"Elasticsearch instance cannot be reached at {self.client}"
 91            )
 92
 93        kwargs = self._update_kwargs(**kwargs)
 94
 95        if return_size is None:
 96            return_size = self.return_size
 97
 98        if return_id_only is None:
 99            return_id_only = self.return_id_only
100
101        if query_type is None:
102            query_type = self.config.query_type
103
104        kwargs.pop('return_size', None)
105        kwargs.pop('return_id_only', None)
106        kwargs.pop('query_type', None)
107
108        tasks = [
109            self.execute_query(
110                topic_num=topic_num,
111                query_type=query_type,
112                return_size=return_size,
113                return_id_only=return_id_only,
114                **kwargs
115            )
116            for topic_num in self.topics
117        ]
118
119        results = []
120
121        for f in tqdm.asyncio.tqdm.as_completed(tasks, desc="Running Queries"):
122            res = await unpack_coroutine(f)
123
124            if return_results:
125                results.append(res)
126
127        return results

A generic function that will asynchronously run all topics using the execute_query() method

Parameters
  • query_type: Which query to execute. Query_type determines which method is used to generate the queries from self.query.query_funcs: Dict[str, func]
  • return_results: Whether to return raw results from the client. Useful for analysing results directly or for computing the BM25 scores for log normalization in NIR-style scoring
  • return_size: Number of documents to return. Overrides the config value if exists.
  • return_id_only: Return the ID of the document only, rather than the full source document.
  • args: Arguments to pass to the execute_query method
  • kwargs: Keyword arguments to pass to the execute_query method
Returns
A list of results if return_results = True else an empty list is returned.