提问人:Nate 提问时间:12/13/2014 最后编辑:Nate 更新时间:12/22/2014 访问量:9506
使用 PHP 和 MySQL 实现一个简单的队列?
Implementing a simple queue with PHP and MySQL?
问:
我有一个PHP脚本,它从数据库中检索行,然后根据内容执行工作。这项工作可能很耗时(但不一定计算成本高昂),因此我需要允许多个脚本并行运行。
数据库中的行如下所示:
+---------------------+---------------+------+-----+---------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------------+---------------+------+-----+---------------------+----------------+
| id | bigint(11) | NO | PRI | NULL | auto_increment |
.....
| date_update_started | datetime | NO | | 0000-00-00 00:00:00 | |
| date_last_updated | datetime | NO | | 0000-00-00 00:00:00 | |
+---------------------+---------------+------+-----+---------------------+----------------+
我的脚本当前选择日期最早的行(工作完成后会更新),并且不使用 .date_last_updated
date_update_started
如果我现在要并行运行脚本的多个实例,它们会选择相同的行(至少在某些时候),并且会完成重复的工作。
我想做的是使用事务来选择行,更新列,然后向 SQL 语句添加一个条件,选择行以仅选择大于某个值的行(以确保另一个脚本不起作用)。F.D.公司date_update_started
WHERE
date_update_started
$sth = $dbh->prepare('
START TRANSACTION;
SELECT * FROM table WHERE date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;
UPDATE table DAY SET date_update_started = UTC_TIMESTAMP() WHERE id IN (SELECT id FROM table WHERE date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;);
COMMIT;
');
$sth->execute(); // in real code some values will be bound
$rows = $sth->fetchAll(PDO::FETCH_ASSOC);
从我所读到的内容来看,这本质上是一个队列实现,在MySQL中似乎不受欢迎。尽管如此,我需要找到一种方法来允许多个脚本并行运行,在我完成研究之后,这就是我想出的。
这种方法会奏效吗?有更好的方法吗?
答:
编辑:对不起,我完全误解了你的问题
您应该在表上放置一个“锁定”列,将脚本正在使用的条目的值设置为 true,完成后将其设置为 false。
就我而言,我放置了其他 3 个时间戳(整数)列:target_ts、start_ts、done_ts。 你
UPDATE table SET locked = TRUE WHERE target_ts<=UNIX_TIMESTAMP() AND ISNULL(done_ts) AND ISNULL(start_ts);
然后
SELECT * FROM table WHERE target_ts<=UNIX_TIMESTAMP() AND ISNULL(start_ts) AND locked=TRUE;
完成您的工作并逐个更新每个条目(以避免数据不一致),将 done_ts 属性设置为当前时间戳(您也可以立即解锁它们)。您可以将target_ts更新到您希望的下一个更新,也可以忽略此列,只使用done_ts进行选择
评论
我们在生产中实现了这样的东西。
为了避免重复,我们做了一个MySQL UPDATE,如下所示(我修改了查询以类似于您的表):
UPDATE queue SET id = LAST_INSERT_ID(id), date_update_started = ...
WHERE date_update_started IS NULL AND ...
LIMIT 1;
我们在单个事务中执行此 UPDATE,并利用该功能。当这样使用时,它与参数一起使用时,它会在事务会话中写入参数,在本例中,它是已更新的单个 () 队列的 ID(如果有)。LAST_INSERT_ID
LIMIT 1
在那之后,我们做:
SELECT LAST_INSERT_ID();
在不带参数的情况下使用时,它会检索以前存储的值,从而获取必须执行的队列项的 ID。
评论
我认为您的方法可以奏效,只要您还为您选择的当前正在处理的行添加某种标识符,就可以按照@JuniusRendel建议,我什至会考虑使用另一个字符串键(随机或实例 ID)用于脚本导致错误且未正常完成的情况, 因为一旦您在工作后更新了行,您就必须清理这些字段。
正如我所看到的,这种方法的问题在于,将有 2 个脚本在同一点运行,并在它们被签名为锁定之前选择相同的行。在这里,正如我所看到的,这实际上取决于您在行上做了什么样的工作,如果这两个脚本的最终结果相同,我认为您唯一的问题是浪费时间和服务器内存(这不是小问题,但我现在将它们放在一边......如果您的工作将导致两个脚本上的更新不同,则您的问题将是 TB 中最后可能会有错误的更新。
@Jean提到了您可以采取的第二种方法,即使用 MySql 锁。我不是该主题的专家,但这似乎是一个好方法,并使用“选择......FOR UPDATE的语句可以为您提供所需的内容,就像您可以在同一调用中执行的选择和更新一样 - 这将比2个单独的查询更快,并且可以降低其他实例选择这些行的风险,因为它们将被锁定。
'SELECT ....FOR UPDATE' 允许您运行 select 语句并锁定这些特定行以更新它们,因此您的语句可能如下所示:
START TRANSACTION;
SELECT * FROM tb where field='value' LIMIT 1000 FOR UPDATE;
UPDATE tb SET lock_field='1' WHERE field='value' LIMIT 1000;
COMMIT;
锁功能强大,但要注意它不会影响不同部分的应用程序。检查当前是否为更新锁定的选定行,是否在应用程序的其他位置(可能针对最终用户)请求了这些行,以及在这种情况下会发生什么情况。
此外,表必须是 InnoDB,建议您检查 where 子句的字段具有 Mysql 索引,否则您可能会锁定整个表或遇到“间隙锁定”。
锁定过程也有可能,尤其是在运行并行脚本时,会给您的CPU和内存带来沉重的负担。
这是关于该主题的另一篇读物:http://www.percona.com/blog/2006/08/06/select-lock-in-share-mode-and-for-update/
希望这对您有所帮助,并想听听您的进展情况。
每次脚本运行时,我都会让脚本生成一个 uniqid。
$sctiptInstance = uniqid();
我会添加一个脚本实例列来将此值保存为 varchar 并在其上放置索引。当脚本运行时,我会在事务中使用 select for update 来根据任何逻辑选择您的行,不包括带有脚本实例的行,然后使用脚本实例更新这些行。像这样:
START TRANSACTION;
SELECT * FROM table WHERE script_instance = '' AND date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000 FOR UPDATE;
UPDATE table SET date_update_started = UTC_TIMESTAMP(), script_instance = '{$scriptInstance}' WHERE script_instance = '' AND date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;
COMMIT;
现在,这些行将从脚本的其他实例中排除。您是否工作,然后更新行以将脚本实例设置回 null 或空白,并更新上次更新日期列。
您还可以使用脚本实例写入另一个名为“当前实例”或类似内容的表,并让脚本检查该表以获取正在运行的脚本计数,以控制并发脚本的数量。我也会将脚本的 PID 添加到表中。然后,您可以使用该信息创建一个内务处理脚本,以定期从 cron 运行,以检查长时间运行或恶意进程并杀死它们等。
我有一个系统在生产中完全像这样工作。我们每分钟运行一个脚本来执行一些处理,有时运行可能需要一分钟以上的时间。
我们有一个 status 表列,其中 0 表示 NOT RUN YET,1 表示 FINISHED,其他值表示正在进行中。
脚本做的第一件事是更新表格,设置一行或多行,其值意味着我们正在处理该行。我们使用 getmypid() 来更新我们想要处理的行,但这些行仍未处理。
完成处理后,脚本会更新具有相同进程 ID 的行,并将它们标记为已完成(状态 1)。
这样,我们就可以避免每个脚本尝试处理已经在处理中的行,它就像一个魅力。这并不意味着没有更好的方法,但这确实可以完成工作。
我过去曾出于非常相似的原因使用过存储过程。我们使用 FOR UPDATE 读取锁来锁定表,同时更新了 selected 标志以从任何将来的 select 中删除该条目。它看起来像这样:
CREATE PROCEDURE `select_and_lock`()
BEGIN
START TRANSACTION;
SELECT your_fields FROM a_table WHERE some_stuff=something
AND selected = 0 FOR UPDATE;
UPDATE a_table SET selected = 1;
COMMIT;
END$$
没有理由必须在存储过程中完成它,尽管现在我想到了。
评论
SELECT ... LIMIT 5