QueuedJobService
class QueuedJobService (View source)
A service that can be used for starting, stopping and listing queued jobs.
When a job is first added, it is initialised, its job type determined, then persisted to the database
When the queues are scanned, a job is reloaded and processed. Ignoring the persistence and reloading, it looks something like
job->getJobType(); job->getJobData(); data->write(); job->setup(); while !job->isComplete job->process(); job->getJobData(); data->write();
Properties
protected | int | $startedAt | Timestamp (in seconds) when the queue was started |
|
public | DefaultQueueHandler | $queueHandler | ||
public | TaskRunnerEngine | $queueRunner | ||
public | array | $defaultJobs | Config controlled list of default/required jobs |
Methods
Start a job (or however the queue handler determines it should be started)
Copies data from a job into a descriptor for persisting
Check the current job queues and see if any of the jobs currently in there should be started. If so, return the next job that should be executed
Runs an explicit check on all currently running jobs to make sure their "processed" count is incrementing between each run. If it's not, then we need to flag it as paused due to an error.
Checks through all the scheduled jobs that are expected to exist
Prepares the given jobDescriptor for execution. Returns the job that will actually be run in a state ready for executing.
Given a QueuedJobDescriptor mark the job as initialised. Works sort of like a mutex.
Determines the memory limit (in bytes) for this application Limits to the smaller of memory_limit configured via php.ini or silverstripe config
Gets a list of all the current jobs (or jobs that have recently finished)
Return the SQL filter used to get the job list - this is used by the UI for displaying the job list.
Details
__construct()
Register our shutdown handler
int
queueJob(QueuedJob $job, $startAfter = null, int $userId = null, $queueName = null)
Adds a job to the queue to be started
Relevant data about the job will be persisted using a QueuedJobDescriptor
startJob(JobDescriptor $jobDescriptor, date $startAfter = null)
Start a job (or however the queue handler determines it should be started)
protected
copyJobToDescriptor(QueuedJob $job, JobDescriptor $jobDescriptor)
Copies data from a job into a descriptor for persisting
protected
copyDescriptorToJob(QueuedJobDescriptor $jobDescriptor, QueuedJob $job)
No description
QueuedJobDescriptor
getNextPendingJob(string $type = null)
Check the current job queues and see if any of the jobs currently in there should be started. If so, return the next job that should be executed
checkJobHealth(int $queue = null)
Runs an explicit check on all currently running jobs to make sure their "processed" count is incrementing between each run. If it's not, then we need to flag it as paused due to an error.
This typically happens when a PHP fatal error is thrown, which can't be picked up by the error handler or exception checker; in this case, we detect these stalled jobs later and fix (try) to fix them
checkdefaultJobs($queue = null)
Checks through all the scheduled jobs that are expected to exist
protected bool
restartStalledJob(QueuedJobDescriptor $stalledJob)
Attempt to restart a stalled job
protected QueuedJob|bool
initialiseJob(QueuedJobDescriptor $jobDescriptor)
Prepares the given jobDescriptor for execution. Returns the job that will actually be run in a state ready for executing.
Note that this is called each time a job is picked up to be executed from the cron job - meaning that jobs that are paused and restarted will have 'setup()' called on them again, so your job MUST detect that and act accordingly.
protected bool
grabMutex(QueuedJobDescriptor $jobDescriptor)
Given a QueuedJobDescriptor mark the job as initialised. Works sort of like a mutex.
Currently a database lock isn't entirely achievable, due to database adapters not supporting locks. This may still have a race condition, but this should minimise the possibility. Side effect is the job status will be changed to "Initialised".
Assumption is the job has a status of "Queued" or "Wait".
bool
runJob(int $jobId)
Start the actual execution of a job.
The assumption is the jobID refers to a QueuedJobDescriptor that is status set as "Queued".
This method will continue executing until the job says it's completed
protected
markStarted()
Start timer
protected bool
hasPassedTimeLimit()
Is execution time too long?
protected bool
isMemoryTooHigh()
Is memory usage too high?
protected float
getMemoryUsage()
Get peak memory usage of this application
protected float
getMemoryLimit()
Determines the memory limit (in bytes) for this application Limits to the smaller of memory_limit configured via php.ini or silverstripe config
protected float
getPHPMemoryLimit()
Calculate the current memory limit of the server
protected float
parseMemory(string $memString)
Convert memory limit string to bytes.
Based on implementation in install.php5
protected
humanReadable($size)
No description
getJobList(string $type = null, int $includeUpUntil = 0)
Gets a list of all the current jobs (or jobs that have recently finished)
string
getJobListFilter(string $type = null, int $includeUpUntil = 0)
Return the SQL filter used to get the job list - this is used by the UI for displaying the job list.
..
runQueue(string $queue)
Process the job queue with the current queue runner
processJobQueue(string $name)
Process all jobs from a given queue
onShutdown()
When PHP shuts down, we want to process all of the immediate queue items
We use the 'getNextPendingJob' method, instead of just iterating the queue, to ensure we ignore paused or stalled jobs.