1 回答

TA貢獻(xiàn)1804條經(jīng)驗 獲得超7個贊
您可以按照以下方式進(jìn)行操作 - 下面是 100% 有效的示例。請注意,在生產(chǎn)中使用 Celery 來執(zhí)行此類任務(wù),或者自己編寫另一個守護(hù)程序應(yīng)用程序(另一個進(jìn)程),并借助消息隊列(例如 RabbitMQ)或公共數(shù)據(jù)庫的幫助從 http 服務(wù)器向其提供任務(wù)。
如果對下面的代碼有任何疑問,請隨時提問,這對我來說是很好的練習(xí):
from flask import Flask, current_app
import threading
from threading import Thread, Event
import time
from random import randint
app = Flask(__name__)
# use the dict to store events to stop other treads
# one event per thread !
app.config["ThreadWorkerActive"] = dict()
def do_work(e: Event):
"""function just for another one thread to do some work"""
while True:
if e.is_set():
break # can be stopped from another trhead
print(f"{threading.current_thread().getName()} working now ...")
time.sleep(2)
print(f"{threading.current_thread().getName()} was stoped ...")
@app.route("/long_thread", methods=["GET"])
def long_thread_task():
"""Allows to start a new thread"""
th_name = f"Th-{randint(100000, 999999)}" # not really unique actually
stop_event = Event() # is used to stop another thread
th = Thread(target=do_work, args=(stop_event, ), name=th_name, daemon=True)
th.start()
current_app.config["ThreadWorkerActive"][th_name] = stop_event
return f"{th_name} was created!"
@app.route("/stop_thread/<th_id>", methods=["GET"])
def stop_thread_task(th_id):
th_name = f"Th-{th_id}"
if th_name in current_app.config["ThreadWorkerActive"].keys():
e = current_app.config["ThreadWorkerActive"].get(th_name)
if e:
e.set()
current_app.config["ThreadWorkerActive"].pop(th_name)
return f"Th-{th_id} was asked to stop"
else:
return "Sorry something went wrong..."
else:
return f"Th-{th_id} not found"
@app.route("/", methods=["GET"])
def index_route():
text = ("/long_thread - create another thread. "
"/stop_thread/th_id - stop thread with a certain id. "
f"Available Threads: {'; '.join(current_app.config['ThreadWorkerActive'].keys())}")
return text
if __name__ == '__main__':
app.run(host="0.0.0.0", port=9999)
添加回答
舉報