qust
1from pathlib import Path 2import sys as __sys 3from importlib import reload as __rl__ 4 5__sub_modules = ["expr", "dataframe", "udf", "future", "plot", "ta", "stock", "fast_path"] 6 7for __sub in __sub_modules: 8 __full_name = f"qust.{__sub}" 9 if __full_name in __sys.modules: 10 __mod = __import__(__full_name, fromlist=["*"]) 11 __rl__(__mod) 12 13 14from qust.expr import ( 15 col, 16 Expr, 17) 18from qust.dataframe import ( 19 DataFrame 20) 21from qust.context import ( 22 set_lib_path as __set_lib_path, 23 register_start_up_deps as __register_start_up_deps, 24 with_cols, 25 select, 26 clear_cache, 27) 28from qust.udf import ( 29 UdfBatch, 30 UdfRow, 31) 32from qust.future import ( 33 TradePriceType, 34 MatchPriceType, 35) 36from . import plot as __plot__ 37from . import ta as __ta__ 38from . import stock as __stock__ 39from . import fast_path as __fast_path__ 40 41 42 43 44LIB = Path(__file__).parent 45__set_lib_path(str(LIB)) 46 47__register_start_up_deps() 48 49del __set_lib_path 50del Path 51del LIB 52 53__all__ = [ 54 "col", 55 "Expr", 56 "DataFrame", 57 "with_cols", 58 "select", 59 "clear_cache", 60 "UdfBatch", 61 "UdfRow", 62 "TradePriceType", 63 "MatchPriceType", 64]
54def col(*args: ExprInput) -> Expr: 55 """ 56 选择列生成`Expr` 57 58 例子 59 -------- 60 >>> col("a") 61 >>> col("a", "b") 62 >>> col("a", pl.col("b").rank()) 63 >>> col(col("a").ffill(), "b") 64 """ 65 return Expr(_select(*args))
选择列生成Expr
例子
>>> col("a")
>>> col("a", "b")
>>> col("a", pl.col("b").rank())
>>> col(col("a").ffill(), "b")
16class Expr: ...
102 def mean(self) -> Expr: 103 """ 104 **行算子**,计算平均 105 106 例子 107 ------- 108 >>> col("a").mean() 109 >>> col("a").mean().over("code") 110 >>> col("a").mean().expanding() 111 >>> col("a").mean().rolling(10) 112 >>> col("a").mean().expanding().over("code") 113 >>> col("a").mean().std().rolling(20, 10).over("code", "stock") 114 115 示例1 116 ^^^^^^ 117 >>> data = pl.DataFrame({ 118 ... "price": range(5), 119 ... "code": ["a", "a", "a", "b", "b"] 120 ... }) 121 >>> data_next = pl.DataFrame({ 122 ... "price": [2, 3, 10], 123 ... "code": ["c", "c", "a"] 124 ... }) 125 126 >>> df1 = qs.with_cols(col("price1").mean()) 127 >>> df1.calc_data(data) 128 shape: (1, 1) 129 ┌────────────┐ 130 │ price_mean │ 131 │ --- │ 132 │ f64 │ 133 ╞════════════╡ 134 │ 2.692308 │ 135 └────────────┘ 136 >>> df1.calc_data(data_next) 137 shape: (1, 1) 138 ┌────────────┐ 139 │ price_mean │ 140 │ --- │ 141 │ f64 │ 142 ╞════════════╡ 143 │ 3.125 │ 144 └────────────┘ 145 146 示例2 147 ^^^^^^ 148 >>> df2 = qs.with_cols(col("price").mean().expanding().alias("price_mean")) 149 >>> df2.calc_data(data) 150 shape: (5, 3) 151 ┌───────┬──────┬────────────┐ 152 │ price ┆ code ┆ price_mean │ 153 │ --- ┆ --- ┆ --- │ 154 │ i64 ┆ str ┆ f64 │ 155 ╞═══════╪══════╪════════════╡ 156 │ 0 ┆ a ┆ 0.0 │ 157 │ 1 ┆ a ┆ 0.5 │ 158 │ 2 ┆ a ┆ 1.0 │ 159 │ 3 ┆ b ┆ 1.5 │ 160 │ 4 ┆ b ┆ 2.0 │ 161 └───────┴──────┴────────────┘ 162 >>> df2.calc_data(data_next) 163 shape: (3, 3) 164 ┌───────┬──────┬────────────┐ 165 │ price ┆ code ┆ price_mean │ 166 │ --- ┆ --- ┆ --- │ 167 │ i64 ┆ str ┆ f64 │ 168 ╞═══════╪══════╪════════════╡ 169 │ 2 ┆ c ┆ 2.642857 │ 170 │ 3 ┆ c ┆ 2.666667 │ 171 │ 10 ┆ a ┆ 3.125 │ 172 └───────┴──────┴────────────┘ 173 174 示例3 175 ^^^^^ 176 >>> df3 = qs.with_cols(col("price").mean().rolling(2).alias("price_mean")) 177 >>> df3.calc_data(data) 178 shape: (5, 3) 179 ┌───────┬──────┬────────────┐ 180 │ price ┆ code ┆ price_mean │ 181 │ --- ┆ --- ┆ --- │ 182 │ i64 ┆ str ┆ f64 │ 183 ╞═══════╪══════╪════════════╡ 184 │ 0 ┆ a ┆ null │ 185 │ 1 ┆ a ┆ 0.5 │ 186 │ 2 ┆ a ┆ 1.5 │ 187 │ 3 ┆ b ┆ 2.5 │ 188 │ 4 ┆ b ┆ 3.5 │ 189 └───────┴──────┴────────────┘ 190 >>> df3.calc_data(data_next) 191 shape: (3, 3) 192 ┌───────┬──────┬────────────┐ 193 │ price ┆ code ┆ price_mean │ 194 │ --- ┆ --- ┆ --- │ 195 │ i64 ┆ str ┆ f64 │ 196 ╞═══════╪══════╪════════════╡ 197 │ 2 ┆ c ┆ 3.0 │ 198 │ 3 ┆ c ┆ 2.5 │ 199 │ 10 ┆ a ┆ 6.5 │ 200 └───────┴──────┴────────────┘ 201 202 示例4 203 ^^^^ 204 >>> df4 = qs.with_cols(col("price").mean().expanding().over("code").alias("price_mean")) 205 >>> df4.calc_data(data) 206 shape: (5, 3) 207 ┌───────┬──────┬────────────┐ 208 │ price ┆ code ┆ price_mean │ 209 │ --- ┆ --- ┆ --- │ 210 │ i64 ┆ str ┆ f64 │ 211 ╞═══════╪══════╪════════════╡ 212 │ 0 ┆ a ┆ 0.0 │ 213 │ 1 ┆ a ┆ 0.5 │ 214 │ 2 ┆ a ┆ 1.0 │ 215 │ 3 ┆ b ┆ 3.0 │ 216 │ 4 ┆ b ┆ 3.5 │ 217 └───────┴──────┴────────────┘ 218 >>> df4.calc_data(data_next) 219 shape: (3, 3) 220 ┌───────┬──────┬────────────┐ 221 │ price ┆ code ┆ price_mean │ 222 │ --- ┆ --- ┆ --- │ 223 │ i64 ┆ str ┆ f64 │ 224 ╞═══════╪══════╪════════════╡ 225 │ 2 ┆ c ┆ 2.0 │ 226 │ 3 ┆ c ┆ 2.5 │ 227 │ 10 ┆ a ┆ 3.25 │ 228 └───────┴──────┴────────────┘ 229 230 示例5 231 ^^^^ 232 >>> df5 = qs.select(col("price").mean().group_by("code")) 233 >>> df5.calc_data(data) 234 shape: (2, 2) 235 ┌──────┬───────┐ 236 │ code ┆ price │ 237 │ --- ┆ --- │ 238 │ str ┆ f64 │ 239 ╞══════╪═══════╡ 240 │ a ┆ 1.0 │ 241 │ b ┆ 3.5 │ 242 └──────┴───────┘ 243 >>> df5.calc_data(data_next) 244 shape: (3, 2) 245 ┌──────┬───────┐ 246 │ code ┆ price │ 247 │ --- ┆ --- │ 248 │ str ┆ f64 │ 249 ╞══════╪═══════╡ 250 │ a ┆ 3.25 │ 251 │ b ┆ 3.5 │ 252 │ c ┆ 2.5 │ 253 └──────┴───────┘ 254 """ 255 return Expr(self._expr.mean())
行算子,计算平均
例子
>>> col("a").mean()
>>> col("a").mean().over("code")
>>> col("a").mean().expanding()
>>> col("a").mean().rolling(10)
>>> col("a").mean().expanding().over("code")
>>> col("a").mean().std().rolling(20, 10).over("code", "stock")
示例1 ^^^^^^
>>> data = pl.DataFrame({
... "price": range(5),
... "code": ["a", "a", "a", "b", "b"]
... })
>>> data_next = pl.DataFrame({
... "price": [2, 3, 10],
... "code": ["c", "c", "a"]
... })
>>> df1 = qs.with_cols(col("price1").mean())
>>> df1.calc_data(data)
shape: (1, 1)
┌────────────┐
│ price_mean │
│ --- │
│ f64 │
╞════════════╡
│ 2.692308 │
└────────────┘
>>> df1.calc_data(data_next)
shape: (1, 1)
┌────────────┐
│ price_mean │
│ --- │
│ f64 │
╞════════════╡
│ 3.125 │
└────────────┘
示例2 ^^^^^^
>>> df2 = qs.with_cols(col("price").mean().expanding().alias("price_mean"))
>>> df2.calc_data(data)
shape: (5, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ f64 │
╞═══════╪══════╪════════════╡
│ 0 ┆ a ┆ 0.0 │
│ 1 ┆ a ┆ 0.5 │
│ 2 ┆ a ┆ 1.0 │
│ 3 ┆ b ┆ 1.5 │
│ 4 ┆ b ┆ 2.0 │
└───────┴──────┴────────────┘
>>> df2.calc_data(data_next)
shape: (3, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ f64 │
╞═══════╪══════╪════════════╡
│ 2 ┆ c ┆ 2.642857 │
│ 3 ┆ c ┆ 2.666667 │
│ 10 ┆ a ┆ 3.125 │
└───────┴──────┴────────────┘
示例3 ^^^^^
>>> df3 = qs.with_cols(col("price").mean().rolling(2).alias("price_mean"))
>>> df3.calc_data(data)
shape: (5, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ f64 │
╞═══════╪══════╪════════════╡
│ 0 ┆ a ┆ null │
│ 1 ┆ a ┆ 0.5 │
│ 2 ┆ a ┆ 1.5 │
│ 3 ┆ b ┆ 2.5 │
│ 4 ┆ b ┆ 3.5 │
└───────┴──────┴────────────┘
>>> df3.calc_data(data_next)
shape: (3, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ f64 │
╞═══════╪══════╪════════════╡
│ 2 ┆ c ┆ 3.0 │
│ 3 ┆ c ┆ 2.5 │
│ 10 ┆ a ┆ 6.5 │
└───────┴──────┴────────────┘
示例4 ^^^^
>>> df4 = qs.with_cols(col("price").mean().expanding().over("code").alias("price_mean"))
>>> df4.calc_data(data)
shape: (5, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ f64 │
╞═══════╪══════╪════════════╡
│ 0 ┆ a ┆ 0.0 │
│ 1 ┆ a ┆ 0.5 │
│ 2 ┆ a ┆ 1.0 │
│ 3 ┆ b ┆ 3.0 │
│ 4 ┆ b ┆ 3.5 │
└───────┴──────┴────────────┘
>>> df4.calc_data(data_next)
shape: (3, 3)
┌───────┬──────┬────────────┐
│ price ┆ code ┆ price_mean │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ f64 │
╞═══════╪══════╪════════════╡
│ 2 ┆ c ┆ 2.0 │
│ 3 ┆ c ┆ 2.5 │
│ 10 ┆ a ┆ 3.25 │
└───────┴──────┴────────────┘
示例5 ^^^^
>>> df5 = qs.select(col("price").mean().group_by("code"))
>>> df5.calc_data(data)
shape: (2, 2)
┌──────┬───────┐
│ code ┆ price │
│ --- ┆ --- │
│ str ┆ f64 │
╞══════╪═══════╡
│ a ┆ 1.0 │
│ b ┆ 3.5 │
└──────┴───────┘
>>> df5.calc_data(data_next)
shape: (3, 2)
┌──────┬───────┐
│ code ┆ price │
│ --- ┆ --- │
│ str ┆ f64 │
╞══════╪═══════╡
│ a ┆ 3.25 │
│ b ┆ 3.5 │
│ c ┆ 2.5 │
└──────┴───────┘
269 def quantile(self, percent: float) -> Expr: 270 """**行算子**, 分位数 271 272 参数 273 ---- 274 `percent`: 0~1,设定的分位数 275 """ 276 return Expr(self._expr.quantile(percent))
行算子, 分位数
参数
percent: 0~1,设定的分位数
283 def rank(self, percent: bool = False, ret_list: bool = False) -> Expr: 284 """ 285 **行算子**,计算当前行的排序位数 286 287 注意这个和`polars`的排序不是同一个逻辑,`polars`的排序是全局排序,不能进行流式计算 288 289 参数 290 ----- 291 - `percent`: 是否返回百分比 292 - `ret_list`: 是否返回整列 293 294 例子 295 ------- 296 ```python 297 col("a").rank().rolling(10) 对应 polars pl.col("a").rolling_rank(10) 298 ``` 299 ```python 300 col("a").rank().expanding() 想象成 polars pl.col("a").cum_rank() 301 ``` 302 ```python 303 col("a").rank() 对应 polars pl.col("a").rank().last() 304 ``` 305 ```python 306 col("a").rank().group_by("code") 对应 polars df.group_by("code").agg(pl.col("a").rank().last()) 307 ``` 308 309 如果需要用polars的排序,可以这样: 310 >>> qs.select(pl.col("a").rank()) 311 >>> qs.select(col("a").select(pl.col("a").rank())) 312 """ 313 return Expr(self._expr.rank(percent, ret_list))
行算子,计算当前行的排序位数
注意这个和polars的排序不是同一个逻辑,polars的排序是全局排序,不能进行流式计算
参数
percent: 是否返回百分比ret_list: 是否返回整列
例子
col("a").rank().rolling(10) 对应 polars pl.col("a").rolling_rank(10)
col("a").rank().expanding() 想象成 polars pl.col("a").cum_rank()
col("a").rank() 对应 polars pl.col("a").rank().last()
col("a").rank().group_by("code") 对应 polars df.group_by("code").agg(pl.col("a").rank().last())
如果需要用polars的排序,可以这样:
>>> qs.select(pl.col("a").rank())
>>> qs.select(col("a").select(pl.col("a").rank()))
316 def rolling(self, window_size: int, min_samples: int | None = None) -> Expr: 317 """ 318 **上下文**,滚动 319 320 **所有支持`retract`的行算子**都可以使用这个上下文, 比如`ewm`就不支持`retract` 321 322 323 参数 324 ----- 325 `window_size`: 滚动窗口长度 326 327 `min_samples`: 窗口内最少有效值(非 `null`)的个数,如果窗口内有效值少于这个,则返回`null` 328 329 例子 330 ----- 331 >>> col("a").rolling(10) 332 >>> col("a", "b").select( 333 ... col("a").mean(), 334 ... col("b").std() 335 ... ).rolling(10) 336 """ 337 if min_samples is None: 338 min_samples = window_size 339 return Expr(self._expr.rolling(window_size, min_samples))
341 def rolling_dynamic( 342 self, 343 window_size: ExprInput, 344 window_max: int = 1000, 345 min_samples: int | None = None, 346 ) -> Expr: 347 """ 348 **上下文**,变化窗口的滚动计算 349 350 参数 351 ----- 352 `window_size`: `str` | `pl.Expr` | `Expr`, 选择窗口列, 这列需要能被cast成整数,非空, 大于0 353 354 `window_max`: 最大可能的滚动窗口,设置过大可能会影响内存 355 356 `min_samples`: 最小有效值个数 357 358 例子 359 -------- 360 >>> data = pl.DataFrame({ 361 ... "price": [1, 2, 3, 4, 5, 6, 7, 8 ,9, 10], 362 ... "window": [1, 1, 2, 2, 1, 4, 3, 2, 5, 20] 363 ... }) 364 365 >>> res = qs.with_cols( 366 ... col("price").mean().rolling_dynamic("window").alias("price_rolling_dyn") 367 ... ).calc_data(data) 368 >>> res 369 shape: (10, 3) 370 ┌───────┬────────┬───────────────────┐ 371 │ price ┆ window ┆ price_rolling_dyn │ 372 │ --- ┆ --- ┆ --- │ 373 │ i64 ┆ i64 ┆ f64 │ 374 ╞═══════╪════════╪═══════════════════╡ 375 │ 1 ┆ 1 ┆ 1.0 │ window = 1, 所以元素为[1], 结果是1.0 376 │ 2 ┆ 1 ┆ 2.0 │ window = 1, 所以元素是[2], 结果是2.0 377 │ 3 ┆ 2 ┆ 2.5 │ window = 2, 所以元素是[2, 3], 结果是2.5 378 │ 4 ┆ 2 ┆ 3.5 │ window = 2, 所以元素是[3, 4], 结果是3.5 379 │ 5 ┆ 1 ┆ 5.0 │ . 380 │ 6 ┆ 4 ┆ 4.5 │ . 381 │ 7 ┆ 3 ┆ 6.0 │ . 382 │ 8 ┆ 2 ┆ 7.5 │ 383 │ 9 ┆ 5 ┆ 7.0 │ 384 │ 10 ┆ 20 ┆ 5.5 │ 385 └───────┴────────┴───────────────────┘ 386 387 >>> qs.with_cols(col( 388 ... col("a").mean(), 389 ... col("b").std(), 390 ... col("c").mean().skew(), 391 ... ).rolling_dynamic("window")) 392 393 使用场景 394 ------- 395 假设我有这个类型的流式数据 396 >>> import datetime as dt 397 >>> data = pl.DataFrame({ 398 ... "datetime": [ 399 ... dt.datetime(2010, 1, 1, 9, 30, 0), 400 ... dt.datetime(2010, 1, 1, 10, 30, 0), 401 ... dt.datetime(2010, 1, 1, 12, 30, 0), 402 ... dt.datetime(2010, 1, 2, 9, 30, 0), 403 ... dt.datetime(2010, 1, 2, 10, 30, 0), 404 ... dt.datetime(2010, 1, 2, 12, 30, 0), 405 ... dt.datetime(2010, 1, 3, 9, 30, 0), 406 ... dt.datetime(2010, 1, 3, 10, 30, 0), 407 ... dt.datetime(2010, 1, 3, 12, 30, 0), 408 ... ], 409 ... "price": range(9) 410 ...}) 411 412 如果我想计算**每天**的price的cum_sum,可以这样: 413 >>> res = qs.with_cols( 414 ... col("price").sum().expanding().over(pl.col("datetime").dt.date()).alias("gg") 415 ... ).calc_data(data) 416 >>> res 417 shape: (9, 3) 418 ┌─────────────────────┬───────┬─────┐ 419 │ datetime ┆ price ┆ gg │ 420 │ --- ┆ --- ┆ --- │ 421 │ datetime[μs] ┆ i64 ┆ i64 │ 422 ╞═════════════════════╪═══════╪═════╡ 423 │ 2010-01-01 09:30:00 ┆ 0 ┆ 0 │ 424 │ 2010-01-01 10:30:00 ┆ 1 ┆ 1 │ 425 │ 2010-01-01 12:30:00 ┆ 2 ┆ 3 │ 426 │ 2010-01-02 09:30:00 ┆ 3 ┆ 3 │ 427 │ 2010-01-02 10:30:00 ┆ 4 ┆ 7 │ 428 │ 2010-01-02 12:30:00 ┆ 5 ┆ 12 │ 429 │ 2010-01-03 09:30:00 ┆ 6 ┆ 6 │ 430 │ 2010-01-03 10:30:00 ┆ 7 ┆ 13 │ 431 │ 2010-01-03 12:30:00 ┆ 8 ┆ 21 │ 432 └─────────────────────┴───────┴─────┘ 433 434 如果我想计算**每两天**的cum_sum,用`over`或者`group_by`没办法,但是可以通过这个算子达到: 435 >>> window = pl.lit(pl.Series([4, 5, 6, 4, 5, 6, 4, 5, 6])) 436 >>> res = qs.with_cols( 437 ... col("price").sum().rolling_dynamic(window).alias("gg") 438 ... ).calc_data(data) 439 >>> res 440 shape: (9, 3) 441 ┌─────────────────────┬───────┬─────┐ 442 │ datetime ┆ price ┆ gg │ 443 │ --- ┆ --- ┆ --- │ 444 │ datetime[μs] ┆ i64 ┆ i64 │ 445 ╞═════════════════════╪═══════╪═════╡ 446 │ 2010-01-01 09:30:00 ┆ 0 ┆ 0 │ 447 │ 2010-01-01 10:30:00 ┆ 1 ┆ 1 │ 448 │ 2010-01-01 12:30:00 ┆ 2 ┆ 3 │ 449 │ 2010-01-02 09:30:00 ┆ 3 ┆ 6 │ window=4, [0, 1, 2, 3] -> 6 450 │ 2010-01-02 10:30:00 ┆ 4 ┆ 10 │ window=5, [0, 1, 2, 3, 4] -> 10 451 │ 2010-01-02 12:30:00 ┆ 5 ┆ 15 │ window=6, [0, 1, 2, 3, 4, 5] -> 15 452 │ 2010-01-03 09:30:00 ┆ 6 ┆ 18 │ window=4, [3, 4, 5, 6] -> 18 453 │ 2010-01-03 10:30:00 ┆ 7 ┆ 25 │ . 454 │ 2010-01-03 12:30:00 ┆ 8 ┆ 33 │ . 455 └─────────────────────┴───────┴─────┘ 456 """ 457 if min_samples is None: 458 min_samples = 1 459 return Expr(self._expr.rolling_dynamic(_col(window_size), window_max, min_samples))
上下文,变化窗口的滚动计算
参数
window_size: str | pl.Expr | Expr, 选择窗口列, 这列需要能被cast成整数,非空, 大于0
window_max: 最大可能的滚动窗口,设置过大可能会影响内存
min_samples: 最小有效值个数
例子
>>> data = pl.DataFrame({
... "price": [1, 2, 3, 4, 5, 6, 7, 8 ,9, 10],
... "window": [1, 1, 2, 2, 1, 4, 3, 2, 5, 20]
... })
>>> res = qs.with_cols(
... col("price").mean().rolling_dynamic("window").alias("price_rolling_dyn")
... ).calc_data(data)
>>> res
shape: (10, 3)
┌───────┬────────┬───────────────────┐
│ price ┆ window ┆ price_rolling_dyn │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ f64 │
╞═══════╪════════╪═══════════════════╡
│ 1 ┆ 1 ┆ 1.0 │ window = 1, 所以元素为[1], 结果是1.0
│ 2 ┆ 1 ┆ 2.0 │ window = 1, 所以元素是[2], 结果是2.0
│ 3 ┆ 2 ┆ 2.5 │ window = 2, 所以元素是[2, 3], 结果是2.5
│ 4 ┆ 2 ┆ 3.5 │ window = 2, 所以元素是[3, 4], 结果是3.5
│ 5 ┆ 1 ┆ 5.0 │ .
│ 6 ┆ 4 ┆ 4.5 │ .
│ 7 ┆ 3 ┆ 6.0 │ .
│ 8 ┆ 2 ┆ 7.5 │
│ 9 ┆ 5 ┆ 7.0 │
│ 10 ┆ 20 ┆ 5.5 │
└───────┴────────┴───────────────────┘
>>> qs.with_cols(col(
... col("a").mean(),
... col("b").std(),
... col("c").mean().skew(),
... ).rolling_dynamic("window"))
使用场景
假设我有这个类型的流式数据
>>> import datetime as dt
>>> data = pl.DataFrame({
... "datetime": [
... dt.datetime(2010, 1, 1, 9, 30, 0),
... dt.datetime(2010, 1, 1, 10, 30, 0),
... dt.datetime(2010, 1, 1, 12, 30, 0),
... dt.datetime(2010, 1, 2, 9, 30, 0),
... dt.datetime(2010, 1, 2, 10, 30, 0),
... dt.datetime(2010, 1, 2, 12, 30, 0),
... dt.datetime(2010, 1, 3, 9, 30, 0),
... dt.datetime(2010, 1, 3, 10, 30, 0),
... dt.datetime(2010, 1, 3, 12, 30, 0),
... ],
... "price": range(9)
...})
如果我想计算每天的price的cum_sum,可以这样:
>>> res = qs.with_cols(
... col("price").sum().expanding().over(pl.col("datetime").dt.date()).alias("gg")
... ).calc_data(data)
>>> res
shape: (9, 3)
┌─────────────────────┬───────┬─────┐
│ datetime ┆ price ┆ gg │
│ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ i64 ┆ i64 │
╞═════════════════════╪═══════╪═════╡
│ 2010-01-01 09:30:00 ┆ 0 ┆ 0 │
│ 2010-01-01 10:30:00 ┆ 1 ┆ 1 │
│ 2010-01-01 12:30:00 ┆ 2 ┆ 3 │
│ 2010-01-02 09:30:00 ┆ 3 ┆ 3 │
│ 2010-01-02 10:30:00 ┆ 4 ┆ 7 │
│ 2010-01-02 12:30:00 ┆ 5 ┆ 12 │
│ 2010-01-03 09:30:00 ┆ 6 ┆ 6 │
│ 2010-01-03 10:30:00 ┆ 7 ┆ 13 │
│ 2010-01-03 12:30:00 ┆ 8 ┆ 21 │
└─────────────────────┴───────┴─────┘
如果我想计算每两天的cum_sum,用over或者group_by没办法,但是可以通过这个算子达到:
>>> window = pl.lit(pl.Series([4, 5, 6, 4, 5, 6, 4, 5, 6]))
>>> res = qs.with_cols(
... col("price").sum().rolling_dynamic(window).alias("gg")
... ).calc_data(data)
>>> res
shape: (9, 3)
┌─────────────────────┬───────┬─────┐
│ datetime ┆ price ┆ gg │
│ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ i64 ┆ i64 │
╞═════════════════════╪═══════╪═════╡
│ 2010-01-01 09:30:00 ┆ 0 ┆ 0 │
│ 2010-01-01 10:30:00 ┆ 1 ┆ 1 │
│ 2010-01-01 12:30:00 ┆ 2 ┆ 3 │
│ 2010-01-02 09:30:00 ┆ 3 ┆ 6 │ window=4, [0, 1, 2, 3] -> 6
│ 2010-01-02 10:30:00 ┆ 4 ┆ 10 │ window=5, [0, 1, 2, 3, 4] -> 10
│ 2010-01-02 12:30:00 ┆ 5 ┆ 15 │ window=6, [0, 1, 2, 3, 4, 5] -> 15
│ 2010-01-03 09:30:00 ┆ 6 ┆ 18 │ window=4, [3, 4, 5, 6] -> 18
│ 2010-01-03 10:30:00 ┆ 7 ┆ 25 │ .
│ 2010-01-03 12:30:00 ┆ 8 ┆ 33 │ .
└─────────────────────┴───────┴─────┘
461 def rolling_intra_day(self, window: ExprInput, window_size: int, min_samples: int | None = None) -> Expr: 462 """ 463 **上下文**,日内滚动计算 464 465 参数 466 ---- 467 `window`: 列,pl.Boollean, 指定行是否是日线 468 469 `window_size`: 窗口大小 470 471 `min_samples`: 最小有效值个数 472 473 例子 474 ---- 475 >>> col("a").mean().rolling_intra_day("window", 2) 476 477 场景 478 ---- 479 假设我有这个类型的流式数据 480 >>> import datetime as dt 481 >>> data = pl.DataFrame({ 482 ... "datetime": [ 483 ... dt.datetime(2010, 1, 1, 9, 30, 0), 484 ... dt.datetime(2010, 1, 1, 10, 30, 0), 485 ... dt.datetime(2010, 1, 1, 12, 30, 0), 486 ... dt.datetime(2010, 1, 2, 9, 30, 0), 487 ... dt.datetime(2010, 1, 2, 10, 30, 0), 488 ... dt.datetime(2010, 1, 2, 12, 30, 0), 489 ... dt.datetime(2010, 1, 3, 9, 30, 0), 490 ... dt.datetime(2010, 1, 3, 10, 30, 0), 491 ... dt.datetime(2010, 1, 3, 12, 30, 0), 492 ... ], 493 ... "price": range(9) 494 ... }) 495 496 在当前k线,我需要计算 *[往前数2天的日线价格,往前数1天的日线价格, 当前分钟线价格]* 的移动平均, 可以这样做: 497 >>> df = qs.with_cols( 498 ... col("price").mean().rolling_intra_day( 499 ... pl.col("datetime").dt.hour() == 12, 500 ... 3, 501 ... ).alias("g") 502 ... ) 503 >>> df.calc_data(data) 504 shape: (9, 3) 505 ┌─────────────────────┬───────┬──────────┐ 506 │ datetime ┆ price ┆ g │ 507 │ --- ┆ --- ┆ --- │ 508 │ datetime[μs] ┆ i64 ┆ f64 │ 509 ╞═════════════════════╪═══════╪══════════╡ 510 │ 2010-01-01 09:30:00 ┆ 0 ┆ null │ 511 │ 2010-01-01 10:30:00 ┆ 1 ┆ null │ 512 │ 2010-01-01 12:30:00 ┆ 2 ┆ null │ 513 │ 2010-01-02 09:30:00 ┆ 3 ┆ null │ 514 │ 2010-01-02 10:30:00 ┆ 4 ┆ null │ 515 │ 2010-01-02 12:30:00 ┆ 5 ┆ null │ 516 │ 2010-01-03 09:30:00 ┆ 6 ┆ 4.333333 │ [2, 5, 6] -> 4.333 517 │ 2010-01-03 10:30:00 ┆ 7 ┆ 4.666667 │ [2, 5, 7] -> 4.666 518 │ 2010-01-03 12:30:00 ┆ 8 ┆ 5.0 │ [2, 5, 8] -> 5 519 └─────────────────────┴───────┴──────────┘ 520 >>> data_next = pl.DataFrame({ 521 ... "datetime": [ 522 ... dt.datetime(2010, 1, 4, 9, 30, 0), 523 ... dt.datetime(2010, 1, 4, 10, 30, 0), 524 ... dt.datetime(2010, 1, 4, 12, 30, 0), 525 ... ], 526 ... "price": range(9, 12) 527 ... }) 528 >>> df.calc_data(data) 529 shape: (3, 3) 530 ┌─────────────────────┬───────┬──────────┐ 531 │ datetime ┆ price ┆ g │ 532 │ --- ┆ --- ┆ --- │ 533 │ datetime[μs] ┆ i64 ┆ f64 │ 534 ╞═════════════════════╪═══════╪══════════╡ 535 │ 2010-01-04 09:30:00 ┆ 9 ┆ 7.333333 │ [5, 8, 9] -> 7.333 536 │ 2010-01-04 10:30:00 ┆ 10 ┆ 7.666667 │ [5, 8, 10] -> 7.666 537 │ 2010-01-04 12:30:00 ┆ 11 ┆ 8.0 │ [5, 8, 11] -> 8.0 538 └─────────────────────┴───────┴──────────┘ 539 540 同样逻辑,就能在k线没有合成完成的时候计算指标,比如接受tick数据,合成k线的时候,每次新的tick数据来了更新指标,并且速度不受影响 541 (所有的行算子,都是行更新). 542 543 """ 544 if min_samples is None: 545 min_samples = window_size 546 return Expr(self._expr.rolling_intra_day(_col(window), window_size, min_samples))
上下文,日内滚动计算
参数
window: 列,pl.Boollean, 指定行是否是日线
window_size: 窗口大小
min_samples: 最小有效值个数
例子
>>> col("a").mean().rolling_intra_day("window", 2)
场景
假设我有这个类型的流式数据
>>> import datetime as dt
>>> data = pl.DataFrame({
... "datetime": [
... dt.datetime(2010, 1, 1, 9, 30, 0),
... dt.datetime(2010, 1, 1, 10, 30, 0),
... dt.datetime(2010, 1, 1, 12, 30, 0),
... dt.datetime(2010, 1, 2, 9, 30, 0),
... dt.datetime(2010, 1, 2, 10, 30, 0),
... dt.datetime(2010, 1, 2, 12, 30, 0),
... dt.datetime(2010, 1, 3, 9, 30, 0),
... dt.datetime(2010, 1, 3, 10, 30, 0),
... dt.datetime(2010, 1, 3, 12, 30, 0),
... ],
... "price": range(9)
... })
在当前k线,我需要计算 [往前数2天的日线价格,往前数1天的日线价格, 当前分钟线价格] 的移动平均, 可以这样做:
>>> df = qs.with_cols(
... col("price").mean().rolling_intra_day(
... pl.col("datetime").dt.hour() == 12,
... 3,
... ).alias("g")
... )
>>> df.calc_data(data)
shape: (9, 3)
┌─────────────────────┬───────┬──────────┐
│ datetime ┆ price ┆ g │
│ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ i64 ┆ f64 │
╞═════════════════════╪═══════╪══════════╡
│ 2010-01-01 09:30:00 ┆ 0 ┆ null │
│ 2010-01-01 10:30:00 ┆ 1 ┆ null │
│ 2010-01-01 12:30:00 ┆ 2 ┆ null │
│ 2010-01-02 09:30:00 ┆ 3 ┆ null │
│ 2010-01-02 10:30:00 ┆ 4 ┆ null │
│ 2010-01-02 12:30:00 ┆ 5 ┆ null │
│ 2010-01-03 09:30:00 ┆ 6 ┆ 4.333333 │ [2, 5, 6] -> 4.333
│ 2010-01-03 10:30:00 ┆ 7 ┆ 4.666667 │ [2, 5, 7] -> 4.666
│ 2010-01-03 12:30:00 ┆ 8 ┆ 5.0 │ [2, 5, 8] -> 5
└─────────────────────┴───────┴──────────┘
>>> data_next = pl.DataFrame({
... "datetime": [
... dt.datetime(2010, 1, 4, 9, 30, 0),
... dt.datetime(2010, 1, 4, 10, 30, 0),
... dt.datetime(2010, 1, 4, 12, 30, 0),
... ],
... "price": range(9, 12)
... })
>>> df.calc_data(data)
shape: (3, 3)
┌─────────────────────┬───────┬──────────┐
│ datetime ┆ price ┆ g │
│ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ i64 ┆ f64 │
╞═════════════════════╪═══════╪══════════╡
│ 2010-01-04 09:30:00 ┆ 9 ┆ 7.333333 │ [5, 8, 9] -> 7.333
│ 2010-01-04 10:30:00 ┆ 10 ┆ 7.666667 │ [5, 8, 10] -> 7.666
│ 2010-01-04 12:30:00 ┆ 11 ┆ 8.0 │ [5, 8, 11] -> 8.0
└─────────────────────┴───────┴──────────┘
同样逻辑,就能在k线没有合成完成的时候计算指标,比如接受tick数据,合成k线的时候,每次新的tick数据来了更新指标,并且速度不受影响 (所有的行算子,都是行更新).
548 def expanding(self) -> Expr: 549 """ 550 **上下文**, 累积计算, 支持所有行算子 551 552 与 `rolling(*_dynamic, *_intra_day)`的区别在于不用`retract` 553 554 对应`polars`的`cum(*_sum, *_mean, ...)` 555 556 写法 557 ---- 558 >>> col("a").mean().expanding() 559 >>> col("a").mean().std().expanding() 560 >>> col( 561 ... col("a").mean().expanding(), 562 ... col("a").std().alias("gg").expanding(), 563 ... ) 564 565 例子 566 ---- 567 >>> data = pl.DataFrame({ 568 ... "price": range(5), 569 ... "code": ["a", "a", "a", "b", "b"] 570 ... }) 571 >>> df = qs.with_cols( 572 ... col("price").sum().expanding().alias("cum_sum_otters"), 573 ... pl.col("price").cum_sum().alias("cum_sum_polars"), 574 ... col("price").sum().expanding().over("code").alias("cum_sum_otters_over"), 575 ... pl.col("price").cum_sum().over("code").alias("cum_sum_polars_over") 576 ... ) 577 >>> df.calc_data(data) 578 shape: (5, 6) 579 ┌───────┬──────┬────────────────┬────────────────┬─────────────────────┬─────────────────────┐ 580 │ price ┆ code ┆ cum_sum_otters ┆ cum_sum_polars ┆ cum_sum_otters_over ┆ cum_sum_polars_over │ 581 │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ 582 │ i64 ┆ str ┆ i64 ┆ i64 ┆ i64 ┆ i64 │ 583 ╞═══════╪══════╪════════════════╪════════════════╪═════════════════════╪═════════════════════╡ 584 │ 0 ┆ a ┆ 0 ┆ 0 ┆ 0 ┆ 0 │ 585 │ 1 ┆ a ┆ 1 ┆ 1 ┆ 1 ┆ 1 │ 586 │ 2 ┆ a ┆ 3 ┆ 3 ┆ 3 ┆ 3 │ 587 │ 3 ┆ b ┆ 6 ┆ 6 ┆ 3 ┆ 3 │ 588 │ 4 ┆ b ┆ 10 ┆ 10 ┆ 7 ┆ 7 │ 589 └───────┴──────┴────────────────┴────────────────┴─────────────────────┴─────────────────────┘ 590 """ 591 return Expr(self._expr.expanding())
上下文, 累积计算, 支持所有行算子
与 rolling(*_dynamic, *_intra_day)的区别在于不用retract
对应polars的cum(*_sum, *_mean, ...)
写法
>>> col("a").mean().expanding()
>>> col("a").mean().std().expanding()
>>> col(
... col("a").mean().expanding(),
... col("a").std().alias("gg").expanding(),
... )
例子
>>> data = pl.DataFrame({
... "price": range(5),
... "code": ["a", "a", "a", "b", "b"]
... })
>>> df = qs.with_cols(
... col("price").sum().expanding().alias("cum_sum_otters"),
... pl.col("price").cum_sum().alias("cum_sum_polars"),
... col("price").sum().expanding().over("code").alias("cum_sum_otters_over"),
... pl.col("price").cum_sum().over("code").alias("cum_sum_polars_over")
... )
>>> df.calc_data(data)
shape: (5, 6)
┌───────┬──────┬────────────────┬────────────────┬─────────────────────┬─────────────────────┐
│ price ┆ code ┆ cum_sum_otters ┆ cum_sum_polars ┆ cum_sum_otters_over ┆ cum_sum_polars_over │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═══════╪══════╪════════════════╪════════════════╪═════════════════════╪═════════════════════╡
│ 0 ┆ a ┆ 0 ┆ 0 ┆ 0 ┆ 0 │
│ 1 ┆ a ┆ 1 ┆ 1 ┆ 1 ┆ 1 │
│ 2 ┆ a ┆ 3 ┆ 3 ┆ 3 ┆ 3 │
│ 3 ┆ b ┆ 6 ┆ 6 ┆ 3 ┆ 3 │
│ 4 ┆ b ┆ 10 ┆ 10 ┆ 7 ┆ 7 │
└───────┴──────┴────────────────┴────────────────┴─────────────────────┴─────────────────────┘
593 def over(self, *args: ExprInput) -> Expr: 594 """ 595 **上下文**, 分组计算 596 597 写法 598 ----- 599 >>> col("a").mean().expanding().over("code") 600 >>> col(col("a").mean(), col("b").std()).rolling(10).over("code", "date") 601 602 上下文都支持更紧凑的写法,通常内存耗用也更少,比如下面两种写法 603 >>> import numpy as np 604 >>> n = 4000000 605 >>> data = pl.DataFrame({ 606 ... "price": np.random.randn(n), 607 ... "code": np.random.choice(["a", "b", "c"], size=n, replace=True) 608 ... }) 609 610 611 612 写法1 613 ^^^^^^ 614 >>> qs.select(col( 615 ... col("price").mean().expanding().alias("mean").over("code"), 616 ... col("price").skew().expanding().alias("skew").over("code"), 617 ... ).perf("not tight")).calc_data(data) 618 >>> 619 -------------- 620 not tight 621 129.185255ms 622 -------------- 623 624 写法2 625 ^^^^^ 626 >>> qs.select(col( 627 ... col("price").mean().alias("mean"), 628 ... col("price").skew().alias("skew"), 629 ... ).expanding().over("code").perf("do tight")).calc_data(data) 630 >>> 631 -------------- 632 do tight 633 115.143059ms 634 -------------- 635 """ 636 return Expr(self._expr.over(_select(*args)))
上下文, 分组计算
写法
>>> col("a").mean().expanding().over("code")
>>> col(col("a").mean(), col("b").std()).rolling(10).over("code", "date")
上下文都支持更紧凑的写法,通常内存耗用也更少,比如下面两种写法
>>> import numpy as np
>>> n = 4000000
>>> data = pl.DataFrame({
... "price": np.random.randn(n),
... "code": np.random.choice(["a", "b", "c"], size=n, replace=True)
... })
写法1 ^^^^^^
>>> qs.select(col(
... col("price").mean().expanding().alias("mean").over("code"),
... col("price").skew().expanding().alias("skew").over("code"),
... ).perf("not tight")).calc_data(data)
<h2 id="-1">>>></h2>
not tight
129.185255ms
写法2 ^^^^^
>>> qs.select(col(
... col("price").mean().alias("mean"),
... col("price").skew().alias("skew"),
... ).expanding().over("code").perf("do tight")).calc_data(data)
<h2 id="-2">>>></h2>
do tight
115.143059ms
638 def group_by(self, *args: str | pl.Expr | Expr) -> Expr: 639 """ 640 **上下文**, 分组计算, 支持所有行算子, 对于非行算子,在算子后面加上`.last_value()`变成行算子 641 642 写法 643 ---- 644 >>> col("a").mean().group_by("code") 645 >>> col("a").mean().group_by("code1", "code2) 646 >>> col(col("a").mean(), col("b").sum()).group_by("code") 647 648 与`over`的区别 649 ---------- 650 ```python 651 data = pl.DataFrame({ 652 "price": range(5), 653 "code": ["a", "a", "a", "b", "b"] 654 }) 655 656 data_next = pl.DataFrame({ 657 "price": [2, 3, None], 658 "code": ["c", "c", "a"] 659 }) 660 ``` 661 662 over写法 663 ^^^^^ 664 >>> df_over = qs.with_cols( 665 ... col("price").mean().expanding().alias("mean_a").over("code") 666 ... ) 667 >>> df_over.calc_data(data) 668 shape: (5, 3) 669 ┌───────┬──────┬────────┐ 670 │ price ┆ code ┆ mean_a │ 671 │ --- ┆ --- ┆ --- │ 672 │ i64 ┆ str ┆ f64 │ 673 ╞═══════╪══════╪════════╡ 674 │ 0 ┆ a ┆ 0.0 │ 675 │ 1 ┆ a ┆ 0.5 │ 676 │ 2 ┆ a ┆ 1.0 │ 677 │ 3 ┆ b ┆ 3.0 │ 678 │ 4 ┆ b ┆ 3.5 │ 679 └───────┴──────┴────────┘ 680 >>> df_over.calc_data(data_next) 681 shape: (3, 3) 682 ┌───────┬──────┬────────┐ 683 │ price ┆ code ┆ mean_a │ 684 │ --- ┆ --- ┆ --- │ 685 │ i64 ┆ str ┆ f64 │ 686 ╞═══════╪══════╪════════╡ 687 │ 2 ┆ c ┆ 2.0 │ 688 │ 3 ┆ c ┆ 2.5 │ 689 │ null ┆ a ┆ null │ -> 这样的mean_a是null,原因是price是null 690 └───────┴──────┴────────┘ 691 692 group_by写法 693 ^^^^^ 694 >>> df_group_by = qs.select( # 这里要用select,因为groupby后的长度和原先长度不一致 695 ... col("price").mean().alias("mean_a").group_by("code") 696 ... ) 697 >>> df_group_by.calc_data(data) 698 shape: (2, 2) 699 ┌──────┬────────┐ 700 │ code ┆ mean_a │ 701 │ --- ┆ --- │ 702 │ str ┆ f64 │ 703 ╞══════╪════════╡ 704 │ a ┆ 1.0 │ 705 │ b ┆ 3.5 │ 706 └──────┴────────┘ 707 >>> df_group_by.calc_data(data_next) 708 shape: (3, 2) 709 ┌──────┬────────┐ 710 │ code ┆ mean_a │ 711 │ --- ┆ --- │ 712 │ str ┆ f64 │ 713 ╞══════╪════════╡ 714 │ a ┆ 1.0 │ -> data_next的a行是null值,但是这里返回的不是null 715 │ b ┆ 3.5 │ -> data_next不包含b行,但是这里依然返回了b, 也就是说历史分组都会返回 716 │ c ┆ 2.5 │ -> 新增的分组 717 └──────┴────────┘ 718 """ 719 return Expr(self._expr.group_by(_select(*args)))
上下文, 分组计算, 支持所有行算子, 对于非行算子,在算子后面加上.last_value()变成行算子
写法
>>> col("a").mean().group_by("code")
>>> col("a").mean().group_by("code1", "code2)
>>> col(col("a").mean(), col("b").sum()).group_by("code")
与over的区别
data = pl.DataFrame({
"price": range(5),
"code": ["a", "a", "a", "b", "b"]
})
data_next = pl.DataFrame({
"price": [2, 3, None],
"code": ["c", "c", "a"]
})
over写法 ^^^^^
>>> df_over = qs.with_cols(
... col("price").mean().expanding().alias("mean_a").over("code")
... )
>>> df_over.calc_data(data)
shape: (5, 3)
┌───────┬──────┬────────┐
│ price ┆ code ┆ mean_a │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ f64 │
╞═══════╪══════╪════════╡
│ 0 ┆ a ┆ 0.0 │
│ 1 ┆ a ┆ 0.5 │
│ 2 ┆ a ┆ 1.0 │
│ 3 ┆ b ┆ 3.0 │
│ 4 ┆ b ┆ 3.5 │
└───────┴──────┴────────┘
>>> df_over.calc_data(data_next)
shape: (3, 3)
┌───────┬──────┬────────┐
│ price ┆ code ┆ mean_a │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ f64 │
╞═══════╪══════╪════════╡
│ 2 ┆ c ┆ 2.0 │
│ 3 ┆ c ┆ 2.5 │
│ null ┆ a ┆ null │ -> 这样的mean_a是null,原因是price是null
└───────┴──────┴────────┘
group_by写法 ^^^^^
>>> df_group_by = qs.select( # 这里要用select,因为groupby后的长度和原先长度不一致
... col("price").mean().alias("mean_a").group_by("code")
... )
>>> df_group_by.calc_data(data)
shape: (2, 2)
┌──────┬────────┐
│ code ┆ mean_a │
│ --- ┆ --- │
│ str ┆ f64 │
╞══════╪════════╡
│ a ┆ 1.0 │
│ b ┆ 3.5 │
└──────┴────────┘
>>> df_group_by.calc_data(data_next)
shape: (3, 2)
┌──────┬────────┐
│ code ┆ mean_a │
│ --- ┆ --- │
│ str ┆ f64 │
╞══════╪════════╡
│ a ┆ 1.0 │ -> data_next的a行是null值,但是这里返回的不是null
│ b ┆ 3.5 │ -> data_next不包含b行,但是这里依然返回了b, 也就是说历史分组都会返回
│ c ┆ 2.5 │ -> 新增的分组
└──────┴────────┘
725 def filter_cb(self, by: str | pl.Expr | Expr) -> Expr: 726 """ 727 **上下文**, 过滤计算, 先过滤掉行,然后计算,最后用null值把原先过滤掉的行填充回来 728 729 写法 730 ------ 731 >>> col("a").mean().rolling(5).filter_cb("to_filter") 732 >>> col( 733 ... col("a").mean().rolling(5), 734 ... col("b").sum().expanding() 735 ... ).filter("to_filter") 736 737 使用场景 738 ------- 739 假设我有下面类型的流式数据 740 >>> data = pl.DataFrame({ 741 ... "price": range(6), 742 ... "is_finished": [False, True, False, True, False, True], 743 ... }) 744 其中 price 列是从tick合成的k线价格, is_finished, 表示这个k线是否还在合成中,True表示那一个tick合成完成 745 746 我现在要计算k线的移动平均,我可以先filter之后再计算,但是这样tick数据就没了,如果我后续需要tick回测就做不到 747 748 所以可以用这个算子这么做 749 >>> qs.with_cols( 750 ... col("price").mean().rolling(2, 1).filter_cb("is_finished").alias("ma") 751 ... ).calc_data(data) 752 >>> 753 shape: (6, 3) 754 ┌───────┬─────────────┬──────┐ 755 │ price ┆ is_finished ┆ ma │ 756 │ --- ┆ --- ┆ --- │ 757 │ i64 ┆ bool ┆ f64 │ 758 ╞═══════╪═════════════╪══════╡ 759 │ 0 ┆ false ┆ null │ 被filter掉,所以每参与移动平均的计算 760 │ 1 ┆ true ┆ 1.0 │ [1] -> 1 761 │ 2 ┆ false ┆ null │ 被filter掉,所以每参与移动平均的计算 762 │ 3 ┆ true ┆ 2.0 │ [1, 3] -> 2 763 │ 4 ┆ false ┆ null │ 被filter掉,所以每参与移动平均的计算 764 │ 5 ┆ true ┆ 4.0 │ [3, 5] -> 4 765 └───────┴─────────────┴──────┘ 766 """ 767 return Expr(self._expr.filter_cb(_col(by)))
上下文, 过滤计算, 先过滤掉行,然后计算,最后用null值把原先过滤掉的行填充回来
写法
>>> col("a").mean().rolling(5).filter_cb("to_filter")
>>> col(
... col("a").mean().rolling(5),
... col("b").sum().expanding()
... ).filter("to_filter")
使用场景
假设我有下面类型的流式数据
>>> data = pl.DataFrame({
... "price": range(6),
... "is_finished": [False, True, False, True, False, True],
... })
其中 price 列是从tick合成的k线价格, is_finished, 表示这个k线是否还在合成中,True表示那一个tick合成完成
我现在要计算k线的移动平均,我可以先filter之后再计算,但是这样tick数据就没了,如果我后续需要tick回测就做不到
所以可以用这个算子这么做
>>> qs.with_cols(
... col("price").mean().rolling(2, 1).filter_cb("is_finished").alias("ma")
... ).calc_data(data)
>>>
shape: (6, 3)
┌───────┬─────────────┬──────┐
│ price ┆ is_finished ┆ ma │
│ --- ┆ --- ┆ --- │
│ i64 ┆ bool ┆ f64 │
╞═══════╪═════════════╪══════╡
│ 0 ┆ false ┆ null │ 被filter掉,所以每参与移动平均的计算
│ 1 ┆ true ┆ 1.0 │ [1] -> 1
│ 2 ┆ false ┆ null │ 被filter掉,所以每参与移动平均的计算
│ 3 ┆ true ┆ 2.0 │ [1, 3] -> 2
│ 4 ┆ false ┆ null │ 被filter掉,所以每参与移动平均的计算
│ 5 ┆ true ┆ 4.0 │ [3, 5] -> 4
└───────┴─────────────┴──────┘
769 def select(self, *args: str | pl.Expr | Expr) -> Expr: 770 """ 771 **无状态算子**,选择列 772 773 写法 774 ----- 775 >>> col("a").select("a", (pl.col("a") + 1).alias("b"), col("a").ffill().alias("c")) 776 >>> col("a", "b").select(pl.col("a") + pl.col("b")).select(col("a").alias("gg")) 777 """ 778 return Expr(self._expr.select([_col(x) for x in args]))
780 def alias(self, names: str | list[str]) -> Expr: 781 """ 782 **无状态算子**,重命名 783 784 写法 785 ----- 786 >>> col("a").alias("b") 787 >>> col("a", "b").alias("gg1", "gg2") 788 """ 789 if isinstance(names, str): 790 names = [names] 791 return Expr(self._expr.alias(names))
无状态算子,重命名
写法
>>> col("a").alias("b")
>>> col("a", "b").alias("gg1", "gg2")
793 def perf(self, id: str | None = None) -> Expr: 794 """ 795 **无状态算子**,打印算子的计算时间 796 797 写法 798 ----- 799 >>> col("a").std().rolling(5).perf() 800 >>> col(col("a").std(), col("b").mean()).rolling(5).perf("min std perf") 801 802 说明 803 ----- 804 `with_cols` 和 `select`里面有多个算子的时候,用的多线程,所以每个算子的计算时间加起来会大于总的计算时间 805 比如 806 ```python 807 import numpy as np 808 n = 1000000 809 data = pl.DataFrame({ 810 "price": np.random.randn(n), 811 }) 812 res = qs.select(col( 813 col("price").mean().alias("mean1").perf("1"), 814 col("price").mean().alias("mean2").perf("2"), 815 col("price").mean().alias("mean3").perf("3"), 816 ).expanding().perf("all")).calc_data(data) 817 ``` 818 >>> 819 -------------- 820 2 821 4.240824ms 822 -------------- 823 -------------- 824 1 825 4.279659ms 826 -------------- 827 -------------- 828 3 829 4.815348ms 830 -------------- 831 -------------- 832 all 833 5.027417ms 834 -------------- 835 836 可以看到 4 + 4 + 4 > 5 837 """ 838 return Expr(self._expr.perf(id))
无状态算子,打印算子的计算时间
写法
>>> col("a").std().rolling(5).perf()
>>> col(col("a").std(), col("b").mean()).rolling(5).perf("min std perf")
说明
with_cols 和 select里面有多个算子的时候,用的多线程,所以每个算子的计算时间加起来会大于总的计算时间
比如
import numpy as np
n = 1000000
data = pl.DataFrame({
"price": np.random.randn(n),
})
res = qs.select(col(
col("price").mean().alias("mean1").perf("1"),
col("price").mean().alias("mean2").perf("2"),
col("price").mean().alias("mean3").perf("3"),
).expanding().perf("all")).calc_data(data)
>>>
2
4.240824ms
1
4.279659ms
3
4.815348ms
all
5.027417ms
可以看到 4 + 4 + 4 > 5
840 def shift(self, n: int) -> Expr: 841 """ 842 **行算子**,无retract,n只能大于0,n < 0 会报错, 因为n小于0无法进行流式计算 843 844 写法 845 ---- 846 >>> col("a").shift(1) 847 >>> col("a").shift(10).over("code") 848 """ 849 850 return Expr(self._expr.shift(n))
行算子,无retract,n只能大于0,n < 0 会报错, 因为n小于0无法进行流式计算
写法
>>> col("a").shift(1)
>>> col("a").shift(10).over("code")
858 def with_cols(self, *args: ExprInput) -> Expr: 859 """ 860 **无状态算子**, 和select类似,只不过保留输入列,如果新生成列和输入列有重复列,会只保留新生成的重复列 861 """ 862 return Expr(self._expr.with_cols([_col(x) for x in args]))
无状态算子, 和select类似,只不过保留输入列,如果新生成列和输入列有重复列,会只保留新生成的重复列
864 def first_value(self) -> Expr: 865 """ 866 **行算子**, 最后第一个值 867 """ 868 return Expr(self._expr.first_value())
行算子, 最后第一个值
870 def last_value(self) -> Expr: 871 """ 872 **行算子**, 最后一个值 873 """ 874 return Expr(self._expr.last_value())
行算子, 最后一个值
876 def add_suffix(self, suffix: str) -> Expr: 877 """ 878 **无状态算子**,添加后缀 879 """ 880 return Expr(self._expr.add_suffix(suffix))
无状态算子,添加后缀
888 def skew(self, bias: bool = True) -> Expr: 889 """ 890 **行算子**, 偏度 891 """ 892 return Expr(self._expr.skew(bias))
行算子, 偏度
894 def kurtosis(self, fisher: bool = True, bias: bool = True) -> Expr: 895 """ 896 **行算子**, 峰度 897 """ 898 return Expr(self._expr.kurtosis(fisher, bias))
行算子, 峰度
900 def cov(self, ddof: int = 1) -> Expr: 901 """ 902 **行算子**, 计算两列的协方差 903 904 写法 905 ----- 906 >>> col("a", "b").cov() 907 """ 908 return Expr(self._expr.cov(ddof))
行算子, 计算两列的协方差
写法
>>> col("a", "b").cov()
910 def corr(self, ddof: int = 1, method: str = "pearson") -> Expr: 911 """ 912 **行算子**, 计算两列的相关性 913 914 参数 915 ----- 916 :ddof: int, 917 :method: str, "pearson" | "spearman" 918 919 920 写法 921 ----- 922 >>> col("a", "b").corr() 923 >>> col("a", "b").corr("spearman") 924 """ 925 return Expr(self._expr.corr(ddof, method))
行算子, 计算两列的相关性
参数
:ddof: int, :method: str, "pearson" | "spearman"
写法
>>> col("a", "b").corr()
>>> col("a", "b").corr("spearman")
939 def ewm(self, alpha: float) -> Expr: 940 """ 941 **行算子**, 无retract,指数加权平均 942 """ 943 return Expr(self._expr.ewm(alpha))
行算子, 无retract,指数加权平均
952 def max_k(self, k: int) -> Expr: 953 """ 954 **行算子**,选出最大的k个值 955 """ 956 raise NotImplementedError("还没添加")
行算子,选出最大的k个值
958 def min_k(self, k: int) -> Expr: 959 """ 960 **行算子**, 选出最小的k个值 961 """ 962 raise NotImplementedError("还没添加")
行算子, 选出最小的k个值
964 def arg_max(self) -> Expr: 965 """ 966 **行算子**,最大值的位置 967 """ 968 raise NotImplementedError("还没添加")
行算子,最大值的位置
12class DataFrame: 13 14 def __init__(self, origin: DataFrameQust | None = None): 15 if origin is None: 16 origin = DataFrameQust() 17 self.__df = origin 18 19 def select(self, *args: ExprInput) -> "DataFrame": 20 return DataFrame(self.__df.select([_col(x) for x in args])) 21 22 def with_cols(self, *args: ExprInput) -> "DataFrame": 23 return DataFrame(self.__df.with_cols([_col(x) for x in args])) 24 25 def filter(self, arg: ExprInput) -> "DataFrame": 26 return DataFrame(self.__df.filter(_col(arg))) 27 28 def calc_data(self, data: pl.DataFrame) -> pl.DataFrame: 29 return self.__df.calc_data(data)
清除全局变量
9class UdfBatch(ABC): 10 """ 11 **批算子**, python自定义批算子 12 13 例子 14 ----- 15 自定义一个批算子,计算出输入的移动平均并且画图,在jupyter里面,每次接受到新数据后,更新图片 16 ```python 17 # 在jupyter里面的一个单元格下面定义算子 18 import matplotlib.pyplot as plt 19 import matplotlib 20 21 class PlotLineInJupyter(qs.UdfBatch): 22 def __init__(self): 23 plt.ion() 24 self.fig, self.ax = plt.subplots(figsize=(10, 6)) 25 self.line1, = self.ax.plot([], [], label="raw", linewidth=2) 26 self.line2, = self.ax.plot([], [], label="rolling_mean", linewidth=2) 27 self.ax.set_title('test udf batch', fontsize=14, fontweight='bold') 28 self.x_data, self.y_data, self.y_data_mean = [], [], [] 29 self.fig.tight_layout() 30 self.display_handle = display(self.fig, display_id=True) 31 self.fig.canvas.draw() 32 plt.close(self.fig) 33 34 35 def calc_batch(self, data): 36 self.x_data.extend(data[:, 0].to_list()) 37 self.y_data.extend(data[:, 1].to_list()) 38 self.y_data_mean.extend(data[:, 2].to_list()) 39 self.line1.set_data(self.x_data, self.y_data) 40 self.line2.set_data(self.x_data, self.y_data_mean) 41 if self.y_data: 42 x_min, x_max = min(self.x_data), max(self.x_data) 43 y_min, y_max = min(self.y_data), max(self.y_data) 44 self.ax.set_xlim(x_min - max(5, x_max*0.05), x_max + max(5, x_max*0.05)) 45 self.ax.set_ylim(y_min - max(0.05, y_max*0.05), y_max + max(0.05, y_max*0.05)) 46 self.fig.canvas.draw() 47 self.fig.canvas.flush_events() 48 self.display_handle.update(self.fig) 49 return data 50 51 df = qs.DataFrame().select( 52 col( 53 "index", 54 "y", 55 col("y").mean().rolling(10).alias("y_mean"), 56 ).udf_batch(PlotLineInJupyter()) 57 58 # 在另一个单元格里面运行 59 import numpy as np 60 n = 1000 61 data = pl.DataFrame({ 62 "index": range(n), 63 "y": np.random.randn(n), 64 }) 65 66 import time 67 for i in range(0, 200, 2): 68 df.calc_data(data[i:i+2]) 69 time.sleep(0.01) 70 ``` 71 """ 72 73 @abstractmethod 74 def calc_batch(self, data: pl.DataFrame) -> pl.DataFrame: 75 pass
批算子, python自定义批算子
例子
自定义一个批算子,计算出输入的移动平均并且画图,在jupyter里面,每次接受到新数据后,更新图片
# 在jupyter里面的一个单元格下面定义算子
import matplotlib.pyplot as plt
import matplotlib
class PlotLineInJupyter(qs.UdfBatch):
def __init__(self):
plt.ion()
self.fig, self.ax = plt.subplots(figsize=(10, 6))
self.line1, = self.ax.plot([], [], label="raw", linewidth=2)
self.line2, = self.ax.plot([], [], label="rolling_mean", linewidth=2)
self.ax.set_title('test udf batch', fontsize=14, fontweight='bold')
self.x_data, self.y_data, self.y_data_mean = [], [], []
self.fig.tight_layout()
self.display_handle = display(self.fig, display_id=True)
self.fig.canvas.draw()
plt.close(self.fig)
def calc_batch(self, data):
self.x_data.extend(data[:, 0].to_list())
self.y_data.extend(data[:, 1].to_list())
self.y_data_mean.extend(data[:, 2].to_list())
self.line1.set_data(self.x_data, self.y_data)
self.line2.set_data(self.x_data, self.y_data_mean)
if self.y_data:
x_min, x_max = min(self.x_data), max(self.x_data)
y_min, y_max = min(self.y_data), max(self.y_data)
self.ax.set_xlim(x_min - max(5, x_max*0.05), x_max + max(5, x_max*0.05))
self.ax.set_ylim(y_min - max(0.05, y_max*0.05), y_max + max(0.05, y_max*0.05))
self.fig.canvas.draw()
self.fig.canvas.flush_events()
self.display_handle.update(self.fig)
return data
df = qs.DataFrame().select(
col(
"index",
"y",
col("y").mean().rolling(10).alias("y_mean"),
).udf_batch(PlotLineInJupyter())
# 在另一个单元格里面运行
import numpy as np
n = 1000
data = pl.DataFrame({
"index": range(n),
"y": np.random.randn(n),
})
import time
for i in range(0, 200, 2):
df.calc_data(data[i:i+2])
time.sleep(0.01)
77class UdfRow(ABC): 78 """ 79 **行算子**, python自定义行算子,和其他行算子有相同行为 80 81 示例 82 ----- 83 1. 实现一个马丁格尔策略 84 ```python 85 class MartinGillStra(qs.UdfRow): 86 # 用python自定义行算子,实现一个马丁格尔策略 87 88 # 策略的输入 89 # ----- 90 # col(price, line_down_std2, line_down_std1, line_middle, line_up_std1, line_up_std2) 91 92 # price: k线价格 93 94 # line_down_std2: k线形成的最低线 95 96 # line_down_std1: k线形成的中下线 97 98 # line_middle: k线的中间线 99 100 # line_up_std1: k线形成的中上线 101 102 # line_up_std2: k线形成的最高线 103 104 # 策略逻辑 105 # ------ 106 107 # price 处于 [line_middle, line_up_std1], 目标仓位1, 108 # price 处于 [line_middle, line_up_std2], 目标仓位2, 109 # price 处于 [line_up_std2, inf], 目标仓位3, 110 # 反过来就是 -1, -2, -3 111 112 # 输出 113 # ------ 114 # col(target) 115 # target: 目标仓位 116 117 def __init__(self): 118 self.last_hold = 0.0 119 120 def output_schema(self, input_schema): 121 return [("target", pl.Float64)] 122 123 def update(self, price, down_std2, down_std1, middle, up_std1, up_std2): 124 # 如果能够保证输入没有null,可以去除这个检查,性能有提升 125 if price is None or down_std2 is None or down_std1 is None or middle is None or up_std1 is None or up_std2 is None: 126 return None 127 if price <= down_std2: 128 self.last_hold = -3.0 129 elif down_std2 < price <= down_std1: 130 self.last_hold = -2.0 131 elif down_std1 < price <= middle: 132 self.last_hold = -1.0 133 elif middle < price < up_std1: 134 self.last_hold = 1.0 135 elif up_std1 <= price < up_std2: 136 self.last_hold = 2.0 137 elif up_std2 <= price: 138 self.last_hold = 3.0 139 140 def calc(self): 141 return [self.last_hold] 142 143 df = qs.select( 144 col( 145 col("close").alias("price"), 146 col("close").mean().alias("middle"), 147 col("close").std().alias("std"), 148 ).rolling(20, 1).select(col( 149 "price", 150 (pl.col("middle") - 2.0 * pl.col("std")).alias("a"), 151 (pl.col("middle") - 1.0 * pl.col("std")).alias("b"), 152 "middle", 153 (pl.col("middle") + 1.0 * pl.col("std")).alias("c"), 154 (pl.col("middle") + 2.0 * pl.col("std")).alias("d"), 155 ).udf.row(MartinGillStra())).expanding() 156 ) 157 158 df.calc_data(data) 159 ``` 160 161 2. 自己实现一个均值计算 162 ```python 163 class MeanUdf(qs.UdfRow): 164 165 def __init__(self): 166 self.sum = 0.0 167 self.count = 0.0 168 169 def output_schema(self, input_schema): 170 return [("mean_res", pl.Float64)] 171 172 def update(self, value): 173 self.sum += value 174 self.count += 1.0 175 176 def calc(self): 177 return [self.sum / self.count] 178 179 def retract(self, value): 180 self.sum -= value 181 self.count -= 1.0 182 183 import numpy as np 184 n = 1000 185 data = pl.DataFrame({ 186 "value": np.random.randn(n), 187 "code": np.random.choice(["a", "b", "c"], size=n, replace=True), 188 "window": np.random.choice([10, 5, 2], size = n, replace=True), 189 "intra_day": np.random.choice([True, False], size = n, replace=True) 190 }) 191 192 qs.with_cols( 193 col("value").udf.row(MeanUdf()).expanding().alias("expanding"), 194 col("value").udf.row(MeanUdf()).rolling(10).alias("rolling"), 195 col("value").udf.row(MeanUdf()).rolling_dynamic("window").alias("rolling_dynamic"), 196 col("value").udf.row(MeanUdf()).rolling_intra_day("intra_day", 3).alias("rolling_intraday"), 197 col("value").udf.row(MeanUdf()).expanding().alias("expanding").over("code").add_suffix("over"), 198 col("value").udf.row(MeanUdf()).rolling(10).alias("rolling").over("code").add_suffix("over"), 199 col("value").udf.row(MeanUdf()).rolling_dynamic("window").alias("rolling_dynamic").add_suffix("over"), 200 col("value").udf.row(MeanUdf()).rolling_intra_day("intra_day", 3).alias("rolling_intraday").add_suffix("over"), 201 ).calc_data(data) 202 203 qs.select( 204 col("value").udf.row(MeanUdf()) 205 ).calc_data(data) 206 207 qs.select( 208 col("value").udf.row(MeanUdf()).group_by("code") 209 ).calc_data(data) 210 ``` 211 """ 212 213 @abstractmethod 214 def output_schema(self, input_schema: pl.Schema) -> List[Tuple[str, pl.DataType]]: 215 """ 216 算子返回的输出,列名 + 类型 217 """ 218 pass 219 220 @abstractmethod 221 def update(self, *args): 222 """ 223 :args: 算子的输入 224 比如输入是 col(a, b, c), args就是a,b,c三列每一行 225 """ 226 pass 227 228 @abstractmethod 229 def calc(self) -> List: 230 """ 231 必须返回一个List, 长度和self.output_schema一致 232 """ 233 pass 234 235 def support_retract(self) -> bool: 236 return True 237 238 def retract(self, *args): 239 """ 240 如果没有添加这个方法,那么这个行算子不能用于`rolling(*_dynamic, *_intra_day)`上下文: 241 """ 242 raise NotImplementedError("如果需要自定义的行算子支持各类rolling,得先为这个行算子添加`retract`方法") 243 244 def retract_front(self, *args): 245 self.retract(*args) 246 247 def retract_back(self, *args): 248 self.retract(*args)
行算子, python自定义行算子,和其他行算子有相同行为
示例
- 实现一个马丁格尔策略
class MartinGillStra(qs.UdfRow):
# 用python自定义行算子,实现一个马丁格尔策略
# 策略的输入
# -----
# col(price, line_down_std2, line_down_std1, line_middle, line_up_std1, line_up_std2)
# price: k线价格
# line_down_std2: k线形成的最低线
# line_down_std1: k线形成的中下线
# line_middle: k线的中间线
# line_up_std1: k线形成的中上线
# line_up_std2: k线形成的最高线
# 策略逻辑
# ------
# price 处于 [line_middle, line_up_std1], 目标仓位1,
# price 处于 [line_middle, line_up_std2], 目标仓位2,
# price 处于 [line_up_std2, inf], 目标仓位3,
# 反过来就是 -1, -2, -3
# 输出
# ------
# col(target)
# target: 目标仓位
def __init__(self):
self.last_hold = 0.0
def output_schema(self, input_schema):
return [("target", pl.Float64)]
def update(self, price, down_std2, down_std1, middle, up_std1, up_std2):
# 如果能够保证输入没有null,可以去除这个检查,性能有提升
if price is None or down_std2 is None or down_std1 is None or middle is None or up_std1 is None or up_std2 is None:
return None
if price <= down_std2:
self.last_hold = -3.0
elif down_std2 < price <= down_std1:
self.last_hold = -2.0
elif down_std1 < price <= middle:
self.last_hold = -1.0
elif middle < price < up_std1:
self.last_hold = 1.0
elif up_std1 <= price < up_std2:
self.last_hold = 2.0
elif up_std2 <= price:
self.last_hold = 3.0
def calc(self):
return [self.last_hold]
df = qs.select(
col(
col("close").alias("price"),
col("close").mean().alias("middle"),
col("close").std().alias("std"),
).rolling(20, 1).select(col(
"price",
(pl.col("middle") - 2.0 * pl.col("std")).alias("a"),
(pl.col("middle") - 1.0 * pl.col("std")).alias("b"),
"middle",
(pl.col("middle") + 1.0 * pl.col("std")).alias("c"),
(pl.col("middle") + 2.0 * pl.col("std")).alias("d"),
).udf.row(MartinGillStra())).expanding()
)
df.calc_data(data)
- 自己实现一个均值计算
class MeanUdf(qs.UdfRow):
def __init__(self):
self.sum = 0.0
self.count = 0.0
def output_schema(self, input_schema):
return [("mean_res", pl.Float64)]
def update(self, value):
self.sum += value
self.count += 1.0
def calc(self):
return [self.sum / self.count]
def retract(self, value):
self.sum -= value
self.count -= 1.0
import numpy as np
n = 1000
data = pl.DataFrame({
"value": np.random.randn(n),
"code": np.random.choice(["a", "b", "c"], size=n, replace=True),
"window": np.random.choice([10, 5, 2], size = n, replace=True),
"intra_day": np.random.choice([True, False], size = n, replace=True)
})
qs.with_cols(
col("value").udf.row(MeanUdf()).expanding().alias("expanding"),
col("value").udf.row(MeanUdf()).rolling(10).alias("rolling"),
col("value").udf.row(MeanUdf()).rolling_dynamic("window").alias("rolling_dynamic"),
col("value").udf.row(MeanUdf()).rolling_intra_day("intra_day", 3).alias("rolling_intraday"),
col("value").udf.row(MeanUdf()).expanding().alias("expanding").over("code").add_suffix("over"),
col("value").udf.row(MeanUdf()).rolling(10).alias("rolling").over("code").add_suffix("over"),
col("value").udf.row(MeanUdf()).rolling_dynamic("window").alias("rolling_dynamic").add_suffix("over"),
col("value").udf.row(MeanUdf()).rolling_intra_day("intra_day", 3).alias("rolling_intraday").add_suffix("over"),
).calc_data(data)
qs.select(
col("value").udf.row(MeanUdf())
).calc_data(data)
qs.select(
col("value").udf.row(MeanUdf()).group_by("code")
).calc_data(data)
213 @abstractmethod 214 def output_schema(self, input_schema: pl.Schema) -> List[Tuple[str, pl.DataType]]: 215 """ 216 算子返回的输出,列名 + 类型 217 """ 218 pass
算子返回的输出,列名 + 类型
220 @abstractmethod 221 def update(self, *args): 222 """ 223 :args: 算子的输入 224 比如输入是 col(a, b, c), args就是a,b,c三列每一行 225 """ 226 pass
:args: 算子的输入 比如输入是 col(a, b, c), args就是a,b,c三列每一行
228 @abstractmethod 229 def calc(self) -> List: 230 """ 231 必须返回一个List, 长度和self.output_schema一致 232 """ 233 pass
必须返回一个List, 长度和self.output_schema一致
455class TradePriceType: 456 """ 457 挂单价格类型 458 """ 459 queue = TradePrice.queue() 460 """ 461 排队成交,卖单挂ask1,买单挂bid1 462 """ 463 last_price = TradePrice.last_price() 464 """ 465 挂最新价 466 """ 467 opsite = TradePrice.opsite() 468 """ 469 挂对手价,卖单挂bid1,买单挂ask1 470 """
挂单价格类型
473class MatchPriceType: 474 """ 475 撮合成交类型 476 """ 477 simnow = MatchPrice.simnow() 478 """ 479 用simnow的方法撮合成交 480 """ 481 void = MatchPrice.void() 482 """ 483 按挂单价格直接成交 484 """
撮合成交类型