flask,这个在他的官网里也有详细文档,这里简单论述一个登录+后台管理的例子。
#引入一个蓝图,蓝图封装一些接口
from lantu import simple_page
from flask import Flask,render_template,request,session,redirect,url_for
import json
import hashlib
#定义app,secret_key主要在生成session时候用
app=Flask(__name__)
app.secret_key='gjc'
#绑定蓝图
app.register_blueprint(simple_page)
#主页跳转到登录
@app.route('/')
def index():
return redirect(url_for('login'))
#登录判断session,有则跳转到主页
@app.route("/login")
def login():
if "username" not in session:
return render_template('login.html')
else:
return redirect(url_for('simple_page.main'))
#登录验证接口
@app.route("/login/loginYz",methods=['POST'])
def loginyz():
psd=hashlib.sha256()
psd.update('root'.encode('utf-8'))
if request.form['username']!='root':
res={
"state":"1001",
"result":"username error"
}
return json.dumps(res)
elif psd.hexdigest()!=request.form['password']:
res={
"state":"1002",
"result":"password error"
}
return json.dumps(res)
else:
session['username'] = request.form['username']
res={
"state":"1000",
"result":"success",
"linkTo":"/main"
}
return json.dumps(res)
#登出接口,删除session
@app.route("/loginout")
def loginout():
session.pop('username', None)
return redirect(url_for('login'))
#开始出程序
if __name__=='__main__':
app.run("0.0.0.0",debug=True)
上面绑定的蓝图
from flask import Blueprint,render_template,abort,session,request
from jinja2 import TemplateNotFound
from pymongo import MongoClient
import json
#连接mongodb数据库
client=MongoClient()
db=client.test_database
posts=db.posts
#定义一个蓝图
simple_page=Blueprint('simple_page',__name__,template_folder='templates')
#管理主页
@simple_page.route("/main")
def main():
if "username" in session:
data={}
data['username']=session['username']
data['info']=[post for post in posts.find()]
return render_template("main.html",data=data)
else:
return render_template('login.html')
#增加数据
@simple_page.route("/main/add",methods=['POST'])
def add():
post={
"name":request.form['name'],
"email":request.form['email'],
"phone":request.form['tel'],
"more":request.form['more']
}
if posts.insert_one(post).inserted_id!="":
return json.dumps({"state":"1000","result":"success"})
else:
return json.dumps({"state":"1004","result":"insert error"})
#删除数据
@simple_page.route("/main/dele",methods=['POST'])
def dele():
result=posts.delete_one({"name":request.form['name']})
if result.deleted_count>0:
return json.dumps({"state":"1000","result":request.form['name']})
else:
return json.dumps({"state":"1005","result":"delete error"})
#修改数据
@simple_page.route("/main/edit",methods=['POST'])
def edit():
result=posts.update_one({"name":request.form['org_name']},{"$set":{
"name":request.form['ch_name'],
"email":request.form['ch_email'],
"phone":request.form['ch_phone'],
"more":request.form['ch_more']
} })
if result.matched_count!=0:
return json.dumps({"state":"1000","result":request.form['ch_name']})
else:
return json.dumps({"state":"1006","result":"edit error"})
#return json.dumps({"state":"1000","result":result.matched_count})
关于前端页面这里不做展示了,接下来使用gunicorn运行该flask程序
首先可以配置一下gunicorn的config,flask项目touch一个config.py,复制如下内容,当然也可以自行修改配置,内容也可以在这里下载。
bind = '0.0.0.0:8000'
backlog = 2048
workers = 1
worker_class = 'sync'
worker_connections = 1000
timeout = 30
keepalive = 2
spew = False
daemon = False
pidfile = None
umask = 0
user = None
group = None
tmp_upload_dir = None
errorlog = '-'
loglevel = 'info'
accesslog = '-'
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
proc_name = None
def post_fork(server, worker):
server.log.info("Worker spawned (pid: %s)", worker.pid)
def pre_fork(server, worker):
pass
def pre_exec(server):
server.log.info("Forked child, re-executing.")
def when_ready(server):
server.log.info("Server is ready. Spawning workers")
def worker_int(worker):
worker.log.info("worker received INT or QUIT signal")
## get traceback info
import threading, sys, traceback
id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
worker.log.debug("\n".join(code))
def worker_abort(worker):
worker.log.info("worker received SIGABRT signal")
然后运行
gunicorn -c config.py main:app
就可以多线程运行flask程序啦!
版权声明:本文为原创文章,转载请注明出处和作者,不得用于商业用途,请遵守
CC BY-NC-SA 4.0协议。
赞赏一下
支付宝打赏
微信打赏