使用mixphp打造多进程异步邮件发送

使用mixphp打造多进程异步邮件发送

内容导读

收集整理的这篇技术教程文章主要介绍了使用mixphp打造多进程异步邮件发送,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含5827字,纯文字阅读大概需要9分钟

内容图文

这篇文章主要介绍了关于使用 mixphp 打造多进程异步邮件发送,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下

邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。

传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。

下面演示一个异步邮件发送系统的开发过程,涉及知识点:

  • 异步

  • 消息队列

  • 多进程

  • 守护进程

如何使用消息队列实现异步

PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

  • redis

  • rabbitmq

  • kafka

本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

// 入列$redis->lpush($key, $data);// 出列$data = $redis->rpop($key);// 阻塞出列$data = $redis->brpop($key, 10);

架构设计

本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。

邮件发送库选型

以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。

由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:

composer require swiftmailer/swiftmailer

生产者开发

在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。

在传统 MVC 框架的控制器中增加如下代码:

通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接$redis = new Redis();if (!$redis->connect('127.0.0.1', 6379)) {

throw new Exception('Redis Connect Failure');}$redis->auth('');$redis->select(0);// 投递任务$data = [

'to'


=> ['***@qq.com' => 'A name'],

'body'

=> 'Here is the message itself',

'subject' => 'The title content',];$redis->lpush('queue:email', serialize($data));

通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。

消费者开发

本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。

TaskExecutor 的 MODE_PUSH 模式有二种进程:

  • 左进程:负责从消息队列取出任务数据,投放给中进程。

  • 中进程:负责执行邮件发送任务。

PushCommand.php 代码如下:

<?phpnamespace appsdaemoncommands;use mixconsoleExitCode;use mixfacadesInput;use mixfacadesRedis;use mixtaskCenterProcess;use mixtaskLeftProcess;use mixtaskTaskExecutor;/** * 推送模式范例 * @author 刘健 <coder.liu@qq.com> */class PushCommand extends BaseCommand{

// 配置信息

const HOST = 'smtpdm.aliyun.com';

const PORT = 465;

const SECURITY = 'ssl';

const USERNAME = '****@email.***.com';

const PASSWORD = '****';

// 初始化事件

public function onInitialize()

{



parent::onInitialize(); // TODO: Change the autogenerated stub



// 获取程序名称



$this->programName = Input::getCommandName();



// 设置pidfile



$this->pidFile = "/var/run/{$this->programName}.pid";

}

/**

 * 获取服务

 * @return TaskExecutor

 */

public function getTaskService()

{



return create_object(





[







// 类路径







'class'



 => 'mixtaskTaskExecutor',







// 服务名称







'name'




=> "mix-daemon: {$this->programName}",







// 执行类型







'type'




=> mixtaskTaskExecutor::TYPE_DAEMON,







// 执行模式







'mode'




=> mixtaskTaskExecutor::MODE_PUSH,







// 左进程数







'leftProcess'
 => 1,







// 中进程数







'centerProcess' => 5,







// 任务超时时间 (秒)







'timeout'


 => 5,





]



);

}

// 启动

public function actionStart()

{



// 预处理



if (!parent::actionStart()) {





return ExitCode::UNSPECIFIED_ERROR;



}



// 启动服务



$service = $this->getTaskService();



$service->on('LeftStart', [$this, 'onLeftStart']);



$service->on('CenterStart', [$this, 'onCenterStart']);



$service->start();



// 返回退出码



return ExitCode::OK;

}

// 左进程启动事件回调函数

public function onLeftStart(LeftProcess $worker)

{



try {





// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线





$queueModel = Redis::getInstance();





// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出





for ($j = 0; $j < 16000; $j++) {







// 从消息队列中间件阻塞获取一条消息







$data = $queueModel->brpop('queue:email', 10);







if (empty($data)) {









continue;







}







list(, $data) = $data;







// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)







$worker->push($data, false);





}



} catch (Exception $e) {





// 休息一会,避免 CPU 出现 100%





sleep(1);





// 抛出错误





throw $e;



}

}

// 中进程启动事件回调函数

public function onCenterStart(CenterProcess $worker)

{



// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出



for ($j = 0; $j < 16000; $j++) {





// 从进程消息队列中抢占一条消息





$data = $worker->pop();





if (empty($data)) {







continue;





}





// 处理消息





try {







// 处理消息,比如:发送短信、发送邮件、微信推送







var_dump($data);







$ret = self::sendEmail($data);







var_dump($ret);





} catch (Exception $e) {







// 回退数据到消息队列







$worker->rollback($data);







// 休息一会,避免 CPU 出现 100%







sleep(1);







// 抛出错误







throw $e;





}



}

}

// 发送邮件

public static function sendEmail($data)

{



// Create the Transport



$transport = (new Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))





->setUsername(self::USERNAME)





->setPassword(self::PASSWORD);



// Create the Mailer using your created Transport



$mailer = new Swift_Mailer($transport);



// Create a message



$message = (new Swift_Message($data['subject']))





->setFrom([self::USERNAME => '**网'])





->setTo($data['to'])





->setBody($data['body']);



// Send the message



$result = $mailer->send($message);



return $result;

}}

测试

  1. 在 shell 中启动 push 常驻程序。

[root@localhost bin]# ./mix-daemon push startmix-daemon 'push' start successed.
  1. 调用接口往消息队列投放任务。

此时 shell 终端将打印:

成功收到测试邮件:

以上就是本文的全部内容,希望对大家的学习有所帮助,更多相关内容请关注PHP中文网!

相关推荐:

给PHP开启shmop扩展实现共享内存

php实现共享内存进程通信函数(_shm)

以上就是使用 mixphp 打造多进程异步邮件发送的详细内容,更多请关注Gxl网其它相关文章!

内容总结

以上是为您收集整理的使用mixphp打造多进程异步邮件发送全部内容,希望文章能够帮你解决使用mixphp打造多进程异步邮件发送所遇到的程序开发问题。 如果觉得技术教程内容还不错,欢迎将网站推荐给程序员好友。

内容备注

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。


本文关键词:

联系我们

在线咨询:点击这里给我发消息

邮件:w420220301@qq.com