VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > Python基础教程 >
  • 带你认识 flask 后台作业

 

01

任务类别简介

 

任务进程为后台作业提供了一个便捷的解决方案。Worker过程独立于应用程序运行,甚至可以位于不同的系统上。应用程序和worker之间的通信是通过消息完成的。通过与物理相互作用来监视其进度。下图展示了一个典型的实现:

 

Python中最流行的任务类别是Celery。这是一个相当复杂的重叠,它有很多选项并支持多个消息示例。另一个流行的Python任务位置是Redis Queue(RQ),它牺牲了一些替代,,仅支持Redis消息本身,但作为交换,它的建立要比Celery简单长度

Celery和RQ都非常适合在Flask应用程序中支持后台任务,所以我可以选择更简单的RQ。不过,用Celery实现相同的功能其实也不难。如果您对Celery更有吸引力,可以阅读我的博客中的将Celery与Flask文章一起使用

02

使用RQ

 

RQ是一个标准的Python三方重叠,用pip安装:

  •  
  •  
(venv) $ pip install rq(venv) $ pip freeze > requirements.txt

正如我前面提到的,应用和RQ worker之间的通信将在Redis消息中执行,因此你需要运行Redis服务器。有很多途径来安装和运行Redis服务器,可以下载其内核并执行编译和安装。如果你使用的是Windows中,微软在此处维护了Redis的的安装程序。在Linux的上,你可以通过操作系统的软件包管理器安装Redis的。Mac OS X的用户可以运行brew install redis,使用然后redis-server命令手动启动服务

除了确保服务正在运行并可以识别RQ访问之外,你不需要与Redis进行其他交互

03

创建任务

 

一个任务,不过是一个Python函数而已。以下是一个示例任务,我将其引入一个新的app / tasks.py模块:

app / tasks.py:示例后台任务

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
import time
def example(seconds):    print('Starting task')    for i in range(seconds):        print(i)        time.sleep(1)    print('Task completed')

该任务将秒数作为参数,然后在该时间量内等待,并每秒打印一次计数器

 

04

运行 RQ 工人

 

任务准备就绪,可以通过rq worker来启动一个worker进程了:

  •  
  •  
  •  
  •  
  •  
(venv) $ rq worker microblog-tasks18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.118:55:06 Cleaning registries for queue: microblog-tasks18:55:0618:55:06 *** Listening on microblog-tasks...


microblog-tasks如果您想启动多个worker来扩展量子,您只需要运行rq worker来生成更多连接到同一个模型的进程,就可以使用Worker进程现在连接到了Redis,并在称为的上面上查看可能的分配给它的任何作业。在生产环境中,您可能希望至少运行可用的CPU数量的工人。。然后,,当作业出现在特定位置时,任何可用的worker进程都可以获取它

 

05

执行任务

 

现在打开第二个终端窗口并激活虚拟环境。我将使用shell会话来启动worker中的example()任务:

  •  
  •  
  •  
  •  
  •  
  •  
>>> from redis import Redis>>> import rq>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))>>> job = queue.enqueue('app.tasks.example', 23)>>> job.get_id()'c651de7f-21a8-4068-afd5-8b982a6f6d32'

如果采用的是Redis服务器运行在不同的主机或端口号上,则使用RQ的Queue类表示从应用程序端看到的任务类型。Redis则需要使用其他URL。

队列的enqueue()方法用于将作业添加到队列中。第一个参数是要执行的任务的名称,可直接传入函数对象或导入字符串。我发现传入字符串更加方便,因为不需要在应用程序对enqueue()预期的任何剩余参数将被传递给worker中运行的函数。

enqueue()只要进行了调用,运行着RQ worker的终端窗口上就会出现一些活动。你会看到example()函数正在运行,并且连续打印一次计数器。同时,你的其他终端不会被分开,你可以继续在shell在上面的示例中,我调用job.get_id()方法来获取分配给任务的唯一标识符。你可以尝试使用另一个有趣表达式来检查worker上的函数是否已完成:

  •  
  •  
>>> job.is_finishedFalse

 

如果你像我在上面的示例中那样传递了23,那么函数将运行约23秒。在那之后,job.is_finished表达式将True转化为。就是这么简单,炫酷否?

一旦函数完成,worker又回到等待作业的状态,所以如果你想进行更多的实验,你可以用不同的参数重复执行enqueue()调用。),但最终会被删除。这很重要,任务类别不保留已执行作业的历史记录

06

报告任务进度

 

通常,对于长期运行的任务,您需要将一些进度信息提供给应用程序,从而可以将其显示给用户。RQ通过使用作业对象的meta属性来支持这一点。让我重新编写example()任务来编写进度报告:

app / tasks.py::带进度的示例后台任务

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
import timefrom rq import get_current_job
def example(seconds):    job = get_current_job()    print('Starting task')    for i in range(seconds):        job.meta['progress'] = 100.0 * i / seconds        job.save_meta()        print(i)        time.sleep(1)    job.meta['progress'] = 100    job.save_meta()    print('Task completed')

 

这个新版本的example()使用RQ的get_current_job()函数来获取一个作业实例,该实例与提交任务时返回给应用程序的实例类似。作业对象的meta属性是一个字典,任务可以编写任何想要的与应用程序通信的自定义数据。在此示例中,我编写了progress,表示完成任务的百分比。每次进程更新时,我都调用job.save_meta()指示RQ将数据写入Redis,应用程序可以在其中找到它。

在应用程序方面(目前只是一个Python shell),我可以运行此任务,然后监视进度,如下所示:

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
>>> job = queue.enqueue('app.tasks.example', 23)>>> job.meta{}>>> job.refresh()>>> job.meta{'progress': 13.043478260869565}>>> job.refresh()>>> job.meta{'progress': 69.56521739130434}>>> job.refresh()>>> job.meta{'progress': 100}>>> job.is_finishedTrue


如您所见,在另一侧,meta属性可以被重新读。需要调用refresh()