使用 PHP 和 MySQL 实现一个简单的队列?

Implementing a simple queue with PHP and MySQL?

提问人:Nate 提问时间:12/13/2014 最后编辑:Nate 更新时间:12/22/2014 访问量:9506

问:

我有一个PHP脚本,它从数据库中检索行,然后根据内容执行工作。这项工作可能很耗时(但不一定计算成本高昂),因此我需要允许多个脚本并行运行。

数据库中的行如下所示:

+---------------------+---------------+------+-----+---------------------+----------------+
| Field               | Type          | Null | Key | Default             | Extra          |
+---------------------+---------------+------+-----+---------------------+----------------+
| id                  | bigint(11)    | NO   | PRI | NULL                | auto_increment |
.....
| date_update_started | datetime      | NO   |     | 0000-00-00 00:00:00 |                |
| date_last_updated   | datetime      | NO   |     | 0000-00-00 00:00:00 |                |
+---------------------+---------------+------+-----+---------------------+----------------+

我的脚本当前选择日期最早的行(工作完成后会更新),并且不使用 .date_last_updateddate_update_started

如果我现在要并行运行脚本的多个实例,它们会选择相同的行(至少在某些时候),并且会完成重复的工作。

我想做的是使用事务来选择行,更新列,然后向 SQL 语句添加一个条件,选择行以仅选择大于某个值的行(以确保另一个脚本不起作用)。F.D.公司date_update_startedWHEREdate_update_started

$sth = $dbh->prepare('
    START TRANSACTION;
    SELECT * FROM table WHERE date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;
    UPDATE table DAY SET date_update_started = UTC_TIMESTAMP() WHERE id IN (SELECT id FROM table WHERE date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;);
    COMMIT;
');
$sth->execute(); // in real code some values will be bound
$rows = $sth->fetchAll(PDO::FETCH_ASSOC);

从我所读到的内容来看,这本质上是一个队列实现,在MySQL中似乎不受欢迎。尽管如此,我需要找到一种方法来允许多个脚本并行运行,在我完成研究之后,这就是我想出的。

这种方法会奏效吗?有更好的方法吗?

php mysql sql pdo 队列

评论

0赞 Lupin 12/16/2014
如何运行并行脚本?
0赞 Nate 12/16/2014
@Lupin 目前,该脚本每 15 分钟通过 cron 作业执行一次。该脚本将检查另一个实例是否正在运行,如果正在运行,则终止。我不确定我将如何管理正在运行的多个脚本 - 我可能在数据库中有一个计数器来查看有多少正在运行并以这种方式限制实例的数量,但一次一个问题:-)
0赞 Lupin 12/16/2014
好的,还有一些其他问题让我完全理解:1.你有一个脚本,它选择行并处理它们,然后将其更新回数据库,对吗?2. 您希望能够让并行脚本运行并执行相同的操作,但在不同的行上,对吗?3. 每次脚本运行时,选择的行是连续的,这意味着它们是 1-100、101-200 等,还是在 id 方面是随机的,并且仅由大于 1 date_update_started的行选择?
0赞 Nate 12/17/2014
@Lupin 1.是的,2。是的,3。根据日期字段和示例中未显示的另一个字段选择行。因此,它们不是严格意义上的“连续”,而是按两个字段排序的。
0赞 Patrick Echterbruch 12/19/2014
另一种方法是让某种主脚本获取一些行(例如,),然后为这些行中的每一行启动一个单独的处理脚本实例。您甚至可以使用第二个表来跟踪当前正在运行的处理实例数,因此每当 cron 启动您的主脚本时,它都会知道要获取多少行。但由于这甚至与您要求的相差甚远,因此我决定将其添加为评论而不是答案。SELECT ... LIMIT 5

答:

1赞 n00dl3 12/16/2014 #1

编辑:对不起,我完全误解了你的问题

您应该在表上放置一个“锁定”列,将脚本正在使用的条目的值设置为 true,完成后将其设置为 false。

就我而言,我放置了其他 3 个时间戳(整数)列:target_ts、start_ts、done_ts。 你

UPDATE table SET locked = TRUE WHERE target_ts<=UNIX_TIMESTAMP() AND ISNULL(done_ts) AND ISNULL(start_ts);

然后

SELECT * FROM table WHERE target_ts<=UNIX_TIMESTAMP() AND ISNULL(start_ts) AND locked=TRUE;

完成您的工作并逐个更新每个条目(以避免数据不一致),将 done_ts 属性设置为当前时间戳(您也可以立即解锁它们)。您可以将target_ts更新到您希望的下一个更新,也可以忽略此列,只使用done_ts进行选择

评论

0赞 Nate 12/17/2014
我不认为PHP实际上支持多线程,但无论如何,运行脚本的多个实例都不是问题。问题主要是如何处理从数据库中检索行。
0赞 n00dl3 12/17/2014
我更新了,对不起,也许我:)喝醉了。对于线程,我不知道,这就是 PECL 扩展所声称的,但我没有测试它,所以......
5赞 Alessandro Lai 12/16/2014 #2

我们在生产中实现了这样的东西。

为了避免重复,我们做了一个MySQL UPDATE,如下所示(我修改了查询以类似于您的表):

UPDATE queue SET id = LAST_INSERT_ID(id), date_update_started = ... 
WHERE date_update_started IS NULL AND ...
LIMIT 1;

我们在单个事务中执行此 UPDATE,并利用该功能。当这样使用时,它与参数一起使用时,它会在事务会话中写入参数,在本例中,它是已更新的单个 () 队列的 ID(如果有)。LAST_INSERT_IDLIMIT 1

在那之后,我们做:

SELECT LAST_INSERT_ID();

在不带参数的情况下使用时,它会检索以前存储的值,从而获取必须执行的队列项的 ID。

评论

0赞 Nate 12/17/2014
您能详细说明一下“写锁”是什么意思吗?也许有一个代码示例?
0赞 Alessandro Lai 12/17/2014
@Nate、编辑和扩展;)顺便说一句,我还建议使用 RabbitMQ。我们梦想着:D使用它
8赞 Lupin 12/17/2014 #3

我认为您的方法可以奏效,只要您还为您选择的当前正在处理的行添加某种标识符,就可以按照@JuniusRendel建议,我什至会考虑使用另一个字符串键(随机或实例 ID)用于脚本导致错误且未正常完成的情况, 因为一旦您在工作后更新了行,您就必须清理这些字段。

正如我所看到的,这种方法的问题在于,将有 2 个脚本在同一点运行,并在它们被签名为锁定之前选择相同的行。在这里,正如我所看到的,这实际上取决于您在行上做了什么样的工作,如果这两个脚本的最终结果相同,我认为您唯一的问题是浪费时间和服务器内存(这不是小问题,但我现在将它们放在一边......如果您的工作将导致两个脚本上的更新不同,则您的问题将是 TB 中最后可能会有错误的更新。

@Jean提到了您可以采取的第二种方法,即使用 MySql 锁。我不是该主题的专家,但这似乎是一个好方法,并使用“选择......FOR UPDATE的语句可以为您提供所需的内容,就像您可以在同一调用中执行的选择和更新一样 - 这将比2个单独的查询更快,并且可以降低其他实例选择这些行的风险,因为它们将被锁定。

'SELECT ....FOR UPDATE' 允许您运行 select 语句并锁定这些特定行以更新它们,因此您的语句可能如下所示:

START TRANSACTION;
   SELECT * FROM tb where field='value' LIMIT 1000 FOR UPDATE;
   UPDATE tb SET lock_field='1' WHERE field='value' LIMIT 1000;
COMMIT;

锁功能强大,但要注意它不会影响不同部分的应用程序。检查当前是否为更新锁定的选定行,是否在应用程序的其他位置(可能针对最终用户)请求了这些行,以及在这种情况下会发生什么情况。

此外,表必须是 InnoDB,建议您检查 where 子句的字段具有 Mysql 索引,否则您可能会锁定整个表或遇到“间隙锁定”。

锁定过程也有可能,尤其是在运行并行脚本时,会给您的CPU和内存带来沉重的负担。

这是关于该主题的另一篇读物:http://www.percona.com/blog/2006/08/06/select-lock-in-share-mode-and-for-update/

希望这对您有所帮助,并想听听您的进展情况。

1赞 Craig Jacobs 12/20/2014 #4

每次脚本运行时,我都会让脚本生成一个 uniqid。

$sctiptInstance = uniqid();

我会添加一个脚本实例列来将此值保存为 varchar 并在其上放置索引。当脚本运行时,我会在事务中使用 select for update 来根据任何逻辑选择您的行,不包括带有脚本实例的行,然后使用脚本实例更新这些行。像这样:

START TRANSACTION;
SELECT * FROM table WHERE script_instance = '' AND date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000 FOR UPDATE;
UPDATE table SET date_update_started = UTC_TIMESTAMP(), script_instance = '{$scriptInstance}' WHERE script_instance = '' AND date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;
COMMIT;

现在,这些行将从脚本的其他实例中排除。您是否工作,然后更新行以将脚本实例设置回 null 或空白,并更新上次更新日期列。

您还可以使用脚本实例写入另一个名为“当前实例”或类似内容的表,并让脚本检查该表以获取正在运行的脚本计数,以控制并发脚本的数量。我也会将脚本的 PID 添加到表中。然后,您可以使用该信息创建一个内务处理脚本,以定期从 cron 运行,以检查长时间运行或恶意进程并杀死它们等。

1赞 Miguel Mesquita Alfaiate 12/22/2014 #5

我有一个系统在生产中完全像这样工作。我们每分钟运行一个脚本来执行一些处理,有时运行可能需要一分钟以上的时间。

我们有一个 status 表列,其中 0 表示 NOT RUN YET,1 表示 FINISHED,其他值表示正在进行中。

脚本做的第一件事是更新表格,设置一行或多行,其值意味着我们正在处理该行。我们使用 getmypid() 来更新我们想要处理的行,但这些行仍未处理。

完成处理后,脚本会更新具有相同进程 ID 的行,并将它们标记为已完成(状态 1)。

这样,我们就可以避免每个脚本尝试处理已经在处理中的行,它就像一个魅力。这并不意味着没有更好的方法,但这确实可以完成工作。

1赞 Mike Miller 12/22/2014 #6

我过去曾出于非常相似的原因使用过存储过程。我们使用 FOR UPDATE 读取锁来锁定表,同时更新了 selected 标志以从任何将来的 select 中删除该条目。它看起来像这样:

CREATE PROCEDURE `select_and_lock`()
 BEGIN
  START TRANSACTION;
  SELECT your_fields FROM a_table WHERE some_stuff=something 
   AND selected = 0 FOR UPDATE;
  UPDATE a_table SET selected = 1;
  COMMIT;
 END$$

没有理由必须在存储过程中完成它,尽管现在我想到了。