SiriBlog

siriyang的个人博客


  • 首页

  • 排行榜

  • 标签111

  • 分类33

  • 归档285

  • 关于

  • 搜索

个人数据中心:微信机器人接入及后端开发

发表于 2022-11-30 更新于 2023-01-30 分类于 计算机 , 技术 , Python 阅读次数: Valine:
本文字数: 10k 阅读时长 ≈ 9 分钟

  本文承接个人数据中心解决方案,实现系统架构中的数据应用模块,详细讲解企业微信机器人的申请配置以及后端系统框架的开发。

企业微信机器人

  首先需要在企业微信官网注册一个个人的企业微信账号,并将该企业微信账号于自己的微信绑定,跟着引导流程填写相应的信息即可完成:https://work.weixin.qq.com/

  注册完成之后即可通过网页进入企业微信的管理控制台,企业微信的大部分配置只能通过该控制台实现,客户端无法完成。

机器人创建及配置

  进入控制台的应用管理页面,下翻找到“创建应用”选项进行机器人创建。
  这里需要注意的一点是企业微信支持创建两种机器人:

  • 群内机器人:功能相对单一,且仅能挂载到群聊里进行使用,没有单独的会话窗口;
  • 自建应用:功能完善且丰富,既拥有独立的会话窗口可以进行私聊,也可以挂载到群聊里使用。

  这里的话我们当然选择功能更为强大的自建应用进行使用。

  点击创建根据引导完成应用基础信息的填写,之后这些信息也都可以在应用详情界面修改。
  如下图所示进入应用主页可以看到应用的ID和密钥,以及各种配置选项和功能模块。我们主要使用的是它的接收消息功能,应用可以将我们发送给它的消息转发到我们自己的服务器上进行处理。

  进入API接收消息配置界面,可以看到有三个输入框,其中第一个是我们自己搭建的后端服务程序路由,剩下两个是我们在后续服务程序开发过程中在验证该应用消息时需要使用的token和密钥。

微信接入企业微信

  在应用创建好以后,为了后续使用过程中的便利,企业微信提供了微信插件,支持在微信中直接访问企业微信会话。
  在控制台界面进入“我的企业>微信插件”即可找到。

  在该页面可以配置插件的一些基础信息,使用你要绑定的微信扫描下方的邀请码即可在微信中安装插件。

  企业微信接入以后会以一个独立会话的形式存在,并且可以支持置顶,这样我们每次打开微信就能快速找到输入入口。

后端系统开发

  如之前个人数据中心解决方案所说,我将后端系统设计为一下几个模块,但是在实际开发实现中,由于每个第三方服务的消息接收和应答都是不同的,所以我把接收应答封装在一起,消息分发和消息处理也各封装一个类。

项目文件结构

  先来看看项目文件夹下的文件结构:

1
2
3
4
5
6
7
8
9
10
11
12
.
├── MsgAct_Help.py # 帮助命令处理模块
...
├── MsgAct_ReadRecord.py # 阅读记录命令处理模块
├── MsgProcessor2.py # 消息分发处理类
├── NotionAPI.py # Notion API封装函数
├── QiYeWeiXinAPI.py # 自定义企业微信webhook接口
├── WXBizMsgCrypt3.py # 企业微信官方的消息加、解密工具函数
├── ierror.py # 企业微信官方的错误代码变量定义
├── app.py # flask程序启动入口
├── run.sh # flask程序启动脚本
└── utils.py # 自定义工具函数

  项目中的WXBizMsgCrypt3.py和ierror.py是微信官方提供的,直接下载使用即可。

  • WXBizMsgCrypt3.py
  • ierror.py

  另外我也自己封装了一些工具函数,对于各个消息处理模块我也都解藕成单独的类文件,这里以帮助命令处理模块为例为大家介绍整个后端系统的处理流程,其他应用模块会单独撰文介绍。

消息收发模块

  由于我并不擅长后端开发,所以选择使用python的flask框架,可以说是非常简单的一个后端框架了,只用简单的几行代码就可以实现一个后端接口。
  消息收发模块是整个消息处理链路的开始和结束部分,由于各个平台接口的消息收发方式都不同,所以我将其统一封装在app.py文件中,这也是整个后端系统的启动入口。

接口验证
  在收发消息之前,要将你的后端接口绑定到微信应用,需要进行一个验证流程,代码中的VerifyURL()正是完成这个操作。

消息解密
  对于微信应用来说,当你在话框发送一条消息以后,微信应用后台将该消息转发送给你的后端系统。为了数据安全,微信应用发送的消息都是加密过的,所以我们需要使用解密函数进行解密。

消息回复
  在得到解密的明文以后我们就可以做相应的处理了,处理完的结果通过函数返回值直接响应。微信应用设置了单条消息响应时间不超过2秒,超时将会重传消息,多次重传得不到响应则会停止重传,判定消息发送失败。
  在我的消息处理模块中经常会遇到访问Notion数据库或同时处理多条命令导致处理时间超过2秒的情况。所以我用一个主线程来接收消息,每次收到消息以后就立即回复一个空字符串,应用接收到我们回复的空字符串后前端将不做反应。
  与此同时单独开辟一条线程来处理消息,待消息处理完成以后再通过微信应用的webhook来异步回复消息,解决了消息回复的时间限制。采用webhook可以在我们任何想回复的时候进行回复,比如进行定时的数据播报,或者在触发某种条件的时候进行提醒。

配置后端接口路由
  对于之前在应用配置过程中提到的后端URL链接,在这里就是:http://你的主机地址/qywx_recall/siri_bird/
  可以根据你的具体情况进行修改。

app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from flask import Flask,request,abort
from datetime import datetime
import threading
import time
import sys
import xml.etree.cElementTree as ET
from WXBizMsgCrypt3 import WXBizMsgCrypt
from MsgProcessor2 import MsgProcessor2
from QiYeWeiXinAPI import QYWXAPPSendText

app = Flask(__name__)
msgProcessor = MsgProcessor2()

sToken = "your_sToken" # 你的应用消息收发tocken
sEncodingAESKey = "your_sEncodingAESKey" #你的应用消息收发密钥
sCorpID = "your_sCorpID" #你的企业id

MAX_LENGTH=600

QYWXAPPSendText("服务启动成功!")

# 多线程并发处理消息
class MsgProcessThread (threading.Thread):
def __init__(self,threadID,name,content):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.content=content

def run(self):
print ("开始线程:" + self.name)
replyMsg=msgProcessor.processMsg(self.content)

print(replyMsg)

print("replyMsg length:",len(replyMsg))

if(len(replyMsg)>0):
msg=""
for m in replyMsg.split("\n"):
if len(msg)+len(m)>MAX_LENGTH:
QYWXAPPSendText(msg)
msg=m+"\n"
else:
msg+=m+"\n"
if(msg!=""):
QYWXAPPSendText(msg)

print ("退出线程:" + self.name)


# @app.route("/test/", methods=['GET'])
# def test():

# content="日记"
# threadID=datetime.now().strftime("%Y%m%d%H%M%S")
# threadName="Thread-"+threadID
# thread=MsgProcessThread(threadID,threadName,content)
# thread.start()

# return threadName

@app.route("/qywx_recall/siri_bird/", methods=['GET'])
def VerifyURL():

msg_signature = request.args.get("msg_signature")
timestamp = request.args.get("timestamp")
nonce = request.args.get("nonce")
echostr = request.args.get("echostr")

print("msg_signature",msg_signature)
print("timestamp",timestamp)
print("nonce",nonce)
print("echostr",echostr)
print()

wxcpt=WXBizMsgCrypt(sToken,sEncodingAESKey,sCorpID)

ret,sEchoStr=wxcpt.VerifyURL(msg_signature, timestamp,nonce,echostr)
if(ret!=0):
print("ERR: VerifyURL ret: " + str(ret))
abort(503)
else:
print("done VerifyURL")
#验证URL成功,将sEchoStr返回给企业号

print(sEchoStr)

print("==============================")

return sEchoStr

@app.route("/qywx_recall/siri_bird/", methods=['POST'])
def GetMsg():

msg_signature = request.args.get("msg_signature")
timestamp = request.args.get("timestamp")
nonce = request.args.get("nonce")
req_data= request.get_data()

print("msg_signature",msg_signature)
print("timestamp",timestamp)
print("nonce",nonce)
print("req_data",req_data)
print()

wxcpt=WXBizMsgCrypt(sToken,sEncodingAESKey,sCorpID)

ret,sMsg=wxcpt.DecryptMsg( req_data, msg_signature, timestamp, nonce)
print(ret,sMsg)
if( ret!=0 ):
print("ERR: DecryptMsg ret: " + str(ret))
abort(503)
# 解密成功,sMsg即为xml格式的明文
xml_tree = ET.fromstring(sMsg)
content = xml_tree.find("Content").text

print("===========解密成功===================")

print(content)

print("===========处理===================")

# 异步处理消息
threadID=datetime.now().strftime("%Y%m%d%H%M%S")
threadName="Thread-"+threadID
thread=MsgProcessThread(threadID,threadName,content)
thread.start()

replyMsg=""

print("===========响应===================")
sRespData = "<xml><MsgType>text</MsgType><Content>"+replyMsg+"</Content><AgentID>1000002</AgentID></xml>"
ret,sEncryptMsg=wxcpt.EncryptMsg(sRespData, nonce, timestamp)
if( ret!=0 ):
print("ERR: EncryptMsg ret: " + str(ret))
abort(503)

return sEncryptMsg

  微信应用webhook消息有多种回复形式:纯文本、卡片、markdown等等,这里我对经常使用的纯文本形式进行了函数封装,之后只需要调用函数,传入要发送的信息即可。

QiYeWeiXinAPI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import requests
import json

"""
发送Text消息到企业微信应用
"""
def QYWXAPPSendText(content):

# 获取tocken
tocken_url="https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid"+"你的企业id"+"&corpsecret="+"你的应用密钥"

response = requests.request("GET", tocken_url)

js=json.loads(response.text)

tocken=""
if(js["errmsg"]=="ok"):
tocken=js["access_token"]
else:
print("企业微信应用tocken获取失败!")
return

# 发送消息

url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token="+tocken

payload = json.dumps({
"touser" : "@all",
"msgtype" : "text",
"agentid" : 1000002, # 你的应用id
"text":{
"content":content
}
})
headers = {
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

  app.py与QiYeWeiXinAPI.py中的企业id可以在企业微信控制台的“我的企业>企业信息”页面最下方获取。

  QiYeWeiXinAPI.py中的应用密钥在应用主页的应用id下点解“查看”获取,注意不要与应用“接收消息”的解密密钥搞混了。

消息处理模块

  在实现了微信消息的接收与回复以后,接下来的问题就是如何处理消息。
  首先需要实现的是一个消息分发模块,这个模块要负责各个实际处理消息的应用模块的注册,以及将收到的消息轮询的发送给他们进行处理。
  在我的设计中,一条消息可以包含多条不同的命令由不同的模块进行处理,以便在未来的应用中可以组成一个工作流。分发模块接收到消息以后,按行进行切分,一行就是一条命令。同时还会对命令进行一定的格式处理,如去除首尾的空格,去除空行等。
  我没有将消息类型的判断放在分发模块中实现,而是将其放到了具体的消息处理模块中。这样是为了将各模块与分发模块解藕,各消息处理模块在自己内部进行是否处理这条消息的判断,使其业务逻辑更为内聚。
  因此消息分发模块会将切分后的命令发送给所有模块进行一次处理,一条消息的多个命令可以被多个模块处理并得到多条结果,最后再由分发模块把这些处理结果收集起来返回给回复模块进行回复。

MsgProcessor2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from urllib.parse import unquote

from MsgAct_Help import *
from MsgAct_ReadRecord import *

class MsgProcessor2():
def __init__(self):

# 交互动作注册列表
self.actionList=[
{"name":"帮助","action":HelpAction(self)},
{"name":"阅读记录","action":ReadRecordAction()}
]

def processMsg(self,content):
reMsg=[]

cmdList=content.split("\n") # 按行切割消息
for i in cmdList:
i=i.strip() # 去除收尾空格

# 跳过空行
if(len(i)==0):
continue

res=self.doAction(i) # 调用消息处理函数

if(res!=""):
reMsg.append(res) # 收集消息处理结果

if(len(reMsg)==0):
return ""
return "\n\n".join(reMsg).strip() # 将结果拼接并返回


def doAction(self,content):
contents=[ unquote(i.strip()) for i in content.split(" ") if len(i.strip())>0] # 将命令转化为参数列表格式

for act in self.actionList:
res=act["action"].doAction(contents) # 调用各模块的消息处理函数
if(res!=""):
return res

return ""

# 用于测试
if __name__ == "__main__":
msgProcessor=MsgProcessor2()

msg="""
帮助
"""

print(msgProcessor.processMsg(msg))

  下面这个是帮助模块,用于提示当前的系统都注册了哪些模块,并提示用户如何查询帮助文档。之后的应用模块实现也都需要按照该模块的格式提供一个doAction函数供消息分发函数统一调用。
  在各模块中也需要提供一个使用文档,以便他人或者自己忘记该模块使用方法时进行查询。

MsgAct_Help.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class HelpAction():

def __init__(self,father):
self.father=father
pass

def doAction(self,args):
res_info=""
if len(args)==1 and (args[0]=="帮助" or args[0]=="help"):
res_info="帮助文档\n\n目前已集成的模块:\n"
for act in self.father.actionList:
res_info+="* "+act["name"]+"\n"
res_info+="\n输入“帮助 模块名”查看详细信息"
return res_info

return ""

系统部署

  在系统代码都开发好以后可以参考下面这篇文章实现项目的自动化部署:推送项目到远程仓库并触发执行sh脚本

  我将后端系统的启动命令写到run.sh脚本中,之后每次项目文件推送部署好以后调用执行该文件即可。该脚本首先会关闭已经启动的系统进程,如果没有启动则不做操作,然后进程重新启动。

run.sh
1
2
3
ps -ef | grep "flask run --host 0.0.0.0 --port 8001" | grep -v grep | awk '{print $2}' | xargs kill -9 # 关闭已在运行的进程

flask run --host 0.0.0.0 --port 8001 >> nohup.out 2>&1 & disown # 使进程从git hook中脱离

注意根据自己的实际情况设置端口号。
这里偷了个懒,对于正式部署的系统是不推荐直接使用flask命令进行启动的,应该将其部署到web容器服务当中。


参考资料:

  • 企业微信官方开发文档
  • 企业微信官方加解密库

  相关代码已托管于GitHub,欢迎Star!

-------- 本文结束 感谢阅读 --------
相关文章
  • 个人数据中心:获取Switch游戏记录
  • 个人数据中心:获取Steam游戏记录
  • 个人数据中心:数据备份模块
  • 个人数据中心:阅读记录模块开发
  • 个人数据中心:数据采集模块设计与实现
觉得文章写的不错的话,请我喝瓶怡宝吧!😀
SiriYang 微信支付

微信支付

SiriYang 支付宝

支付宝

  • 本文标题: 个人数据中心:微信机器人接入及后端开发
  • 本文作者: SiriYang
  • 创建时间: 2022年11月30日 - 21时11分
  • 修改时间: 2023年01月30日 - 16时01分
  • 本文链接: https://blog.siriyang.cn/posts/20221130211917id.html
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
效率 知识管理 Python
个人数据中心:阅读记录模块开发
个人数据中心:数据采集模块设计与实现
  • 文章目录
  • 站点概览
SiriYang

SiriYang

大数据研究生
285 日志
32 分类
86 标签
RSS
GitHub E-Mail
Creative Commons
Links
  • 友情链接
  • 打赏记录
  • 作品商铺

  1. 企业微信机器人
    1. 机器人创建及配置
    2. 微信接入企业微信
  2. 后端系统开发
    1. 项目文件结构
    2. 消息收发模块
    3. 消息处理模块
  3. 系统部署
蜀ICP备19008337号 © 2019 – 2023 SiriYang | 1.6m | 23:34
0%