تحليل بيانات مالية عالية الأداء باستخدام مكتبة Polars: تقييم كسول، تعبيرات متقدمة، ودمج SQL
في هذا البرنامج التعليمي، نتعمق في بناء خط أنابيب متقدم لتحليل البيانات باستخدام مكتبة Polars، وهي مكتبة DataFrame سريعة للغاية مصممة لأداء مثالي وقابلية للتطوير. هدفنا هو إظهار كيفية استخدام تقييم Polars الكسول، والتعبيرات المعقدة، ودوال النوافذ، وواجهة SQL لمعالجة مجموعات البيانات المالية واسعة النطاق بكفاءة. نبدأ بإنشاء مجموعة بيانات زمنية مالية اصطناعية، ثم نتقدم خطوة بخطوة عبر خط أنابيب شامل، من هندسة الميزات والإحصائيات المتداول إلى التحليل متعدد الأبعاد والتصنيف. على مدار هذا البرنامج التعليمي، سنوضح كيف تمكننا Polars من كتابة تحويلات بيانات واضحة وعالية الأداء، مع الحفاظ على استخدام منخفض للذاكرة وضمان التنفيذ السريع.
1. إعداد البيئة وتوليد البيانات الاصطناعية
نبدأ باستيراد المكتبات الأساسية، بما في ذلك Polars لعمليات DataFrame عالية الأداء و NumPy لإنشاء بيانات اصطناعية. لضمان التوافق، نضيف خطوة تثبيت احتياطية لـ Polars في حالة عدم تثبيتها بالفعل. بمجرد تجهيز الإعداد، نعلن عن بداية خط أنابيب التحليلات المتقدمة لدينا.
import polars as pl
import numpy as np
from datetime import datetime, timedelta
import io
try:
import polars as pl
except ImportError:
import subprocess
subprocess.run(["pip", "install", "polars"], check=True)
import polars as pl
print("خط أنابيب تحليلات Polars المتقدمة")
print("=" * 50)
np.random.seed(42)
n_records = 100000
dates = [datetime(2020, 1, 1) + timedelta(days=i//100) for i in range(n_records)]
tickers = np.random.choice(['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN'], n_records)
data = {
'timestamp': dates,
'ticker': tickers,
'price': np.random.lognormal(4, 0.3, n_records),
'volume': np.random.exponential(1000000, n_records).astype(int),
'bid_ask_spread': np.random.exponential(0.01, n_records),
'market_cap': np.random.lognormal(25, 1, n_records),
'sector': np.random.choice(['Tech', 'Finance', 'Healthcare', 'Energy'], n_records)
}
print(f" تم توليد {n_records:,} سجلًا ماليًا اصطناعيًا")
نقوم بتوليد مجموعة بيانات مالية اصطناعية غنية تحتوي على 100,000 سجل باستخدام NumPy، محاكاة بيانات الأسهم اليومية للرموز الرئيسية مثل AAPL و TSLA. يتضمن كل إدخال ميزات سوقية رئيسية مثل السعر، والحجم، و هامش العرض والطلب، والقيمة السوقية، والقطاع. يوفر هذا أساسًا واقعيًا لعرض تحليلات Polars المتقدمة على مجموعة بيانات زمنية.
2. معالجة البيانات باستخدام Polars
lf = pl.LazyFrame(data)
result = (
lf
.with_columns([
pl.col('timestamp').dt.year().alias('year'),
pl.col('timestamp').dt.month().alias('month'),
pl.col('timestamp').dt.weekday().alias('weekday'),
pl.col('timestamp').dt.quarter().alias('quarter')
])
.with_columns([
pl.col('price').rolling_mean(20).over('ticker').alias('sma_20'),
pl.col('price').rolling_std(20).over('ticker').alias('volatility_20'),
pl.col('price').ewm_mean(span=12).over('ticker').alias('ema_12'),
pl.col('price').diff().alias('price_diff'),
(pl.col('volume') * pl.col('price')).alias('dollar_volume')
])
.with_columns([
pl.col('price_diff').clip(0, None).rolling_mean(14).over('ticker').alias('rsi_up'),
pl.col('price_diff').abs().rolling_mean(14).over('ticker').alias('rsi_down'),
(pl.col('price') - pl.col('sma_20')).alias('bb_position')
])
.with_columns([
(100 - (100 / (1 + pl.col('rsi_up') / pl.col('rsi_down')))).alias('rsi')
])
.filter(
(pl.col('price') > 10) & (pl.col('volume') > 100000) & (pl.col('sma_20').is_not_null())
)
.group_by(['ticker', 'year', 'quarter'])
.agg([
pl.col('price').mean().alias('avg_price'),
pl.col('price').std().alias('price_volatility'),
pl.col('price').min().alias('min_price'),
pl.col('price').max().alias('max_price'),
pl.col('price').quantile(0.5).alias('median_price'),
pl.col('volume').sum().alias('total_volume'),
pl.col('dollar_volume').sum().alias('total_dollar_volume'),
pl.col('rsi').filter(pl.col('rsi').is_not_null()).mean().alias('avg_rsi'),
pl.col('volatility_20').mean().alias('avg_volatility'),
pl.col('bb_position').std().alias('bollinger_deviation'),
pl.len().alias('trading_days'),
pl.col('sector').n_unique().alias('sectors_count'),
(pl.col('price') > pl.col('sma_20')).mean().alias('above_sma_ratio'),
((pl.col('price').max() - pl.col('price').min()) / pl.col('price').min()).alias('price_range_pct')
])
.with_columns([
pl.col('total_dollar_volume').rank(method='ordinal', descending=True).alias('volume_rank'),
pl.col('price_volatility').rank(method='ordinal', descending=True).alias('volatility_rank')
])
.filter(pl.col('trading_days') >= 10)
.sort(['ticker', 'year', 'quarter'])
)
نقوم بتحميل مجموعة البيانات الاصطناعية لدينا في Polars LazyFrame
لتمكين التنفيذ المؤجل، مما يسمح لنا بتسلسل التحويلات المعقدة بكفاءة. من هناك، نقوم بإثراء البيانات بخصائص تعتمد على الوقت، وتطبيق مؤشرات تقنية متقدمة، مثل المتوسطات المتحركة، و RSI، و Bollinger bands، باستخدام دوال النوافذ والمتداول. ثم نقوم بإجراء عمليات تجميع مجمعة حسب الرمز، والسنة، والربع لاستخراج الإحصائيات والمؤشرات المالية الرئيسية. أخيرًا، نقوم بترتيب النتائج بناءً على الحجم والتقلب، وفلترة الأجزاء التي لم يتم تداولها بشكل كافٍ، وفرز البيانات للاستكشاف البديهي، مع الاستفادة الكاملة من محرك التقييم الكسول القوي لـ Polars.
3. جمع النتائج وتحليلها
df = result.collect()
print(f"n نتائج التحليل: {df.height:,} سجلًا مجمعًا")
print("nأعلى 10 أرباع من حيث الحجم:")
print(df.sort('total_dollar_volume', descending=True).head(10).to_pandas())
print("n تحليلات متقدمة:")
pivot_analysis = (
df.group_by('ticker')
.agg([
pl.col('avg_price').mean().alias('overall_avg_price'),
pl.col('price_volatility').mean().alias('overall_volatility'),
pl.col('total_dollar_volume').sum().alias('lifetime_volume'),
pl.col('above_sma_ratio').mean().alias('momentum_score'),
pl.col('price_range_pct').mean().alias('avg_range_pct')
])
.with_columns([
(pl.col('overall_avg_price') / pl.col('overall_volatility')).alias('risk_adj_score'),
(pl.col('momentum_score') * 0.4 + pl.col('avg_range_pct') * 0.3 + (pl.col('lifetime_volume') / pl.col('lifetime_volume').max()) * 0.3).alias('composite_score')
])
.sort('composite_score', descending=True)
)
print("n تصنيف أداء الرموز:")
print(pivot_analysis.to_pandas())
بمجرد اكتمال خط الأنابيب الكسول، نقوم بجمع النتائج في DataFrame و مراجعة أفضل 10 أرباع بناءً على إجمالي حجم الدولار على الفور. يساعدنا هذا في تحديد فترات النشاط التجاري المكثف. ثم نتخذ خطوة أخرى في تحليلنا من خلال تجميع البيانات حسب الرمز لحساب رؤى على مستوى أعلى، مثل حجم التداول مدى الحياة، ومتوسط تقلب السعر، ودرجة مركبة مخصصة. يساعدنا هذا الملخص متعدد الأبعاد في مقارنة الأسهم ليس فقط من حيث الحجم الخام، ولكن أيضًا من حيث الزخم والأداء المعدل حسب المخاطر، مما يفتح رؤى أعمق في سلوك الرمز بشكل عام.
4. واجهة SQL و خيارات التصدير
print("n عرض واجهة SQL:")
pl.Config.set_tbl_rows(5)
sql_result = pl.sql("""
SELECT ticker, AVG(avg_price) as mean_price, STDDEV(price_volatility) as volatility_consistency, SUM(total_dollar_volume) as total_volume, COUNT(*) as quarters_tracked
FROM df
WHERE year >= 2021
GROUP BY ticker
ORDER BY total_volume DESC
""", eager=True)
print(sql_result)
print(f"n مقاييس الأداء:")
print(f" • تم تطبيق تحسينات التقييم الكسول")
print(f" • تم معالجة {n_records:,} سجل بكفاءة")
print(f" • عمليات عمودية فعالة من حيث الذاكرة")
print(f" • عمليات بدون نسخ حيثما أمكن")
print(f"n خيارات التصدير:")
print(" • Parquet (ضغط عالي): df.write_parquet('data.parquet')")
print(" • Delta Lake: df.write_delta('delta_table')")
print(" • JSON streaming: df.write_ndjson('data.jsonl')")
print(" • Apache Arrow: df.to_arrow()")
print("n تم إكمال خط أنابيب Polars المتقدم بنجاح!")
print(" تم عرض: التقييم الكسول، والتعبيرات المعقدة، ودوال النوافذ،")
print(" واجهة SQL، والعمليات التجميعية المتقدمة، وتحليلات عالية الأداء")
نختتم خط الأنابيب من خلال عرض واجهة SQL الأنيقة لـ Polars، وتشغيل استعلام تجميعي لتحليل أداء الرمز بعد عام 2021 باستخدام بناء جملة SQL المألوف. تتيح لنا هذه القدرة الهجينة دمج تحويلات Polars الواضحة مع استعلامات SQL التصريحية بسلاسة. لتسليط الضوء على كفاءتها، نقوم بطباعة مقاييس الأداء الرئيسية، مع التركيز على التقييم الكسول، وكفاءة الذاكرة، والتنفيذ بدون نسخ. أخيرًا، نوضح مدى سهولة تصدير النتائج بتنسيقات متنوعة، مثل Parquet و Arrow و JSONL، مما يجعل هذا الأنبوب قويًا وجاهزًا للإنتاج. بهذا، نُكمل سير عمل تحليلات عالية الأداء شاملًا باستخدام Polars
اترك تعليقاً