信使:同步和排队消息处理
编辑本页信使:同步和排队消息处理
Messenger提供了一个消息总线,它能够发送消息,然后立即在应用程序中处理它们,或者通过稍后处理的传输(例如队列)发送它们。要更深入地了解它,请阅读Messenger组件文档.
创建消息和处理程序
Messenger围绕您将创建的两个不同的类展开:(1)保存数据的消息类和(2)消息发送时将调用的处理程序类。处理程序类将读取消息类并执行一个或多个任务。
对于消息类没有特定的要求,除了它可以被序列化:
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/ / src /信息/ SmsNotification.php名称空间应用程序\消息;类SmsNotification{私人$内容;公共函数__construct(字符串$内容){$这->内容=$内容;}公共函数getContent():字符串{返回$这->内容;}}
消息处理程序是PHP可调用的,创建它的推荐方法是创建一个具有AsMessageHandler属性,并且具有__invoke ()
使用消息类(或消息接口)类型提示的方法:
12 3 4 5 6 7 8 9 10 11 12 13 14
/ / src / MessageHandler / SmsNotificationHandler.php名称空间应用程序\MessageHandler;使用应用程序\消息\SmsNotification;使用ob娱乐下载\组件\信使\属性\AsMessageHandler;# (AsMessageHandler)类SmsNotificationHandler{公共函数__invoke(SmsNotification$消息){/ /……做一些工作——比如发短信!}}
提示
你也可以使用# (AsMessageHandler)
属性。您可以对单个类中的任意多个方法使用该属性,从而允许您对多个相关类型的消息的处理进行分组。
6.1
支持# (AsMessageHandler)
在Symfony 6.1中引入了on方法。ob娱乐下载
多亏了自动配置和SmsNotification
类型提示,Symfonob娱乐下载y知道这个处理程序应该被调用当SmsNotification
消息被发送。大多数情况下,这就是你所需要做的。但是你也可以手动配置消息处理程序.要查看所有已配置的处理程序,运行:
1
$PHP bin/控制台调试:messenger
发送消息
你准备好了!要分派消息(并调用处理程序),请注入messenger.default_bus
服务(透过MessageBusInterface
),就像控制器一样:
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/ / src /控制器/ DefaultController.php名称空间应用程序\控制器;使用应用程序\消息\SmsNotification;使用ob娱乐下载\包\FrameworkBundle\控制器\AbstractController;使用ob娱乐下载\组件\信使\MessageBusInterface;类DefaultController扩展AbstractController{公共函数指数(MessageBusInterface$公共汽车){//将导致SmsNotificationHandler被调用$公共汽车->调度(新SmsNotification (“看!我创造了一条信息!”));/ /……}}
传输:异步/队列消息
默认情况下,消息一经分派就立即进行处理。如果希望异步处理消息,可以配置传输。传输能够发送消息(例如到队列系统),然后通过工作人员接收它们.信使支持多种传输.
请注意
如果要使用不受支持的传输,请查看排队的交通,它支持Kafka和谷歌Pub/Sub。
传输使用“DSN”进行注册。多亏了Messenger的Flex配方,你的.env
文件中已经有了一些例子。
1 2 3
# MESSENGER_TRANSPORT_DSN = amqp: / /客人:guest@localhost: 5672 / % 2 f /消息# MESSENGER_TRANSPORT_DSN =学说:/ /违约# MESSENGER_TRANSPORT_DSN =复述:/ / localhost: 6379 /消息
取消注释您想要的任何传输(或将其设置为.env.local
).看到信使:同步和排队消息处理欲知详情。
接下来,在配置/包/ messenger.yaml
,让我们定义一个名为异步
使用这样的配置:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:传输:异步:“% env (MESSENGER_TRANSPORT_DSN) %”#或展开以配置更多选项#异步:# dsn: "%env(MESSENGER_TRANSPORT_DSN)%"# options: []
将消息路由到传输
现在您已经配置了传输,而不是立即处理消息,您可以将它们配置为发送到传输:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9
#配置/包/ messenger.yaml框架:信使:传输:异步:“% env (MESSENGER_TRANSPORT_DSN) %”路由:# async是上面你给传输的任何名称“消息应用\ \ SmsNotification”:异步
多亏了这一点应用\ \ SmsNotification消息
会被送到哪里异步
传输和它的处理程序将不马上叫我来。下未匹配的任何消息路由
仍将立即处理,即同步处理。
请注意
你可以使用‘*’
作为消息类。这将作为未匹配的任何消息的默认路由规则路由
.默认情况下,这有助于确保不同步处理任何消息。
唯一的缺点就是‘*’
也适用于使用Symfony Mailer发送的电子邮件(使用ob娱乐下载SendEmailMessage
当Messenger可用时)。这可能会导致问题,如果你的电子邮件是不可序列化的(例如,如果他们包括文件附件作为PHP资源/流)。
您还可以通过类的父类或接口来路由类。或者向多个传输器发送消息:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9
#配置/包/ messenger.yaml框架:信使:路由:#路由扩展此示例基类或接口的所有消息“消息应用\ \ AbstractAsyncMessage”:异步“消息应用\ \ AsyncMessageInterface”:异步“我的消息\ \ ToBeSentToTwoSenders”:(异步审计)
请注意
如果同时为子类和父类配置路由,则两条规则都将被使用。例如,如果你有一个SmsNotification
对象,从通知
,两者的路由为通知
而且SmsNotification
将被使用。
讯息中的教义实体
如果需要在消息中传递Doctrine实体,最好传递实体的主键(或处理程序实际需要的任何相关信息,例如电子邮件
,等等)而不是对象(否则你可能会看到与实体管理器相关的错误):
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/ / src /信息/ NewUserWelcomeEmail.php名称空间应用程序\消息;类NewUserWelcomeEmail{私人$用户标识;公共函数__construct(int$用户标识){$这->用户id =$用户标识;}公共函数getUserId():int{返回$这->用户标识;}}
然后,在你的处理程序中,你可以查询一个新对象:
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
/ / src / MessageHandler / NewUserWelcomeEmailHandler.php名称空间应用程序\MessageHandler;使用应用程序\消息\NewUserWelcomeEmail;使用应用程序\存储库\UserRepository;使用ob娱乐下载\组件\信使\属性\AsMessageHandler;# (AsMessageHandler)类NewUserWelcomeEmailHandler{私人$userRepository;公共函数__construct(UserRepository$userRepository){$这->userRepository =$userRepository;}公共函数__invoke(NewUserWelcomeEmail$welcomeEmail){$用户=$这->userRepository->找到($welcomeEmail->getUserId ());/ /……发送电子邮件!}}
这保证了实体包含新的数据。
同步处理消息
如果一个消息没有匹配任何路由规则,它不会被送往任何运输工具,会立即处理。在某些情况下(比如当将处理程序绑定到不同的传输)时,显式地处理它会更容易或更灵活:通过创建一个同步
传输和“发送”要立即处理的消息:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:传输:#……其他传输同步:“同步:/ /”路由:应用程序消息\ \ SmsNotification:同步
创建自己的交通工具
如果需要从不受支持的设备发送或接收消息,还可以创建自己的传输。看到如何创建自己的信使运输.
消费消息(运行Worker)
在大多数情况下,一旦您的消息被路由,您就需要“使用”它们。你可以用信使:消费
命令:
1 2 3 4
$PHP bin/console messenger:消耗异步#使用-vv查看正在发生的事情的细节$PHP bin/console messenger:使用async -vv
第一个参数是接收者的名称(如果路由到自定义服务,则为服务id)。默认情况下,该命令将永远运行:在传输上查找新消息并处理它们。这个命令叫做“worker”。
提示
的实例来正确地停止工作StopWorkerException.
部署到生产环境
在制作过程中,有一些重要的事情需要考虑:
- 使用过程管理器,如Supervisor或systemd来保持工人的运行
- 您将希望一个或多个“工作人员”始终运行。要做到这一点,使用过程控制系统,如主管或systemd.
- 不要让员工永远跑掉
-
有些服务(比如Doctrine的
EntityManager
)会消耗更多的内存。所以,与其让你的工作线程一直运行,不如使用一个标志信使:消费——限制= 10
告诉worker在退出前只处理10条消息(然后进程管理器将创建一个新进程)。还有其他的选择,比如——内存限制= 128
而且——期限= 3600
. - 停止遇到错误的工作人员
-
如果像数据库服务器这样的工作者依赖项停止工作,或者超时,您可以尝试添加重新连接逻辑,或者如果worker接收到太多错误,则退出该worker
——破坏极限
选项信使:消费
命令。 - 在部署时重新启动worker
-
每次部署时,您都需要重新启动所有工作进程,以便它们能够看到新部署的代码。要做到这一点,运行
信使:stop-workers
在部署。这将向每个worker发出信号,它应该完成当前正在处理的消息,并应该优雅地关闭。然后,流程管理器将创建新的工作流程。该命令使用应用程序内部缓存—因此请确保将其配置为使用您喜欢的适配器。 - 在部署之间使用相同的缓存
-
如果部署策略涉及创建新的目标目录,则应该为cache.prefix.seed配置选项,以便在部署之间使用相同的缓存名称空间。否则,
cache.app
的值kernel.project_dir
参数作为命名空间的基础,这将在每次进行新的部署时导致不同的命名空间。
优先传输
有时某些类型的消息应该具有更高的优先级,并在其他消息之前处理。为了实现这一点,您可以创建多个传输,并将不同的消息路由到它们。例如:
- YAML
- XML
- PHP
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
#配置/包/ messenger.yaml框架:信使:传输:async_priority_high:dsn:' % env (MESSENGER_TRANSPORT_DSN) %选项:# queue_name是特定于传输原则的queue_name:高#为AMQP发送到单独的交换机,然后排队#交换:#名称:高#队列:# messages_high: ~#或redis尝试“组”async_priority_low:dsn:' % env (MESSENGER_TRANSPORT_DSN) %选项:queue_name:低路由:“消息应用\ \ SmsNotification”:async_priority_low“消息应用\ \ NewUserWelcomeEmail”:async_priority_high
然后,您可以为每个传输运行单独的worker,或者指示一个worker按优先级顺序处理消息:
1
$PHP bin/console messenger:consume async_priority_high async_priority_low
工作人员总是首先寻找等待的消息async_priority_high
.如果没有,然后它将使用来自的消息async_priority_low
.
限制消费到特定的队列
一些传输(特别是AMQP)具有交换器和队列的概念。Syob娱乐下载mfony传输总是绑定到交换。默认情况下,工作线程从附加到指定传输交换机的所有队列中进行消费。然而,有些用例希望工作人员只从特定的队列中消费。
你可以限制worker只处理来自特定队列的消息:
1 2 3 4
$PHP bin/console messenger:consume my_transport——queues=fasttrack#你可以多次传递——queues选项来处理多个队列$PHP bin/console messenger:consume my_transport——queues=fasttrack1——queues=fasttrack2
请注意
允许使用队列
选项,接收方必须实现QueueReceiverInterface.
检查每次传输的排队消息数
运行信使:统计数据
命令来了解某些或所有传输的“队列”中有多少消息:
1 2 3 4 5
显示所有传输中排队消息的数量$PHP bin/console messenger:stats#只显示某些传输的统计数据$PHP bin/console messenger:stats my_transport_name other_transport_name
请注意
为了使该命令生效,配置的传输接收方必须实现MessageCountAwareInterface.
6.2
的信使:统计数据
命令在Symfony 6.2中引入。ob娱乐下载
主管配置
监控器是一个很好的工具,可以保证您的工作进程正常运行总是运行(即使由于失败、达到消息限制或由于信使:stop-workers
).你可以在Ubuntu上安装它,例如,通过:
1
$Sudo apt-get安装管理器
管理器配置文件通常位于/etc/supervisor/conf.d
目录中。例如,您可以创建一个新的messenger-worker.conf
文件来确保有两个实例信使:消费
一直在运行:
1 2 3 4 5 6 7 8 9 10
、/ etc /主管/ conf.d / messenger-worker.conf(项目:messenger-consume)命令=php /path/to/your/app/bin/console messenger:consume async——time-limit=3600用户= ubuntunumprocs=2startsecs=0自动启动=真正的autorestart=真正的startretries=10process_name= % s_ (program_name) % (process_num)02d
改变异步
参数,以使用传输器的名称和用户
到您的服务器上的Unix用户。
谨慎
在部署过程中,某些东西可能不可用(例如数据库),导致使用者无法启动。在这种情况下,主管会尝试startretries
重启命令的次数。请确保更改此设置,以避免命令处于FATAL状态,该状态将永远不会重新启动。
每重启一次,“监控器”将延迟增加1秒。例如,如果值为10
,它将等待1秒,2秒,3秒,等等。这样,该服务总共有55秒的时间恢复可用。增加了startretries
设置为覆盖最大预期停机时间。
如果你使用Redis Transport,请注意每个worker都需要一个唯一的消费者名,以避免相同的消息被多个worker处理。实现这一点的一种方法是在Supervisor配置文件中设置一个环境变量,然后可以在messenger.yaml
(参见上面的Redis部分):
1
环境= MESSENGER_CONSUMER_NAME = % s_ (program_name) % (process_num)02d
接下来,告诉Supervisor读取你的配置并启动你的workers:
1 2 3 4 5
$Sudo supervisor orctl重读$Sudo supervisor orctl update$Sudo monitorctl start message -consume:*
看到主管医生欲知详情。
Systemd配置
虽然Supervisor是一个很好的工具,但它有一个缺点,那就是需要系统访问才能运行它。Systemd已经成为大多数Linux发行版的标准,并且有一个很好的替代方案叫做Systemd用户服务.
Systemd用户服务配置文件通常位于~ / config / systemd /用户
目录中。例如,您可以创建一个新的messenger-worker.service
文件。或者一个messenger-worker@.service
如果你想让更多的实例同时运行:
1 2 3 4 5 6 7 8 9 10
(单位)描述=ob娱乐下载Symfony信使消费%i(服务)ExecStart=php /path/to/your/app/bin/console messenger:consume async——time-limit=3600重新启动=总RestartSec=30.(安装)WantedBy= default.target
现在,告诉systemd启用并启动一个worker:
1 2 3 4 5 6
$systemctl——用户启用messenger-worker@1.service$Systemctl——user start messenger-worker@1.service#启用并启动20个worker$systemctl——用户启用messenger-worker@ {1 . . 20} .service$Systemctl——user start message -worker@{1..20}.service
如果你改变了你的服务配置文件,你需要重新加载守护进程:
1
$Systemctl—user daemon-reload
重新启动所有的消费者:
1
$Systemctl——user restart messenger- consumer @*.service
systemd用户实例仅在特定用户第一次登录后启动。消费者通常需要在系统引导时启动。启用徘徊用户激活该行为:
1
$loginctl启用玲儿<用户名>
日志由日志管理,可以使用journalctl命令进行操作:
1 2 3 4 5 6 7 8
#跟踪用户的日志$Journalctl -f——user-unit messenger-consume@11.service#跟踪所有消费者的日志$Journalctl -f——user-unit message - consumer @*#跟踪用户服务的所有日志$journalctl -f _UID= .日志含义$UID
看到systemd文档欲知详情。
请注意
的高级权限journalctl
命令,或者将您的用户添加到system -journal组:
1
$sudo usermod -a -G system -journal
无状态的工人
PHP被设计成无状态的,在不同的请求之间没有共享资源。在HTTP上下文中,PHP在发送响应后清理所有内容,因此您可以决定不处理可能泄漏内存的服务。
另一方面,工作人员通常在长时间运行的CLI进程中按顺序处理消息,这些进程在处理单个消息后不会结束。注意服务状态,以防止信息和/或内存泄漏,因为Symfony将在所有消息中注入相同的服务实例,从而保留服务的内部状态。ob娱乐下载
然而,某些Symfony服务,如Mob娱乐下载onolog手指交叉处理程序,泄漏的设计。ob娱乐下载Symfony提供了一个服务重启特性来解决这个问题。当在两条消息之间自动重置容器时,Symfony将查找所有已实现的服务ob娱乐下载ResetInterface(包括您自己的服务)并调用他们的服务重置()
方法,以便它们可以清理其内部状态。
如果服务不是无状态的,并且希望在每条消息之后重置其属性,则必须实现该服务ResetInterface属性中的属性重置()
方法。
如果不想重置容器,请添加——无重置
选项。信使:消费
命令。
6.1
在6.ob娱乐下载1之前的Symfony版本中,服务容器不会在消息之间自动重置,您必须设置framework.messenger.reset_on_message
选项真正的
.
重试和失败
如果在使用来自传输的消息时抛出异常,则该异常将自动重新发送到传输以再次尝试。缺省情况下,一条消息将被重试3次才被丢弃或丢弃发送到故障传输.每次重试也将被延迟,如果失败是由于临时问题。所有这些对于每个传输都是可配置的:
- YAML
- XML
- PHP
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#配置/包/ messenger.yaml框架:信使:传输:async_priority_high:dsn:' % env (MESSENGER_TRANSPORT_DSN) %#默认配置retry_strategy:max_retries:3.#毫秒延迟延迟:1000#导致每次重试前的延迟更高#例:1秒延迟,2秒,4秒乘数:2max_delay:0#覆盖所有这些服务#实现Symfony\Coob娱乐下载mponent\Messenger\Retry\RetryStrategyInterface# service: null
提示
ob娱乐下载Symfony触发WorkerMessageRetriedEvent当消息被重试时,您可以运行自己的逻辑。
请注意
多亏了SerializedMessageStamp,消息的序列化形式将被保存,这将防止在稍后重试消息时再次序列化它。
6.1
的SerializedMessageStamp
类在Symfony 6.1中引入。ob娱乐下载
避免错误
有时,处理消息可能会失败知道是永久性的,不应该再尝试。如果你扔UnrecoverableMessageHandlingException,该消息将不会被重试。
强制重试
有时,处理消息一定会以某种方式失败知道是暂时的,必须重新尝试。如果你扔RecoverableMessageHandlingException,该消息将永远被重试无限和max_retries
设置将被忽略。
保存和重试失败的消息
如果消息失败,则会重试多次(max_retries
),然后会被丢弃。为了避免这种情况发生,您可以配置failure_transport
:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:#重试后,消息将被发送到“失败”的传输failure_transport:失败的传输:#……其他传输失败:“教义:/ /违约?queue_name =失败'
在本例中,如果处理消息失败3次(默认情况下)max_retries
),然后发送至失败的
交通工具。当你可以使用信使:消费失败
要像普通传输一样使用它,您通常需要手动查看失败传输中的消息,并选择重试它们:
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
#查看失败传输中的所有消息,默认限制为50条$PHP bin/console messenger:failed:show#请看前10条消息$PHP bin/console message:failed:show——max=10#只看到MyClass消息$PHP bin/console message:failed:show——class-filter=“MyClass”#按消息类查看消息的数量$PHP bin/console messenger:failed:show——stats . txt#查看有关特定故障的详细信息$PHP bin/console message:failed:show 20 -vv#逐个查看和重试消息$PHP bin/console message:failed:retry -vv . txt#重试特定消息$PHP bin/console message:failed:retry 20 30——force#删除消息而不重新尝试它$PHP bin/console messenger:failed:remove 20#删除消息而不重新尝试它们,并在删除之前显示每条消息$PHP bin/console message:failed:remove 20 30——show-messages
6.2
的——类过滤器
而且——统计数据
在Symfony 6.2中引入了选项。ob娱乐下载
如果消息再次失败,它将被重新发送回由于正常的失败传输重试规则.一旦达到最大重试,消息将被永久丢弃。
多次失败的传输
有时候,只有一个单一的、全球性的方案是不够的没有运输
配置是因为某些消息比其他消息更重要。在这些情况下,你可以只覆盖特定传输的失败传输:
- YAML
- XML
- PHP
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#配置/包/ messenger.yaml框架:信使:#重试后,消息将被发送到“失败”的传输如果在传输中没有配置“failed_transport”,则默认为#failure_transport:failed_default传输:async_priority_high:dsn:' % env (MESSENGER_TRANSPORT_DSN) %failure_transport:failed_high_priority#因为没有配置失败的传输,所以使用的传输将是#全局的“failure_transport”集合async_priority_low:dsn:“教义:/ /违约?queue_name=async_priority_low'failed_default:“教义:/ /违约?queue_name=failed_default'failed_high_priority:“教义:/ /违约?queue_name=failed_high_priority'
如果没有failure_transport
全局定义或在传输级别上定义,消息将在重试次数之后被丢弃。
失败的命令有一个可选选项——运输
要指定failure_transport
在传输级别配置。
1 2 3 4 5 6 7 8
#查看"failure_transport"传输中的所有消息$PHP bin/console messenger:failed:show——transport=failure_transport#重试来自failure_transport的特定消息$PHP bin/console message:failed:retry 20 30——transport=failure_transport——force#从"failure_transport"中删除消息而不重新尝试$PHP bin/console messenger:failed:remove 20——transport=failure_transport
传输配置
Messenger支持许多不同的传输类型,每种类型都有自己的选项。选项可以通过DSN字符串或配置传递到传输。
1 2
# .envMESSENGER_TRANSPORT_DSN = amqp: / / localhost / % 2 f /消息吗?auto_setup =假
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8
#配置/包/ messenger.yaml框架:信使:传输:my_transport:dsn:“% env (MESSENGER_TRANSPORT_DSN) %”选项:auto_setup:假
下面定义的选项选项
优先于DSN中定义的。
AMQP运输
AMQP传输使用AMQP PHP扩展向RabbitMQ等队列发送消息。通过运行:
1
$作曲家需要symfony/amqpob娱乐下载-messenger
AMQP传输DSN可能是这样的:
1 2 3 4 5
# .envMESSENGER_TRANSPORT_DSN = amqp: / /客人:guest@localhost: 5672 / % 2 f /消息#或使用AMQPS协议MESSENGER_TRANSPORT_DSN = amqp: / /客人:guest@localhost / % 2 f /消息
如果要使用TLS/SSL加密的AMQP,还必须提供CA证书。中定义证书路径amqp.cacert
PHP.ini设置(例如:amqp.cacert=/etc/ssl/certs
)或在cacert
深空网络参数(例如amqp: / / localhost ?cacert=/etc/ssl/certs/
).
TLS/SSL加密的AMQP使用的默认端口是5671,但您可以在港口
深空网络参数(例如:amqp: / / localhost ?cacert=/etc/ssl/certs/&port=12345
).
请注意
默认情况下,传输将自动创建所需的任何交换机、队列和绑定密钥。这可以被禁用,但某些功能可能无法正常工作(如延迟队列)。若要不自动创建任何队列,可以配置传输队列:[]
.
请注意
可以将AMQP传输的使用者限制为仅处理来自交换机某些队列的消息。看到信使:同步和排队消息处理.
传输有许多其他选项,包括配置交换、队列绑定键等。请参阅有关欧宝官网下载app连接.
传输有许多选项:
选项 | 描述 | 默认的 |
---|---|---|
auto_setup |
在发送/获取期间是否应该自动创建交换机和队列。 | 真正的 |
cacert |
PEM格式CA证书文件的路径。 | |
cert |
PEM格式的客户端证书路径。 | |
channel_max |
指定服务器允许的最高通道号。0表示标准扩展限制 | |
confirm_timeout |
确认超时时间(以秒为单位);如果未指定,传输将不等待消息确认。注意:0或更大秒。可能是小数。 | |
connect_timeout |
连接超时。注意:0或更大秒。可能是小数。 | |
frame_max |
服务器为连接建议的最大帧大小,包括帧头和结束字节。0表示标准扩展限制(取决于librabbimq默认的帧大小限制) | |
心跳 |
服务器需要的连接心跳的延迟(以秒为单位)。0表示服务器不需要心跳。注意,librabbitmq的心跳支持有限,这意味着仅在阻塞调用期间检查心跳。 | |
宿主 |
AMQP服务的主机名 | |
关键 |
PEM格式的客户端密钥路径。 | |
登录 |
用于连接AMQP服务的用户名 | |
密码 |
用于连接AMQP服务的密码 | |
持续的 |
“假” |
|
港口 |
AMQP服务的端口 | |
read_timeout |
超时进行收入活动。注意:0或更大秒。可能是小数。 | |
重试 |
||
sasl_method |
||
connection_name |
对于自定义连接名(至少需要1.10版本的PHP AMQP扩展) | |
验证 |
开启/关闭对等体验证。如果启用了对等验证,则服务器证书中的公共名称必须与服务器名称匹配。缺省情况下,开启对等体验证。 | |
vhost |
与AMQP服务一起使用的虚拟主机 | |
write_timeout |
输出活动超时。注意:0或更大秒。可能是小数。 | |
延迟(queue_name_pattern) |
用于创建队列的模式 | delay_ % exchange_name % _ % routing_key % _ %延迟% |
延迟(exchange_name) |
用于延迟/重试消息的交换机名称 | 延迟 |
队列[名称](参数) |
额外的参数 | |
队列[名字][binding_arguments] |
在绑定队列时使用的参数。 | |
队列[名字][binding_keys] |
绑定到此队列的绑定键(如果有) | |
队列[名字][标记] |
队列的旗帜 | AMQP_DURABLE |
交换(参数) |
交换的额外参数(例如:alternate-exchange ) |
|
交换(default_publish_routing_key) |
如果消息上没有指定,则在发布时使用的路由键 | |
交换(旗帜) |
交换的旗帜 | AMQP_DURABLE |
交换(名字) |
交易所名称 | |
交换(类型) |
交换类型 | 扇出 |
6.1
的connection_name
选项在Symfony 6.1中引入。ob娱乐下载
您还可以通过添加在邮件上配置特定于amqp的设置AmqpStamp寄往你的信封:
1 2 3 4 5 6 7
使用ob娱乐下载\组件\信使\桥\Amqp\运输\AmqpStamp;/ /……$属性= [];$公共汽车->调度(新SmsNotification (), (新AmqpStamp (“custom-routing-key”AMQP_NOPARAM,$属性)));
谨慎
使用者不会显示在管理面板中,因为此传输不依赖于\ AmqpQueue:消费()
这就是阻塞。有一个拦截接球手使——时限/内存限制
的选项信使:消费
命令以及信使:stop-workers
命令效率很低,因为它们都依赖于接收者无论是否找到消息都立即返回的事实。消费工作者负责迭代,直到它接收到要处理的消息和/或直到达到其中一个停止条件。因此,如果worker卡在阻塞调用中,则无法到达它的停止逻辑。
教义运输
Doctrine传输可用于在数据库表中存储消息。通过运行:
1
$作曲家需要交响乐/教义传播者ob娱乐下载
教条传输DSN可能看起来像这样:
1 2
# .envMESSENGER_TRANSPORT_DSN =学说:/ /违约
格式为原则:/ / < connection_name >
,以防你有多个连接,并希望使用“默认”以外的一个。传输将自动创建一个名为messenger_messages
.
或者,要自己创建表,请设置auto_setup
选项假
而且生成迁移.
提示
为了避免像Doctrine Migrations这样的工具试图删除这个表,因为它不是常规模式的一部分,您可以设置schema_filter
选择:
- YAML
- XML
- PHP
1 2 3 4
#配置/包/ doctrine.yaml原则:dbal:schema_filter:~ ^ (? ! messenger_messages) ~”
谨慎
存储在数据库中的消息的datetime属性使用当前系统的时区。如果具有不同时区配置的多台计算机使用相同的存储,则可能会导致问题。
传输有许多选项:
选项 | 描述 | 默认的 |
---|---|---|
table_name | 表的名称 | messenger_messages |
queue_name | 队列的名称(表中的列,用于使用一个表进行多个传输) | 默认的 |
redeliver_timeout | 重试队列中处于“处理”状态的消息前的超时(如果工作人员由于某种原因停止,将会发生这种情况,最终您应该重试该消息)-以秒为单位。 | 3600 |
auto_setup | 是否在发送/获取期间自动创建表。 | 真正的 |
请注意
集redeliver_timeout
到大于您的最慢消息持续时间的值。否则,有些消息将在第一个消息仍在处理时第二次启动。
在使用PostgreSQL时,您可以访问以下选项来利用听/通知特性。这允许一种比Doctrine传输的默认轮询行为更有效的方法,因为当新消息插入到表中时,PostgreSQL将直接通知工作者。
选项 | 描述 | 默认的 |
---|---|---|
use_notify | 是否使用LISTEN/NOTIFY。 | 真正的 |
check_delayed_interval | 检查延迟消息的时间间隔,以毫秒为单位。设置为0禁用检查。 | 60000 |
get_notify_timeout | 呼叫时等待应答的时间长度PDO: pgsqlGetNotify 单位为毫秒。 |
0 |
Beanstalkd运输
Beanstalkd传输将消息直接发送到Beanstalkd工作队列。通过运行:
1
$作曲家需要symfony/beanob娱乐下载stalk -messenger
Beanstalkd传输深空网络可能看起来像这样:
1 2 3 4 5
# .envMESSENGER_TRANSPORT_DSN = beanstalkd: / / localhost: 11300 ?tube_name = foo&timeout = 4竞技场队伍= 120#如果没有端口,默认为11300MESSENGER_TRANSPORT_DSN = beanstalkd: / / localhost
传输有许多选项:
选项 | 描述 | 默认的 |
---|---|---|
tube_name | 队列名称 | 默认的 |
超时 | 消息保留超时-以秒为单位。 | 0(将导致服务器立即返回响应或将抛出TransportException) |
竞技场队伍 | 消息在放入就绪队列之前运行的时间(以秒为单位)。 | 90 |
复述,运输
Redis传输使用流使消息排队。这个传输需要Redis PHP扩展(>=4.3)和一个运行的Redis服务器(^5.0)。通过运行:
1
$作曲家需要symfony/red ob娱乐下载-messenger
Redis传输DSN可能是这样的:
1 2 3 4 5 6 7 8
# .envMESSENGER_TRANSPORT_DSN =复述:/ / localhost: 6379 /消息#全DSN示例MESSENGER_TRANSPORT_DSN =复述:/ / password@localhost: 6379 /信息/ symob娱乐下载fony /消费者?auto_setup =真正的序列化器&dbindex = 0 = 1 &stream_max_entries = 0# Redis集群示例MESSENGER_TRANSPORT_DSN =复述:/ / host-01:6379复述:/ / host-02:6379,复述:/ / host-03:6379,复述:/ / host-04:6379# Unix Socket示例MESSENGER_TRANSPORT_DSN =复述:/ / / var /运行/ redis.sock
可以通过DSN或选项
钥匙下运输进去messenger.yaml
:
选项 | 描述 | 默认的 |
---|---|---|
流 | Redis流的名称 | 消息 |
集团 | Redis消费组名称 | ob娱乐下载 |
消费者 | Redis中使用的消费者名 | 消费者 |
auto_setup | 自动创建Redis组? | 真正的 |
身份验证 | Redis密码 | |
delete_after_ack | 如果真正的 ,消息处理后会自动删除 |
真正的 |
delete_after_reject | 如果真正的 ,如果邮件被拒绝,则会自动删除 |
真正的 |
懒惰的 | 只有在真正需要连接时才进行连接 | 假 |
序列化器 | 如何在Redis中序列化最终的有效载荷复述:OPT_SERIALIZER 选项) |
复述:SERIALIZER_PHP |
stream_max_entries | 流将被修剪到的最大条目数。将其设置为足够大的数字以避免丢失挂起的消息 | 0 (意思是“不修剪”) |
tls | 为连接启用TLS支持 | 假 |
redeliver_timeout | 重试被放弃的消费者拥有的挂起消息之前的超时(如果工作人员由于某种原因死亡,将会发生这种情况,最终您应该重试消息)-以秒为单位。 | 3600 |
claim_interval | 应检查挂起/放弃消息是否声明的时间间隔——以毫秒为单位 | 60000 (1分钟) |
persistent_id | 字符串,如果空连接是非持久的。 | 零 |
retry_interval | Int,单位为毫秒的值 | 0 |
read_timeout | 浮动,以秒为单位的值默认为无限 | 0 |
超时 | 浮动,以秒为单位的值默认为无限 | 0 |
sentinel_master | 如果禁用了哨兵支持,则返回null或空 | 零 |
6.1
的persistent_id
,retry_interval
,read_timeout
,超时
,sentinel_master
在Symfony 6.1中引入了选项。ob娱乐下载
谨慎
不应该超过一个信使:消费
命令以相同的组合运行流
,集团
而且消费者
,或者消息最终可能被处理多次。如果运行多个队列工作器,消费者
可以设置为环境变量,比如% env (MESSENGER_CONSUMER_NAME) %
,由Supervisor设置(下面的例子)或任何其他用于管理工作进程的服务。在容器环境中,主机名
可以用作使用者名,因为每个容器/主机只有一个worker。如果使用Kubernetes来编排容器,请考虑使用StatefulSet
要有稳定的名称。
提示
集delete_after_ack
来真正的
(如果您使用单个组)或定义stream_max_entries
(如果您可以估计在您的情况下有多少Max条目是可接受的)以避免内存泄漏。否则,所有消息将永远保存在Redis中。
内存传输
的内存中
传输实际上并不传递消息。相反,它在请求期间将它们保存在内存中,这对测试很有用。例如,如果你有一个async_priority_normal
传送器,你可以在测验
使用此传输的环境:
- YAML
- XML
- PHP
1 2 3 4 5
#配置/包/测试/ messenger.yaml框架:信使:传输:async_priority_normal:的内存:/ /
然后,在测试时,消息将会不被送到真正的运输。更好的是,在测试中,您可以检查在请求期间发送的消息:
12 3 4 5 6 7 8 9 10 11 12 13 14 16 17 18 19 20
/ /测试/控制器/ DefaultControllerTest.php名称空间应用程序\测试\控制器;使用ob娱乐下载\包\FrameworkBundle\测试\WebTestCase;使用ob娱乐下载\组件\信使\运输\InMemoryTransport;类DefaultControllerTest扩展WebTestCase{公共函数testSomething(){$客户端=静态::createClient ();/ /……$这->assertSame (200,$客户端->getResponse ()->getStatusCode ());/*@varInMemoryTransport $transport */$运输=$这->getContainer ()->get (“messenger.transport.async_priority_normal”);$这->assertCount (1,$运输->getSent ());}}
传输有许多选项:
-
序列化
(布尔,默认值:假
) - 是否序列化消息。这对于测试附加层非常有用,特别是当您使用自己的消息序列化器时。
请注意
所有内存中
每次测试后,传输将自动重置在扩展测试类KernelTestCase或WebTestCase.
Amazon SQS
Amazon SQS传输非常适合托管在AWS上的应用程序。通过运行:
1
$需要symfony/amazon-ob娱乐下载sqs-messenger
SQS传输DSN可能是这样的:
1 2 3
# .envMESSENGER_TRANSPORT_DSN = https://sqs.eu -西方- 3. - amazonaws.com/123456789012/messages?access_key=akiaiosfodnn7example&secret_key=j17m97ffsvoki0brifoo9a MESSENGER_TRANSPORT_DSN = sqs: / / localhost: 9494 /消息吗?sslmode =禁用
请注意
传输将自动创建所需的队列。属性可以禁用此功能auto_setup
选项假
.
提示
在发送或接收消息之前,Symfony需要将队列名称转换为AWS队列URLob娱乐下载GetQueueUrl
AWS中的API。这个额外的API调用可以通过提供一个DSN(即队列URL)来避免。
传输有许多选项:
选项 | 描述 | 默认的 |
---|---|---|
access_key |
AWS访问密钥 | 必须进行url编码 |
账户 |
AWS帐户标识符 | 凭证的所有者 |
auto_setup |
是否应该在发送/获取期间自动创建队列。 | 真正的 |
buffer_size |
需要预取的消息数 | 9 |
调试 |
如果真正的 它会记录所有HTTP请求和响应(会影响性能) |
假 |
端点 |
SQS服务的绝对URL | https://sqs.eu-west-1.amazonaws.com |
poll_timeout |
等待新消息持续时间(以秒为单位) | 0.1 |
queue_name |
队列名称 | 消息 |
地区 |
AWS区域名称 | 一来就 |
secret_key |
AWS密钥 | 必须进行url编码 |
session_token |
AWS会话令牌 | |
visibility_timeout |
消息不可见的秒数(可见性超时值) | 队列的配置 |
wait_time |
长轮询持续时间(秒) | 20. |
6.1
的session_token
选项在Symfony 6.1中引入。ob娱乐下载
请注意
的wait_time
参数定义了Amazon SQS在发送响应之前等待消息在队列中可用的最长时间。通过消除空响应的数量,它有助于降低使用Amazon SQS的成本。
的poll_timeout
参数定义了接收者在返回null之前应该等待的时间。它避免了阻塞其他接收器被调用。
请注意
如果队列名的后缀为.fifo
, AWS将创建一个先进先出队列.使用邮票AmazonSqsFifoStamp定义消息组ID
和消息重复数据删除ID
.
FIFO队列不支持设置每条消息的延迟,值为延迟:0
在重试策略设置中是必需的。
序列化消息
当消息被发送到(和从)传输时,它们使用PHP的本机序列化serialize ()
&unserialize ()
功能。您可以将此全局(或为每个传输)更改为实现的服务SerializerInterface:
- YAML
- XML
- PHP
12 3 4 5 6 7 8 9 10 11 12 13
#配置/包/ messenger.yaml框架:信使:序列化器:default_serializer:messenger.transport.ob娱乐下载symfony_serializerob娱乐下载symfony_serializer:格式:json背景:{}传输:async_priority_normal:dsn:#……序列化器:messenger.transport.ob娱乐下载symfony_serializer
的messenger.transport.ob娱乐下载symfony_serializer
是使用序列化器组件可以通过几种方式进行配置。如果你做选择使用Symfony序列化器时,您ob娱乐下载可以通过SerializerStamp(见信封及邮票).
提示
当向另一个应用程序发送/接收消息/从另一个应用程序接收消息时,您可能需要更多地控制序列化过程。使用自定义序列化器提供该控件。看到ob娱乐下载symfonycast的消息序列化器教程获取详细信息。
自定义处理程序
使用属性配置处理程序
你可以通过将选项传递给属性来配置你的处理程序:
12 3 4 5 6 7 8 9 10 11 12 13 14 15
/ / src / MessageHandler / SmsNotificationHandler.php名称空间应用程序\MessageHandler;使用应用程序\消息\OtherSmsNotification;使用应用程序\消息\SmsNotification;使用ob娱乐下载\组件\信使\属性\AsMessageHandler;#[AsMessageHandler(fromTransport: 'async', priority: 10)]类SmsNotificationHandler{公共函数__invoke(SmsNotification$消息){/ /……}}
可以配置该属性的选项有:
公共汽车
fromTransport
处理
方法
优先级
手动配置处理程序
ob娱乐下载Symfony通常会自动查找并注册处理程序.但是,您也可以手动配置处理程序,并通过标记处理程序服务来传递一些额外的配置messenger.message_handler
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10 11
#配置/ services.yaml服务:应用MessageHandler \ \ SmsNotificationHandler:标签:(messenger.message_handler)#或配置选项标签:-名称:messenger.message_handler#仅当无法通过type-hint猜出时需要处理:应用\ \ SmsNotification消息
使用标记配置的可能选项有:
公共汽车
from_transport
处理
方法
优先级
处理多条消息
一个处理程序类可以处理多个消息。为此,添加# AsMessageHandler
属性为所有处理方法:
12 3 4 5 6 7 8 9 10 11 12 13 14 16 17 18 19 20
/ / src / MessageHandler / SmsNotificationHandler.php名称空间应用程序\MessageHandler;使用应用程序\消息\OtherSmsNotification;使用应用程序\消息\SmsNotification;类SmsNotificationHandler{# (AsMessageHandler)公共函数handleSmsNotification(SmsNotification$消息){/ /……}# (AsMessageHandler)公共函数handleOtherSmsNotification(OtherSmsNotification$消息){/ /……}}
6.2
实现MessageSubscriberInterface是用一个处理程序类处理多个消息的另一种方法。此接口在Symfony 6.2中已弃用。ob娱乐下载
将处理程序绑定到不同的传输
每个消息可以有多个处理程序,以及当使用消息时所有的处理程序调用。但也可以配置处理程序,使其仅在从具体的交通工具。这允许您拥有一个单独的消息,其中每个处理程序由使用不同传输的不同“工作者”调用。
假设你有一个UploadedImage
带有两个处理程序的消息:
ThumbnailUploadedImageHandler
:您希望由名为image_transport
NotifyAboutNewUploadedImageHandler
:您希望由名为async_priority_normal
要做到这一点,请添加from_transport
每个处理程序的选项。例如:
12 3 4 5 6 7 8 9 10 11 12 13
/ / src / MessageHandler / ThumbnailUploadedImageHandler.php名称空间应用程序\MessageHandler;使用应用程序\消息\UploadedImage;# (AsMessageHandler (from_transport:“image_transport”)]类ThumbnailUploadedImageHandler{公共函数__invoke(UploadedImage$uploadedImage){//做一些缩略图}}
类似的:
1 2 3 4 5 6 7 8
/ / src / MessageHandler / NotifyAboutNewUploadedImageHandler.php/ /……# (AsMessageHandler (from_transport:“async_priority_normal”)]类NotifyAboutNewUploadedImageHandler{/ /……}
然后,确保将你的信息“路由”到这两个传输:
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:传输:async_priority_normal:#……image_transport:#……路由:#……“消息应用\ \ UploadedImage”:[image_transport,async_priority_normal]
就是这样!您现在可以消费每个传输:
1 2 3 4
#在处理消息时只调用ThumbnailUploadedImageHandler$PHP bin/console messenger:consume image_transport -vv$PHP bin/console messenger:consume async_priority_normal -vv
谨慎
如果处理程序不有from_transport
Config,它将被执行每一个接收消息的传输。
延长信使
信封及邮票
消息可以是任何PHP对象。有时,您可能需要配置关于消息的一些额外的东西——比如在AMQP中处理消息的方式,或者在处理消息之前添加一个延迟。你可以通过在你的邮件中添加一个“stamp”来做到这一点:
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
使用ob娱乐下载\组件\信使\信封;使用ob娱乐下载\组件\信使\MessageBusInterface;使用ob娱乐下载\组件\信使\邮票\DelayStamp;公共函数指数(MessageBusInterface$公共汽车){$公共汽车->调度(新SmsNotification (“……”), (//处理前等待5秒新DelayStamp (5000)));//或显式地创建一个Envelope$公共汽车->调度(新信封(新SmsNotification (“……”), (新DelayStamp (5000)));/ /……}
在内部,每条消息都包装在信封
,用于保存消息和邮票。您可以手动创建,也可以让消息总线来创建。针对不同的目的,有各种不同的戳记,它们在内部用于跟踪关于消息的信息——比如处理消息的消息总线,或者在失败后重试消息。
中间件
将消息分派到消息总线时会发生什么,这取决于中间件的集合及其顺序。默认情况下,为每个总线配置的中间件是这样的:
add_bus_name_stamp_middleware
-添加一个戳来记录该消息被发送到哪个总线;dispatch_after_current_bus
——看事务性消息:在处理完成后处理新消息;failed_message_processing_middleware
参数处理正在重试的消息传输失败使它们正常工作,就像从原始运输中接收它们一样;- 你自己的收藏中间件;
的send_message
如果为传输配置了路由,这将发送消息到该传输并停止中间件链;handle_message
-调用给定消息的消息处理程序。
请注意
这些中间件名称实际上是快捷方式名称。真实的服务id前缀为messenger.middleware。
(如。messenger.middleware.handle_message
).
中间件在消息被分派时执行,但是也同样,当通过worker接收消息时(对于发送到要异步处理的传输的消息)。如果您创建自己的中间件,请记住这一点。
您可以将自己的中间件添加到此列表中,或者完全禁用默认中间件和只有包括你自己的:
- YAML
- XML
- PHP
12 3 4 5 6 7 8 9 10 11 12 13
#配置/包/ messenger.yaml框架:信使:公共汽车:messenger.bus.default:#禁用默认中间件default_middleware:假#和/或添加您自己的中间件:#实现Symfony\Component\Messengeob娱乐下载r\Middleware\MiddlewareInterface的服务id-“应用程序、中间件、MyMiddleware”-“应用程序、中间件、AnotherMiddleware”
请注意
如果中间件服务是抽象的,则每个总线将创建该服务的不同实例。
教义中间件
1.11
以下Doctrine中间件是在DoctrineBundle 1.11中引入的。
如果你在你的应用中使用Doctrine,你可能会想要使用一些可选的中间件:
- YAML
- XML
- PHP
12 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
#配置/包/ messenger.yaml框架:信使:公共汽车:command_bus:中间件:#每次处理消息时,Doctrine连接#被“ping”并重新连接,如果它关闭。有用的#如果你的工作运行了很长时间,数据库#连接有时会丢失-doctrine_ping_connection#处理后,Doctrine连接关闭,#可以释放一个worker中的数据库连接,#而不是让它们永远打开-doctrine_close_connection#将所有处理程序包装在单个Doctrine事务中#处理程序不需要调用flush()和错误#将导致回滚-doctrine_transaction#或将不同的实体管理器传递给任何#- doctrine_transaction: ['custom']
其他中间件)
添加router_context
中间件,如果你需要在消费者中生成绝对的url(例如,呈现一个带有链接的模板)。这个中间件存储在构建绝对url时所需的原始请求上下文(即主机、HTTP端口等)。
添加验证
中间件,如果您需要使用验证器组件在处理它之前。如果验证失败,aValidationFailedException
会被扔。的ValidationStamp可用于配置验证组。
- YAML
- XML
- PHP
1 2 3 4 5 6 7 8
#配置/包/ messenger.yaml框架:信使:公共汽车:command_bus:中间件:-router_context-验证
多总线,命令和事件总线
默认情况下,Messenger为您提供单个消息总线服务。但是,你可以任意配置,创建“命令”、“查询”或“事件”总线并控制它们的中间件。看到多个公共汽车.