لنا ثلاثة أسابيع لإرسال دفق بيانات واحد. اليوم، يمكن للمحلل الذي ليس لديه خبرة في بايثون أن يفعل ذلك في يوم واحد. وإليك كيف وصلنا إلى هناك.
اسمي كيريل كازلو وأنا مهندس بيانات في Mindbox. يقوم فريقنا بانتظام بحساب مقاييس الأعمال للعملاء، مما يعني أننا نعمل باستمرار على إنشاء مجموعات بيانات للفوترة والتحليلات، مستمدين من عشرات المصادر المختلفة.
لقد اعتمدنا على PySpark لفترة طويلة لمعالجة جميع بياناتنا. مشكلة؟ لا يمكنك حقًا العمل مع PySpark بدون خبرة Python. يتطلب كل خط أنابيب جديد مبرمجًا. وهذا يعني الانتظار – أحيانًا لأسابيع.
في هذا المنشور، سأخبرك كيف قمنا ببناء نظام أساسي للبيانات الداخلية حيث يمكن للمحلل أو مدير المنتج تشغيل مسار يتم تحديثه بانتظام عن طريق حفظ أربعة ملفات YAML فقط.
لماذا يبطئنا PySpark؟
اسمحوا لي أن أوضح هذا الألم بمثال كتابي – حساب MAU (المستخدمون النشطون شهريًا).
للوهلة الأولى، يبدو هذا بمثابة مهمة SQL بسيطة: COUNT(DISTINCT customerId) على عدة طاولات في نافذة زمنية. ومع ذلك، نظرًا لجميع تكاليف البنية التحتية – PySpark، وإعداد Airflow DAG، وتخصيص موارد Spark، والاختبار – كان علينا تسليمها إلى المطورين. نتيجة؟ أسبوع كامل لشحن عداد MAU.
استغرق تسليم كل مؤشر جديد ما بين أسبوع إلى ثلاثة أسابيع. وكانت العملية هي نفسها في كل مرة:
- قام المحلل بتحديد متطلبات العمل، وعثر على مطور متاح وقدم السياق.
- قام المطور بشرح التفاصيل، وكتب كود PySpark، وقام بمراجعة الكود، وقام بتكوين DAG ونشره.
ما أردناه حقًا هو أن يتمكن المحللون ومديرو المنتجات – الأشخاص الذين يفهمون منطق الأعمال بشكل أفضل ويجيدون لغة SQL وYAML – من التعامل مع الأمر بأنفسهم. لا بايثون. لا باي سبارك.
ما استبدلناه بـ PySpark: كل ما تحتاجه هو YAML وSQL
لتبني نهج تصريحي، قمنا بتقسيم طبقة البيانات لدينا إلى ثلاثة أجزاء واخترنا أداة مناسبة لكل منها:
- دي إل تي (محمل البيانات) – يسحب البيانات من واجهات برمجة التطبيقات الخارجية وقواعد البيانات إلى تخزين الكائنات. تم تكوينه بالكامل عبر ملف YAML. لا يوجد رمز مطلوب.
- dbt (أداة إنشاء البيانات) على Trino – تحويل البيانات باستخدام SQL خالص. يربط النماذج من خلال
ref()يقوم تلقائيًا بإنشاء رسم بياني للتبعية ويدعم التحديثات المتزايدة. - تدفق الهواء + الفضاء – تنسيق خطوط الأنابيب. يتم إنشاء Airflow DAG تلقائيًا من
dag.yamlومشروع دي بي تي.
كنا نستخدم Trino بالفعل كمحرك استعلام للاستعلامات المخصصة وقمنا بتوصيله بـ Superset for BI. لقد أثبتت بالفعل قيمتها: بالنسبة للاستعلامات ذات المنطق القياسي، قامت بمعالجة مجموعات البيانات الضخمة بشكل أسرع وبموارد أقل من Spark. علاوة على ذلك، يدعم Trino أصلاً الوصول الموحد إلى مخازن بيانات متعددة من استعلام SQL واحد. بالنسبة لـ 90% من خطوط الأنابيب لدينا، كانت Trino مناسبة تمامًا.
كيف نقوم بتحميل البيانات: dlt.yaml
يصف ملف YAML الأول مكان وكيفية تحميل البيانات لمزيد من المعالجة. فيما يلي مثال واقعي – تحميل بيانات الفوترة من واجهة برمجة التطبيقات الداخلية:
product: sg-team
feature: billing
schema: billing_tarification
dag:
dag_id: dlt_billing_tarification
schedule: "0 4 * * *"
description: "Daily refresh of tarification data"
tags:
- billing
alerts:
enabled: true
severity: warning
source:
type: rest_api
client:
base_url: "https://internal-api.example.com"
auth:
type: bearer
token: dlt-billing.token
resources:
- name: tarification_data
endpoint:
path: /tarificationData
method: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
pricingPlanLine: CurrentPlan
write_disposition: replace
processing_steps:
- map: dlt_custom.billing_tarification_data.map
- name: charges_raw
columns:
staffUserName:
data_type: text
nullable: true
endpoint:
path: /data-feed/charges
method: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
write_disposition: replace
- name: discounts_raw
endpoint:
path: /data-feed/discounts
method: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
write_disposition: replace
يحدد هذا التكوين أربعة موارد من واجهة برمجة تطبيقات واحدة. لكل واحد منهم، نحدد نقطة النهاية ونطلب المعلمات ونكتب الإستراتيجية – في حالتنا replace يعني “الكتابة في كل مرة”. يمكنك أيضًا إضافة خطوات المعالجة وتحديد أنواع الأعمدة وتكوين التنبيهات.
الإعداد كله هناك 40 سطرًا من YAML. بدون dlt، سيكون كل موصل عبارة عن طلبات معالجة نصية python، وترقيم الصفحات، وإعادة المحاولة، والتسلسل إلى تنسيق جدول Delta، والنقل إلى الذاكرة.
كيف نقوم بتحويل البيانات باستخدام SQL: dbt_project.yaml وsource.yaml
الخطوة التالية هي تكوين نموذج dbt. في حالة Trino، هذا يعني استعلامات SQL.
فيما يلي مثال لإعداد حساب MAU. هذا ما يبدو عليه التحضير للحدث من مصدر واحد:
-- int_mau_events_visits.sql (simplified)
{{ config(materialized='table') }}
WITH period AS (
-- Rolling window: last 5 months to current
SELECT
YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year,
MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month,
YEAR(CURRENT_DATE) AS end_year,
MONTH(CURRENT_DATE) AS end_month
),
events AS (
-- Pull visit events within the period window
SELECT src._tenant, src.unmergedCustomerId,
'visits' AS src_type, src.endpoint
FROM {{ source('final', 'customerstracking_visits') }} src
CROSS JOIN period p
WHERE src.unmergedCustomerId IS NOT NULL
AND /* ...timestamp filtering by year/month bounds... */
),
events_with_customer AS (
-- Resolve merged customer IDs
SELECT e._tenant,
COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId,
e.src_type, e.endpoint
FROM events e
LEFT JOIN {{ ref('int_merged_customers') }} mc
ON e._tenant = mc._tenant
AND e.unmergedCustomerId = mc.unmergedCustomerId
)
-- Keep only actual (non-deleted) customers
SELECT ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint
FROM events_with_customer ewc
WHERE EXISTS (
SELECT 1 FROM {{ ref('int_actual_customers') }} ac
WHERE ewc._tenant = ac._tenant
AND ewc.customerId = ac.customerId
)
تتبع جميع مصادر الأحداث العشرة نفس النمط تمامًا. الاختلافات الوحيدة هي الجدول المصدر وعوامل التصفية. ثم يتم دمج النماذج في تيار واحد:
-- int_mau_events.sql (union of all sources)
SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_visits') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_orders') }}
-- ...plus 6 more sources
وأخيرًا، قاعدة البيانات حيث يتم تجميع كل شيء:
-- mau_period_datamart.sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=('_tenant', 'start_year', 'start_month', 'end_year', 'end_month')
) }}
{%- set months_back = var('months_back', 5) | int -%}
WITH period AS (
SELECT
YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year,
MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month,
YEAR(CURRENT_DATE) AS end_year,
MONTH(CURRENT_DATE) AS end_month
),
events_resolved AS (
SELECT * FROM {{ ref('int_mau_events') }}
),
metrics_by_tenant AS (
SELECT
er._tenant,
COUNT(DISTINCT CASE WHEN src_type="visits"
THEN customerId END) AS CustomersTracking_Visits,
COUNT(DISTINCT CASE WHEN src_type="orders"
THEN customerId END) AS ProcessingOrders_Orders,
COUNT(DISTINCT CASE WHEN src_type="mailings"
THEN customerId END) AS Mailings_MessageStatuses,
-- ...other metrics
COUNT(DISTINCT customerId) AS MAU
FROM events_resolved er
GROUP BY er._tenant
)
SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month
FROM metrics_by_tenant m
CROSS JOIN period p
نستخدم لتكوين مستودع البيانات incremental_strategy='merge'. يقوم dbt تلقائيًا بإنشاء استعلام دمج عن طريق الاستبدال unique_key لزيادة. ليست هناك حاجة لتنفيذ التحميل المتزايد يدويًا.
لدمج النماذج في مشروع واحد، قمنا بتكوينها dbt_project.yaml: :
name: mau_period
version: '1.0.0'
models:
mau_period:
+on_table_exists: replace
+on_schema_change: append_new_columns
و sources.yamlالذي يصف جداول الإدخال:
sources:
- name: final
database: data_platform
schema: final
tables:
- name: inapps_targetings_v2
- name: inapps_clicks_v2
- name: customerstracking_visits
- name: processingorders_orders
- name: cdp_mergedcustomers_v2
# ...
والنتيجة هي نفس منطق الأعمال الذي استخدمناه في PySpark، ولكن بلغة SQL خالصة: sources.yaml يستبدل مخططات typedspark، {{ ref() }} و {{ source() }} يستبدل .get_table()ويحل أمر التنفيذ التلقائي عبر الرسم البياني للتبعية محل الضبط اليدوي لموارد Spark.
كيف نقوم بتكوين تدفق الهواء: dag.yaml
يحدد ملف التكوين الرابع متى وكيف يقوم Airflow بتشغيل خط الأنابيب:
product: sg-team
feature: billing
schema: mau
schedule: "15 21 * * *" # every day at 00:15 MSK
params:
- name: start_date
description: "Start date (YYYY-MM-DD). Leave empty for auto"
default: ""
- name: end_date
description: "End date (YYYY-MM-DD). Leave empty for auto"
default: ""
- name: months_back
description: "Months to look back (default: 5)"
default: 5
alerts:
enabled: true
severity: warning
ثم نقوم بتحليل نص بايثون الخاص بنا dag.yaml و dbt_project.yaml ويستخدم مكتبة Cosmos لإنشاء Airflow DAG كامل الوظائف. هذا هو مجرد قطعة من كود بايثون في التكوين بأكمله. تمت كتابته مرة واحدة ويعمل مع أي مشروع dbt. وهنا الجزء الرئيسي:
def _build_dbt_project_dags(project_path: Path, environ: dict) -> list(DbtDag):
config_dict = yaml.safe_load(dag_config_path.read_text())
config = DagConfig.model_validate(config_dict)
# YAML params → Airflow Params
params = {}
operator_vars = {}
for param in config.params:
params(param.name) = Param(
default=param.default if param.default is not None else "",
description=param.description,
)
operator_vars(param.name) = f"{{{{ params.{param.name} }}}}"
# Cosmos creates the DAG from the dbt project
with DbtDag(
dag_id=f"dbt_{project_path.name}",
schedule=config.schedule,
params=params,
project_config=ProjectConfig(dbt_project_path=project_path),
profile_config=ProfileConfig(
profile_name="default",
target_name=project_name,
profile_mapping=TrinoLDAPProfileMapping(
conn_id="trino_default",
profile_args={
"database": profile_database,
"schema": profile_schema,
},
),
),
operator_args={"vars": operator_vars},
) as dag:
# Create schema before running models
create_schema = SQLExecuteQueryOperator(
task_id="create_schema",
conn_id="trino_default",
sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...",
)
# Attach to root tasks
for unique_id, _ in dag.dbt_graph.filtered_nodes.items():
task = dag.tasks_map(unique_id)
if not task.upstream_task_ids:
create_schema >> task
الكون يقرأ manifest.json من مشروع dbt، يقوم بتحليل الرسم البياني لتبعية النموذج وإنشاء وظيفة Airflow منفصلة لكل نموذج. يتم إنشاء تبعيات المهام تلقائيًا على الأساس ref() المكالمات في SQL.
كيف يقوم المحللون ببناء خطوط الأنابيب بدون مطورين
الآن، عندما يحتاج المحلل إلى مسار جديد وقابل للتكرار، يمكنه تجميعه معًا في بضع خطوات:
الخطوة 1 قم بإنشاء مجلد في المستودع: dbt-projects/my_new_pipeline/.
الخطوة 2 إذا كانت هناك حاجة إلى استيعاب بيانات خارجية، فاكتب تكوين YAML لـ dlt.
الخطوة 3 حفظ نماذج SQL بالتنسيق models/ المجلد ووصف المصادر الموجودة فيه sources.yaml.
الخطوة 4 يخلق dbt_project.yaml و dag.yaml.
الخطوة 5 اضغط على Git، قم بالمراجعة، ثم قم بالاتصال.
يقوم CI/CD ببناء مشروع dbt ويرسل العناصر إلى S3. يقرأ Airflow ملفات DAG من هناك، ويقوم Cosmos بتحليل مشروع dbt وإنشاء رسم بياني للمهام. كما هو مقرر، يقوم dbt بتشغيل النماذج على منصة Trino بالترتيب الصحيح. والنتيجة النهائية هي قاعدة بيانات محدثة في المستودع، متاحة عبر Superset.
ماذا تغير بعد الهجرة
لكي يتمكن المحللون من بناء خطوط الأنابيب بأنفسهم، عليهم أن يفهموا ذلك ref() و source() المفاهيم، الفرق بين table و incremental التجسيد وأساسيات Git. لقد أجرينا العديد من ورش العمل الداخلية وقمنا بإعداد أدلة خطوة بخطوة لكل نوع من المهام.
لماذا لا يحل المكدس الجديد محل PySpark بالكامل
بالنسبة لحوالي 10% من خطوط الأنابيب لدينا، لا يزال PySpark هو الخيار الوحيد – عندما لا يتناسب التحويل مع SQL. يدعم dbt وحدات ماكرو Jinja، لكن هذا لن يحل محل لغة Python الكاملة. وسيكون من الظلم تجاهل القيود المفروضة على الأدوات الجديدة.
dlt + Delta: دعم تجريبي لإدراج عفوًا. نحن نستخدم تنسيق دلتا لطبقة التخزين لدينا. تم وضع علامة على موصل Delta dlt على أنه تجريبي، لذا لم تنجح استراتيجية الربط خارج الصندوق. وكان علينا أن نجد حلولاً بديلة، وفي بعض الحالات استخدمناها replace بدلاً من merge (التضحية بالتزايد)، وفي غيرها كتبنا العرف processing_steps.
ترينو محدود التسامح مع الخطأ. لدى Trino آلية تحمل الخطأ، ولكنها تعمل عن طريق حفظ النتائج المتوسطة في S3. ومع أحجام البيانات التي تصل إلى تيرابايت، يعد هذا الأمر غير عملي – فالحجم الهائل لعمليات S3 يجعلها باهظة التكلفة. بدون تمكين التسامح مع الخطأ، إذا تعطلت عملية Trino المنفذة، فسيفشل الاستعلام بأكمله. ومن ناحية أخرى، يقوم Spark بإعادة تشغيل المهمة الفاشلة فقط. لقد قمنا بحل هذه المشكلة عن طريق إعادة المحاولة على مستوى DAG وتقسيم النماذج الثقيلة إلى سلاسل من النماذج المتوسطة.
UDF والمنطق المخصص. في Spark، يمكنك كتابة منطق Python مخصص مباشرةً في المسار – وهذا أمر مريح للغاية. وبفضل البنية الجديدة، أصبح هذا الأمر أكثر صعوبة. لا يساعد dbt على Trino: يقوم Jinja بإنشاء SQL فقط، بينما تعمل نماذج Python dbt فقط مع Snowflake وDatabricks وBigQuery. يمكنك كتابة UDFs في Trino، ولكن فقط في Java – مع كل النفقات العامة التي تستلزم: مستودع منفصل، وبناء خط أنابيب، ونشر ملفات JAR بين جميع العاملين. لذا، إذا لم يتطابق التحويل مع SQL، فسينتهي بك الأمر إما بوحش SQL غير قابل للصيانة أو ببرنامج نصي مستقل يكسر الأصل.
ما هو التالي: الاختبار، وقوالب النماذج، والتدريب
اختبار أفضل. لقد أجرينا بعض الاختبارات القوية لخطوط الأنابيب في PySpark، لكن البنية الجديدة ما زالت تلحق بالركب. قدمت أحدث إصدارات dbt اختبار الوحدة – يمكنك الآن التحقق من منطق نموذج SQL الخاص بك مقابل البيانات المزيفة دون الحاجة إلى تشغيل خط الأنابيب بالكامل. نريد إضافة اختبارات dbt على مستوى النموذج وكطبقة مراقبة منفصلة.
قوالب قابلة لإعادة الاستخدام للأنماط الشعبية. العديد من نماذج dbt لدينا تبدو متشابهة. يمكن لتكوين واحد أن يصف عدة نماذج بنفس النمط – فهي تختلف فقط في الجدول المصدر والمرشحات. نحن نخطط لاستخراج المنطق المشترك إلى وحدات ماكرو dbt.
توسيع قاعدة مستخدمي المنصة. نريد أن يتمكن المزيد من المهندسين والمحللين من العمل مع البيانات بشكل مستقل. نحن نخطط لجلسات تدريب داخلية منتظمة ووثائق وأدلة للبدء حتى يتمكن المستخدمون الجدد من الوصول بسرعة إلى السرعة والبدء في إنشاء نماذجهم الخاصة.
إذا كان فريقك عالقًا في نفس حلقة “المحللون في انتظار المطورين”، فأنا أرغب في معرفة كيفية تعاملك معها. تواصل معي على LinkedIn ودعنا نقارن الملاحظات.
جميع الصور في هذه المقالة هي من قبل المؤلف ما لم يذكر خلاف ذلك.












