数据是企业获取竞争优势的重要资产。随着技术的进步,数据的收集和存储变得更加容易。然而,数据量的激增却使得数据处理变得更加缓慢和复杂,尤其是在数据规模较大的情况下。
为提升数据处理能力,有多种工具可供选择,其中之一便是Dask。Dask 是一个强大的 Python 库,提供兼容 Pandas 的 API,能够通过并行和外存计算实现数据处理的扩展。它通过将工作流划分为更小的批次,并在多个核心或多台机器上并发执行,有效处理大规模数据集。
鉴于 Dask 的实用价值,学习如何构建一个任何数据专业人士都能使用的端到端数据管道是明智之举。因此,本文将带你学习如何用 Dask 搭建数据管道。
让我们开始吧。
准备工作
为了顺利完成本教程,我们需要做好以下准备。首先,需要建立一个用于存储数据的数据库。本例将使用 MySQL 作为数据库,你只需下载安装并按照标准步骤完成安装即可。
数据集方面,我们将使用 Kaggle 上公开的 Data Scientist Salary 数据集。请将该数据集保存在名为“data”的文件夹下,暂且不用操作。
接下来,使用如下命令创建一个虚拟环境以搭建开发环境:
python -m venv dask_pipeline
你可以自行命名虚拟环境,但我更倾向于选择有意义的名称。激活虚拟环境后,创建一个 requirements.txt 文件,用于列出项目所需的所有依赖库:
dask[complete]
pandas
numpy
sqlalchemy
PyMySQL
luigi
python-dotenv
setuptools
准备好后,使用如下命令安装所有依赖库:
pip install -r requirements.txt
随后,新建一个名为“.env”的文件,用于存储本项目所用的变量,主要是数据库的访问信息。文件内容如下:
DB_USER=your_username
DB_PASS=your_password
DB_HOST=localhost
DB_PORT=3306
DB_NAME=analytics
接着,创建一个 config.py 文件,用于数据库连接:
from dotenv import load_dotenv
import os
load_dotenv()
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")
CONN_STR = (
f"mysql+pymysql://{DB_USER}:{DB_PASS}@"
f"{DB_HOST}:{DB_PORT}/{DB_NAME}"
)
一切就绪后,我们便可以用 Dask 创建端到端数据处理管道。
数据管道搭建(Dask)
要搭建数据管道,我们将用到 Luigi 这个 Python 库,它通常用于构建批处理作业的复杂管道。在本例中,我们使用 Luigi 创建一个利用 Dask 将 CSV 数据导入数据库、用 Dask 进行转换并再次加载回数据库的管道。
首先,我们在名为 luigi_pipeline.py 的 Python 文件中编写用于创建数据库的相关代码。导入所有必要库,并创建一个任务以建立数据库:
import luigi
from luigi import LocalTarget, Parameter, IntParameter
from sqlalchemy import create_engine, text
import pandas as pd
from dask import delayed
import dask.dataframe as dd
from config import DB_USER, DB_PASS, DB_HOST, DB_PORT, DB_NAME, CONN_STR
class CreateDatabase(luigi.Task):
def output(self):
return LocalTarget("tmp/db_created.txt")
def run(self):
engine = create_engine(
f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/"
)
with engine.connect() as conn:
conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}"))
self.output().makedirs()
with self.output().open("w") as f:
f.write("ok")
上述代码将在数据库不存在时自动创建一个新数据库。我们将在后续基于 Dask 的 CSV 数据导入管道中用到这个类。
接下来,设置 CSV 导入任务:
class IngestCSV(luigi.Task):
csv_path = Parameter()
table_name = Parameter(default="ds_salaries")
def requires(self):
return CreateDatabase()
def output(self):
return LocalTarget("tmp/ingest_done.txt")
def run(self):
url_no_db = f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/"
engine0 = create_engine(url_no_db)
with engine0.connect() as conn:
conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}"))
ddf = dd.read_csv(self.csv_path, assume_missing=True)
engine = create_engine(CONN_STR)
empty = ddf.head(0)
empty.to_sql(self.table_name, con=engine, if_exists="replace", index=False)
def append_part(pdf):
pdf.to_sql(self.table_name, con=engine, if_exists="append", index=False)
ddf.map_partitions(append_part, meta=()).compute()
with self.output().open("w") as f:
f.write("ok")
在上述代码中,我们使用 Dask 读取 CSV 文件并将其写入数据库。Dask 的并行化处理极大提升了数据读取和写入效率。
接下来,我们将 CSV 数据导入步骤嵌入到 ETL 转换任务中:
class TransformETL(luigi.Task):
csv_path = Parameter()
table_name = Parameter(default="ds_salaries")
chunk_size = IntParameter(default=100_000)
def requires(self):
return IngestCSV(csv_path=self.csv_path,
table_name=self.table_name)
def output(self):
return LocalTarget("tmp/etl_done.txt")
def run(self):
engine = create_engine(CONN_STR)
# 1. 统计总行数用于分块
with engine.connect() as conn:
total = conn.execute(
text(f"SELECT COUNT(*) FROM {self.table_name}")
).scalar()
# 2. 构建分块延迟任务
@delayed
def load_chunk(offset, limit):
return pd.read_sql(
f"SELECT * FROM {self.table_name} LIMIT {limit} OFFSET {offset}",
engine
)
parts = [
load_chunk(i * self.chunk_size, self.chunk_size)
for i in range((total // self.chunk_size) + 1)
]
# 3. 加载零行元数据并转换数据类型
meta = (
pd.read_sql(f"SELECT * FROM {self.table_name} LIMIT 0", engine)
.astype({
"work_year": "int64",
"salary": "float64",
"salary_in_usd": "float64",
"remote_ratio": "int64",
# 其他字段默认 object 类型
})
)
# 4. 用修正后的元数据创建 Dask DataFrame
ddf = dd.from_delayed(parts, meta=meta)
# 5. 用 Dask 进行数据清洗和过滤
ddf = (
ddf
.dropna(subset=["salary_in_usd"])
.assign(
salary_in_usd=ddf["salary_in_usd"].astype(float)
)
)
# 6. 仅保留全职岗位
ddf = ddf[ddf["employment_type"] == "FT"]
# 7. 按 1 万美元为单位计算薪资区间
bracket = (ddf["salary_in_usd"] // 10_000).astype(int) * 10_000
ddf = ddf.assign(salary_bracket=bracket)
# 8. 聚合:按年份计算平均薪资
result = (
ddf.groupby("work_year")["salary_in_usd"]
.mean()
.compute()
.reset_index()
.rename(columns={"salary_in_usd": "avg_salary_usd"})
)
# 9. 存储结果
result.to_sql("avg_salary_by_year",
con=engine, if_exists="replace", index=False)
with self.output().open("w") as f:
f.write("ok")
上述代码主要利用 Dask 完成了以下任务:
- 分块从数据库读取数据集
- 设置元数据并创建 Dask DataFrame
- 用 Dask 进行数据过滤和清洗
- 用 Dask 进行数据转换
- 将结果写回数据库
至此,数据管道已经搭建完毕。可以用如下命令执行:
python luigi_pipeline.py TransformETL --csv-path data\ds_salaries.csv
运行后会得到如下输出信息:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 complete ones were encountered:
- 1 TransformETL(csv_path=data\ds_salaries.csv, table_name=ds_salaries, chunk_size=100000)
Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies
你还可以通过启动 Luigi UI 查看管道运行情况:
luigid
在仪表盘中可以看到如下输出效果。
构建端到端的数据处理管道(Dask)
如果一切顺利,你可以看到管道运行成功,并能在数据库中查询结果:
SELECT * FROM analytics.avg_salary_by_year;
结果如下所示:
构建端到端数据处理管道(Dask)
现在,你已经成功构建了一个基于 Dask 的端到端数据处理管道。所有代码已保存在以下 GitHub 仓库中。
结论
构建数据处理管道是数据专业人士必须掌握的重要技能,尤其是利用 Dask 这类提升数据处理和操作效率的工具。本文带你学习了如何从数据摄取到结果落库,完整搭建端到端的数据处理管道。
希望本教程对你有所帮助!