信使:同步和排队消息处理

编辑本页

信使:同步和排队消息处理

Messenger提供了一个消息总线,它能够发送消息,然后立即在应用程序中处理它们,或者通过稍后处理的传输(例如队列)发送它们。要更深入地了解它,请阅读Messenger组件文档

安装

在使用ob娱乐下载Symfony Flex,执行此命令安装messenger:

1
作曲家需要交响乐/信使ob娱乐下载

创建消息和处理程序

Messenger围绕您将创建的两个不同的类展开:(1)保存数据的消息类和(2)消息发送时将调用的处理程序类。处理程序类将读取消息类并执行一个或多个任务。

对于消息类没有特定的要求,除了它可以被序列化:

12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/ / src /信息/ SmsNotification.php名称空间应用程序消息SmsNotification私人内容公共函数__construct(字符串内容->内容=内容;}公共函数getContent()字符串返回->内容;}}

消息处理程序是PHP可调用的,创建它的推荐方法是创建一个具有AsMessageHandler属性,并且具有__invoke ()使用消息类(或消息接口)类型提示的方法:

12 3 4 5 6 7 8 9 10 11 12 13 14
/ / src / MessageHandler / SmsNotificationHandler.php名称空间应用程序MessageHandler使用应用程序消息SmsNotification使用ob娱乐下载组件信使属性AsMessageHandler# (AsMessageHandler)SmsNotificationHandler公共函数__invoke(SmsNotification消息/ /……做一些工作——比如发短信!}}

提示

你也可以使用# (AsMessageHandler)属性。您可以对单个类中的任意多个方法使用该属性,从而允许您对多个相关类型的消息的处理进行分组。

6.1

支持# (AsMessageHandler)在Symfony 6.1中引入了on方法。ob娱乐下载

多亏了自动配置SmsNotification类型提示,Symfonob娱乐下载y知道这个处理程序应该被调用当SmsNotification消息被发送。大多数情况下,这就是你所需要做的。但是你也可以手动配置消息处理程序.要查看所有已配置的处理程序,运行:

1
PHP bin/控制台调试:messenger

发送消息

你准备好了!要分派消息(并调用处理程序),请注入messenger.default_bus服务(透过MessageBusInterface),就像控制器一样:

12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/ / src /控制器/ DefaultController.php名称空间应用程序控制器使用应用程序消息SmsNotification使用ob娱乐下载FrameworkBundle控制器AbstractController使用ob娱乐下载组件信使MessageBusInterfaceDefaultController扩展AbstractController公共函数指数(MessageBusInterface公共汽车//将导致SmsNotificationHandler被调用公共汽车->调度(SmsNotification (“看!我创造了一条信息!”));/ /……}}

传输:异步/队列消息

默认情况下,消息一经分派就立即进行处理。如果希望异步处理消息,可以配置传输。传输能够发送消息(例如到队列系统),然后通过工作人员接收它们.信使支持多种传输

请注意

如果要使用不受支持的传输,请查看排队的交通,它支持Kafka和谷歌Pub/Sub。

传输使用“DSN”进行注册。多亏了Messenger的Flex配方,你的.env文件中已经有了一些例子。

1 2 3
# MESSENGER_TRANSPORT_DSN = amqp: / /客人:guest@localhost: 5672 / % 2 f /消息# MESSENGER_TRANSPORT_DSN =学说:/ /违约# MESSENGER_TRANSPORT_DSN =复述:/ / localhost: 6379 /消息

取消注释您想要的任何传输(或将其设置为.env.local).看到信使:同步和排队消息处理欲知详情。

接下来,在配置/包/ messenger.yaml,让我们定义一个名为异步使用这样的配置:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:传输:异步:“% env (MESSENGER_TRANSPORT_DSN) %”#或展开以配置更多选项#异步:# dsn: "%env(MESSENGER_TRANSPORT_DSN)%"# options: []

将消息路由到传输

现在您已经配置了传输,而不是立即处理消息,您可以将它们配置为发送到传输:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9
#配置/包/ messenger.yaml框架:信使:传输:异步:“% env (MESSENGER_TRANSPORT_DSN) %”路由:# async是上面你给传输的任何名称“消息应用\ \ SmsNotification”异步

多亏了这一点应用\ \ SmsNotification消息会被送到哪里异步传输和它的处理程序将马上叫我来。下未匹配的任何消息路由仍将立即处理,即同步处理。

请注意

你可以使用‘*’作为消息类。这将作为未匹配的任何消息的默认路由规则路由.默认情况下,这有助于确保不同步处理任何消息。

唯一的缺点就是‘*’也适用于使用Symfony Mailer发送的电子邮件(使用ob娱乐下载SendEmailMessage当Messenger可用时)。这可能会导致问题,如果你的电子邮件是不可序列化的(例如,如果他们包括文件附件作为PHP资源/流)。

您还可以通过类的父类或接口来路由类。或者向多个传输器发送消息:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9
#配置/包/ messenger.yaml框架:信使:路由:#路由扩展此示例基类或接口的所有消息“消息应用\ \ AbstractAsyncMessage”异步“消息应用\ \ AsyncMessageInterface”异步“我的消息\ \ ToBeSentToTwoSenders”(异步审计)

请注意

如果同时为子类和父类配置路由,则两条规则都将被使用。例如,如果你有一个SmsNotification对象,从通知,两者的路由为通知而且SmsNotification将被使用。

讯息中的教义实体

如果需要在消息中传递Doctrine实体,最好传递实体的主键(或处理程序实际需要的任何相关信息,例如电子邮件,等等)而不是对象(否则你可能会看到与实体管理器相关的错误):

12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/ / src /信息/ NewUserWelcomeEmail.php名称空间应用程序消息NewUserWelcomeEmail私人用户标识公共函数__construct(int用户标识->用户id =用户标识;}公共函数getUserId()int返回->用户标识;}}

然后,在你的处理程序中,你可以查询一个新对象:

12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
/ / src / MessageHandler / NewUserWelcomeEmailHandler.php名称空间应用程序MessageHandler使用应用程序消息NewUserWelcomeEmail使用应用程序存储库UserRepository使用ob娱乐下载组件信使属性AsMessageHandler# (AsMessageHandler)NewUserWelcomeEmailHandler私人userRepository公共函数__construct(UserRepositoryuserRepository->userRepository =userRepository;}公共函数__invoke(NewUserWelcomeEmailwelcomeEmail用户->userRepository->找到(welcomeEmail->getUserId ());/ /……发送电子邮件!}}

这保证了实体包含新的数据。

同步处理消息

如果一个消息没有匹配任何路由规则,它不会被送往任何运输工具,会立即处理。在某些情况下(比如当将处理程序绑定到不同的传输)时,显式地处理它会更容易或更灵活:通过创建一个同步传输和“发送”要立即处理的消息:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:传输:#……其他传输同步:“同步:/ /”路由:应用程序消息\ \ SmsNotification:同步

创建自己的交通工具

如果需要从不受支持的设备发送或接收消息,还可以创建自己的传输。看到如何创建自己的信使运输

消费消息(运行Worker)

在大多数情况下,一旦您的消息被路由,您就需要“使用”它们。你可以用信使:消费命令:

1 2 3 4
PHP bin/console messenger:消耗异步#使用-vv查看正在发生的事情的细节PHP bin/console messenger:使用async -vv

第一个参数是接收者的名称(如果路由到自定义服务,则为服务id)。默认情况下,该命令将永远运行:在传输上查找新消息并处理它们。这个命令叫做“worker”。

提示

的实例来正确地停止工作StopWorkerException

部署到生产环境

在制作过程中,有一些重要的事情需要考虑:

使用过程管理器,如Supervisor或systemd来保持工人的运行
您将希望一个或多个“工作人员”始终运行。要做到这一点,使用过程控制系统,如主管systemd
不要让员工永远跑掉
有些服务(比如Doctrine的EntityManager)会消耗更多的内存。所以,与其让你的工作线程一直运行,不如使用一个标志信使:消费——限制= 10告诉worker在退出前只处理10条消息(然后进程管理器将创建一个新进程)。还有其他的选择,比如——内存限制= 128而且——期限= 3600
停止遇到错误的工作人员
如果像数据库服务器这样的工作者依赖项停止工作,或者超时,您可以尝试添加重新连接逻辑,或者如果worker接收到太多错误,则退出该worker——破坏极限选项信使:消费命令。
在部署时重新启动worker
每次部署时,您都需要重新启动所有工作进程,以便它们能够看到新部署的代码。要做到这一点,运行信使:stop-workers在部署。这将向每个worker发出信号,它应该完成当前正在处理的消息,并应该优雅地关闭。然后,流程管理器将创建新的工作流程。该命令使用应用程序内部缓存—因此请确保将其配置为使用您喜欢的适配器。
在部署之间使用相同的缓存
如果部署策略涉及创建新的目标目录,则应该为cache.prefix.seed配置选项,以便在部署之间使用相同的缓存名称空间。否则,cache.app的值kernel.project_dir参数作为命名空间的基础,这将在每次进行新的部署时导致不同的命名空间。

优先传输

有时某些类型的消息应该具有更高的优先级,并在其他消息之前处理。为了实现这一点,您可以创建多个传输,并将不同的消息路由到它们。例如:

  • YAML
  • XML
  • PHP
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
#配置/包/ messenger.yaml框架:信使:传输:async_priority_high:dsn:' % env (MESSENGER_TRANSPORT_DSN) %选项:# queue_name是特定于传输原则的queue_name:#为AMQP发送到单独的交换机,然后排队#交换:#名称:高#队列:# messages_high: ~#或redis尝试“组”async_priority_low:dsn:' % env (MESSENGER_TRANSPORT_DSN) %选项:queue_name:路由:“消息应用\ \ SmsNotification”async_priority_low“消息应用\ \ NewUserWelcomeEmail”async_priority_high

然后,您可以为每个传输运行单独的worker,或者指示一个worker按优先级顺序处理消息:

1
PHP bin/console messenger:consume async_priority_high async_priority_low

工作人员总是首先寻找等待的消息async_priority_high.如果没有,然后它将使用来自的消息async_priority_low

限制消费到特定的队列

一些传输(特别是AMQP)具有交换器和队列的概念。Syob娱乐下载mfony传输总是绑定到交换。默认情况下,工作线程从附加到指定传输交换机的所有队列中进行消费。然而,有些用例希望工作人员只从特定的队列中消费。

你可以限制worker只处理来自特定队列的消息:

1 2 3 4
PHP bin/console messenger:consume my_transport——queues=fasttrack#你可以多次传递——queues选项来处理多个队列PHP bin/console messenger:consume my_transport——queues=fasttrack1——queues=fasttrack2

请注意

允许使用队列选项,接收方必须实现QueueReceiverInterface

检查每次传输的排队消息数

运行信使:统计数据命令来了解某些或所有传输的“队列”中有多少消息:

1 2 3 4 5
显示所有传输中排队消息的数量PHP bin/console messenger:stats#只显示某些传输的统计数据PHP bin/console messenger:stats my_transport_name other_transport_name

请注意

为了使该命令生效,配置的传输接收方必须实现MessageCountAwareInterface

6.2

信使:统计数据命令在Symfony 6.2中引入。ob娱乐下载

主管配置

监控器是一个很好的工具,可以保证您的工作进程正常运行总是运行(即使由于失败、达到消息限制或由于信使:stop-workers).你可以在Ubuntu上安装它,例如,通过:

1
Sudo apt-get安装管理器

管理器配置文件通常位于/etc/supervisor/conf.d目录中。例如,您可以创建一个新的messenger-worker.conf文件来确保有两个实例信使:消费一直在运行:

1 2 3 4 5 6 7 8 9 10
、/ etc /主管/ conf.d / messenger-worker.conf(项目:messenger-consume)命令=php /path/to/your/app/bin/console messenger:consume async——time-limit=3600用户= ubuntunumprocs2startsecs0自动启动真正的autorestart真正的startretries10process_name= % s_ (program_name) % (process_num)02d

改变异步参数,以使用传输器的名称和用户到您的服务器上的Unix用户。

谨慎

在部署过程中,某些东西可能不可用(例如数据库),导致使用者无法启动。在这种情况下,主管会尝试startretries重启命令的次数。请确保更改此设置,以避免命令处于FATAL状态,该状态将永远不会重新启动。

每重启一次,“监控器”将延迟增加1秒。例如,如果值为10,它将等待1秒,2秒,3秒,等等。这样,该服务总共有55秒的时间恢复可用。增加了startretries设置为覆盖最大预期停机时间。

如果你使用Redis Transport,请注意每个worker都需要一个唯一的消费者名,以避免相同的消息被多个worker处理。实现这一点的一种方法是在Supervisor配置文件中设置一个环境变量,然后可以在messenger.yaml(参见上面的Redis部分):

1
环境= MESSENGER_CONSUMER_NAME = % s_ (program_name) % (process_num)02d

接下来,告诉Supervisor读取你的配置并启动你的workers:

1 2 3 4 5
Sudo supervisor orctl重读Sudo supervisor orctl updateSudo monitorctl start message -consume:*

看到主管医生欲知详情。

优雅的关闭

如果你安装了PCNTLPHP扩展在您的项目中,工作人员将处理SIGTERMPOSIX信号在终止前完成对当前消息的处理。

在某些情况下SIGTERM信号由Supervisor本身发送(例如,停止一个以Supervisor为入口点的Docker容器)。在这些情况下,您需要添加stopwaitsecs程序配置键(具有所需宽限期的值,以秒为单位),以执行安全关机:

1 2
(项目:x)stopwaitsecs20.

Systemd配置

虽然Supervisor是一个很好的工具,但它有一个缺点,那就是需要系统访问才能运行它。Systemd已经成为大多数Linux发行版的标准,并且有一个很好的替代方案叫做Systemd用户服务

Systemd用户服务配置文件通常位于~ / config / systemd /用户目录中。例如,您可以创建一个新的messenger-worker.service文件。或者一个messenger-worker@.service如果你想让更多的实例同时运行:

1 2 3 4 5 6 7 8 9 10
(单位)描述=ob娱乐下载Symfony信使消费%i(服务)ExecStart=php /path/to/your/app/bin/console messenger:consume async——time-limit=3600重新启动=总RestartSec30.(安装)WantedBy= default.target

现在,告诉systemd启用并启动一个worker:

1 2 3 4 5 6
systemctl——用户启用messenger-worker@1.serviceSystemctl——user start messenger-worker@1.service#启用并启动20个workersystemctl——用户启用messenger-worker@ {1 . . 20} .serviceSystemctl——user start message -worker@{1..20}.service

如果你改变了你的服务配置文件,你需要重新加载守护进程:

1
Systemctl—user daemon-reload

重新启动所有的消费者:

1
Systemctl——user restart messenger- consumer @*.service

systemd用户实例仅在特定用户第一次登录后启动。消费者通常需要在系统引导时启动。启用徘徊用户激活该行为:

1
loginctl启用玲儿<用户名>

日志由日志管理,可以使用journalctl命令进行操作:

1 2 3 4 5 6 7 8
#跟踪用户的日志Journalctl -f——user-unit messenger-consume@11.service#跟踪所有消费者的日志Journalctl -f——user-unit message - consumer @*#跟踪用户服务的所有日志journalctl -f _UID= .日志含义UID

看到systemd文档欲知详情。

请注意

的高级权限journalctl命令,或者将您的用户添加到system -journal组:

1
sudo usermod -a -G system -journal 

无状态的工人

PHP被设计成无状态的,在不同的请求之间没有共享资源。在HTTP上下文中,PHP在发送响应后清理所有内容,因此您可以决定不处理可能泄漏内存的服务。

另一方面,工作人员通常在长时间运行的CLI进程中按顺序处理消息,这些进程在处理单个消息后不会结束。注意服务状态,以防止信息和/或内存泄漏,因为Symfony将在所有消息中注入相同的服务实例,从而保留服务的内部状态。ob娱乐下载

然而,某些Symfony服务,如Mob娱乐下载onolog手指交叉处理程序,泄漏的设计。ob娱乐下载Symfony提供了一个服务重启特性来解决这个问题。当在两条消息之间自动重置容器时,Symfony将查找所有已实现的服务ob娱乐下载ResetInterface(包括您自己的服务)并调用他们的服务重置()方法,以便它们可以清理其内部状态。

如果服务不是无状态的,并且希望在每条消息之后重置其属性,则必须实现该服务ResetInterface属性中的属性重置()方法。

如果不想重置容器,请添加——无重置选项。信使:消费命令。

6.1

在6.ob娱乐下载1之前的Symfony版本中,服务容器不会在消息之间自动重置,您必须设置framework.messenger.reset_on_message选项真正的

重试和失败

如果在使用来自传输的消息时抛出异常,则该异常将自动重新发送到传输以再次尝试。缺省情况下,一条消息将被重试3次才被丢弃或丢弃发送到故障传输.每次重试也将被延迟,如果失败是由于临时问题。所有这些对于每个传输都是可配置的:

  • YAML
  • XML
  • PHP
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#配置/包/ messenger.yaml框架:信使:传输:async_priority_high:dsn:' % env (MESSENGER_TRANSPORT_DSN) %#默认配置retry_strategy:max_retries:3.#毫秒延迟延迟:1000#导致每次重试前的延迟更高#例:1秒延迟,2秒,4秒乘数:2max_delay:0#覆盖所有这些服务#实现Symfony\Coob娱乐下载mponent\Messenger\Retry\RetryStrategyInterface# service: null

提示

ob娱乐下载Symfony触发WorkerMessageRetriedEvent当消息被重试时,您可以运行自己的逻辑。

请注意

多亏了SerializedMessageStamp,消息的序列化形式将被保存,这将防止在稍后重试消息时再次序列化它。

6.1

SerializedMessageStamp类在Symfony 6.1中引入。ob娱乐下载

避免错误

有时,处理消息可能会失败知道是永久性的,不应该再尝试。如果你扔UnrecoverableMessageHandlingException,该消息将不会被重试。

强制重试

有时,处理消息一定会以某种方式失败知道是暂时的,必须重新尝试。如果你扔RecoverableMessageHandlingException,该消息将永远被重试无限和max_retries设置将被忽略。

保存和重试失败的消息

如果消息失败,则会重试多次(max_retries),然后会被丢弃。为了避免这种情况发生,您可以配置failure_transport

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:#重试后,消息将被发送到“失败”的传输failure_transport:失败的传输:#……其他传输失败:“教义:/ /违约?queue_name =失败'

在本例中,如果处理消息失败3次(默认情况下)max_retries),然后发送至失败的交通工具。当你可以使用信使:消费失败要像普通传输一样使用它,您通常需要手动查看失败传输中的消息,并选择重试它们:

12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
#查看失败传输中的所有消息,默认限制为50条PHP bin/console messenger:failed:show#请看前10条消息PHP bin/console message:failed:show——max=10#只看到MyClass消息PHP bin/console message:failed:show——class-filter=“MyClass”#按消息类查看消息的数量PHP bin/console messenger:failed:show——stats . txt#查看有关特定故障的详细信息PHP bin/console message:failed:show 20 -vv#逐个查看和重试消息PHP bin/console message:failed:retry -vv . txt#重试特定消息PHP bin/console message:failed:retry 20 30——force#删除消息而不重新尝试它PHP bin/console messenger:failed:remove 20#删除消息而不重新尝试它们,并在删除之前显示每条消息PHP bin/console message:failed:remove 20 30——show-messages

6.2

——类过滤器而且——统计数据在Symfony 6.2中引入了选项。ob娱乐下载

如果消息再次失败,它将被重新发送回由于正常的失败传输重试规则.一旦达到最大重试,消息将被永久丢弃。

多次失败的传输

有时候,只有一个单一的、全球性的方案是不够的没有运输配置是因为某些消息比其他消息更重要。在这些情况下,你可以只覆盖特定传输的失败传输:

  • YAML
  • XML
  • PHP
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#配置/包/ messenger.yaml框架:信使:#重试后,消息将被发送到“失败”的传输如果在传输中没有配置“failed_transport”,则默认为#failure_transport:failed_default传输:async_priority_high:dsn:' % env (MESSENGER_TRANSPORT_DSN) %failure_transport:failed_high_priority#因为没有配置失败的传输,所以使用的传输将是#全局的“failure_transport”集合async_priority_low:dsn:“教义:/ /违约?queue_name=async_priority_low'failed_default:“教义:/ /违约?queue_name=failed_default'failed_high_priority:“教义:/ /违约?queue_name=failed_high_priority'

如果没有failure_transport全局定义或在传输级别上定义,消息将在重试次数之后被丢弃。

失败的命令有一个可选选项——运输要指定failure_transport在传输级别配置。

1 2 3 4 5 6 7 8
#查看"failure_transport"传输中的所有消息PHP bin/console messenger:failed:show——transport=failure_transport#重试来自failure_transport的特定消息PHP bin/console message:failed:retry 20 30——transport=failure_transport——force#从"failure_transport"中删除消息而不重新尝试PHP bin/console messenger:failed:remove 20——transport=failure_transport

传输配置

Messenger支持许多不同的传输类型,每种类型都有自己的选项。选项可以通过DSN字符串或配置传递到传输。

1 2
# .envMESSENGER_TRANSPORT_DSN = amqp: / / localhost / % 2 f /消息吗?auto_setup =
  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8
#配置/包/ messenger.yaml框架:信使:传输:my_transport:dsn:“% env (MESSENGER_TRANSPORT_DSN) %”选项:auto_setup:

下面定义的选项选项优先于DSN中定义的。

AMQP运输

AMQP传输使用AMQP PHP扩展向RabbitMQ等队列发送消息。通过运行:

1
作曲家需要symfony/amqpob娱乐下载-messenger

AMQP传输DSN可能是这样的:

1 2 3 4 5
# .envMESSENGER_TRANSPORT_DSN = amqp: / /客人:guest@localhost: 5672 / % 2 f /消息#或使用AMQPS协议MESSENGER_TRANSPORT_DSN = amqp: / /客人:guest@localhost / % 2 f /消息

如果要使用TLS/SSL加密的AMQP,还必须提供CA证书。中定义证书路径amqp.cacertPHP.ini设置(例如:amqp.cacert=/etc/ssl/certs)或在cacert深空网络参数(例如amqp: / / localhost ?cacert=/etc/ssl/certs/).

TLS/SSL加密的AMQP使用的默认端口是5671,但您可以在港口深空网络参数(例如:amqp: / / localhost ?cacert=/etc/ssl/certs/&port=12345).

请注意

默认情况下,传输将自动创建所需的任何交换机、队列和绑定密钥。这可以被禁用,但某些功能可能无法正常工作(如延迟队列)。若要不自动创建任何队列,可以配置传输队列:[]

请注意

可以将AMQP传输的使用者限制为仅处理来自交换机某些队列的消息。看到信使:同步和排队消息处理

传输有许多其他选项,包括配置交换、队列绑定键等。请参阅有关欧宝官网下载app连接

传输有许多选项:

选项 描述 默认的
auto_setup 在发送/获取期间是否应该自动创建交换机和队列。 真正的
cacert PEM格式CA证书文件的路径。
cert PEM格式的客户端证书路径。
channel_max 指定服务器允许的最高通道号。0表示标准扩展限制
confirm_timeout 确认超时时间(以秒为单位);如果未指定,传输将不等待消息确认。注意:0或更大秒。可能是小数。
connect_timeout 连接超时。注意:0或更大秒。可能是小数。
frame_max 服务器为连接建议的最大帧大小,包括帧头和结束字节。0表示标准扩展限制(取决于librabbimq默认的帧大小限制)
心跳 服务器需要的连接心跳的延迟(以秒为单位)。0表示服务器不需要心跳。注意,librabbitmq的心跳支持有限,这意味着仅在阻塞调用期间检查心跳。
宿主 AMQP服务的主机名
关键 PEM格式的客户端密钥路径。
登录 用于连接AMQP服务的用户名
密码 用于连接AMQP服务的密码
持续的 “假”
港口 AMQP服务的端口
read_timeout 超时进行收入活动。注意:0或更大秒。可能是小数。
重试
sasl_method
connection_name 对于自定义连接名(至少需要1.10版本的PHP AMQP扩展)
验证 开启/关闭对等体验证。如果启用了对等验证,则服务器证书中的公共名称必须与服务器名称匹配。缺省情况下,开启对等体验证。
vhost 与AMQP服务一起使用的虚拟主机
write_timeout 输出活动超时。注意:0或更大秒。可能是小数。
延迟(queue_name_pattern) 用于创建队列的模式 delay_ % exchange_name % _ % routing_key % _ %延迟%
延迟(exchange_name) 用于延迟/重试消息的交换机名称 延迟
队列[名称](参数) 额外的参数
队列[名字][binding_arguments] 在绑定队列时使用的参数。
队列[名字][binding_keys] 绑定到此队列的绑定键(如果有)
队列[名字][标记] 队列的旗帜 AMQP_DURABLE
交换(参数) 交换的额外参数(例如:alternate-exchange
交换(default_publish_routing_key) 如果消息上没有指定,则在发布时使用的路由键
交换(旗帜) 交换的旗帜 AMQP_DURABLE
交换(名字) 交易所名称
交换(类型) 交换类型 扇出

6.1

connection_name选项在Symfony 6.1中引入。ob娱乐下载

您还可以通过添加在邮件上配置特定于amqp的设置AmqpStamp寄往你的信封:

1 2 3 4 5 6 7
使用ob娱乐下载组件信使Amqp运输AmqpStamp/ /……属性= [];公共汽车->调度(SmsNotification (), (AmqpStamp (“custom-routing-key”AMQP_NOPARAM,属性)));

谨慎

使用者不会显示在管理面板中,因为此传输不依赖于\ AmqpQueue:消费()这就是阻塞。有一个拦截接球手使——时限/内存限制的选项信使:消费命令以及信使:stop-workers命令效率很低,因为它们都依赖于接收者无论是否找到消息都立即返回的事实。消费工作者负责迭代,直到它接收到要处理的消息和/或直到达到其中一个停止条件。因此,如果worker卡在阻塞调用中,则无法到达它的停止逻辑。

教义运输

Doctrine传输可用于在数据库表中存储消息。通过运行:

1
作曲家需要交响乐/教义传播者ob娱乐下载

教条传输DSN可能看起来像这样:

1 2
# .envMESSENGER_TRANSPORT_DSN =学说:/ /违约

格式为原则:/ / < connection_name >,以防你有多个连接,并希望使用“默认”以外的一个。传输将自动创建一个名为messenger_messages

或者,要自己创建表,请设置auto_setup选项而且生成迁移

提示

为了避免像Doctrine Migrations这样的工具试图删除这个表,因为它不是常规模式的一部分,您可以设置schema_filter选择:

  • YAML
  • XML
  • PHP
1 2 3 4
#配置/包/ doctrine.yaml原则:dbal:schema_filter:~ ^ (? ! messenger_messages) ~”

谨慎

存储在数据库中的消息的datetime属性使用当前系统的时区。如果具有不同时区配置的多台计算机使用相同的存储,则可能会导致问题。

传输有许多选项:

选项 描述 默认的
table_name 表的名称 messenger_messages
queue_name 队列的名称(表中的列,用于使用一个表进行多个传输) 默认的
redeliver_timeout 重试队列中处于“处理”状态的消息前的超时(如果工作人员由于某种原因停止,将会发生这种情况,最终您应该重试该消息)-以秒为单位。 3600
auto_setup 是否在发送/获取期间自动创建表。 真正的

请注意

redeliver_timeout到大于您的最慢消息持续时间的值。否则,有些消息将在第一个消息仍在处理时第二次启动。

在使用PostgreSQL时,您可以访问以下选项来利用听/通知特性。这允许一种比Doctrine传输的默认轮询行为更有效的方法,因为当新消息插入到表中时,PostgreSQL将直接通知工作者。

选项 描述 默认的
use_notify 是否使用LISTEN/NOTIFY。 真正的
check_delayed_interval 检查延迟消息的时间间隔,以毫秒为单位。设置为0禁用检查。 60000
get_notify_timeout 呼叫时等待应答的时间长度PDO: pgsqlGetNotify单位为毫秒。 0

Beanstalkd运输

Beanstalkd传输将消息直接发送到Beanstalkd工作队列。通过运行:

1
作曲家需要symfony/beanob娱乐下载stalk -messenger

Beanstalkd传输深空网络可能看起来像这样:

1 2 3 4 5
# .envMESSENGER_TRANSPORT_DSN = beanstalkd: / / localhost: 11300 ?tube_name = foo&timeout = 4竞技场队伍= 120#如果没有端口,默认为11300MESSENGER_TRANSPORT_DSN = beanstalkd: / / localhost

传输有许多选项:

选项 描述 默认的
tube_name 队列名称 默认的
超时 消息保留超时-以秒为单位。 0(将导致服务器立即返回响应或将抛出TransportException)
竞技场队伍 消息在放入就绪队列之前运行的时间(以秒为单位)。 90

复述,运输

Redis传输使用使消息排队。这个传输需要Redis PHP扩展(>=4.3)和一个运行的Redis服务器(^5.0)。通过运行:

1
作曲家需要symfony/red ob娱乐下载-messenger

Redis传输DSN可能是这样的:

1 2 3 4 5 6 7 8
# .envMESSENGER_TRANSPORT_DSN =复述:/ / localhost: 6379 /消息#全DSN示例MESSENGER_TRANSPORT_DSN =复述:/ / password@localhost: 6379 /信息/ symob娱乐下载fony /消费者?auto_setup =真正的序列化器&dbindex = 0 = 1 &stream_max_entries = 0# Redis集群示例MESSENGER_TRANSPORT_DSN =复述:/ / host-01:6379复述:/ / host-02:6379,复述:/ / host-03:6379,复述:/ / host-04:6379# Unix Socket示例MESSENGER_TRANSPORT_DSN =复述:/ / / var /运行/ redis.sock

可以通过DSN或选项钥匙下运输进去messenger.yaml

选项 描述 默认的
Redis流的名称 消息
集团 Redis消费组名称 ob娱乐下载
消费者 Redis中使用的消费者名 消费者
auto_setup 自动创建Redis组? 真正的
身份验证 Redis密码
delete_after_ack 如果真正的,消息处理后会自动删除 真正的
delete_after_reject 如果真正的,如果邮件被拒绝,则会自动删除 真正的
懒惰的 只有在真正需要连接时才进行连接
序列化器 如何在Redis中序列化最终的有效载荷复述:OPT_SERIALIZER选项) 复述:SERIALIZER_PHP
stream_max_entries 流将被修剪到的最大条目数。将其设置为足够大的数字以避免丢失挂起的消息 0(意思是“不修剪”)
tls 为连接启用TLS支持
redeliver_timeout 重试被放弃的消费者拥有的挂起消息之前的超时(如果工作人员由于某种原因死亡,将会发生这种情况,最终您应该重试消息)-以秒为单位。 3600
claim_interval 应检查挂起/放弃消息是否声明的时间间隔——以毫秒为单位 60000(1分钟)
persistent_id 字符串,如果空连接是非持久的。
retry_interval Int,单位为毫秒的值 0
read_timeout 浮动,以秒为单位的值默认为无限 0
超时 浮动,以秒为单位的值默认为无限 0
sentinel_master 如果禁用了哨兵支持,则返回null或空

6.1

persistent_idretry_intervalread_timeout超时,sentinel_master在Symfony 6.1中引入了选项。ob娱乐下载

谨慎

不应该超过一个信使:消费命令以相同的组合运行集团而且消费者,或者消息最终可能被处理多次。如果运行多个队列工作器,消费者可以设置为环境变量,比如% env (MESSENGER_CONSUMER_NAME) %,由Supervisor设置(下面的例子)或任何其他用于管理工作进程的服务。在容器环境中,主机名可以用作使用者名,因为每个容器/主机只有一个worker。如果使用Kubernetes来编排容器,请考虑使用StatefulSet要有稳定的名称。

提示

delete_after_ack真正的(如果您使用单个组)或定义stream_max_entries(如果您可以估计在您的情况下有多少Max条目是可接受的)以避免内存泄漏。否则,所有消息将永远保存在Redis中。

内存传输

内存中传输实际上并不传递消息。相反,它在请求期间将它们保存在内存中,这对测试很有用。例如,如果你有一个async_priority_normal传送器,你可以在测验使用此传输的环境:

  • YAML
  • XML
  • PHP
1 2 3 4 5
#配置/包/测试/ messenger.yaml框架:信使:传输:async_priority_normal:的内存:/ /

然后,在测试时,消息将会被送到真正的运输。更好的是,在测试中,您可以检查在请求期间发送的消息:

12 3 4 5 6 7 8 9 10 11 12 13 14 16 17 18 19 20
/ /测试/控制器/ DefaultControllerTest.php名称空间应用程序测试控制器使用ob娱乐下载FrameworkBundle测试WebTestCase使用ob娱乐下载组件信使运输InMemoryTransportDefaultControllerTest扩展WebTestCase公共函数testSomething()客户端静态::createClient ();/ /……->assertSame (200客户端->getResponse ()->getStatusCode ());/*@varInMemoryTransport $transport */运输->getContainer ()->get (“messenger.transport.async_priority_normal”);->assertCount (1运输->getSent ());}}

传输有许多选项:

序列化(布尔,默认值:
是否序列化消息。这对于测试附加层非常有用,特别是当您使用自己的消息序列化器时。

请注意

所有内存中每次测试后,传输将自动重置扩展测试类KernelTestCaseWebTestCase

Amazon SQS

Amazon SQS传输非常适合托管在AWS上的应用程序。通过运行:

1
需要symfony/amazon-ob娱乐下载sqs-messenger

SQS传输DSN可能是这样的:

1 2 3
# .envMESSENGER_TRANSPORT_DSN = https://sqs.eu -西方- 3. - amazonaws.com/123456789012/messages?access_key=akiaiosfodnn7example&secret_key=j17m97ffsvoki0brifoo9a MESSENGER_TRANSPORT_DSN = sqs: / / localhost: 9494 /消息吗?sslmode =禁用

请注意

传输将自动创建所需的队列。属性可以禁用此功能auto_setup选项

提示

在发送或接收消息之前,Symfony需要将队列名称转换为AWS队列URLob娱乐下载GetQueueUrlAWS中的API。这个额外的API调用可以通过提供一个DSN(即队列URL)来避免。

传输有许多选项:

选项 描述 默认的
access_key AWS访问密钥 必须进行url编码
账户 AWS帐户标识符 凭证的所有者
auto_setup 是否应该在发送/获取期间自动创建队列。 真正的
buffer_size 需要预取的消息数 9
调试 如果真正的它会记录所有HTTP请求和响应(会影响性能)
端点 SQS服务的绝对URL https://sqs.eu-west-1.amazonaws.com
poll_timeout 等待新消息持续时间(以秒为单位) 0.1
queue_name 队列名称 消息
地区 AWS区域名称 一来就
secret_key AWS密钥 必须进行url编码
session_token AWS会话令牌
visibility_timeout 消息不可见的秒数(可见性超时值 队列的配置
wait_time 长轮询持续时间(秒) 20.

6.1

session_token选项在Symfony 6.1中引入。ob娱乐下载

请注意

wait_time参数定义了Amazon SQS在发送响应之前等待消息在队列中可用的最长时间。通过消除空响应的数量,它有助于降低使用Amazon SQS的成本。

poll_timeout参数定义了接收者在返回null之前应该等待的时间。它避免了阻塞其他接收器被调用。

请注意

如果队列名的后缀为.fifo, AWS将创建一个先进先出队列.使用邮票AmazonSqsFifoStamp定义消息组ID消息重复数据删除ID

FIFO队列不支持设置每条消息的延迟,值为延迟:0在重试策略设置中是必需的。

序列化消息

当消息被发送到(和从)传输时,它们使用PHP的本机序列化serialize ()unserialize ()功能。您可以将此全局(或为每个传输)更改为实现的服务SerializerInterface

  • YAML
  • XML
  • PHP
12 3 4 5 6 7 8 9 10 11 12 13
#配置/包/ messenger.yaml框架:信使:序列化器:default_serializer:messenger.transport.ob娱乐下载symfony_serializerob娱乐下载symfony_serializer:格式:json背景:传输:async_priority_normal:dsn:#……序列化器:messenger.transport.ob娱乐下载symfony_serializer

messenger.transport.ob娱乐下载symfony_serializer是使用序列化器组件可以通过几种方式进行配置。如果你选择使用Symfony序列化器时,您ob娱乐下载可以通过SerializerStamp(见信封及邮票).

提示

当向另一个应用程序发送/接收消息/从另一个应用程序接收消息时,您可能需要更多地控制序列化过程。使用自定义序列化器提供该控件。看到ob娱乐下载symfonycast的消息序列化器教程获取详细信息。

自定义处理程序

使用属性配置处理程序

你可以通过将选项传递给属性来配置你的处理程序:

12 3 4 5 6 7 8 9 10 11 12 13 14 15
/ / src / MessageHandler / SmsNotificationHandler.php名称空间应用程序MessageHandler使用应用程序消息OtherSmsNotification使用应用程序消息SmsNotification使用ob娱乐下载组件信使属性AsMessageHandler#[AsMessageHandler(fromTransport: 'async', priority: 10)]SmsNotificationHandler公共函数__invoke(SmsNotification消息/ /……}}

可以配置该属性的选项有:

  • 公共汽车
  • fromTransport
  • 处理
  • 方法
  • 优先级

手动配置处理程序

ob娱乐下载Symfony通常会自动查找并注册处理程序.但是,您也可以手动配置处理程序,并通过标记处理程序服务来传递一些额外的配置messenger.message_handler

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10 11
#配置/ services.yaml服务:应用MessageHandler \ \ SmsNotificationHandler:标签:(messenger.message_handler)#或配置选项标签:-名称:messenger.message_handler#仅当无法通过type-hint猜出时需要处理:应用\ \ SmsNotification消息

使用标记配置的可能选项有:

  • 公共汽车
  • from_transport
  • 处理
  • 方法
  • 优先级

处理多条消息

一个处理程序类可以处理多个消息。为此,添加# AsMessageHandler属性为所有处理方法:

12 3 4 5 6 7 8 9 10 11 12 13 14 16 17 18 19 20
/ / src / MessageHandler / SmsNotificationHandler.php名称空间应用程序MessageHandler使用应用程序消息OtherSmsNotification使用应用程序消息SmsNotificationSmsNotificationHandler# (AsMessageHandler)公共函数handleSmsNotification(SmsNotification消息/ /……# (AsMessageHandler)公共函数handleOtherSmsNotification(OtherSmsNotification消息/ /……}}

6.2

实现MessageSubscriberInterface是用一个处理程序类处理多个消息的另一种方法。此接口在Symfony 6.2中已弃用。ob娱乐下载

将处理程序绑定到不同的传输

每个消息可以有多个处理程序,以及当使用消息时所有的处理程序调用。但也可以配置处理程序,使其仅在从具体的交通工具。这允许您拥有一个单独的消息,其中每个处理程序由使用不同传输的不同“工作者”调用。

假设你有一个UploadedImage带有两个处理程序的消息:

  • ThumbnailUploadedImageHandler:您希望由名为image_transport
  • NotifyAboutNewUploadedImageHandler:您希望由名为async_priority_normal

要做到这一点,请添加from_transport每个处理程序的选项。例如:

12 3 4 5 6 7 8 9 10 11 12 13
/ / src / MessageHandler / ThumbnailUploadedImageHandler.php名称空间应用程序MessageHandler使用应用程序消息UploadedImage# (AsMessageHandler (from_transport:“image_transport”)]ThumbnailUploadedImageHandler公共函数__invoke(UploadedImageuploadedImage//做一些缩略图}}

类似的:

1 2 3 4 5 6 7 8
/ / src / MessageHandler / NotifyAboutNewUploadedImageHandler.php/ /……# (AsMessageHandler (from_transport:“async_priority_normal”)]NotifyAboutNewUploadedImageHandler/ /……

然后,确保将你的信息“路由”到这两个传输:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:传输:async_priority_normal:#……image_transport:#……路由:#……“消息应用\ \ UploadedImage”[image_transport,async_priority_normal]

就是这样!您现在可以消费每个传输:

1 2 3 4
#在处理消息时只调用ThumbnailUploadedImageHandlerPHP bin/console messenger:consume image_transport -vvPHP bin/console messenger:consume async_priority_normal -vv

谨慎

如果处理程序from_transportConfig,它将被执行每一个接收消息的传输。

延长信使

信封及邮票

消息可以是任何PHP对象。有时,您可能需要配置关于消息的一些额外的东西——比如在AMQP中处理消息的方式,或者在处理消息之前添加一个延迟。你可以通过在你的邮件中添加一个“stamp”来做到这一点:

12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
使用ob娱乐下载组件信使信封使用ob娱乐下载组件信使MessageBusInterface使用ob娱乐下载组件信使邮票DelayStamp公共函数指数(MessageBusInterface公共汽车公共汽车->调度(SmsNotification (“……”), (//处理前等待5秒DelayStamp (5000)));//或显式地创建一个Envelope公共汽车->调度(信封(SmsNotification (“……”), (DelayStamp (5000)));/ /……

在内部,每条消息都包装在信封,用于保存消息和邮票。您可以手动创建,也可以让消息总线来创建。针对不同的目的,有各种不同的戳记,它们在内部用于跟踪关于消息的信息——比如处理消息的消息总线,或者在失败后重试消息。

中间件

将消息分派到消息总线时会发生什么,这取决于中间件的集合及其顺序。默认情况下,为每个总线配置的中间件是这样的:

  1. add_bus_name_stamp_middleware-添加一个戳来记录该消息被发送到哪个总线;
  2. dispatch_after_current_bus——看事务性消息:在处理完成后处理新消息
  3. failed_message_processing_middleware参数处理正在重试的消息传输失败使它们正常工作,就像从原始运输中接收它们一样;
  4. 你自己的收藏中间件
  5. 的send_message如果为传输配置了路由,这将发送消息到该传输并停止中间件链;
  6. handle_message-调用给定消息的消息处理程序。

请注意

这些中间件名称实际上是快捷方式名称。真实的服务id前缀为messenger.middleware。(如。messenger.middleware.handle_message).

中间件在消息被分派时执行,但是同样,当通过worker接收消息时(对于发送到要异步处理的传输的消息)。如果您创建自己的中间件,请记住这一点。

您可以将自己的中间件添加到此列表中,或者完全禁用默认中间件和只有包括你自己的:

  • YAML
  • XML
  • PHP
12 3 4 5 6 7 8 9 10 11 12 13
#配置/包/ messenger.yaml框架:信使:公共汽车:messenger.bus.default:#禁用默认中间件default_middleware:#和/或添加您自己的中间件:#实现Symfony\Component\Messengeob娱乐下载r\Middleware\MiddlewareInterface的服务id-“应用程序、中间件、MyMiddleware”-“应用程序、中间件、AnotherMiddleware”

请注意

如果中间件服务是抽象的,则每个总线将创建该服务的不同实例。

教义中间件

1.11

以下Doctrine中间件是在DoctrineBundle 1.11中引入的。

如果你在你的应用中使用Doctrine,你可能会想要使用一些可选的中间件:

  • YAML
  • XML
  • PHP
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
#配置/包/ messenger.yaml框架:信使:公共汽车:command_bus:中间件:#每次处理消息时,Doctrine连接#被“ping”并重新连接,如果它关闭。有用的#如果你的工作运行了很长时间,数据库#连接有时会丢失-doctrine_ping_connection#处理后,Doctrine连接关闭,#可以释放一个worker中的数据库连接,#而不是让它们永远打开-doctrine_close_connection#将所有处理程序包装在单个Doctrine事务中#处理程序不需要调用flush()和错误#将导致回滚-doctrine_transaction#或将不同的实体管理器传递给任何#- doctrine_transaction: ['custom']

其他中间件)

添加router_context中间件,如果你需要在消费者中生成绝对的url(例如,呈现一个带有链接的模板)。这个中间件存储在构建绝对url时所需的原始请求上下文(即主机、HTTP端口等)。

添加验证中间件,如果您需要使用验证器组件在处理它之前。如果验证失败,aValidationFailedException会被扔。的ValidationStamp可用于配置验证组。

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8
#配置/包/ messenger.yaml框架:信使:公共汽车:command_bus:中间件:-router_context-验证

“信使号”事件

除了中间件之外,Messenger还分派几个事件。你可以创建事件监听器以钩进过程的各个部分。对于每个事件,事件类是事件名称:

多总线,命令和事件总线

默认情况下,Messenger为您提供单个消息总线服务。但是,你可以任意配置,创建“命令”、“查询”或“事件”总线并控制它们的中间件。看到多个公共汽车

此工作,包括代码示例,是根据创作共用BY-SA 3.0许可证。
ob娱乐下载Symfony 6.2支持通过苏禄人
ob娱乐下载Symfony 6.2支持通过Les-Tilleuls.coop