最近团长工作中收到了后台回归测试的任务,火急火燎的写了几个主要流程的接口自动化回归脚本。刚开始想着就几个接口那就直接调用吧,写出来是这样的:

class ops_ris():
def __init__(self,env,phase_no='IG_TELCHECK',risk_desc='AGENCY'):
self.env=env
self.phase_no=phase_no
self.risk_desc=risk_desc
def investigation(self):
"""调查处理无风险经办结案"""
#转派
self.transfercase()
#查询最近一笔xiejinggeng名下经办待处理案件号
# sql = "SELECT case_no FROM ig_case where handle_user='xiejinggeng'  and state in ('TCED','TCW')  ORDER BY id desc limit 1"
# self.case_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["case_no"]
# 查询最近一笔case_no案件号IG_TELCHECK阶段对应的task_no
sql=f"select task_no from wf_task where biz_no='{self.case_no}' and phase_no='{self.phase_no}' order by id desc limit 1"
task_no=DBmananger(self.env, 'ris').callMysql(sql)[0]["task_no"]
#登录获取sid
self.sid=get_sid(self.env)
#调查经办结案
method,bizContent=invest_first(self.case_no,task_no)
self.header=headers(self.sid)
self.url=envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json', method='post',headers=self.header)
if res['flag'] =="S":
logger.debug(f"案件{self.case_no}已经办处理成功")
else:
logger.error(f"案件{self.case_no}经办处理失败,失败原因:{res['msg']}")
test_name = ["中介类型调查经办处理", "盗用类型调查经办处理", "伪冒类型调查经办处理", "其他类型调查经办处理"]
if self.risk_desc == "AGENCY":
test_Name = test_name[0]
elif self.risk_desc == "THEFT":
test_Name = test_name[1]
elif self.risk_desc == "FAKE":
test_Name = test_name[2]
else:
test_Name = test_name[3]
return {
"测试用例名称":test_Name,
"测试结果":"pass" if res['flag'] =="S" else "fail",
"原因":res["msg"],
"url":self.url,
"res": res,
"param":GetCommonContent(method, bizContent)
}
def transfercase(self):
"""调查案件案件转派"""
sql=f"select case_no from ig_case where risk_desc='{self.risk_desc}' and state in ('TCED','TCW')  ORDER BY id desc limit 1"
self.case_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["case_no"]
sql=f"select task_no,biz_no from wf_task where phase_no='{self.phase_no}' and biz_no='{self.case_no}' ORDER BY id desc limit 1"
task_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["task_no"]
self.case_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["biz_no"]
method, bizContent = transfer(self.case_no, task_no)
# 登录获取sid
self.sid = get_sid(self.env)
self.header = headers(self.sid)
self.url = envhost[self.env]
self.res=apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',method='post', headers=self.header)
# 调查转派
if self.res['flag'] =="S":
logger.debug(f"案件{self.case_no}已经转派成功")
else:
logger.error(f"案件{self.case_no}经办转派失败,失败原因:{res['msg']}")
return {
"测试用例名称": "调查案子转派",
"测试结果": "pass" if self.res['flag'] == "S" else "fail",
"原因": self.res["msg"],
"url": self.url,
"res": self.res,
"param": GetCommonContent(method, bizContent)
}
def review(self):
"""调查案件复核"""
test_name = ["中介类型调查复核处理", "盗用类型调查复核处理", "伪冒类型调查复核处理", "其他类型调查复核处理"]
if self.risk_desc == "AGENCY":
test_Name = test_name[0]
elif self.risk_desc == "THEFT":
test_Name = test_name[1]
elif self.risk_desc == "FAKE":
test_Name = test_name[2]
else:
test_Name = test_name[3]
if self.res['flag'] == "F":
return {
"测试用例名称": test_Name,
"测试结果": "skip" ,
"原因": "经办失败跳过复核",
"url": self.url,
"res": '',
"param": ''
}
else:
#检测案件状态,结案直接pass  复核阶段调复核处理
sql=f"select state from ig_case where case_no='{self.case_no}' "
state = DBmananger(self.env, 'ris').callMysql(sql)[0]["state"]
if state=="PS":
logger.debug(f"案件{self.case_no}已经结案")
else:
sql=f"select task_no,state from wf_task where phase_no='IG_LASTINSTANCE' and biz_no='{self.case_no}' ORDER BY id desc limit 1"
task_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["task_no"]
sql=f"select remark from m_opinion where case_no='{self.case_no}' and phase_no='IG_TELCHECK'"
remark=DBmananger(self.env, 'ris').callMysql(sql)[0]["remark"]
method, bizContent = transfer(self.case_no, task_no)
# 登录获取sid
self.sid = get_sid(self.env)
self.header = headers(self.sid)
self.url = envhost[self.env]
apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',method='post', headers=self.header)
method, bizContent = inreview(self.case_no, task_no,remark)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json', method='post', headers=self.header)
# 调查复核结案
if res['flag'] == "S":
logger.debug(f"案件{self.case_no}结案成功")
else:
#再处理一次复核,有缓存
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json', method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.case_no}结案成功")
else:
logger.debug(f"案件{self.case_no}结案失败,原因:{res['msg']}")
# test_name = ["中介类型调查复核处理", "盗用类型调查复核处理", "伪冒类型调查复核处理", "其他类型调查复核处理"]
# if self.risk_desc == "AGENCY":
#     test_Name = test_name[0]
# elif self.risk_desc == "THEFT":
#     test_Name = test_name[1]
# elif self.risk_desc == "FAKE":
#     test_Name = test_name[2]
# else:
#     test_Name = test_name[3]
return {
"测试用例名称": test_Name,
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
class tel():
def __init__(self,env,investigate="Y",investigateType1="N",investigateType2="N",investigateType3="Y",investigateType4="N",biz_no=''):
self.env = env
#调查类型:investigateType1-中介,investigateType2-盗用,investigateType3-伪冒,investigateType4-其他
self.investigate=investigate
self.investigateType1=investigateType1
self.investigateType2 = investigateType2
self.investigateType3 = investigateType3
self.investigateType4 = investigateType4
self.biz_no=biz_no
def transfercase(self):
"""电核案件转派"""
#倒序查xiejinggeng名下电核最新一笔电核案子
if self.biz_no=='':
sql = "select user_no,task_no,biz_no from wf_task where  phase_no='AP_TELCHECK' and switch_flag='SW_AP_360_JIE' and state='DISPATHED'  ORDER BY id DESC  limit 1"
else:
sql=f"select user_no,task_no,biz_no from wf_task where biz_no ='{self.biz_no}' and phase_no='AP_TELCHECK' ORDER BY id DESC limit 1"
self.user_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["user_no"]
self.task_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["task_no"]
self.biz_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["biz_no"]
if self.user_no=="xiejinggeng":
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = transfer_tel(self.biz_no,self.task_no,'genggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
elif self.user_no=="genggeng":
# 登录获取sid
self.sid = get_sid(self.env,login_name='genggeng')
method, bizContent = transfer_tel(self.biz_no, self.task_no, 'xiejinggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
else:
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = transfer_tel(self.biz_no, self.task_no, 'genggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
return {
"测试用例名称": "电核案件转派",
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def tel_check(self):
"""电话核查"""
#查下案子在谁名下
sql=f"select user_no,task_no,biz_no from wf_task where   phase_no='AP_TELCHECK' ORDER BY id DESC limit 1"
self.user_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["user_no"]
self.task_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["task_no"]
self.biz_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["biz_no"]
# 登录获取sid
self.sid = get_sid(self.env,login_name=self.user_no)
method, bizContent =  tel_check(self.biz_no,self.task_no,self.investigateType1,self.investigateType2,self.investigateType3,self.investigateType4,self.investigate)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',
method='post', headers=self.header)
#investigateType1-中介,investigateType2-盗用,investigateType3-伪冒,investigateType4-其他
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}电核成功")
else:
logger.debug(f"案件{self.biz_no}电核失败,原因:{res['msg']}")
test_name=["电核转中介类型调查","电核转盗用类型调查","电核转伪冒类型调查","电核转其他类型调查"]
if self.investigateType1=="Y":
test_Name=test_name[0]
elif self.investigateType2=="Y":
test_Name=test_name[1]
elif self.investigateType3=="Y":
test_Name=test_name[2]
else:
test_Name = test_name[3]
return {
"测试用例名称": test_Name,
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def final_jud(self):
"""终审"""
#先转派
sql = f"select user_no,task_no,biz_no from wf_task where biz_no ='{self.biz_no}' and phase_no='AP_LASTINSTANCE' ORDER BY id DESC limit 1"
self.user_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["user_no"]
self.task_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["task_no"]
self.biz_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["biz_no"]
if self.user_no=="xiejinggeng":
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = transfer_tel(self.biz_no,self.task_no,'genggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
elif self.user_no=="genggeng":
# 登录获取sid
self.sid = get_sid(self.env,login_name='genggeng')
method, bizContent = transfer_tel(self.biz_no, self.task_no, 'xiejinggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
else:
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = transfer_tel(self.biz_no, self.task_no, 'genggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
#终审
# 查下案子在谁名下
sql = f"select user_no,task_no,biz_no from wf_task where biz_no ='{self.biz_no}' and phase_no='AP_LASTINSTANCE' ORDER BY id DESC limit 1"
self.user_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["user_no"]
self.task_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["task_no"]
self.biz_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["biz_no"]
# 登录获取sid
self.sid = get_sid(self.env, login_name=self.user_no)
method, bizContent = fina_check(self.biz_no,self.task_no)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent),
parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}终审成功")
else:
logger.debug(f"案件{self.biz_no}终审失败,原因:{res['msg']}")
test_name = ["终审转中介类型调查", "终审转盗用类型调查", "终审转伪冒类型调查", "终审转其他类型调查"]
if self.investigateType1 == "Y":
test_Name = test_name[0]
elif self.investigateType2 == "Y":
test_Name = test_name[1]
elif self.investigateType3 == "Y":
test_Name = test_name[2]
else:
test_Name = test_name[3]
return {
"测试用例名称":  test_Name,
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
class Inquisition():
def __init__(self,env):
self.env=env
def Inquisitiontransfer(self):
"""征审质检转派"""
#查询最新待质检案子
sql="select appl_no from qc_case where source_channel='APV' and state='W_QC' and case_type='A24' order by id desc limit 1"
self.appl_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["appl_no"]
#转派
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = transfer_opinion(self.appl_no,"xiejinggeng")
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent),
parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.appl_no}征审质检转派给xiejinggeng成功")
else:
logger.debug(f"案件{self.appl_no}征审质检转派给xiejinggeng失败,原因:{res['msg']}")
return {
"测试用例名称": "征审质检案件转派",
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def Inquisition_qc_case(self):
'''征审质检'''
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = opinion(self.appl_no)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent),
parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.appl_no}征审质检成功")
else:
logger.debug(f"案件{self.appl_no}征审质检失败,原因:{res['msg']}")
return {
"测试用例名称": "征审质检",
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def Inquisition_feedback(self):
'''征审质检争议处理'''
# 登录获取sid
sql=f"select tel_check_user from qc_case where appl_no ={self.appl_no}"
self.tel_check_user = DBmananger(self.env, 'ris').callMysql(sql)[0]["tel_check_user"]
self.sid = get_sid(self.env,login_name=self.tel_check_user)
method, bizContent = feedback(self.appl_no)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent),
parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.appl_no}征审质检争议处理成功")
else:
logger.debug(f"案件{self.appl_no}征审质检争议处理失败,原因:{res['msg']}")
return {
"测试用例名称": "征审质检争议处理",
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def Inquisition_opinionend(self):
'''征审质检定案'''
# 登录获取sid
self.sid = get_sid(self.env, login_name=self.tel_check_user)
method, bizContent = opinionend(self.appl_no)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent),
parammode='json',
method='post', headers=self.header)
sql=f"select state from qc_case where appl_no ={self.appl_no}"
self.state = DBmananger(self.env, 'ris').callMysql(sql)[0]["state"]
if res['flag'] == "S" and self.state=='VDT':
logger.debug(f"案件{self.appl_no}征审质检定案成功")
else:
logger.debug(f"案件{self.appl_no}征审质检定案失败,原因:{res['msg']},案件状态:{self.state}")
return {
"测试用例名称": "征审质检定案",
"测试结果": "pass" if res['flag'] == "S" and self.state=='VDT' else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def nowtime():
now_time = datetime.datetime.now()
now = datetime.datetime.strftime(now_time, '%Y-%m-%d %H:%M:%S')
# date_now=datetime.datetime.strftime(now_time, '%Y-%m-%d')
date = (int(time.mktime(time.strptime(now, "%Y-%m-%d %H:%M:%S"))))
return now, date

把接口数据直接写到程序里面了,运行起来很爽。不过写过的都知道维护会很麻烦,接口要是改动了要改代码,也不好扩展,还是数据分离吧,借鉴httprunner的方式,将用例放到yaml维护吧(放到excle也可以,之前有些过这种感兴趣的可以翻阅这个文档添加链接描述)
格式如下:

- CaseNo: 1
Extract:
biz_no:
sql-apv_wf: select ***
taskNo:
sql-apv_wf: select ***
user: 'genggeng'
headers:
fun: get_sid(login_name='xiejinggeng')
CaseNmae: 电核转派
Url: ''
Method: post
param:
method: qihoo.apv.ma.caseturn.update
bizContent:
taskNoList:
- key: $biz_no$
value: $taskNo$
productCode: 360借条
toUser: $user$
headers:
Cookie: $headers$

介绍:
Extract这个key里面的是变量相关的,支持sql操作、自定义函数,可通过正则区分sql还是自定义函数,如:sql-开头判断是数据库操作的,后面接操作的库名apv_wf;fun则是自定义函数,到置顶目录去查找并执行get_sid(login_name=‘xiejinggeng’)。变量用 变 量 变量 包括起来,然后替换。如下

from quality_management_logic.ops_new.PublicCenter.Case_Get_Data import *
from main import BASE_DIR
from quality_management_logic.commonCenter.logUtil import  getlog
from quality_management_logic.ops.publicCenter import apiRequests
from quality_management_logic.ops.dataCenter.env import envhost
logger = getlog(targetName='RunCase')
#yaml文件list
casepath=f"{BASE_DIR}/TestCaseCenter/"
casefilelist=File_Name(casepath)
def Run(env):
'''
执行用例的入口
:return:
'''
#读yaml用例文件转换成json格式数据
for yamlfile in casefilelist:
caselist=yaml_r(yamlfile)
logger.debug(f"获取用例list:{caselist}")
#用例中用到的sql、函数预执行替换成新用例
for case in caselist:
#存在变量就替换,反之直接执行用例
if 'Extract' in case.keys():
logger.debug(f"用例:{case.get('CaseNo')}存在变量:{case.get('Extract').keys()}")
for key in case.get('Extract').keys():
extract=to_extract(key)
#是sql
if GetSql(case.get('Extract')[key],env=env):
extract_value= GetSql(case.get('Extract')[key],env=env)
#是自定义函数
elif GetFun(case.get('Extract')[key],pattern='fun'):
extract_value = GetFun(case.get('Extract')[key],pattern='fun')
#是常量
else:
extract_value=case.get('Extract')[key]
#替换
# update_allvalues(caselist, key, extract_value)
update_allvalue(caselist, extract, extract_value)
logger.debug(f"获取用例list:{caselist}")
#开始执行替换后的测试用例
for case in caselist:
url=envhost[env]
param=case["param"]
headers=case["headers"]
res = apiRequests.callInterface(url=url, param=param, parammode='json', method='post', headers=headers)
if __name__=='__main__':
print(Run(env='stg1'))

大功告成

本文地址:https://blog.csdn.net/kairui_guxiaobai/article/details/110472505