SiriBlog

siriyang的个人博客


  • 首页

  • 排行榜

  • 标签111

  • 分类33

  • 归档285

  • 关于

  • 搜索

个人数据中心:数据采集模块设计与实现

发表于 2022-11-30 更新于 2023-01-30 分类于 计算机 , 技术 , Python 阅读次数: Valine:
本文字数: 8.6k 阅读时长 ≈ 8 分钟

  本文承接个人数据中心解决方案,实现系统架构中的数据采集模块,详细讲解数据的采集以及调度方案。

数据采集项目结构

1
2
3
4
5
6
7
8
9
10
data_acquisition
├── LeanCloudAPI.py
├── LeanCloudDA_SiriBlog.py
├── NotionAPI.py
├── NotionDBDA_BookList.py
├── NotionDBDA_GamePurchaseList.py
├── NotionDBDA_VideoList.py
├── ...
├── crontab_run01.sh
└── crontab_run02.sh

  上面是我数据采集项目data_acquisition文件夹下的目录结构,包含了分别从Notion以及LeadCloud上采集数据的py脚本以及crontab定时调度的sh脚本。

  调度系统的设计方案之前在个人数据中心解决方案一文中已经初步阐述。

  我们需要做的就是在本地开发主机上创建该文件夹进行项目开发,然后参照下面这篇文章的方案将项目推送到远程服务器的指定文件夹进行部署:推送项目到远程仓库并触发执行sh脚本

数据采集脚本

  由于第三方数据服务中Notion数据库的使用占比较高,本文将以Notion数据表中的书单表采集脚本实现为例,其他数据源可按照类似的框架进行实现。

  要是用Notion API首先需要创建一个机器人:https://www.notion.so/my-integrations

  点击创建好的机器人可以看到他的一些详细信息,其中token是我们之后需要用到的。你也可以在该页面配置机器人的各种权限。

  然后将创建好的机器人配置到你想要采集的数据表上,这样它才有该表的读写权限。

  参照Notion API官方文档上的案例即可开始编程:https://developers.notion.com/reference/intro

  由于Notion API返回到都是json字符串,需要进行复杂的数据解析。为了简化从Notion中采集数据的代码,我将一些可复用的代码进行封装,统一放在NotionAPI.py里。如下所示,分别实现了全表的获取以及个类型数据解析的函数:

NotionAPI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import requests
import json
import time
import operator
from datetime import datetime,timedelta
import pandas as pd

# 解析标题字段
def NotionPropertyParse_Title(json_data):
res=None
if len(json_data["title"])!=0:
res=json_data["title"][0]["plain_text"]
return res

# 解析文本字段
def NotionPropertyParse_Text(json_data):
res=None
if len(json_data["rich_text"])!=0:
res=""
for s in json_data["rich_text"]:
res+=s["plain_text"]
return res

# 解析数字字段
def NotionPropertyParse_Number(json_data):
res=json_data["number"]
return res

# 解析日期字段,获取开始与结束日期
def NotionPropertyParse_Date(json_data):
res=None
if json_data["date"]!=None:
res=[json_data["date"]["start"],json_data["date"]["end"]]
return res

# 解析日期字段,获取开始日期
def NotionPropertyParse_Date_Start(json_data):
res=None
if json_data["date"]!=None:
res=json_data["date"]["start"]
return res

# 解析日期字段,获取结束日期
def NotionPropertyParse_Date_End(json_data):
res=None
if json_data["date"]!=None:
res=json_data["date"]["end"]
return res

# 解析单选字段
def NotionPropertyParse_Select(json_data):
res=None
if json_data["select"]!=None:
res=json_data["select"]["name"]
return res

# 解析多选字段
def NotionPropertyParse_Multi_Select(json_data):
res=[]
if len(json_data["multi_select"])!=0:
for i in json_data["multi_select"]:
res.append(i["name"])
return res

# 解析获取全表数据
def NotionGetAllRecord(dbid,NotionAPI,token,Notion_Version,filter=None,sorts=None):

results=[]

body={}
if filter != None:
body['filter']=filter
if sorts != None:
body['sorts']=sorts

r=requests.request(
"POST",
NotionAPI+"databases/"+dbid+"/query",
headers={
"Authorization":"Bearer "+token,
"Notion-Version":Notion_Version
},
json=body
)

json_data=json.loads(r.text)

results+=json_data["results"]

while json_data["has_more"]:
start_cursor=json_data["next_cursor"]
body={}
if filter != None:
body['filter']=filter
if sorts != None:
body['sorts']=sorts
body['start_cursor']=start_cursor

r=requests.request(
"POST",
NotionAPI+"databases/"+dbid+"/query",
headers={
"Authorization":"Bearer "+token,
"Notion-Version":Notion_Version
},
json=body
)

json_data=json.loads(r.text)

results+=json_data["results"]

return results

  基于上面的工具函数,我们在接下来的开发过程中就能简化很多代码。

  由于Notion中的数据格式与MySQL的数据格式不同,这里需要进行数据格式的转换:

字段名 Notion数据类型 MySQL数据类型 备注
id 文本 String 页面id
book_name 标题 String 书名
auther_name 文本 String 作者名
auther_country 文本 String 作者国籍
book_type 多选 String 图书类型
book_price 数字 Float 图书价格
book_patform 多选 String 阅读平台
read_datetime_start 日期 DateTime 阅读开始时间
read_datetime_end 日期 DateTime 阅读结束时间
book_page_readed 数字 Float 已读页数
book_page_all 数字 Float 总页数
status 单选 String 阅读状态

  由于我们是采用全量同步的方式进行采集,所以我选择使用pandas的接口来进行向MySQL的整表数据覆写,每次调用会将之前的数据表删掉,然后基于新的数据创建一张新表。

NotionDBDA_BookList.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import requests
import json
import time
import operator
from datetime import datetime,timedelta
import pandas as pd
from sqlalchemy import create_engine,types
import pymysql

from NotionAPI import *

token="your_notion_robot_token" # Notion机器人token
NotionAPI="https://api.notion.com/v1/"
Notion_Version="2021-08-16"

book_list_dbid="your_notion_datatable_id" # 待采集的Notion数据表id

db_user="your_user_name" # 用于连接数据库的用户名
db_passwd="your_password" # 用于连接数据库的用户密码
db_dbname="your_database_name" # 数据库中待连接的库名
db_hostname="your_hostname" # 数据库主机地址

def GetBookListAllRecord():

# 从Notion获取书单所有记录
results=NotionGetAllRecord(book_list_dbid,NotionAPI,token,Notion_Version)

# 解析字段
object_id=[]
book_name=[]
auther_name=[]
auther_country=[]
book_type=[]
book_price=[]
book_patform=[]
read_datetime_start=[]
read_datetime_end=[]
book_page_readed=[]
book_page_all=[]
status=[]
for i in results:

object_id.append(i["id"])

book_name.append(NotionPropertyParse_Title(i["properties"]["书名"]))

auther_name.append(NotionPropertyParse_Text(i["properties"]["作者"]))

auther_country.append(NotionPropertyParse_Select(i["properties"]["国家"]))

book_type.append(",".join(NotionPropertyParse_Multi_Select(i["properties"]["类型"])))

book_price.append(NotionPropertyParse_Number(i["properties"]["价格"]))

book_patform.append(",".join(NotionPropertyParse_Multi_Select(i["properties"]["平台"])))

read_datetime_start.append(NotionPropertyParse_Date_Start(i["properties"]["阅读日期"]))

read_datetime_end.append(NotionPropertyParse_Date_End(i["properties"]["阅读日期"]))

book_page_readed.append(NotionPropertyParse_Number(i["properties"]["已阅读量"]))

book_page_all.append(NotionPropertyParse_Number(i["properties"]["总量"]))

status.append(NotionPropertyParse_Select(i["properties"]["状态"]))

# 创建pandas数据表
df=pd.DataFrame({"id":object_id,"book_name":book_name,"auther_name":auther_name,"auther_country":auther_country,"book_type":book_type,"book_price":book_price,"book_patform":book_patform,"read_datetime_start":read_datetime_start,"read_datetime_end":read_datetime_end,"book_page_readed":book_page_readed,"book_page_all":book_page_all,"status":status})

# 连接数据库
con_engine = create_engine('mysql+pymysql://'+db_user+':'+db_passwd+'@'+db_hostname+':3306/'+db_dbname+'?charset=utf8')

# 定义数据表字段类型约束
dtype={"id":types.String(length=255),
"book_name":types.String(length=255),
"auther_name":types.String(length=255),
"auther_country":types.String(length=255),
"book_type":types.String(length=255),
"book_price":types.Float(),
"book_patform":types.String(length=255),
"read_datetime_start":types.DateTime(),
"read_datetime_end":types.DateTime(),
"book_page_readed":types.Float(),
"book_page_all":types.Float(),
"status":types.String(length=255)
}

# 写入数据到MySQL
df.to_sql('dim_notiondb_book_list_info', con_engine, dtype=dtype, if_exists='replace', index = False)


if __name__ == "__main__":
bt=time.time()
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),"开始采集dim_notiondb_book_list_info")

GetBookListAllRecord()

print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),"dim_notiondb_book_list_info采集结束",time.time()-bt)

定时调度脚本

  首先我们在本地项目中编写好每日零点执行的sh脚本,在该脚本中制定一个或多个我们要执行的采集任务脚本:

crontab_run01.sh
1
2
3
4
5
6
# 0 0 * * *

DA01_HOME_PATH=/your_path/data_acquisition
DA01_PYTHON_PATH=/your_path/miniconda3/bin/python

$DA01_PYTHON_PATH "$DA01_HOME_PATH/NotionDBDA_BookList.py" >> "$DA01_HOME_PATH/crontab_out.log" 2>&1

  以后所有在零点执行的任务都可以写在上面这个sh脚本中。在一个脚本中执行多个任务的时候注意异常处理,否在中间的任务失败可能会导致后面的任务全部中断执行。
  在这里我还为环境变量加上一个编号使其与其他同样零点执行的脚本中的变量区分开,防止发生干扰,产生意料之外的结果。

  接下来在远程服务器端进行crontab命令配置。crontab是Linux系统通常会默认自带的一个工具,可以直接使用。

  打开crontab命令编辑界面:

1
crontab -e

  这其实就是一个vim编辑界面,使用vim的操作指令进行输入保存和推出。

  具体写入内容如下:

1
2
3
0 0 * * * sh /your_path/data_acquisition/crontab_daily_report.sh # 每日播报脚本
0 0 * * * sh /your_path/data_acquisition/crontab_run01.sh # 每日凌晨调度
58 * * * * sh /your_path/data_acquisition/crontab_run02.sh # 每小时58分调度

  上面我编写了三条定时任务命令,分别是在每天零点执行的播报任务和数据采集任务,以及每小时的58分执行一次的数据采集任务。
  该命令配置好以后,只要我们调度的时间不变,剩下的开发我们就只需在本地项目中进行文件修改和远程推送部署,不需要再连接远程服务器进行任务配置。

  至此,整个数据采集系统就完成了,该系统不仅仅能进行数据采集,还能执行数据播报以及数据备份等更多定时调度任务。

  相关代码已托管于GitHub,欢迎Star!

-------- 本文结束 感谢阅读 --------
相关文章
  • 个人数据中心:获取Switch游戏记录
  • 个人数据中心:获取Steam游戏记录
  • 个人数据中心:数据备份模块
  • 个人数据中心:阅读记录模块开发
  • 个人数据中心:微信机器人接入及后端开发
觉得文章写的不错的话,请我喝瓶怡宝吧!😀
SiriYang 微信支付

微信支付

SiriYang 支付宝

支付宝

  • 本文标题: 个人数据中心:数据采集模块设计与实现
  • 本文作者: SiriYang
  • 创建时间: 2022年11月30日 - 15时11分
  • 修改时间: 2023年01月30日 - 16时01分
  • 本文链接: https://blog.siriyang.cn/posts/20221130154306id.html
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
效率 知识管理 Python
个人数据中心:微信机器人接入及后端开发
Docker:基于base_centos安装SuperSet
SiriYang

SiriYang

大数据研究生
285 日志
32 分类
86 标签
RSS
GitHub E-Mail
Creative Commons
Links
  • 友情链接
  • 打赏记录
  • 作品商铺

蜀ICP备19008337号 © 2019 – 2023 SiriYang | 1.6m | 23:34
0%