本篇文章帶大家聊聊laravel 中 schedule 調度的運行機制,希望對大家有所幫助!
??laravel 的 console 命令行極大的方便了 PHP 定時任務的設置以及運行。以往通過 crontab 配置定時任務過程相對比較繁瑣,并且通過 crontab 設置的定時任務很難防止任務的交疊運行。
??所謂任務的交疊運行,是指由于定時任務運行時間較長,在 crontab 設置的運行周期不盡合理的情況下,已經啟動的任務還沒有結束運行,而系統又啟動了新的任務去執行相同的操作。如果程序內部沒有處理好數據一致性的問題,那么兩個任務同時操作同一份數據,很可能會導致嚴重的后果。
⒈ runInBackground 和 withoutOverlapping
??為了防止任務的交疊運行,Laravel 提供了 withoutOverlapping() 方法;為了能讓多任務在后臺并行執行,Laravel 提供了 runInBackground() 方法。
⑴ runInBackground() 方法
??console 命令行中的每一個命令都代表一個 Event ,AppConsoleKernel 中的 schedule() 方法的作用只是將這些命令行代表的 Event 注冊到 IlluminateConsoleSchedulingSchedule 的屬性 $events 中。
// namespace IlluminateConsoleSchedulingSchedule public function command($command, array $parameters = []) { if (class_exists($command)) { $command = Container::getInstance()->make($command)->getName(); } return $this->exec( Application::formatCommandString($command), $parameters ); } public function exec($command, array $parameters = []) { if (count($parameters)) { $command .= ' '.$this->compileParameters($parameters); } $this->events[] = $event = new Event($this->eventMutex, $command, $this->timezone); return $event; }
??Event 的運行方式有兩種:Foreground 和 Background 。二者的區別就在于多個 Event 是否可以并行執行。Event 默認以 Foreground 的方式運行,在這種運行方式下,多個 Event 順序執行,后面的 Event 需要等到前面的 Event 運行完成之后才能開始執行。
??但在實際應用中,我們往往是希望多個 Event 可以并行執行,此時就需要調用 Event 的 runInBackground() 方法將其運行方式設置為 Background 。
??Laravel 框架對這兩種運行方式的處理區別在于命令行的組裝方式和回調方法的調用方式。
//?namespace?IlluminateConsoleSchedulingEvent protected?function?runCommandInForeground(Container?$container) { ????$this->callBeforeCallbacks($container); ????$this->exitCode?=?Process::fromShellCommandline($this->buildCommand(),?base_path(),?null,?null,?null)->run(); ????$this->callAfterCallbacks($container); } protected?function?runCommandInBackground(Container?$container) { ????$this->callBeforeCallbacks($container); ????Process::fromShellCommandline($this->buildCommand(),?base_path(),?null,?null,?null)->run(); } public?function?buildCommand() { ????return?(new?CommandBuilder)->buildCommand($this); } //?namespace?IlluminateConsoleSchedulingCommandBuilder public?function?buildCommand(Event?$event) { ????if?($event->runInBackground)?{ ????????return?$this->buildBackgroundCommand($event); ????} ????return?$this->buildForegroundCommand($event); } protected?function?buildForegroundCommand(Event?$event) { ????$output?=?ProcessUtils::escapeArgument($event->output); ????return?$this->ensureCorrectUser( ????????$event,?$event->command.($event->shouldAppendOutput???'?>>?'?:?'?>?').$output.'?2>&1' ????); } protected?function?buildBackgroundCommand(Event?$event) { ????$output?=?ProcessUtils::escapeArgument($event->output); ????$redirect?=?$event->shouldAppendOutput???'?>>?'?:?'?>?'; ????$finished?=?Application::formatCommandString('schedule:finish').'?"'.$event->mutexName().'"'; ????if?(windows_os())?{ ????????return?'start?/b?cmd?/c?"('.$event->command.'?&?'.$finished.'?"%errorlevel%")'.$redirect.$output.'?2>&1"'; ????} ????return?$this->ensureCorrectUser($event, ????????'('.$event->command.$redirect.$output.'?2>&1?;?'.$finished.'?"$?")?>?' ????????.ProcessUtils::escapeArgument($event->getDefaultOutput()).'?2>&1?&' ????); }
??從代碼中可以看出,采用 Background 方式運行的 Event ,其命令行在組裝的時候結尾會增加一個 & 符號,其作用是使命令行程序進入后臺運行;另外,采用 Foreground 方式運行的 Event ,其回調方法是同步調用的,而采用 Background 方式運行的 Event ,其 after 回調則是通過 schedule:finish 命令行來執行的。
⑵ withoutOverlapping() 方法
??在設置 Event 的運行周期時,由于應用場景的不斷變化,很難避免某個特定的 Event 在某個時間段內需要運行較長的時間才能完成,甚至在下一個運行周期開始時還沒有執行完成。如果不對這種情況進行處理,就會導致多個相同的 Event 同時運行,而如果這些 Event 當中涉及到對數據的操作并且程序中沒有處理好冪等問題,很可能會造成嚴重后果。
??為了避免出現上述的問題,Event 中提供了 withoutOverlapping() 方法,該方法通過將 Event 的 withoutOverlapping 屬性設置為 TRUE ,在每次要執行 Event 時會檢查當前是否存在正在執行的相同的 Event ,如果存在,則不執行新的 Event 任務。
//?namespace?IlluminateConsoleSchedulingEvent public?function?withoutOverlapping($expiresAt?=?1440) { ????$this->withoutOverlapping?=?true; ????$this->expiresAt?=?$expiresAt; ????return?$this->then(function?()?{ ????????$this->mutex->forget($this); ????})->skip(function?()?{ ????????return?$this->mutex->exists($this); ????}); } public?function?run(Container?$container) { ????if?($this->withoutOverlapping?&& ????????!?$this->mutex->create($this))?{ ????????return; ????} ????$this->runInBackground ??????????????????$this->runCommandInBackground($container) ????????????????:?$this->runCommandInForeground($container); }
⒉ mutex 互斥鎖
??在調用 withoutOverlapping() 方法時,該方法還實現了另外兩個功能:一個是設置超時時間,默認為 24 小時;另一個是設置 Event 的回調。
⑴ 超時時間
??首先說超時時間,這個超時時間并不是 Event 的超時時間,而是 Event 的屬性 mutex 的超時時間。在向 IlluminateConsoleSchedulingSchedule 的屬性 $events 中注冊 Event 時,會調用 Schedule 中的 exec() 方法,在該方法中會新建 Event 對象,此時會向 Event 的構造方法中傳入一個 eventMutex ,這就是 Event 對象中的屬性 mutex ,超時時間就是為這個 mutex 設置的。而 Schedule 中的 eventMutex 則是通過實例化 CacheEventMutex 來創建的。
//?namespace?IlluminateConsoleSchedulingSchedule $this->eventMutex?=?$container->bound(EventMutex::class) ??????????????????????????????????$container->make(EventMutex::class) ????????????????????????????????:?$container->make(CacheEventMutex::class);
??設置了 withoutOverlapping 的 Event 在執行之前,首先會嘗試獲取 mutex 互斥鎖,如果無法成功獲取到鎖,那么 Event 就不會執行。獲取互斥鎖的操作通過調用 mutex 的 create() 方法完成。
??CacheEventMutex 在實例化時需要傳入一個 IlluminateContractsCacheFactory 類型的實例,其最終傳入的是一個 IlluminateCacheCacheManager 實例。在調用 create() 方法獲取互斥鎖時,還需要通過調用 store() 方法設置存儲引擎。
//?namespace?IlluminateFoundationConsoleKernel protected?function?defineConsoleSchedule() { ????$this->app->singleton(Schedule::class,?function?($app)?{ ????????return?tap(new?Schedule($this->scheduleTimezone()),?function?($schedule)?{ ????????????$this->schedule($schedule->useCache($this->scheduleCache())); ????????}); ????}); } protected?function?scheduleCache() { ????return?Env::get('SCHEDULE_CACHE_DRIVER'); } //?namespace?IlluminateConsoleSchedulingSchedule public?function?useCache($store) { ????if?($this->eventMutex?instanceof?CacheEventMutex)?{ ????????$this->eventMutex->useStore($store); ????} ????/*?...?...?*/ ????return?$this; } //?namespace?IlluminateConsoleSchedulingCacheEventMutex public?function?create(Event?$event) { ????return?$this->cache->store($this->store)->add( ????????$event->mutexName(),?true,?$event->expiresAt?*?60 ????); } //?namespace?IlluminateCacheCacheManager public?function?store($name?=?null) { ????$name?=?$name??:?$this->getDefaultDriver(); ????return?$this->stores[$name]?=?$this->get($name); } public?function?getDefaultDriver() { ????return?$this->app['config']['cache.default']; } protected?function?get($name) { ????return?$this->stores[$name]????$this->resolve($name); } protected?function?resolve($name) { ????$config?=?$this->getConfig($name); ????if?(is_null($config))?{ ????????throw?new?InvalidArgumentException("Cache?store?[{$name}]?is?not?defined."); ????} ????if?(isset($this->customCreators[$config['driver']]))?{ ????????return?$this->callCustomCreator($config); ????}?else?{ ????????$driverMethod?=?'create'.ucfirst($config['driver']).'Driver'; ????????if?(method_exists($this,?$driverMethod))?{ ????????????return?$this->{$driverMethod}($config); ????????}?else?{ ????????????throw?new?InvalidArgumentException("Driver?[{$config['driver']}]?is?not?supported."); ????????} ????} } protected?function?getConfig($name) { ????return?$this->app['config']["cache.stores.{$name}"]; } protected?function?createFileDriver(array?$config) { ????return?$this->repository(new?FileStore($this->app['files'],?$config['path'],?$config['permission']????null)); }
??在初始化 Schedule 時會指定 eventMutex 的存儲引擎,默認為環境變量中的配置項 SCHEDULE_CACHE_DRIVER 的值。但通常這一項配置在環境變量中并不存在,所以 useCache() 的參數值為空,進而 eventMutex 的 store 屬性值也為空。這樣,在 eventMutex 的 create() 方法中調用 store() 方法為其設置存儲引擎時,store() 方法的參數值也為空。
??當 store() 方法的傳參為空時,會使用應用的默認存儲引擎(如果不做任何修改,默認 cache 的存儲引擎為 file)。之后會取得默認存儲引擎的配置信息(引擎、存儲路徑、連接信息等),然后實例化存儲引擎。最終,file 存儲引擎實例化的是 IlluminateCacheFileStore 。
??在設置完存儲引擎之后,緊接著會調用 add() 方法獲取互斥鎖。由于 store() 方法返回的是 IlluminateContractsCacheRepository 類型的實例,所以最終調用的是 IlluminateCacheRepository 中的 add() 方法。
//?namespace?IlluminateCacheRepository public?function?add($key,?$value,?$ttl?=?null) { ????if?($ttl?!==?null)?{ ????????if?($this->getSeconds($ttl)?store,?'add'))?{ ????????????$seconds?=?$this->getSeconds($ttl); ????????????return?$this->store->add( ????????????????$this->itemKey($key),?$value,?$seconds ????????????); ????????} ????} ????if?(is_null($this->get($key)))?{ ????????return?$this->put($key,?$value,?$ttl); ????} ????return?false; } public?function?get($key,?$default?=?null) { ????if?(is_array($key))?{ ????????return?$this->many($key); ????} ????$value?=?$this->store->get($this->itemKey($key)); ????if?(is_null($value))?{ ????????$this->event(new?CacheMissed($key)); ????????$value?=?value($default); ????}?else?{ ????????$this->event(new?CacheHit($key,?$value)); ????} ????return?$value; } //?namespace?IlluminateCacheFileStore public?function?get($key) { ????return?$this->getPayload($key)['data']????null; } protected?function?getPayload($key) { ????$path?=?$this->path($key); ????try?{ ????????$expire?=?substr( ????????????$contents?=?$this->files->get($path,?true),?0,?10 ????????); ????}?catch?(Exception?$e)?{ ????????return?$this->emptyPayload(); ????} ????if?($this->currentTime()?>=?$expire)?{ ????????$this->forget($key); ????????return?$this->emptyPayload(); ????} ????try?{ ????????$data?=?unserialize(substr($contents,?10)); ????}?catch?(Exception?$e)?{ ????????$this->forget($key); ????????return?$this->emptyPayload(); ????} ????$time?=?$expire?-?$this->currentTime(); ????return?compact('data',?'time'); }
??這里需要說明,所謂互斥鎖,其本質是寫文件。如果文件不存在或文件內容為空或文件中存儲的過期時間小于當前時間,則互斥鎖可以順利獲得;否則無法獲取到互斥鎖。文件內容為固定格式:timestampb:1 。
??所謂超時時間,與此處的 timestamp 的值有密切的聯系。獲取互斥鎖時的時間戳,再加上超時時間的秒數,即是此處的 timestamp 的值。
??由于 FileStore 中不存在 add() 方法,所以程序會直接嘗試調用 get() 方法獲取文件中的內容。如果 get() 返回的結果為 NULL,說明獲取互斥鎖成功,之后會調用 FileStore 的 put() 方法寫文件;否則,說明當前有相同的 Event 在運行,不會再運行新的 Event 。
??在調用 put() 方法寫文件時,首先需要根據傳參計算 eventMutex 的超時時間的秒數,之后再調用 FileStore 中的 put() 方法,將數據寫入文件中。
//?namespace?IlluminateCacheRepository public?function?put($key,?$value,?$ttl?=?null) { ????/*?...?...?*/ ????$seconds?=?$this->getSeconds($ttl); ????if?($seconds?forget($key); ????} ????$result?=?$this->store->put($this->itemKey($key),?$value,?$seconds); ????if?($result)?{ ????????$this->event(new?KeyWritten($key,?$value,?$seconds)); ????} ????return?$result; } //?namespace?IlluminateCacheFileStore public?function?put($key,?$value,?$seconds) { ????$this->ensureCacheDirectoryExists($path?=?$this->path($key)); ????$result?=?$this->files->put( ????????$path,?$this->expiration($seconds).serialize($value),?true ????); ????if?($result?!==?false?&&?$result?>?0)?{ ????????$this->ensureFileHasCorrectPermissions($path); ????????return?true; ????} ????return?false; } protected?function?path($key) { ????$parts?=?array_slice(str_split($hash?=?sha1($key),?2),?0,?2); ????return?$this->directory.'/'.implode('/',?$parts).'/'.$hash; } //?namespace?IlluminateConsoleSchedulingSchedule public?function?mutexName() { ????return?'framework'.DIRECTORY_SEPARATOR.'schedule-'.sha1($this->expression.$this->command); }
??這里需要重點說明的是 $key 的生成方法以及文件路徑的生成方法。$key 通過調用 Event 的 mutexName() 方法生成,其中需要用到 Event 的 $expression 和 $command 屬性。其中 $command 為我們定義的命令行,在調用 $schedule->comand() 方法時傳入,然后進行格式化,$expression 則為 Event 的運行周期。
??以命令行 schedule:test 為例,格式化之后的命令行為 `/usr/local/php/bin/php` `artisan` schedule:test,如果該命令行設置的運行周期為每分鐘一次,即 * * * * * ,則最終計算得到的 $key 的值為 framework/schedule-768a42da74f005b3ac29ca0a88eb72d0ca2b84be 。文件路徑則是將 $key 的值再次進行 sha1 計算之后,以兩個字符為一組切分成數組,然后取數組的前兩項組成一個二級目錄,而配置文件中 file 引擎的默認存儲路徑為 storage/framework/cache/data ,所以最終的文件路徑為 storage/framework/cache/data/eb/60/eb608bf555895f742e5bd57e186cbd97f9a6f432 。而文件中存儲的內容則為 1642122685b:1 。
⑵ 回調方法
??再來說設置的 Event 回調,調用 withoutOverlapping() 方法會為 Event 設置兩個回調:一個是 Event 運行完成之后的回調,用于釋放互斥鎖,即清理緩存文件;另一個是在運行 Event 之前判斷互斥鎖是否被占用,即緩存文件是否已經存在。
??無論 Event 是以 Foreground 的方式運行,還是以 Background 的方式運行,在運行完成之后都會調用 callAfterCallbacks() 方法執行 afterCallbacks 中的回調,其中就有一項回調用于釋放互斥鎖,刪除緩存文件 $this->mutex->forget($this) 。區別就在于,以 Foreground 方式運行的 Event 是在運行完成之后顯式的調用這些回調方法,而以 Background 方式運行的 Event 則需要借助 schedule:finish 來調用這些回調方法。
??所有在 AppConsoleKernel 中注冊 Event,都是通過命令行 schedule:run 來調度的。在調度之前,首先會判斷當前時間點是否滿足各個 Event 所配置的運行周期的要求。如果滿足的話,接下來就是一些過濾條件的判斷,這其中就包括判斷互斥鎖是否被占用。只有在互斥鎖沒有被占用的情況下,Event 才可以運行。
//?namespace?IlluminateConsoleSchedulingScheduleRunCommand public?function?handle(Schedule?$schedule,?Dispatcher?$dispatcher) { ????$this->schedule?=?$schedule; ????$this->dispatcher?=?$dispatcher; ????foreach?($this->schedule->dueEvents($this->laravel)?as?$event)?{ ????????if?(!?$event->filtersPass($this->laravel))?{ ????????????$this->dispatcher->dispatch(new?ScheduledTaskSkipped($event)); ????????????continue; ????????} ????????if?($event->onOneServer)?{ ????????????$this->runSingleServerEvent($event); ????????}?else?{ ????????????$this->runEvent($event); ????????} ????????$this->eventsRan?=?true; ????} ????if?(!?$this->eventsRan)?{ ????????$this->info('No?scheduled?commands?are?ready?to?run.'); ????} } //?namespace?IlluminateConsoleSchedulingSchedule public?function?dueEvents($app) { ????return?collect($this->events)->filter->isDue($app); } //?namespace?IlluminateConsoleSchedulingEvent public?function?isDue($app) { ????/*?...?...?*/ ????return?$this->expressionPasses()?&& ???????????$this->runsInEnvironment($app->environment()); } protected?function?expressionPasses() { ????$date?=?Carbon::now(); ????/*?...?...?*/ ????return?CronExpression::factory($this->expression)->isDue($date->toDateTimeString()); } //?namespace?CronCronExpression public?function?isDue($currentTime?=?'now',?$timeZone?=?null) { ???/*?...?...?*/ ??? ????try?{ ????????return?$this->getNextRunDate($currentTime,?0,?true)->getTimestamp()?===?$currentTime->getTimestamp(); ????}?catch?(Exception?$e)?{ ????????return?false; ????} } public?function?getNextRunDate($currentTime?=?'now',?$nth?=?0,?$allowCurrentDate?=?false,?$timeZone?=?null) { ????return?$this->getRunDate($currentTime,?$nth,?false,?$allowCurrentDate,?$timeZone); }
??有時候,我們可能需要 kill 掉一些在后臺運行的命令行,但緊接著我們會發現這些被 kill 掉的命令行在一段時間內無法按照設置的運行周期自動調度,其原因就在于手動 kill 掉的命令行沒有調用 schedule:finish 清理緩存文件,釋放互斥鎖。這就導致在設置的過期時間到達之前,互斥鎖會一直被占用,新的 Event 不會再次運行。
【相關推薦:laravel視頻教程】