创建Flask项目
最简单的Flask项目
- 创建一个基本的Python项目,带虚拟运行环境
- 在【File】→【Settings】→【Project:】→【Python Interperter】安装Flask包
- 在main文件中创建初始化程序,运行测试。
from flask import Flask
app=Flask(__name__)
@app.route('/index')
def index():
return "你好Flask"
if __name__ == '__main__':
app.run(host="0.0.0.0", debug=True, port=8000)
服务运行说明
-
host
指定运行的主机,127.0.0.1表示本机运行,0.0.0.0 表示可以通过网络访问 -
dubug
指定调试模式,错误是否在网页端显示 -
port
运行的服务器端口号注意:出现”以一种访问权限不允许的方式做了一个访问套接字的尝试“错误,很有可能是端口号被占用
数据库
数据库的链接
-
下载安装pymysql
-
下载安装 Flask-SQLAlchemy
在Flask程序中一般不直接使用pymysql直接写原生SQL语句去操作数据库,更多的是通过SQLAlchemy提供的ORM技术去操作数据库。Flask-SQLAlchemy是在Flask中操作关系型数据库的拓展,是以面向模型对象的形式操作数据库,避免传统通过SQL语句进行数据库操作的复杂度和安全性。
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 数据库配置信息 HOSTNAME = "127.0.0.1" PORT = 3306 USERNAME = "root" PASSWORD = "123456" DATABASE = "flask" app.config['SQLALCHEMY_DATABASE_URI'] = f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4" db = SQLAlchemy(app)
创建类模型,通过ORM创建数据表
class User(db.Model):
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(100), nullable=False)
password = db.Column(db.String(100), nullable=False)
#自动生成表
with app.app_context():
db.create_all()
- 创建一个User数据模型类
- 调用db.create_all()在数据库中建立对应的数据表
创建一对多的链接表
class Article(db.Model):
__tablename__ = "article"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.String(100), nullable=False)
# 添加作者外键
author_id = db.Column(db.Integer, db.ForeignKey("user.id"))
# 添加关联对象属性,backref 会自动给User模型添加一个articles的属性
author = db.relationship("User", backref="articles")
-
给文章表创建一个外键author_id 对应user表中的id字段
-
添加一个author属性,对应文章的一个User,并反向在在User类中创建一个articles的属性,用于返回当前作者的所有的文章
数据库的增删改查
增加记录
- 创建要插入的对象并赋值
- 通过db.session.add 增加新插入的对象
- 通过 db.session.commit() 把回话中的元素进行提交
@app.route('/useradd/')
def userAdd():
user=User(username = "李子豪", password = "234")
db.session.add(user)
db.session.commit()
result=Result(msg="添加成功!")
resDict=vars(result) #对象转化为字典
print(resDict)
return jsonify(resDict)
@app.route('/useradd2/')
def userAdd2():
user=User()
user.username="郑振华"
user.password="456"
db.session.add(user)
db.session.commit()
result=Result(msg="添加成功!")
resDict=vars(result) #对象转化为字典
print(resDict)
return jsonify(resDict)
查找记录
获取所有数据
user=User.query.all()
根据主键查询
采用user=User.query.get(1) ,get 的参数是主键的值
@app.route('/getuser1/')
def getUser1():
user=User.query.get(31) #根据主键查找
if user is None:
res=Result(code="fail",msg="记录不存在")
return jsonify(vars(res))
else:
res=Result(msg="查找成功")
res.data.append(user.toDict())
return jsonify(vars(res))
获取第一条数据
user=User.query.first()
获取记录的总数
res=User.query.count()
根据条件查询
采用User.query.filter_by(username=“张五”) 通过 filter_by 查询条件的方式进行查询,查询的结果可能有多个值。
@app.route('/getuser2/')
def getUser2():
userLst=User.query.filter_by(username="张五")
res = Result(msg="查找成功")
for user in userLst :
res.data.append(user.toDict())
if len(res.data)<=0 :
res2=Result(code="fail",msg="记录不存在")
return jsonify(vars(res2))
else:
return jsonify(vars(res))
-
模型类
.query.fillter_by(字段名 =值)
等价于select * from user where 字段名=值
-
模型类
.query.fillter_by(字段名 =值).first()
等价于select * from user where 字段名=值 limit 1
-
模型类
.query.fillter(模型类.字段名.like('%z%')).all()
等价于select * from user where 字段名 like '%z%'
-
模型类
.query.fillter(模型类.字段名.in_(['a','b','c'])).all()
等价于select * from user where 字段名 in ('a','b','c')
-
模型类
.query.fillter(模型类.字段名.between(开始,结束)).all()
等价于select * from user where 字段名 between 开始 and 结束
组合查询
- 需要导入
from sqlalchemy import or_, and_, not_
- 具体操作
- 模型类
.query.fillter(or_(模型类.字段名.like('z%'),模型类.字段名.contains('a'))).all()
等价于select * from user where 字段名 like 'z%' or 字段名 like '%a%'
- 模型类
.query.fillter(and_(模型类.字段名.like('z%'),模型类.字段名 < '2021-12-12 00:00:00')).all()
等价于select * from user where 字段名 like 'z%' and 字段名 < '2021-12-12 00:00:00'
- 模型类
排序
- 无条件排序 模型类.
query.order_by (模型类.字段名.desc()).all()
等价于select * from user order by 字段名 desc
- 有条件排序
模型类.query.fillter(模型类.字段名==值).order_by (模型类.字段名.desc()).all()
等价于select * from user where 字段名=值 order by 字段名 desc
返回前几条记录
更新记录
-
查询记录,如果用query.filter_by 查询可以用first()返回查询的第一条记录
user=User.query.filter_by(username="王五").first() #返回查询的第一条记录 user.password="88888" db.session.commit()
-
修改记录的对应值
-
调用session的commit方法,更新缓存,保存到数据库
删除记录
- 查询记录
- 从seesion会话中删除
- 调用session的commit方法,更新缓存,保存到数据库
@app.route('/userdelete/')
def userDelete():
user = User.query.filter_by(username="张三").first() # 返回查询的第一条记录
db.session.delete(user)
db.session.commit()
return "成功"
用户登录获取Token
Token 的作用和原理
- 首次登录时,后端服务器判断用户账号密码正确之后,根据用户id、用户名、定义好的秘钥、过期时间生成 token ,返回给前端
- 前端拿到后端返回的 token ,存储在 localStroage
- 后端接口判断请求头有无 token,没有或者 token 过期,返回401;
设置Token
-
安装
Authlib
包 -
调用以下代码设置token信息
# 生成token header = {'alg': 'HS256'} #签名算法 key = current_app.config['SECRET_KEY'] #用于签名的密钥 data={"id":user.id,"username":user.username} # token中存放的用户数据 token=jwt.encode(header=header, payload=data, key=key).decode('utf-8') #生成token
获取Token信息
@bp.route("/getApp",methods=["POST"])
def getApp():
getToken=request.headers.get("token")
result = Result()
if getToken is None :
result.state=401
result.msg="没有权限"
else:
userId=Dbtools.getUserId(getToken)
user=User.query.get(userId)
result.data=user.toDict()
return jsonify(vars(result))
返回JSON数据接口
返回中文乱码问题
通过app.json.ensure_ascii = False
可以实现中文uncode编码问题。
app = Flask(__name__)
app.json.ensure_ascii = False
注意:在Flask 2.3.2版本下 app.config['JSON_AS_ASCII'] = False
解决中文编码方式无效
jsonify
-
Content-Type: application/json 自动为json格式,不用进行设置
-
基本用法如下,resDict为字典格式
return jsonify(resDict)
-
类对象转化为字典,result为一个类对象,resDict是一个字典
resDict=vars(result) #对象转化为字典
创建返回结果的默认字典类
#定义一个结果类,用于返回结果
class Result :
def __init__(self,state = 200, code = "success" ,msg = ""):
self.state=state
self.code=code
self.msg=msg
self.data=dict() #创建空字典
-
state
状态码,(200:成功,400:失败,401:认证失败,无权限,502:拒绝) -
code
响应代码(success:成功,error:失败) -
msg
响应消息 -
data
响应数据
蓝图Blueprint
应用程序配置
-
创建config.py文件用于存储app的相关配置
# 数据库配置信息 HOSTNAME = "127.0.0.1" PORT = 3306 USERNAME = "root" PASSWORD = "123456" DATABASE = "flask" DB_URI = "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8mb4".format(USERNAME,PASSWORD,HOSTNAME,PORT,DATABASE) SQLALCHEMY_DATABASE_URI = DB_URI
-
在mian.py 中导入config.py ,并绑定配置文件
app.config.from_object(config) # 导入数据库配置信息
创建扩展文件
-
创建exts.py
-
把相关数据库操作的扩展包,放在该文件下防止重复导入,循环引用
-
在main.py中导入
from Exts import db , Result
-
在main.py 中绑定app
db.init_app(app)
创建数据模型
- 创建数据模型文件models.py 用于存放数据模型
创建路由蓝图
-
创建python 包,名称可以随意,用于存放路由的蓝图
-
创建蓝图相关代码,核心代码
bp=Blueprint("user",__name__,url_prefix="/user")
url_prefix 用于指定路由的前缀from flask import Blueprint from Exts import db,Result bp=Blueprint("user",__name__,url_prefix="/user") @bp.route("/add") def userAdd(): pass
-
导入并注册蓝图
from Route.User import bp as user_bp …… app.register_blueprint(user_bp) # 注册蓝图
创建蓝图后main.py 中的代码
from flask import Flask
from Exts import db
import config
from Route.User import bp as user_bp
from models import *
from flask_cors import CORS
app = Flask(__name__)
app.json.ensure_ascii = False
CORS(app, resources=r'/*') #可以跨域访问
app.config.from_object(config) # 导入数据库配置信息
db.init_app(app) #取代 db = SQLAlchemy(app)
with app.app_context():
db.create_all()
app.register_blueprint(user_bp)
if __name__ == '__main__':
app.run(host="0.0.0.0", debug=False, port=8000)
Flask跨域访问问题
什么是跨域问题
- 跨域是因为出于浏览器的同源策略限制,它是浏览器最核心也最基本的安全功能,如果缺少了同源策略,则浏览器的正常功能可能都会受到影响。同源策略会阻止一个域的javascript脚本和另外一个域的内容进行交互。
- 所谓同源(即指在同一个域)就是两个页面具有相同的协议(protocol),主机(host)和端口号(port)。
- 跨域问题就是当一个请求url的协议、域名、端口三者之间任意一个与当前页面url不同时出现的问题。
问题描述
当客户端向服务器端请求ajax服务时,如果客户端和服务器端域名不一致,就会出现跨域问题,ajax报错如下:
No 'Access-Control-Allow-Origin' header is present on the requested
解决方式
-
安装flask_cors文章来源:https://uudwc.com/A/LR5ox
-
app初始化的时候就加载配置文章来源地址https://uudwc.com/A/LR5ox
from flask import Flask from flask_cors import CORS app = Flask(__name__) # r'/*' 是通配符,让本服务器所有的 URL 都允许跨域请求 CORS(app, resources=r'/*') if __name__ == "__main__": app.run()