qust

 1from pathlib import Path
 2import sys as __sys
 3from importlib import reload as __rl__
 4
 5__sub_modules = ["expr", "dataframe", "udf", "future", "plot", "ta", "stock", "fast_path"]
 6
 7for __sub in __sub_modules:
 8    __full_name = f"qust.{__sub}"
 9    if __full_name in __sys.modules:
10        __mod = __import__(__full_name, fromlist=["*"])
11        __rl__(__mod)
12
13
14from qust.expr import (
15    col,
16    Expr,
17)
18from qust.dataframe import (
19    DataFrame
20)
21from qust.context import (
22    set_lib_path as __set_lib_path,
23    register_start_up_deps as __register_start_up_deps,
24    with_cols,
25    select,
26    clear_cache,
27)
28from qust.udf import (
29    UdfBatch,
30    UdfRow,
31)
32from qust.future import (
33    TradePriceType,
34    MatchPriceType,
35)
36from . import plot as __plot__
37from . import ta  as __ta__       
38from . import stock as __stock__
39from . import fast_path as __fast_path__
40
41
42
43
44LIB = Path(__file__).parent
45__set_lib_path(str(LIB))
46
47__register_start_up_deps()
48
49del __set_lib_path
50del Path
51del LIB
52
53__all__ = [
54    "col",
55    "Expr",
56    "DataFrame",
57    "with_cols",
58    "select",
59    "clear_cache",
60    "UdfBatch",
61    "UdfRow",
62    "TradePriceType",
63    "MatchPriceType",
64]
def col( *args: Union[str, polars.expr.expr.Expr, Expr, int]) -> Expr:
54def col(*args: ExprInput) -> Expr:
55    """
56    选择列生成`Expr`
57
58    例子
59    --------
60    >>> col("a")
61    >>> col("a", "b")
62    >>> col("a", pl.col("b").rank())
63    >>> col(col("a").ffill(), "b")
64    """
65    return Expr(_select(*args))

选择列生成Expr

例子

>>> col("a")
>>> col("a", "b")
>>> col("a", pl.col("b").rank())
>>> col(col("a").ffill(), "b")
class Expr(qust.expr.ExprNamespace):
16class Expr: ...
def mean(self) -> Expr:
102    def mean(self) -> Expr:
103        """
104        **行算子**,计算平均
105
106        例子
107        -------
108        >>> col("a").mean()
109        >>> col("a").mean().over("code")
110        >>> col("a").mean().expanding()
111        >>> col("a").mean().rolling(10)
112        >>> col("a").mean().expanding().over("code")
113        >>> col("a").mean().std().rolling(20, 10).over("code", "stock")
114
115        示例1
116        ^^^^^^
117        >>> data = pl.DataFrame({
118        ...     "price": range(5),
119        ...     "code": ["a", "a", "a", "b", "b"]
120        ... })
121        >>> data_next = pl.DataFrame({
122        ...     "price": [2, 3, 10],
123        ...     "code": ["c", "c", "a"]
124        ... })
125
126        >>> df1 = qs.with_cols(col("price1").mean())
127        >>> df1.calc_data(data)
128        shape: (1, 1)
129        ┌────────────┐
130        │ price_mean │
131        │ ---        │
132        │ f64        │
133        ╞════════════╡
134        │ 2.692308   │
135        └────────────┘
136        >>> df1.calc_data(data_next)
137        shape: (1, 1)
138        ┌────────────┐
139        │ price_mean │
140        │ ---        │
141        │ f64        │
142        ╞════════════╡
143        │ 3.125      │
144        └────────────┘
145
146        示例2
147        ^^^^^^
148        >>> df2 = qs.with_cols(col("price").mean().expanding().alias("price_mean"))
149        >>> df2.calc_data(data)
150        shape: (5, 3)
151        ┌───────┬──────┬────────────┐
152        │ price ┆ code ┆ price_mean │
153        │ ---   ┆ ---  ┆ ---        │
154        │ i64   ┆ str  ┆ f64        │
155        ╞═══════╪══════╪════════════╡
156        │ 0     ┆ a    ┆ 0.0        │
157        │ 1     ┆ a    ┆ 0.5        │
158        │ 2     ┆ a    ┆ 1.0        │
159        │ 3     ┆ b    ┆ 1.5        │
160        │ 4     ┆ b    ┆ 2.0        │
161        └───────┴──────┴────────────┘
162        >>> df2.calc_data(data_next)
163        shape: (3, 3)
164        ┌───────┬──────┬────────────┐
165        │ price ┆ code ┆ price_mean │
166        │ ---   ┆ ---  ┆ ---        │
167        │ i64   ┆ str  ┆ f64        │
168        ╞═══════╪══════╪════════════╡
169        │ 2     ┆ c    ┆ 2.642857   │
170        │ 3     ┆ c    ┆ 2.666667   │
171        │ 10    ┆ a    ┆ 3.125      │
172        └───────┴──────┴────────────┘
173
174        示例3
175        ^^^^^
176        >>> df3 = qs.with_cols(col("price").mean().rolling(2).alias("price_mean"))
177        >>> df3.calc_data(data)
178        shape: (5, 3)
179        ┌───────┬──────┬────────────┐
180        │ price ┆ code ┆ price_mean │
181        │ ---   ┆ ---  ┆ ---        │
182        │ i64   ┆ str  ┆ f64        │
183        ╞═══════╪══════╪════════════╡
184        │ 0     ┆ a    ┆ null       │
185        │ 1     ┆ a    ┆ 0.5        │
186        │ 2     ┆ a    ┆ 1.5        │
187        │ 3     ┆ b    ┆ 2.5        │
188        │ 4     ┆ b    ┆ 3.5        │
189        └───────┴──────┴────────────┘
190        >>> df3.calc_data(data_next)
191        shape: (3, 3)
192        ┌───────┬──────┬────────────┐
193        │ price ┆ code ┆ price_mean │
194        │ ---   ┆ ---  ┆ ---        │
195        │ i64   ┆ str  ┆ f64        │
196        ╞═══════╪══════╪════════════╡
197        │ 2     ┆ c    ┆ 3.0        │
198        │ 3     ┆ c    ┆ 2.5        │
199        │ 10    ┆ a    ┆ 6.5        │
200        └───────┴──────┴────────────┘
201
202        示例4
203        ^^^^
204        >>> df4 = qs.with_cols(col("price").mean().expanding().over("code").alias("price_mean"))
205        >>> df4.calc_data(data)
206        shape: (5, 3)
207        ┌───────┬──────┬────────────┐
208        │ price ┆ code ┆ price_mean │
209        │ ---   ┆ ---  ┆ ---        │
210        │ i64   ┆ str  ┆ f64        │
211        ╞═══════╪══════╪════════════╡
212        │ 0     ┆ a    ┆ 0.0        │
213        │ 1     ┆ a    ┆ 0.5        │
214        │ 2     ┆ a    ┆ 1.0        │
215        │ 3     ┆ b    ┆ 3.0        │
216        │ 4     ┆ b    ┆ 3.5        │
217        └───────┴──────┴────────────┘
218        >>> df4.calc_data(data_next)
219        shape: (3, 3)
220        ┌───────┬──────┬────────────┐
221        │ price ┆ code ┆ price_mean │
222        │ ---   ┆ ---  ┆ ---        │
223        │ i64   ┆ str  ┆ f64        │
224        ╞═══════╪══════╪════════════╡
225        │ 2     ┆ c    ┆ 2.0        │
226        │ 3     ┆ c    ┆ 2.5        │
227        │ 10    ┆ a    ┆ 3.25       │
228        └───────┴──────┴────────────┘
229
230        示例5
231        ^^^^
232        >>> df5 = qs.select(col("price").mean().group_by("code"))
233        >>> df5.calc_data(data)
234        shape: (2, 2)
235        ┌──────┬───────┐
236        │ code ┆ price │
237        │ ---  ┆ ---   │
238        │ str  ┆ f64   │
239        ╞══════╪═══════╡
240        │ a    ┆ 1.0   │
241        │ b    ┆ 3.5   │
242        └──────┴───────┘
243        >>> df5.calc_data(data_next)
244        shape: (3, 2)
245        ┌──────┬───────┐
246        │ code ┆ price │
247        │ ---  ┆ ---   │
248        │ str  ┆ f64   │
249        ╞══════╪═══════╡
250        │ a    ┆ 3.25  │
251        │ b    ┆ 3.5   │
252        │ c    ┆ 2.5   │
253        └──────┴───────┘
254        """
255        return Expr(self._expr.mean())

行算子,计算平均

例子

>>> col("a").mean()
>>> col("a").mean().over("code")
>>> col("a").mean().expanding()
>>> col("a").mean().rolling(10)
>>> col("a").mean().expanding().over("code")
>>> col("a").mean().std().rolling(20, 10).over("code", "stock")

示例1 ^^^^^^

>>> data = pl.DataFrame({
...     "price": range(5),
...     "code": ["a", "a", "a", "b", "b"]
... })
>>> data_next = pl.DataFrame({
...     "price": [2, 3, 10],
...     "code": ["c", "c", "a"]
... })
>>> df1 = qs.with_cols(col("price1").mean())
>>> df1.calc_data(data)
shape: (1, 1)
┌────────────┐
│ price_mean │
│ ---        │
│ f64        │
╞════════════╡
│ 2.692308   │
└────────────┘
>>> df1.calc_data(data_next)
shape: (1, 1)
┌────────────┐
│ price_mean │
│ ---        │
│ f64        │
╞════════════╡
│ 3.125      │
└────────────┘

示例2 ^^^^^^

>>> df2 = qs.with_cols(col("price").mean().expanding().alias("price_mean"))
>>> df2.calc_data(data)
shape: (5, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ ---   ┆ ---  ┆ ---        │
│ i64   ┆ str  ┆ f64        │
╞═══════╪══════╪════════════╡
│ 0     ┆ a    ┆ 0.0        │
│ 1     ┆ a    ┆ 0.5        │
│ 2     ┆ a    ┆ 1.0        │
│ 3     ┆ b    ┆ 1.5        │
│ 4     ┆ b    ┆ 2.0        │
└───────┴──────┴────────────┘
>>> df2.calc_data(data_next)
shape: (3, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ ---   ┆ ---  ┆ ---        │
│ i64   ┆ str  ┆ f64        │
╞═══════╪══════╪════════════╡
│ 2     ┆ c    ┆ 2.642857   │
│ 3     ┆ c    ┆ 2.666667   │
│ 10    ┆ a    ┆ 3.125      │
└───────┴──────┴────────────┘

示例3 ^^^^^

>>> df3 = qs.with_cols(col("price").mean().rolling(2).alias("price_mean"))
>>> df3.calc_data(data)
shape: (5, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ ---   ┆ ---  ┆ ---        │
│ i64   ┆ str  ┆ f64        │
╞═══════╪══════╪════════════╡
│ 0     ┆ a    ┆ null       │
│ 1     ┆ a    ┆ 0.5        │
│ 2     ┆ a    ┆ 1.5        │
│ 3     ┆ b    ┆ 2.5        │
│ 4     ┆ b    ┆ 3.5        │
└───────┴──────┴────────────┘
>>> df3.calc_data(data_next)
shape: (3, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ ---   ┆ ---  ┆ ---        │
│ i64   ┆ str  ┆ f64        │
╞═══════╪══════╪════════════╡
│ 2     ┆ c    ┆ 3.0        │
│ 3     ┆ c    ┆ 2.5        │
│ 10    ┆ a    ┆ 6.5        │
└───────┴──────┴────────────┘

示例4 ^^^^

>>> df4 = qs.with_cols(col("price").mean().expanding().over("code").alias("price_mean"))
>>> df4.calc_data(data)
shape: (5, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ ---   ┆ ---  ┆ ---        │
│ i64   ┆ str  ┆ f64        │
╞═══════╪══════╪════════════╡
│ 0     ┆ a    ┆ 0.0        │
│ 1     ┆ a    ┆ 0.5        │
│ 2     ┆ a    ┆ 1.0        │
│ 3     ┆ b    ┆ 3.0        │
│ 4     ┆ b    ┆ 3.5        │
└───────┴──────┴────────────┘
>>> df4.calc_data(data_next)
shape: (3, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ ---   ┆ ---  ┆ ---        │
│ i64   ┆ str  ┆ f64        │
╞═══════╪══════╪════════════╡
│ 2     ┆ c    ┆ 2.0        │
│ 3     ┆ c    ┆ 2.5        │
│ 10    ┆ a    ┆ 3.25       │
└───────┴──────┴────────────┘

示例5 ^^^^

>>> df5 = qs.select(col("price").mean().group_by("code"))
>>> df5.calc_data(data)
shape: (2, 2)
┌──────┬───────┐
│ code ┆ price │
│ ---  ┆ ---   │
│ str  ┆ f64   │
╞══════╪═══════╡
│ a    ┆ 1.0   │
│ b    ┆ 3.5   │
└──────┴───────┘
>>> df5.calc_data(data_next)
shape: (3, 2)
┌──────┬───────┐
│ code ┆ price │
│ ---  ┆ ---   │
│ str  ┆ f64   │
╞══════╪═══════╡
│ a    ┆ 3.25  │
│ b    ┆ 3.5   │
│ c    ┆ 2.5   │
└──────┴───────┘
def sum(self) -> Expr:
257    def sum(self) -> Expr:
258        """**行算子**, 求和"""
259        return Expr(self._expr.sum())

行算子, 求和

def std(self, ddof=1) -> Expr:
261    def std(self, ddof = 1)  -> Expr:
262        """**行算子**, 标准差"""
263        return Expr(self._expr.std(ddof))

行算子, 标准差

def var(self, ddof=1) -> Expr:
265    def var(self, ddof = 1) -> Expr:
266        """**行算子**, 方差"""
267        return Expr(self._expr.var(ddof))

行算子, 方差

def quantile(self, percent: float) -> Expr:
269    def quantile(self, percent: float) -> Expr:
270        """**行算子**, 分位数
271
272        参数
273        ----
274        `percent`: 0~1,设定的分位数
275        """
276        return Expr(self._expr.quantile(percent))

行算子, 分位数

参数

percent: 0~1,设定的分位数

def median(self) -> Expr:
278    def median(self) -> Expr:
279        """**行算子**, 中位数"""
280        return Expr(self._expr.median())

行算子, 中位数

def rank(self, percent: bool = False, ret_list: bool = False) -> Expr:
283    def rank(self, percent: bool = False, ret_list: bool = False) -> Expr:
284        """
285        **行算子**,计算当前行的排序位数
286
287        注意这个和`polars`的排序不是同一个逻辑,`polars`的排序是全局排序,不能进行流式计算
288
289        参数
290        -----
291        - `percent`: 是否返回百分比
292        - `ret_list`: 是否返回整列
293
294        例子
295        -------
296        ```python
297        col("a").rank().rolling(10) 对应 polars pl.col("a").rolling_rank(10)
298        ```
299        ```python
300        col("a").rank().expanding() 想象成 polars pl.col("a").cum_rank()
301        ```
302        ```python
303        col("a").rank() 对应 polars pl.col("a").rank().last()
304        ```
305        ```python
306        col("a").rank().group_by("code") 对应 polars df.group_by("code").agg(pl.col("a").rank().last())
307        ```
308
309        如果需要用polars的排序,可以这样:
310        >>> qs.select(pl.col("a").rank())
311        >>> qs.select(col("a").select(pl.col("a").rank()))
312        """
313        return Expr(self._expr.rank(percent, ret_list))

行算子,计算当前行的排序位数

注意这个和polars的排序不是同一个逻辑,polars的排序是全局排序,不能进行流式计算

参数

  • percent: 是否返回百分比
  • ret_list: 是否返回整列

例子

col("a").rank().rolling(10) 对应 polars pl.col("a").rolling_rank(10)
col("a").rank().expanding() 想象成 polars pl.col("a").cum_rank()
col("a").rank() 对应 polars pl.col("a").rank().last()
col("a").rank().group_by("code") 对应 polars df.group_by("code").agg(pl.col("a").rank().last())

如果需要用polars的排序,可以这样:

>>> qs.select(pl.col("a").rank())
>>> qs.select(col("a").select(pl.col("a").rank()))
def rolling(self, window_size: int, min_samples: int | None = None) -> Expr:
316    def rolling(self, window_size: int, min_samples: int | None = None) -> Expr:
317        """
318        **上下文**,滚动
319
320        **所有支持`retract`的行算子**都可以使用这个上下文, 比如`ewm`就不支持`retract`
321
322
323        参数
324        -----
325        `window_size`: 滚动窗口长度
326
327        `min_samples`: 窗口内最少有效值(非 `null`)的个数,如果窗口内有效值少于这个,则返回`null`
328
329        例子
330        -----
331        >>> col("a").rolling(10)
332        >>> col("a", "b").select(
333        ...    col("a").mean(),
334        ...    col("b").std()
335        ... ).rolling(10)
336        """
337        if min_samples is None:
338            min_samples = window_size
339        return Expr(self._expr.rolling(window_size, min_samples))

上下文,滚动

所有支持retract的行算子都可以使用这个上下文, 比如ewm就不支持retract

参数

window_size: 滚动窗口长度

min_samples: 窗口内最少有效值(非 null)的个数,如果窗口内有效值少于这个,则返回null

例子

>>> col("a").rolling(10)
>>> col("a", "b").select(
...    col("a").mean(),
...    col("b").std()
... ).rolling(10)
def rolling_dynamic( self, window_size: Union[str, polars.expr.expr.Expr, Expr, int], window_max: int = 1000, min_samples: int | None = None) -> Expr:
341    def rolling_dynamic(
342        self,
343        window_size: ExprInput,
344        window_max: int = 1000,
345        min_samples: int | None = None,
346    ) -> Expr:
347        """
348        **上下文**,变化窗口的滚动计算
349
350        参数
351        -----
352        `window_size`: `str` | `pl.Expr` | `Expr`, 选择窗口列, 这列需要能被cast成整数,非空, 大于0
353
354        `window_max`: 最大可能的滚动窗口,设置过大可能会影响内存
355
356        `min_samples`: 最小有效值个数
357
358        例子
359        --------
360        >>> data = pl.DataFrame({
361        ...    "price": [1, 2, 3, 4, 5, 6, 7, 8 ,9, 10],
362        ...    "window": [1, 1, 2, 2, 1, 4, 3, 2, 5, 20]
363        ... })
364
365        >>> res = qs.with_cols(
366        ...     col("price").mean().rolling_dynamic("window").alias("price_rolling_dyn")
367        ... ).calc_data(data)
368        >>> res
369        shape: (10, 3)
370        ┌───────┬────────┬───────────────────┐
371        │ price ┆ window ┆ price_rolling_dyn │
372        │ ---   ┆ ---    ┆ ---               │
373        │ i64   ┆ i64    ┆ f64               │
374        ╞═══════╪════════╪═══════════════════╡
375        │ 1     ┆ 1      ┆ 1.0               │  window = 1, 所以元素为[1], 结果是1.0
376        │ 2     ┆ 1      ┆ 2.0               │  window = 1, 所以元素是[2], 结果是2.0
377        │ 3     ┆ 2      ┆ 2.5               │  window = 2, 所以元素是[2, 3], 结果是2.5
378        │ 4     ┆ 2      ┆ 3.5               │  window = 2, 所以元素是[3, 4], 结果是3.5
379        │ 5     ┆ 1      ┆ 5.0               │  .
380        │ 6     ┆ 4      ┆ 4.5               │  .
381        │ 7     ┆ 3      ┆ 6.0               │  .
382        │ 8     ┆ 2      ┆ 7.5               │
383        │ 9     ┆ 5      ┆ 7.0               │
384        │ 10    ┆ 20     ┆ 5.5               │
385        └───────┴────────┴───────────────────┘
386
387        >>> qs.with_cols(col(
388        ...     col("a").mean(),
389        ...     col("b").std(),
390        ...     col("c").mean().skew(),
391        ... ).rolling_dynamic("window"))
392
393        使用场景
394        -------
395        假设我有这个类型的流式数据
396        >>> import datetime as dt
397        >>> data = pl.DataFrame({
398        ... "datetime": [
399        ...     dt.datetime(2010, 1, 1, 9, 30, 0),
400        ...     dt.datetime(2010, 1, 1, 10, 30, 0),
401        ...     dt.datetime(2010, 1, 1, 12, 30, 0),
402        ...     dt.datetime(2010, 1, 2, 9, 30, 0),
403        ...     dt.datetime(2010, 1, 2, 10, 30, 0),
404        ...     dt.datetime(2010, 1, 2, 12, 30, 0),
405        ...     dt.datetime(2010, 1, 3, 9, 30, 0),
406        ...     dt.datetime(2010, 1, 3, 10, 30, 0),
407        ...     dt.datetime(2010, 1, 3, 12, 30, 0),
408        ... ],
409        ... "price": range(9)
410        ...})
411
412        如果我想计算**每天**的price的cum_sum,可以这样:
413        >>> res = qs.with_cols(
414        ...     col("price").sum().expanding().over(pl.col("datetime").dt.date()).alias("gg")
415        ... ).calc_data(data)
416        >>> res
417        shape: (9, 3)
418        ┌─────────────────────┬───────┬─────┐
419        │ datetime            ┆ price ┆ gg  │
420        │ ---                 ┆ ---   ┆ --- │
421        │ datetime[μs]        ┆ i64   ┆ i64 │
422        ╞═════════════════════╪═══════╪═════╡
423        │ 2010-01-01 09:30:00 ┆ 0     ┆ 0   │
424        │ 2010-01-01 10:30:00 ┆ 1     ┆ 1   │
425        │ 2010-01-01 12:30:00 ┆ 2     ┆ 3   │
426        │ 2010-01-02 09:30:00 ┆ 3     ┆ 3   │
427        │ 2010-01-02 10:30:00 ┆ 4     ┆ 7   │
428        │ 2010-01-02 12:30:00 ┆ 5     ┆ 12  │
429        │ 2010-01-03 09:30:00 ┆ 6     ┆ 6   │
430        │ 2010-01-03 10:30:00 ┆ 7     ┆ 13  │
431        │ 2010-01-03 12:30:00 ┆ 8     ┆ 21  │
432        └─────────────────────┴───────┴─────┘
433
434        如果我想计算**每两天**的cum_sum,用`over`或者`group_by`没办法,但是可以通过这个算子达到:
435        >>> window = pl.lit(pl.Series([4, 5, 6, 4, 5, 6, 4, 5, 6]))
436        >>> res = qs.with_cols(
437        ...     col("price").sum().rolling_dynamic(window).alias("gg")
438        ... ).calc_data(data)
439        >>> res
440        shape: (9, 3)
441        ┌─────────────────────┬───────┬─────┐
442        │ datetime            ┆ price ┆ gg  │
443        │ ---                 ┆ ---   ┆ --- │
444        │ datetime[μs]        ┆ i64   ┆ i64 │
445        ╞═════════════════════╪═══════╪═════╡
446        │ 2010-01-01 09:30:00 ┆ 0     ┆ 0   │
447        │ 2010-01-01 10:30:00 ┆ 1     ┆ 1   │
448        │ 2010-01-01 12:30:00 ┆ 2     ┆ 3   │
449        │ 2010-01-02 09:30:00 ┆ 3     ┆ 6   │ window=4, [0, 1, 2, 3]        -> 6
450        │ 2010-01-02 10:30:00 ┆ 4     ┆ 10  │ window=5, [0, 1, 2, 3, 4]     -> 10
451        │ 2010-01-02 12:30:00 ┆ 5     ┆ 15  │ window=6, [0, 1, 2, 3, 4, 5]  -> 15
452        │ 2010-01-03 09:30:00 ┆ 6     ┆ 18  │ window=4, [3, 4, 5, 6]        -> 18
453        │ 2010-01-03 10:30:00 ┆ 7     ┆ 25  │ .
454        │ 2010-01-03 12:30:00 ┆ 8     ┆ 33  │ .
455        └─────────────────────┴───────┴─────┘
456        """
457        if min_samples is None:
458            min_samples = 1
459        return Expr(self._expr.rolling_dynamic(_col(window_size), window_max, min_samples))

上下文,变化窗口的滚动计算

参数

window_size: str | pl.Expr | Expr, 选择窗口列, 这列需要能被cast成整数,非空, 大于0

window_max: 最大可能的滚动窗口,设置过大可能会影响内存

min_samples: 最小有效值个数

例子

>>> data = pl.DataFrame({
...    "price": [1, 2, 3, 4, 5, 6, 7, 8 ,9, 10],
...    "window": [1, 1, 2, 2, 1, 4, 3, 2, 5, 20]
... })
>>> res = qs.with_cols(
...     col("price").mean().rolling_dynamic("window").alias("price_rolling_dyn")
... ).calc_data(data)
>>> res
shape: (10, 3)
┌───────┬────────┬───────────────────┐
│ price ┆ window ┆ price_rolling_dyn │
│ ---   ┆ ---    ┆ ---               │
│ i64   ┆ i64    ┆ f64               │
╞═══════╪════════╪═══════════════════╡
│ 1     ┆ 1      ┆ 1.0               │  window = 1, 所以元素为[1], 结果是1.0
│ 2     ┆ 1      ┆ 2.0               │  window = 1, 所以元素是[2], 结果是2.0
│ 3     ┆ 2      ┆ 2.5               │  window = 2, 所以元素是[2, 3], 结果是2.5
│ 4     ┆ 2      ┆ 3.5               │  window = 2, 所以元素是[3, 4], 结果是3.5
│ 5     ┆ 1      ┆ 5.0               │  .
│ 6     ┆ 4      ┆ 4.5               │  .
│ 7     ┆ 3      ┆ 6.0               │  .
│ 8     ┆ 2      ┆ 7.5               │
│ 9     ┆ 5      ┆ 7.0               │
│ 10    ┆ 20     ┆ 5.5               │
└───────┴────────┴───────────────────┘
>>> qs.with_cols(col(
...     col("a").mean(),
...     col("b").std(),
...     col("c").mean().skew(),
... ).rolling_dynamic("window"))

使用场景

假设我有这个类型的流式数据

>>> import datetime as dt
>>> data = pl.DataFrame({
... "datetime": [
...     dt.datetime(2010, 1, 1, 9, 30, 0),
...     dt.datetime(2010, 1, 1, 10, 30, 0),
...     dt.datetime(2010, 1, 1, 12, 30, 0),
...     dt.datetime(2010, 1, 2, 9, 30, 0),
...     dt.datetime(2010, 1, 2, 10, 30, 0),
...     dt.datetime(2010, 1, 2, 12, 30, 0),
...     dt.datetime(2010, 1, 3, 9, 30, 0),
...     dt.datetime(2010, 1, 3, 10, 30, 0),
...     dt.datetime(2010, 1, 3, 12, 30, 0),
... ],
... "price": range(9)
...})

如果我想计算每天的price的cum_sum,可以这样:

>>> res = qs.with_cols(
...     col("price").sum().expanding().over(pl.col("datetime").dt.date()).alias("gg")
... ).calc_data(data)
>>> res
shape: (9, 3)
┌─────────────────────┬───────┬─────┐
│ datetime            ┆ price ┆ gg  │
│ ---                 ┆ ---   ┆ --- │
│ datetime[μs]        ┆ i64   ┆ i64 │
╞═════════════════════╪═══════╪═════╡
│ 2010-01-01 09:30:00 ┆ 0     ┆ 0   │
│ 2010-01-01 10:30:00 ┆ 1     ┆ 1   │
│ 2010-01-01 12:30:00 ┆ 2     ┆ 3   │
│ 2010-01-02 09:30:00 ┆ 3     ┆ 3   │
│ 2010-01-02 10:30:00 ┆ 4     ┆ 7   │
│ 2010-01-02 12:30:00 ┆ 5     ┆ 12  │
│ 2010-01-03 09:30:00 ┆ 6     ┆ 6   │
│ 2010-01-03 10:30:00 ┆ 7     ┆ 13  │
│ 2010-01-03 12:30:00 ┆ 8     ┆ 21  │
└─────────────────────┴───────┴─────┘

如果我想计算每两天的cum_sum,用over或者group_by没办法,但是可以通过这个算子达到:

>>> window = pl.lit(pl.Series([4, 5, 6, 4, 5, 6, 4, 5, 6]))
>>> res = qs.with_cols(
...     col("price").sum().rolling_dynamic(window).alias("gg")
... ).calc_data(data)
>>> res
shape: (9, 3)
┌─────────────────────┬───────┬─────┐
│ datetime            ┆ price ┆ gg  │
│ ---                 ┆ ---   ┆ --- │
│ datetime[μs]        ┆ i64   ┆ i64 │
╞═════════════════════╪═══════╪═════╡
│ 2010-01-01 09:30:00 ┆ 0     ┆ 0   │
│ 2010-01-01 10:30:00 ┆ 1     ┆ 1   │
│ 2010-01-01 12:30:00 ┆ 2     ┆ 3   │
│ 2010-01-02 09:30:00 ┆ 3     ┆ 6   │ window=4, [0, 1, 2, 3]        -> 6
│ 2010-01-02 10:30:00 ┆ 4     ┆ 10  │ window=5, [0, 1, 2, 3, 4]     -> 10
│ 2010-01-02 12:30:00 ┆ 5     ┆ 15  │ window=6, [0, 1, 2, 3, 4, 5]  -> 15
│ 2010-01-03 09:30:00 ┆ 6     ┆ 18  │ window=4, [3, 4, 5, 6]        -> 18
│ 2010-01-03 10:30:00 ┆ 7     ┆ 25  │ .
│ 2010-01-03 12:30:00 ┆ 8     ┆ 33  │ .
└─────────────────────┴───────┴─────┘
def rolling_intra_day( self, window: Union[str, polars.expr.expr.Expr, Expr, int], window_size: int, min_samples: int | None = None) -> Expr:
461    def rolling_intra_day(self, window: ExprInput, window_size: int, min_samples: int | None = None) -> Expr:
462        """
463        **上下文**,日内滚动计算
464
465        参数
466        ----
467        `window`: 列,pl.Boollean, 指定行是否是日线
468
469        `window_size`: 窗口大小
470
471        `min_samples`: 最小有效值个数
472
473        例子
474        ----
475        >>> col("a").mean().rolling_intra_day("window", 2)
476
477        场景
478        ----
479        假设我有这个类型的流式数据
480        >>> import datetime as dt
481        >>> data = pl.DataFrame({
482        ... "datetime": [
483        ...     dt.datetime(2010, 1, 1, 9, 30, 0),
484        ...     dt.datetime(2010, 1, 1, 10, 30, 0),
485        ...     dt.datetime(2010, 1, 1, 12, 30, 0),
486        ...     dt.datetime(2010, 1, 2, 9, 30, 0),
487        ...     dt.datetime(2010, 1, 2, 10, 30, 0),
488        ...     dt.datetime(2010, 1, 2, 12, 30, 0),
489        ...     dt.datetime(2010, 1, 3, 9, 30, 0),
490        ...     dt.datetime(2010, 1, 3, 10, 30, 0),
491        ...     dt.datetime(2010, 1, 3, 12, 30, 0),
492        ... ],
493        ... "price": range(9)
494        ... })
495
496        在当前k线,我需要计算 *[往前数2天的日线价格,往前数1天的日线价格, 当前分钟线价格]* 的移动平均, 可以这样做:
497        >>> df = qs.with_cols(
498        ...     col("price").mean().rolling_intra_day(
499        ...         pl.col("datetime").dt.hour() == 12,
500        ...         3,
501        ...     ).alias("g")
502        ... )
503        >>> df.calc_data(data)
504        shape: (9, 3)
505        ┌─────────────────────┬───────┬──────────┐
506        │ datetime            ┆ price ┆ g        │
507        │ ---                 ┆ ---   ┆ ---      │
508        │ datetime[μs]        ┆ i64   ┆ f64      │
509        ╞═════════════════════╪═══════╪══════════╡
510        │ 2010-01-01 09:30:00 ┆ 0     ┆ null     │
511        │ 2010-01-01 10:30:00 ┆ 1     ┆ null     │
512        │ 2010-01-01 12:30:00 ┆ 2     ┆ null     │
513        │ 2010-01-02 09:30:00 ┆ 3     ┆ null     │
514        │ 2010-01-02 10:30:00 ┆ 4     ┆ null     │
515        │ 2010-01-02 12:30:00 ┆ 5     ┆ null     │
516        │ 2010-01-03 09:30:00 ┆ 6     ┆ 4.333333 │  [2, 5, 6] -> 4.333
517        │ 2010-01-03 10:30:00 ┆ 7     ┆ 4.666667 │  [2, 5, 7] -> 4.666
518        │ 2010-01-03 12:30:00 ┆ 8     ┆ 5.0      │  [2, 5, 8] -> 5
519        └─────────────────────┴───────┴──────────┘
520        >>> data_next = pl.DataFrame({
521        ...     "datetime": [
522        ...         dt.datetime(2010, 1, 4, 9, 30, 0),
523        ...         dt.datetime(2010, 1, 4, 10, 30, 0),
524        ...         dt.datetime(2010, 1, 4, 12, 30, 0),
525        ...     ],
526        ...     "price": range(9, 12)
527        ... })
528        >>> df.calc_data(data)
529        shape: (3, 3)
530        ┌─────────────────────┬───────┬──────────┐
531        │ datetime            ┆ price ┆ g        │
532        │ ---                 ┆ ---   ┆ ---      │
533        │ datetime[μs]        ┆ i64   ┆ f64      │
534        ╞═════════════════════╪═══════╪══════════╡
535        │ 2010-01-04 09:30:00 ┆ 9     ┆ 7.333333 │  [5, 8, 9]  -> 7.333
536        │ 2010-01-04 10:30:00 ┆ 10    ┆ 7.666667 │  [5, 8, 10] -> 7.666
537        │ 2010-01-04 12:30:00 ┆ 11    ┆ 8.0      │  [5, 8, 11] -> 8.0
538        └─────────────────────┴───────┴──────────┘
539
540        同样逻辑,就能在k线没有合成完成的时候计算指标,比如接受tick数据,合成k线的时候,每次新的tick数据来了更新指标,并且速度不受影响
541        (所有的行算子,都是行更新).
542
543        """
544        if min_samples is None:
545            min_samples = window_size
546        return Expr(self._expr.rolling_intra_day(_col(window), window_size, min_samples))

上下文,日内滚动计算

参数

window: 列,pl.Boollean, 指定行是否是日线

window_size: 窗口大小

min_samples: 最小有效值个数

例子

>>> col("a").mean().rolling_intra_day("window", 2)

场景

假设我有这个类型的流式数据

>>> import datetime as dt
>>> data = pl.DataFrame({
... "datetime": [
...     dt.datetime(2010, 1, 1, 9, 30, 0),
...     dt.datetime(2010, 1, 1, 10, 30, 0),
...     dt.datetime(2010, 1, 1, 12, 30, 0),
...     dt.datetime(2010, 1, 2, 9, 30, 0),
...     dt.datetime(2010, 1, 2, 10, 30, 0),
...     dt.datetime(2010, 1, 2, 12, 30, 0),
...     dt.datetime(2010, 1, 3, 9, 30, 0),
...     dt.datetime(2010, 1, 3, 10, 30, 0),
...     dt.datetime(2010, 1, 3, 12, 30, 0),
... ],
... "price": range(9)
... })

在当前k线,我需要计算 [往前数2天的日线价格,往前数1天的日线价格, 当前分钟线价格] 的移动平均, 可以这样做:

>>> df = qs.with_cols(
...     col("price").mean().rolling_intra_day(
...         pl.col("datetime").dt.hour() == 12,
...         3,
...     ).alias("g")
... )
>>> df.calc_data(data)
shape: (9, 3)
┌─────────────────────┬───────┬──────────┐
│ datetime            ┆ price ┆ g        │
│ ---                 ┆ ---   ┆ ---      │
│ datetime[μs]        ┆ i64   ┆ f64      │
╞═════════════════════╪═══════╪══════════╡
│ 2010-01-01 09:30:00 ┆ 0     ┆ null     │
│ 2010-01-01 10:30:00 ┆ 1     ┆ null     │
│ 2010-01-01 12:30:00 ┆ 2     ┆ null     │
│ 2010-01-02 09:30:00 ┆ 3     ┆ null     │
│ 2010-01-02 10:30:00 ┆ 4     ┆ null     │
│ 2010-01-02 12:30:00 ┆ 5     ┆ null     │
│ 2010-01-03 09:30:00 ┆ 6     ┆ 4.333333 │  [2, 5, 6] -> 4.333
│ 2010-01-03 10:30:00 ┆ 7     ┆ 4.666667 │  [2, 5, 7] -> 4.666
│ 2010-01-03 12:30:00 ┆ 8     ┆ 5.0      │  [2, 5, 8] -> 5
└─────────────────────┴───────┴──────────┘
>>> data_next = pl.DataFrame({
...     "datetime": [
...         dt.datetime(2010, 1, 4, 9, 30, 0),
...         dt.datetime(2010, 1, 4, 10, 30, 0),
...         dt.datetime(2010, 1, 4, 12, 30, 0),
...     ],
...     "price": range(9, 12)
... })
>>> df.calc_data(data)
shape: (3, 3)
┌─────────────────────┬───────┬──────────┐
│ datetime            ┆ price ┆ g        │
│ ---                 ┆ ---   ┆ ---      │
│ datetime[μs]        ┆ i64   ┆ f64      │
╞═════════════════════╪═══════╪══════════╡
│ 2010-01-04 09:30:00 ┆ 9     ┆ 7.333333 │  [5, 8, 9]  -> 7.333
│ 2010-01-04 10:30:00 ┆ 10    ┆ 7.666667 │  [5, 8, 10] -> 7.666
│ 2010-01-04 12:30:00 ┆ 11    ┆ 8.0      │  [5, 8, 11] -> 8.0
└─────────────────────┴───────┴──────────┘

同样逻辑,就能在k线没有合成完成的时候计算指标,比如接受tick数据,合成k线的时候,每次新的tick数据来了更新指标,并且速度不受影响 (所有的行算子,都是行更新).

def expanding(self) -> Expr:
548    def expanding(self) -> Expr:
549        """
550        **上下文**, 累积计算, 支持所有行算子
551
552        与 `rolling(*_dynamic, *_intra_day)`的区别在于不用`retract`
553
554        对应`polars`的`cum(*_sum, *_mean, ...)`
555
556        写法
557        ----
558        >>> col("a").mean().expanding()
559        >>> col("a").mean().std().expanding()
560        >>> col(
561        ...     col("a").mean().expanding(),
562        ...     col("a").std().alias("gg").expanding(),
563        ... )
564
565        例子
566        ----
567        >>> data = pl.DataFrame({
568        ...     "price": range(5),
569        ...     "code": ["a", "a", "a", "b", "b"]
570        ... })
571        >>> df = qs.with_cols(
572        ...     col("price").sum().expanding().alias("cum_sum_otters"),
573        ...     pl.col("price").cum_sum().alias("cum_sum_polars"),
574        ...     col("price").sum().expanding().over("code").alias("cum_sum_otters_over"),
575        ...     pl.col("price").cum_sum().over("code").alias("cum_sum_polars_over")
576        ... )
577        >>> df.calc_data(data)
578        shape: (5, 6)
579        ┌───────┬──────┬────────────────┬────────────────┬─────────────────────┬─────────────────────┐
580        │ price ┆ code ┆ cum_sum_otters ┆ cum_sum_polars ┆ cum_sum_otters_over ┆ cum_sum_polars_over │
581        │ ---   ┆ ---  ┆ ---            ┆ ---            ┆ ---                 ┆ ---                 │
582        │ i64   ┆ str  ┆ i64            ┆ i64            ┆ i64                 ┆ i64                 │
583        ╞═══════╪══════╪════════════════╪════════════════╪═════════════════════╪═════════════════════╡
584        │ 0     ┆ a    ┆ 0              ┆ 0              ┆ 0                   ┆ 0                   │
585        │ 1     ┆ a    ┆ 1              ┆ 1              ┆ 1                   ┆ 1                   │
586        │ 2     ┆ a    ┆ 3              ┆ 3              ┆ 3                   ┆ 3                   │
587        │ 3     ┆ b    ┆ 6              ┆ 6              ┆ 3                   ┆ 3                   │
588        │ 4     ┆ b    ┆ 10             ┆ 10             ┆ 7                   ┆ 7                   │
589        └───────┴──────┴────────────────┴────────────────┴─────────────────────┴─────────────────────┘
590        """
591        return Expr(self._expr.expanding())

上下文, 累积计算, 支持所有行算子

rolling(*_dynamic, *_intra_day)的区别在于不用retract

对应polarscum(*_sum, *_mean, ...)

写法

>>> col("a").mean().expanding()
>>> col("a").mean().std().expanding()
>>> col(
...     col("a").mean().expanding(),
...     col("a").std().alias("gg").expanding(),
... )

例子

>>> data = pl.DataFrame({
...     "price": range(5),
...     "code": ["a", "a", "a", "b", "b"]
... })
>>> df = qs.with_cols(
...     col("price").sum().expanding().alias("cum_sum_otters"),
...     pl.col("price").cum_sum().alias("cum_sum_polars"),
...     col("price").sum().expanding().over("code").alias("cum_sum_otters_over"),
...     pl.col("price").cum_sum().over("code").alias("cum_sum_polars_over")
... )
>>> df.calc_data(data)
shape: (5, 6)
┌───────┬──────┬────────────────┬────────────────┬─────────────────────┬─────────────────────┐
│ price ┆ code ┆ cum_sum_otters ┆ cum_sum_polars ┆ cum_sum_otters_over ┆ cum_sum_polars_over │
│ ---   ┆ ---  ┆ ---            ┆ ---            ┆ ---                 ┆ ---                 │
│ i64   ┆ str  ┆ i64            ┆ i64            ┆ i64                 ┆ i64                 │
╞═══════╪══════╪════════════════╪════════════════╪═════════════════════╪═════════════════════╡
│ 0     ┆ a    ┆ 0              ┆ 0              ┆ 0                   ┆ 0                   │
│ 1     ┆ a    ┆ 1              ┆ 1              ┆ 1                   ┆ 1                   │
│ 2     ┆ a    ┆ 3              ┆ 3              ┆ 3                   ┆ 3                   │
│ 3     ┆ b    ┆ 6              ┆ 6              ┆ 3                   ┆ 3                   │
│ 4     ┆ b    ┆ 10             ┆ 10             ┆ 7                   ┆ 7                   │
└───────┴──────┴────────────────┴────────────────┴─────────────────────┴─────────────────────┘
def over( self, *args: Union[str, polars.expr.expr.Expr, Expr, int]) -> Expr:
593    def over(self, *args: ExprInput) -> Expr:
594        """
595        **上下文**, 分组计算
596
597        写法
598        -----
599        >>> col("a").mean().expanding().over("code")
600        >>> col(col("a").mean(), col("b").std()).rolling(10).over("code", "date")
601
602        上下文都支持更紧凑的写法,通常内存耗用也更少,比如下面两种写法
603        >>> import numpy as np
604        >>> n = 4000000
605        >>> data = pl.DataFrame({
606        ...     "price": np.random.randn(n),
607        ...     "code": np.random.choice(["a", "b", "c"], size=n, replace=True)
608        ... })
609
610
611
612        写法1
613        ^^^^^^
614        >>> qs.select(col(
615        ...     col("price").mean().expanding().alias("mean").over("code"),
616        ...     col("price").skew().expanding().alias("skew").over("code"),
617        ... ).perf("not tight")).calc_data(data)
618        >>>
619        --------------
620        not tight
621        129.185255ms
622        --------------
623
624        写法2
625        ^^^^^
626        >>> qs.select(col(
627        ...     col("price").mean().alias("mean"),
628        ...     col("price").skew().alias("skew"),
629        ... ).expanding().over("code").perf("do tight")).calc_data(data)
630        >>>
631        --------------
632        do tight
633        115.143059ms
634        --------------
635        """
636        return Expr(self._expr.over(_select(*args)))

上下文, 分组计算

写法

>>> col("a").mean().expanding().over("code")
>>> col(col("a").mean(), col("b").std()).rolling(10).over("code", "date")

上下文都支持更紧凑的写法,通常内存耗用也更少,比如下面两种写法

>>> import numpy as np
>>> n = 4000000
>>> data = pl.DataFrame({
...     "price": np.random.randn(n),
...     "code": np.random.choice(["a", "b", "c"], size=n, replace=True)
... })

写法1 ^^^^^^

>>> qs.select(col(
...     col("price").mean().expanding().alias("mean").over("code"),
...     col("price").skew().expanding().alias("skew").over("code"),
... ).perf("not tight")).calc_data(data)
<h2 id="-1">>>></h2>

not tight

129.185255ms

写法2 ^^^^^

>>> qs.select(col(
...     col("price").mean().alias("mean"),
...     col("price").skew().alias("skew"),
... ).expanding().over("code").perf("do tight")).calc_data(data)
<h2 id="-2">>>></h2>

do tight

115.143059ms

def group_by( self, *args: str | polars.expr.expr.Expr | Expr) -> Expr:
638    def group_by(self, *args: str | pl.Expr | Expr) -> Expr:
639        """
640        **上下文**, 分组计算, 支持所有行算子, 对于非行算子,在算子后面加上`.last_value()`变成行算子
641
642        写法
643        ----
644        >>> col("a").mean().group_by("code")
645        >>> col("a").mean().group_by("code1", "code2)
646        >>> col(col("a").mean(), col("b").sum()).group_by("code")
647
648        与`over`的区别
649        ----------
650        ```python
651        data = pl.DataFrame({
652            "price": range(5),
653            "code": ["a", "a", "a", "b", "b"]
654        })
655
656        data_next = pl.DataFrame({
657            "price": [2, 3, None],
658            "code": ["c", "c", "a"]
659        })
660        ```
661
662        over写法
663        ^^^^^
664        >>> df_over = qs.with_cols(
665        ...     col("price").mean().expanding().alias("mean_a").over("code")
666        ... )
667        >>> df_over.calc_data(data)
668        shape: (5, 3)
669        ┌───────┬──────┬────────┐
670        │ price ┆ code ┆ mean_a │
671        │ ---   ┆ ---  ┆ ---    │
672        │ i64   ┆ str  ┆ f64    │
673        ╞═══════╪══════╪════════╡
674        │ 0     ┆ a    ┆ 0.0    │
675        │ 1     ┆ a    ┆ 0.5    │
676        │ 2     ┆ a    ┆ 1.0    │
677        │ 3     ┆ b    ┆ 3.0    │
678        │ 4     ┆ b    ┆ 3.5    │
679        └───────┴──────┴────────┘
680        >>> df_over.calc_data(data_next)
681        shape: (3, 3)
682        ┌───────┬──────┬────────┐
683        │ price ┆ code ┆ mean_a │
684        │ ---   ┆ ---  ┆ ---    │
685        │ i64   ┆ str  ┆ f64    │
686        ╞═══════╪══════╪════════╡
687        │ 2     ┆ c    ┆ 2.0    │
688        │ 3     ┆ c    ┆ 2.5    │
689        │ null  ┆ a    ┆ null   │ -> 这样的mean_a是null,原因是price是null
690        └───────┴──────┴────────┘
691
692        group_by写法
693        ^^^^^
694        >>> df_group_by = qs.select(   # 这里要用select,因为groupby后的长度和原先长度不一致
695        ...     col("price").mean().alias("mean_a").group_by("code")
696        ... )
697        >>> df_group_by.calc_data(data)
698        shape: (2, 2)
699        ┌──────┬────────┐
700        │ code ┆ mean_a │
701        │ ---  ┆ ---    │
702        │ str  ┆ f64    │
703        ╞══════╪════════╡
704        │ a    ┆ 1.0    │
705        │ b    ┆ 3.5    │
706        └──────┴────────┘
707        >>> df_group_by.calc_data(data_next)
708        shape: (3, 2)
709        ┌──────┬────────┐
710        │ code ┆ mean_a │
711        │ ---  ┆ ---    │
712        │ str  ┆ f64    │
713        ╞══════╪════════╡
714        │ a    ┆ 1.0    │ -> data_next的a行是null值,但是这里返回的不是null
715        │ b    ┆ 3.5    │ -> data_next不包含b行,但是这里依然返回了b, 也就是说历史分组都会返回
716        │ c    ┆ 2.5    │ -> 新增的分组
717        └──────┴────────┘
718        """
719        return Expr(self._expr.group_by(_select(*args)))

上下文, 分组计算, 支持所有行算子, 对于非行算子,在算子后面加上.last_value()变成行算子

写法

>>> col("a").mean().group_by("code")
>>> col("a").mean().group_by("code1", "code2)
>>> col(col("a").mean(), col("b").sum()).group_by("code")

over的区别

data = pl.DataFrame({
    "price": range(5),
    "code": ["a", "a", "a", "b", "b"]
})

data_next = pl.DataFrame({
    "price": [2, 3, None],
    "code": ["c", "c", "a"]
})

over写法 ^^^^^

>>> df_over = qs.with_cols(
...     col("price").mean().expanding().alias("mean_a").over("code")
... )
>>> df_over.calc_data(data)
shape: (5, 3)
┌───────┬──────┬────────┐
│ price ┆ code ┆ mean_a │
│ ---   ┆ ---  ┆ ---    │
│ i64   ┆ str  ┆ f64    │
╞═══════╪══════╪════════╡
│ 0     ┆ a    ┆ 0.0    │
│ 1     ┆ a    ┆ 0.5    │
│ 2     ┆ a    ┆ 1.0    │
│ 3     ┆ b    ┆ 3.0    │
│ 4     ┆ b    ┆ 3.5    │
└───────┴──────┴────────┘
>>> df_over.calc_data(data_next)
shape: (3, 3)
┌───────┬──────┬────────┐
│ price ┆ code ┆ mean_a │
│ ---   ┆ ---  ┆ ---    │
│ i64   ┆ str  ┆ f64    │
╞═══════╪══════╪════════╡
│ 2     ┆ c    ┆ 2.0    │
│ 3     ┆ c    ┆ 2.5    │
│ null  ┆ a    ┆ null   │ -> 这样的mean_a是null,原因是price是null
└───────┴──────┴────────┘

group_by写法 ^^^^^

>>> df_group_by = qs.select(   # 这里要用select,因为groupby后的长度和原先长度不一致
...     col("price").mean().alias("mean_a").group_by("code")
... )
>>> df_group_by.calc_data(data)
shape: (2, 2)
┌──────┬────────┐
│ code ┆ mean_a │
│ ---  ┆ ---    │
│ str  ┆ f64    │
╞══════╪════════╡
│ a    ┆ 1.0    │
│ b    ┆ 3.5    │
└──────┴────────┘
>>> df_group_by.calc_data(data_next)
shape: (3, 2)
┌──────┬────────┐
│ code ┆ mean_a │
│ ---  ┆ ---    │
│ str  ┆ f64    │
╞══════╪════════╡
│ a    ┆ 1.0    │ -> data_next的a行是null值,但是这里返回的不是null
│ b    ┆ 3.5    │ -> data_next不包含b行,但是这里依然返回了b, 也就是说历史分组都会返回
│ c    ┆ 2.5    │ -> 新增的分组
└──────┴────────┘
def filter( self, *args: Union[str, polars.expr.expr.Expr, Expr, int]) -> Expr:
722    def filter(self, *args: ExprInput) -> Expr:
723        return Expr(self._expr.filter(_select(*args)))
def filter_cb(self, by: str | polars.expr.expr.Expr | Expr) -> Expr:
725    def filter_cb(self, by: str | pl.Expr | Expr) -> Expr:
726        """
727        **上下文**, 过滤计算, 先过滤掉行,然后计算,最后用null值把原先过滤掉的行填充回来
728
729        写法
730        ------
731        >>> col("a").mean().rolling(5).filter_cb("to_filter")
732        >>> col(
733        ...     col("a").mean().rolling(5),
734        ...     col("b").sum().expanding()
735        ... ).filter("to_filter")
736
737        使用场景
738        -------
739        假设我有下面类型的流式数据
740        >>> data = pl.DataFrame({
741        ...     "price": range(6),
742        ...     "is_finished": [False, True, False, True, False, True],
743        ... })
744        其中 price 列是从tick合成的k线价格, is_finished, 表示这个k线是否还在合成中,True表示那一个tick合成完成
745
746        我现在要计算k线的移动平均,我可以先filter之后再计算,但是这样tick数据就没了,如果我后续需要tick回测就做不到
747
748        所以可以用这个算子这么做
749        >>> qs.with_cols(
750        ...     col("price").mean().rolling(2, 1).filter_cb("is_finished").alias("ma")
751        ... ).calc_data(data)
752        >>>
753        shape: (6, 3)
754        ┌───────┬─────────────┬──────┐
755        │ price ┆ is_finished ┆ ma   │
756        │ ---   ┆ ---         ┆ ---  │
757        │ i64   ┆ bool        ┆ f64  │
758        ╞═══════╪═════════════╪══════╡
759        │ 0     ┆ false       ┆ null │ 被filter掉,所以每参与移动平均的计算
760        │ 1     ┆ true        ┆ 1.0  │ [1] -> 1
761        │ 2     ┆ false       ┆ null │ 被filter掉,所以每参与移动平均的计算
762        │ 3     ┆ true        ┆ 2.0  │ [1, 3] -> 2
763        │ 4     ┆ false       ┆ null │ 被filter掉,所以每参与移动平均的计算
764        │ 5     ┆ true        ┆ 4.0  │ [3, 5] -> 4
765        └───────┴─────────────┴──────┘
766        """
767        return Expr(self._expr.filter_cb(_col(by)))

上下文, 过滤计算, 先过滤掉行,然后计算,最后用null值把原先过滤掉的行填充回来

写法

>>> col("a").mean().rolling(5).filter_cb("to_filter")
>>> col(
...     col("a").mean().rolling(5),
...     col("b").sum().expanding()
... ).filter("to_filter")

使用场景

假设我有下面类型的流式数据

>>> data = pl.DataFrame({
...     "price": range(6),
...     "is_finished": [False, True, False, True, False, True],
... })
其中 price 列是从tick合成的k线价格, is_finished, 表示这个k线是否还在合成中,True表示那一个tick合成完成

我现在要计算k线的移动平均,我可以先filter之后再计算,但是这样tick数据就没了,如果我后续需要tick回测就做不到

所以可以用这个算子这么做

>>> qs.with_cols(
...     col("price").mean().rolling(2, 1).filter_cb("is_finished").alias("ma")
... ).calc_data(data)
>>>
shape: (6, 3)
┌───────┬─────────────┬──────┐
│ price ┆ is_finished ┆ ma   │
│ ---   ┆ ---         ┆ ---  │
│ i64   ┆ bool        ┆ f64  │
╞═══════╪═════════════╪══════╡
│ 0     ┆ false       ┆ null │ 被filter掉,所以每参与移动平均的计算
│ 1     ┆ true        ┆ 1.0  │ [1] -> 1
│ 2     ┆ false       ┆ null │ 被filter掉,所以每参与移动平均的计算
│ 3     ┆ true        ┆ 2.0  │ [1, 3] -> 2
│ 4     ┆ false       ┆ null │ 被filter掉,所以每参与移动平均的计算
│ 5     ┆ true        ┆ 4.0  │ [3, 5] -> 4
└───────┴─────────────┴──────┘
def select( self, *args: str | polars.expr.expr.Expr | Expr) -> Expr:
769    def select(self, *args: str | pl.Expr | Expr) -> Expr:
770        """
771        **无状态算子**,选择列
772
773        写法
774        -----
775        >>> col("a").select("a", (pl.col("a") + 1).alias("b"), col("a").ffill().alias("c"))
776        >>> col("a", "b").select(pl.col("a") + pl.col("b")).select(col("a").alias("gg"))
777        """
778        return Expr(self._expr.select([_col(x) for x in args]))

无状态算子,选择列

写法

>>> col("a").select("a", (pl.col("a") + 1).alias("b"), col("a").ffill().alias("c"))
>>> col("a", "b").select(pl.col("a") + pl.col("b")).select(col("a").alias("gg"))
def alias(self, names: str | list[str]) -> Expr:
780    def alias(self, names: str | list[str]) -> Expr:
781        """
782        **无状态算子**,重命名
783
784        写法
785        -----
786        >>> col("a").alias("b")
787        >>> col("a", "b").alias("gg1", "gg2")
788        """
789        if isinstance(names, str):
790            names = [names]
791        return Expr(self._expr.alias(names))

无状态算子,重命名

写法

>>> col("a").alias("b")
>>> col("a", "b").alias("gg1", "gg2")
def perf(self, id: str | None = None) -> Expr:
793    def perf(self, id: str | None = None) -> Expr:
794        """
795        **无状态算子**,打印算子的计算时间
796
797        写法
798        -----
799        >>> col("a").std().rolling(5).perf()
800        >>> col(col("a").std(), col("b").mean()).rolling(5).perf("min std perf")
801
802        说明
803        -----
804        `with_cols` 和 `select`里面有多个算子的时候,用的多线程,所以每个算子的计算时间加起来会大于总的计算时间
805        比如
806        ```python
807        import numpy as np
808        n = 1000000
809        data = pl.DataFrame({
810            "price": np.random.randn(n),
811        })
812        res = qs.select(col(
813            col("price").mean().alias("mean1").perf("1"),
814            col("price").mean().alias("mean2").perf("2"),
815            col("price").mean().alias("mean3").perf("3"),
816        ).expanding().perf("all")).calc_data(data)
817        ```
818        >>>
819        --------------
820        2
821        4.240824ms
822        --------------
823        --------------
824        1
825        4.279659ms
826        --------------
827        --------------
828        3
829        4.815348ms
830        --------------
831        --------------
832        all
833        5.027417ms
834        --------------
835
836        可以看到 4 + 4 + 4 > 5
837        """
838        return Expr(self._expr.perf(id))

无状态算子,打印算子的计算时间

写法

>>> col("a").std().rolling(5).perf()
>>> col(col("a").std(), col("b").mean()).rolling(5).perf("min std perf")

说明

with_colsselect里面有多个算子的时候,用的多线程,所以每个算子的计算时间加起来会大于总的计算时间 比如

import numpy as np
n = 1000000
data = pl.DataFrame({
    "price": np.random.randn(n),
})
res = qs.select(col(
    col("price").mean().alias("mean1").perf("1"),
    col("price").mean().alias("mean2").perf("2"),
    col("price").mean().alias("mean3").perf("3"),
).expanding().perf("all")).calc_data(data)

>>>

2

4.240824ms


1

4.279659ms


3

4.815348ms


all

5.027417ms

可以看到 4 + 4 + 4 > 5

def shift(self, n: int) -> Expr:
840    def shift(self, n: int) -> Expr:
841        """
842        **行算子**,无retract,n只能大于0,n < 0 会报错, 因为n小于0无法进行流式计算
843
844        写法
845        ----
846        >>> col("a").shift(1)
847        >>> col("a").shift(10).over("code")
848        """
849
850        return Expr(self._expr.shift(n))

行算子,无retract,n只能大于0,n < 0 会报错, 因为n小于0无法进行流式计算

写法

>>> col("a").shift(1)
>>> col("a").shift(10).over("code")
def ffill(self) -> Expr:
852    def ffill(self) -> Expr:
853        """
854        **行算子**, 前向填充
855        """
856        return Expr(self._expr.ffill())

行算子, 前向填充

def with_cols( self, *args: Union[str, polars.expr.expr.Expr, Expr, int]) -> Expr:
858    def with_cols(self, *args: ExprInput) -> Expr:
859        """
860        **无状态算子**, 和select类似,只不过保留输入列,如果新生成列和输入列有重复列,会只保留新生成的重复列
861        """
862        return Expr(self._expr.with_cols([_col(x) for x in args]))

无状态算子, 和select类似,只不过保留输入列,如果新生成列和输入列有重复列,会只保留新生成的重复列

def first_value(self) -> Expr:
864    def first_value(self) -> Expr:
865        """
866        **行算子**, 最后第一个值
867        """
868        return Expr(self._expr.first_value())

行算子, 最后第一个值

def last_value(self) -> Expr:
870    def last_value(self) -> Expr:
871        """
872        **行算子**, 最后一个值
873        """
874        return Expr(self._expr.last_value())

行算子, 最后一个值

def add_suffix(self, suffix: str) -> Expr:
876    def add_suffix(self, suffix: str) -> Expr:
877        """
878        **无状态算子**,添加后缀
879        """
880        return Expr(self._expr.add_suffix(suffix))

无状态算子,添加后缀

def count(self) -> Expr:
882    def count(self) -> Expr:
883        """
884        **行算子**,计数
885        """
886        return Expr(self._expr.count())

行算子,计数

def skew(self, bias: bool = True) -> Expr:
888    def skew(self, bias: bool = True) -> Expr:
889        """
890        **行算子**, 偏度
891        """
892        return Expr(self._expr.skew(bias))

行算子, 偏度

def kurtosis(self, fisher: bool = True, bias: bool = True) -> Expr:
894    def kurtosis(self, fisher: bool = True, bias: bool = True) -> Expr:
895        """
896        **行算子**, 峰度
897        """
898        return Expr(self._expr.kurtosis(fisher, bias))

行算子, 峰度

def cov(self, ddof: int = 1) -> Expr:
900    def cov(self, ddof: int = 1) -> Expr:
901        """
902        **行算子**, 计算两列的协方差
903
904        写法
905        -----
906        >>> col("a", "b").cov()
907        """
908        return Expr(self._expr.cov(ddof))

行算子, 计算两列的协方差

写法

>>> col("a", "b").cov()
def corr(self, ddof: int = 1, method: str = 'pearson') -> Expr:
910    def corr(self, ddof: int = 1, method: str = "pearson") -> Expr:
911        """
912        **行算子**, 计算两列的相关性
913
914        参数
915        -----
916        :ddof: int,
917        :method: str, "pearson" | "spearman"
918
919
920        写法
921        -----
922        >>> col("a", "b").corr()
923        >>> col("a", "b").corr("spearman")
924        """
925        return Expr(self._expr.corr(ddof, method))

行算子, 计算两列的相关性

参数

:ddof: int, :method: str, "pearson" | "spearman"

写法

>>> col("a", "b").corr()
>>> col("a", "b").corr("spearman")
def min(self) -> Expr:
927    def min(self) -> Expr:
928        """
929        **行算子**,最小值
930        """
931        return Expr(self._expr.min())

行算子,最小值

def max(self) -> Expr:
933    def max(self) -> Expr:
934        """
935        **行算子**, 最大值
936        """
937        return Expr(self._expr.max())

行算子, 最大值

def ewm(self, alpha: float) -> Expr:
939    def ewm(self, alpha: float) -> Expr:
940        """
941        **行算子**, 无retract,指数加权平均
942        """
943        return Expr(self._expr.ewm(alpha))

行算子, 无retract,指数加权平均

def cache(self, id: str) -> Expr:
945    def cache(self, id: str) -> Expr:
946        return Expr(self._expr.cache(id))
def child_exprs(self) -> List[Expr]:
948    def child_exprs(self) -> List[Expr]:
949        res = [Expr(x) for x in self._expr.child_exprs()]
950        return res
def max_k(self, k: int) -> Expr:
952    def max_k(self, k: int) -> Expr:
953        """
954        **行算子**,选出最大的k个值
955        """
956        raise NotImplementedError("还没添加")

行算子,选出最大的k个值

def min_k(self, k: int) -> Expr:
958    def min_k(self, k: int) -> Expr:
959        """
960        **行算子**, 选出最小的k个值
961        """
962        raise NotImplementedError("还没添加")

行算子, 选出最小的k个值

def arg_max(self) -> Expr:
964    def arg_max(self) -> Expr:
965        """
966        **行算子**,最大值的位置
967        """
968        raise NotImplementedError("还没添加")

行算子,最大值的位置

def arg_min(self) -> Expr:
970    def arg_min(self) -> Expr:
971        """
972        **行算子**, 最小值位置
973        """
974        raise NotImplementedError("还没添加")

行算子, 最小值位置

def udf(unknown):
def kline(unknown):
def stra(unknown):
def bt(unknown):
def plot(unknown):
def ta(unknown):
def stock(unknown):
def fp(unknown):
class DataFrame:
12class DataFrame:
13
14    def __init__(self, origin: DataFrameQust | None = None):
15        if origin is None:
16            origin = DataFrameQust()
17        self.__df = origin
18
19    def select(self, *args: ExprInput) -> "DataFrame":
20        return DataFrame(self.__df.select([_col(x) for x in args]))
21
22    def with_cols(self, *args: ExprInput) -> "DataFrame":
23        return DataFrame(self.__df.with_cols([_col(x) for x in args]))
24
25    def filter(self, arg: ExprInput) -> "DataFrame":
26        return DataFrame(self.__df.filter(_col(arg)))
27
28    def calc_data(self, data: pl.DataFrame) -> pl.DataFrame:
29        return self.__df.calc_data(data)
DataFrame(origin: DataFrame | None = None)
14    def __init__(self, origin: DataFrameQust | None = None):
15        if origin is None:
16            origin = DataFrameQust()
17        self.__df = origin
def select( self, *args: Union[str, polars.expr.expr.Expr, Expr, int]) -> DataFrame:
19    def select(self, *args: ExprInput) -> "DataFrame":
20        return DataFrame(self.__df.select([_col(x) for x in args]))
def with_cols( self, *args: Union[str, polars.expr.expr.Expr, Expr, int]) -> DataFrame:
22    def with_cols(self, *args: ExprInput) -> "DataFrame":
23        return DataFrame(self.__df.with_cols([_col(x) for x in args]))
def filter( self, arg: Union[str, polars.expr.expr.Expr, Expr, int]) -> DataFrame:
25    def filter(self, arg: ExprInput) -> "DataFrame":
26        return DataFrame(self.__df.filter(_col(arg)))
def calc_data( self, data: polars.dataframe.frame.DataFrame) -> polars.dataframe.frame.DataFrame:
28    def calc_data(self, data: pl.DataFrame) -> pl.DataFrame:
29        return self.__df.calc_data(data)
def with_cols( *args: str | polars.expr.expr.Expr | Expr) -> DataFrame:
27def with_cols(*args: str | pl.Expr | Expr) -> DataFrame:
28    return DataFrame().with_cols(*args)
def select( *args: str | polars.expr.expr.Expr | Expr) -> DataFrame:
30def select(*args: str | pl.Expr | Expr) -> DataFrame:
31    return DataFrame().select(*args)
def clear_cache(id: str | None = None) -> None:
21def clear_cache(id: str | None = None) -> None:
22    """
23    清除全局变量
24    """
25    clear_cache_otters(id)

清除全局变量

class UdfBatch(abc.ABC):
 9class UdfBatch(ABC):
10    """
11    **批算子**, python自定义批算子
12
13    例子
14    -----
15    自定义一个批算子,计算出输入的移动平均并且画图,在jupyter里面,每次接受到新数据后,更新图片
16    ```python
17    # 在jupyter里面的一个单元格下面定义算子
18    import matplotlib.pyplot as plt
19    import matplotlib
20
21    class PlotLineInJupyter(qs.UdfBatch):
22        def __init__(self):
23            plt.ion()
24            self.fig, self.ax = plt.subplots(figsize=(10, 6))
25            self.line1, = self.ax.plot([], [],  label="raw", linewidth=2)
26            self.line2, = self.ax.plot([], [],  label="rolling_mean", linewidth=2)
27            self.ax.set_title('test udf batch', fontsize=14, fontweight='bold')
28            self.x_data, self.y_data, self.y_data_mean = [], [], []  
29            self.fig.tight_layout()
30            self.display_handle = display(self.fig, display_id=True)
31            self.fig.canvas.draw()
32            plt.close(self.fig) 
33
34        
35        def calc_batch(self, data):
36            self.x_data.extend(data[:, 0].to_list())
37            self.y_data.extend(data[:, 1].to_list())
38            self.y_data_mean.extend(data[:, 2].to_list())
39            self.line1.set_data(self.x_data, self.y_data)
40            self.line2.set_data(self.x_data, self.y_data_mean)
41            if self.y_data:
42                x_min, x_max = min(self.x_data), max(self.x_data)
43                y_min, y_max = min(self.y_data), max(self.y_data)
44                self.ax.set_xlim(x_min - max(5, x_max*0.05), x_max + max(5, x_max*0.05))
45                self.ax.set_ylim(y_min - max(0.05, y_max*0.05), y_max + max(0.05, y_max*0.05))
46            self.fig.canvas.draw()
47            self.fig.canvas.flush_events()
48            self.display_handle.update(self.fig)
49            return data
50
51    df = qs.DataFrame().select(
52        col(
53            "index",
54            "y",
55            col("y").mean().rolling(10).alias("y_mean"),
56        ).udf_batch(PlotLineInJupyter())
57    
58    # 在另一个单元格里面运行
59    import numpy as np
60    n = 1000
61    data = pl.DataFrame({
62        "index": range(n),
63        "y": np.random.randn(n),  
64    })
65
66    import time 
67    for i in range(0, 200, 2):
68        df.calc_data(data[i:i+2])
69        time.sleep(0.01)
70    ```
71    """
72
73    @abstractmethod
74    def calc_batch(self, data: pl.DataFrame) -> pl.DataFrame:
75        pass

批算子, python自定义批算子

例子

自定义一个批算子,计算出输入的移动平均并且画图,在jupyter里面,每次接受到新数据后,更新图片

# 在jupyter里面的一个单元格下面定义算子
import matplotlib.pyplot as plt
import matplotlib

class PlotLineInJupyter(qs.UdfBatch):
    def __init__(self):
        plt.ion()
        self.fig, self.ax = plt.subplots(figsize=(10, 6))
        self.line1, = self.ax.plot([], [],  label="raw", linewidth=2)
        self.line2, = self.ax.plot([], [],  label="rolling_mean", linewidth=2)
        self.ax.set_title('test udf batch', fontsize=14, fontweight='bold')
        self.x_data, self.y_data, self.y_data_mean = [], [], []  
        self.fig.tight_layout()
        self.display_handle = display(self.fig, display_id=True)
        self.fig.canvas.draw()
        plt.close(self.fig) 


    def calc_batch(self, data):
        self.x_data.extend(data[:, 0].to_list())
        self.y_data.extend(data[:, 1].to_list())
        self.y_data_mean.extend(data[:, 2].to_list())
        self.line1.set_data(self.x_data, self.y_data)
        self.line2.set_data(self.x_data, self.y_data_mean)
        if self.y_data:
            x_min, x_max = min(self.x_data), max(self.x_data)
            y_min, y_max = min(self.y_data), max(self.y_data)
            self.ax.set_xlim(x_min - max(5, x_max*0.05), x_max + max(5, x_max*0.05))
            self.ax.set_ylim(y_min - max(0.05, y_max*0.05), y_max + max(0.05, y_max*0.05))
        self.fig.canvas.draw()
        self.fig.canvas.flush_events()
        self.display_handle.update(self.fig)
        return data

df = qs.DataFrame().select(
    col(
        "index",
        "y",
        col("y").mean().rolling(10).alias("y_mean"),
    ).udf_batch(PlotLineInJupyter())

# 在另一个单元格里面运行
import numpy as np
n = 1000
data = pl.DataFrame({
    "index": range(n),
    "y": np.random.randn(n),  
})

import time 
for i in range(0, 200, 2):
    df.calc_data(data[i:i+2])
    time.sleep(0.01)
@abstractmethod
def calc_batch( self, data: polars.dataframe.frame.DataFrame) -> polars.dataframe.frame.DataFrame:
73    @abstractmethod
74    def calc_batch(self, data: pl.DataFrame) -> pl.DataFrame:
75        pass
class UdfRow(abc.ABC):
 77class UdfRow(ABC):
 78    """
 79    **行算子**, python自定义行算子,和其他行算子有相同行为
 80
 81    示例
 82    -----
 83    1. 实现一个马丁格尔策略
 84    ```python
 85    class MartinGillStra(qs.UdfRow):
 86        # 用python自定义行算子,实现一个马丁格尔策略
 87
 88        # 策略的输入
 89        # -----
 90        # col(price, line_down_std2, line_down_std1, line_middle, line_up_std1, line_up_std2)
 91
 92        # price: k线价格
 93
 94        # line_down_std2: k线形成的最低线
 95
 96        # line_down_std1: k线形成的中下线
 97
 98        # line_middle: k线的中间线
 99
100        # line_up_std1: k线形成的中上线
101
102        # line_up_std2: k线形成的最高线
103
104        # 策略逻辑
105        # ------
106
107        # price 处于 [line_middle, line_up_std1], 目标仓位1,
108        # price 处于 [line_middle, line_up_std2], 目标仓位2,
109        # price 处于 [line_up_std2, inf], 目标仓位3,
110        # 反过来就是 -1, -2, -3
111
112        # 输出
113        # ------
114        # col(target)
115        # target: 目标仓位
116
117        def __init__(self):
118            self.last_hold = 0.0
119
120        def output_schema(self, input_schema):
121            return [("target", pl.Float64)]
122
123        def update(self, price, down_std2, down_std1, middle, up_std1, up_std2):
124            # 如果能够保证输入没有null,可以去除这个检查,性能有提升
125            if price is None or down_std2 is None or down_std1 is None or middle is None or up_std1 is None or up_std2 is None:
126                return None
127            if price <= down_std2:
128                self.last_hold = -3.0
129            elif down_std2 < price <= down_std1:
130                self.last_hold = -2.0
131            elif down_std1 < price <= middle:
132                self.last_hold = -1.0
133            elif middle < price < up_std1:
134                self.last_hold = 1.0
135            elif up_std1 <= price < up_std2:
136                self.last_hold = 2.0
137            elif up_std2 <= price:
138                self.last_hold = 3.0
139
140        def calc(self):
141            return [self.last_hold]
142
143    df = qs.select(
144        col(
145            col("close").alias("price"),
146            col("close").mean().alias("middle"),
147            col("close").std().alias("std"),
148        ).rolling(20, 1).select(col(
149            "price",
150            (pl.col("middle") - 2.0 * pl.col("std")).alias("a"),
151            (pl.col("middle") - 1.0 * pl.col("std")).alias("b"),
152            "middle",
153            (pl.col("middle") + 1.0 * pl.col("std")).alias("c"),
154            (pl.col("middle") + 2.0 * pl.col("std")).alias("d"),
155        ).udf.row(MartinGillStra())).expanding()
156    )
157
158    df.calc_data(data)
159    ```
160
161    2. 自己实现一个均值计算
162    ```python
163    class MeanUdf(qs.UdfRow):
164
165        def __init__(self):
166            self.sum = 0.0
167            self.count = 0.0
168
169        def output_schema(self, input_schema):
170            return [("mean_res", pl.Float64)]
171        
172        def update(self, value):
173            self.sum += value
174            self.count += 1.0
175
176        def calc(self):
177            return [self.sum / self.count]
178
179        def retract(self, value):
180            self.sum -= value
181            self.count -= 1.0
182
183    import numpy as np
184    n = 1000
185    data = pl.DataFrame({
186        "value": np.random.randn(n),
187        "code": np.random.choice(["a", "b", "c"], size=n, replace=True),
188        "window": np.random.choice([10, 5, 2], size = n, replace=True),
189        "intra_day": np.random.choice([True, False], size = n, replace=True)
190    })
191
192    qs.with_cols(
193        col("value").udf.row(MeanUdf()).expanding().alias("expanding"),
194        col("value").udf.row(MeanUdf()).rolling(10).alias("rolling"),
195        col("value").udf.row(MeanUdf()).rolling_dynamic("window").alias("rolling_dynamic"),
196        col("value").udf.row(MeanUdf()).rolling_intra_day("intra_day", 3).alias("rolling_intraday"),
197        col("value").udf.row(MeanUdf()).expanding().alias("expanding").over("code").add_suffix("over"),
198        col("value").udf.row(MeanUdf()).rolling(10).alias("rolling").over("code").add_suffix("over"),
199        col("value").udf.row(MeanUdf()).rolling_dynamic("window").alias("rolling_dynamic").add_suffix("over"),
200        col("value").udf.row(MeanUdf()).rolling_intra_day("intra_day", 3).alias("rolling_intraday").add_suffix("over"),
201    ).calc_data(data)
202
203    qs.select(
204        col("value").udf.row(MeanUdf())
205    ).calc_data(data)
206
207    qs.select(
208        col("value").udf.row(MeanUdf()).group_by("code")
209    ).calc_data(data)
210    ```
211    """
212
213    @abstractmethod
214    def output_schema(self, input_schema: pl.Schema) -> List[Tuple[str, pl.DataType]]:
215        """
216        算子返回的输出,列名 + 类型
217        """
218        pass
219
220    @abstractmethod
221    def update(self, *args):
222        """
223        :args: 算子的输入
224        比如输入是 col(a, b, c), args就是a,b,c三列每一行
225        """
226        pass
227
228    @abstractmethod
229    def calc(self) -> List:
230        """
231        必须返回一个List, 长度和self.output_schema一致
232        """
233        pass
234
235    def support_retract(self) -> bool:
236        return True
237
238    def retract(self, *args):
239        """
240        如果没有添加这个方法,那么这个行算子不能用于`rolling(*_dynamic, *_intra_day)`上下文:
241        """
242        raise NotImplementedError("如果需要自定义的行算子支持各类rolling,得先为这个行算子添加`retract`方法") 
243
244    def retract_front(self, *args):
245        self.retract(*args)
246
247    def retract_back(self, *args):
248        self.retract(*args)

行算子, python自定义行算子,和其他行算子有相同行为

示例

  1. 实现一个马丁格尔策略
class MartinGillStra(qs.UdfRow):
    # 用python自定义行算子,实现一个马丁格尔策略

    # 策略的输入
    # -----
    # col(price, line_down_std2, line_down_std1, line_middle, line_up_std1, line_up_std2)

    # price: k线价格

    # line_down_std2: k线形成的最低线

    # line_down_std1: k线形成的中下线

    # line_middle: k线的中间线

    # line_up_std1: k线形成的中上线

    # line_up_std2: k线形成的最高线

    # 策略逻辑
    # ------

    # price 处于 [line_middle, line_up_std1], 目标仓位1,
    # price 处于 [line_middle, line_up_std2], 目标仓位2,
    # price 处于 [line_up_std2, inf], 目标仓位3,
    # 反过来就是 -1, -2, -3

    # 输出
    # ------
    # col(target)
    # target: 目标仓位

    def __init__(self):
        self.last_hold = 0.0

    def output_schema(self, input_schema):
        return [("target", pl.Float64)]

    def update(self, price, down_std2, down_std1, middle, up_std1, up_std2):
        # 如果能够保证输入没有null,可以去除这个检查,性能有提升
        if price is None or down_std2 is None or down_std1 is None or middle is None or up_std1 is None or up_std2 is None:
            return None
        if price <= down_std2:
            self.last_hold = -3.0
        elif down_std2 < price <= down_std1:
            self.last_hold = -2.0
        elif down_std1 < price <= middle:
            self.last_hold = -1.0
        elif middle < price < up_std1:
            self.last_hold = 1.0
        elif up_std1 <= price < up_std2:
            self.last_hold = 2.0
        elif up_std2 <= price:
            self.last_hold = 3.0

    def calc(self):
        return [self.last_hold]

df = qs.select(
    col(
        col("close").alias("price"),
        col("close").mean().alias("middle"),
        col("close").std().alias("std"),
    ).rolling(20, 1).select(col(
        "price",
        (pl.col("middle") - 2.0 * pl.col("std")).alias("a"),
        (pl.col("middle") - 1.0 * pl.col("std")).alias("b"),
        "middle",
        (pl.col("middle") + 1.0 * pl.col("std")).alias("c"),
        (pl.col("middle") + 2.0 * pl.col("std")).alias("d"),
    ).udf.row(MartinGillStra())).expanding()
)

df.calc_data(data)
  1. 自己实现一个均值计算
class MeanUdf(qs.UdfRow):

    def __init__(self):
        self.sum = 0.0
        self.count = 0.0

    def output_schema(self, input_schema):
        return [("mean_res", pl.Float64)]

    def update(self, value):
        self.sum += value
        self.count += 1.0

    def calc(self):
        return [self.sum / self.count]

    def retract(self, value):
        self.sum -= value
        self.count -= 1.0

import numpy as np
n = 1000
data = pl.DataFrame({
    "value": np.random.randn(n),
    "code": np.random.choice(["a", "b", "c"], size=n, replace=True),
    "window": np.random.choice([10, 5, 2], size = n, replace=True),
    "intra_day": np.random.choice([True, False], size = n, replace=True)
})

qs.with_cols(
    col("value").udf.row(MeanUdf()).expanding().alias("expanding"),
    col("value").udf.row(MeanUdf()).rolling(10).alias("rolling"),
    col("value").udf.row(MeanUdf()).rolling_dynamic("window").alias("rolling_dynamic"),
    col("value").udf.row(MeanUdf()).rolling_intra_day("intra_day", 3).alias("rolling_intraday"),
    col("value").udf.row(MeanUdf()).expanding().alias("expanding").over("code").add_suffix("over"),
    col("value").udf.row(MeanUdf()).rolling(10).alias("rolling").over("code").add_suffix("over"),
    col("value").udf.row(MeanUdf()).rolling_dynamic("window").alias("rolling_dynamic").add_suffix("over"),
    col("value").udf.row(MeanUdf()).rolling_intra_day("intra_day", 3).alias("rolling_intraday").add_suffix("over"),
).calc_data(data)

qs.select(
    col("value").udf.row(MeanUdf())
).calc_data(data)

qs.select(
    col("value").udf.row(MeanUdf()).group_by("code")
).calc_data(data)
@abstractmethod
def output_schema( self, input_schema: polars.schema.Schema) -> List[Tuple[str, polars.datatypes.classes.DataType]]:
213    @abstractmethod
214    def output_schema(self, input_schema: pl.Schema) -> List[Tuple[str, pl.DataType]]:
215        """
216        算子返回的输出,列名 + 类型
217        """
218        pass

算子返回的输出,列名 + 类型

@abstractmethod
def update(self, *args):
220    @abstractmethod
221    def update(self, *args):
222        """
223        :args: 算子的输入
224        比如输入是 col(a, b, c), args就是a,b,c三列每一行
225        """
226        pass

:args: 算子的输入 比如输入是 col(a, b, c), args就是a,b,c三列每一行

@abstractmethod
def calc(self) -> List:
228    @abstractmethod
229    def calc(self) -> List:
230        """
231        必须返回一个List, 长度和self.output_schema一致
232        """
233        pass

必须返回一个List, 长度和self.output_schema一致

def support_retract(self) -> bool:
235    def support_retract(self) -> bool:
236        return True
def retract(self, *args):
238    def retract(self, *args):
239        """
240        如果没有添加这个方法,那么这个行算子不能用于`rolling(*_dynamic, *_intra_day)`上下文:
241        """
242        raise NotImplementedError("如果需要自定义的行算子支持各类rolling,得先为这个行算子添加`retract`方法") 

如果没有添加这个方法,那么这个行算子不能用于rolling(*_dynamic, *_intra_day)上下文:

def retract_front(self, *args):
244    def retract_front(self, *args):
245        self.retract(*args)
def retract_back(self, *args):
247    def retract_back(self, *args):
248        self.retract(*args)
class TradePriceType:
455class TradePriceType:
456    """
457    挂单价格类型
458    """
459    queue = TradePrice.queue()
460    """
461    排队成交,卖单挂ask1,买单挂bid1
462    """
463    last_price = TradePrice.last_price()
464    """
465    挂最新价
466    """
467    opsite = TradePrice.opsite()
468    """
469    挂对手价,卖单挂bid1,买单挂ask1
470    """

挂单价格类型

queue = Queue

排队成交,卖单挂ask1,买单挂bid1

last_price = LastPrice

挂最新价

opsite = Opsite

挂对手价,卖单挂bid1,买单挂ask1

class MatchPriceType:
473class MatchPriceType:
474    """
475    撮合成交类型
476    """
477    simnow = MatchPrice.simnow()
478    """
479    用simnow的方法撮合成交
480    """
481    void = MatchPrice.void()
482    """
483    按挂单价格直接成交
484    """

撮合成交类型

simnow = Simnow

用simnow的方法撮合成交

void = Void

按挂单价格直接成交