[vhffs-dev] [1258] added handling of stalled jobs (scheduler killed, node crashed, or such) |
[ Thread Index |
Date Index
| More vhffs.org/vhffs-dev Archives
]
Revision: 1258
Author: gradator
Date: 2008-09-25 19:28:52 +0200 (Thu, 25 Sep 2008)
Log Message:
-----------
added handling of stalled jobs (scheduler killed, node crashed, or such)
Modified Paths:
--------------
trunk/vhffs-api/src/Vhffs/Robots/Cron.pm
trunk/vhffs-api/src/Vhffs/Services/Cron.pm
trunk/vhffs-robots/src/cron_scheduler.pl
Modified: trunk/vhffs-api/src/Vhffs/Robots/Cron.pm
===================================================================
--- trunk/vhffs-api/src/Vhffs/Robots/Cron.pm 2008-09-25 16:39:55 UTC (rev 1257)
+++ trunk/vhffs-api/src/Vhffs/Robots/Cron.pm 2008-09-25 17:28:52 UTC (rev 1258)
@@ -44,7 +44,7 @@
{
my $main = shift;
- my $sql = 'SELECT c.cron_id, c.cronpath, c.interval, c.reportmail, c.lastrundate, c.lastrunreturncode, c.nextrundate, c.running, o.owner_uid, o.owner_gid, o.object_id, o.date_creation, o.description, o.state FROM vhffs_cron c INNER JOIN vhffs_object o ON c.object_id=o.object_id WHERE o.state=? AND running=0 AND ( c.nextrundate IS NULL OR c.nextrundate<? )';
+ my $sql = 'SELECT c.cron_id, c.cronpath, c.interval, c.reportmail, c.lastrundate, c.lastrunreturncode, c.nextrundate, c.running, o.owner_uid, o.owner_gid, o.object_id, o.date_creation, o.description, o.state FROM vhffs_cron c INNER JOIN vhffs_object o ON c.object_id=o.object_id WHERE o.state=? AND c.running=0 AND ( c.nextrundate IS NULL OR c.nextrundate<? )';
my $dbh = $main->get_db_object();
my $sth = $dbh->prepare($sql);
$sth->execute( Vhffs::Constants::ACTIVATED , time() ) or return undef;
@@ -57,4 +57,22 @@
}
+sub get_stalled_jobs
+{
+ my $main = shift;
+ my $maxexectime = shift;
+
+ my $sql = 'SELECT c.cron_id, c.cronpath, c.interval, c.reportmail, c.lastrundate, c.lastrunreturncode, c.nextrundate, c.running, o.owner_uid, o.owner_gid, o.object_id, o.date_creation, o.description, o.state FROM vhffs_cron c INNER JOIN vhffs_object o ON c.object_id=o.object_id WHERE o.state=? AND c.running!=0 AND c.nextrundate IS NOT NULL AND c.nextrundate<?';
+ my $dbh = $main->get_db_object();
+ my $sth = $dbh->prepare($sql);
+ $sth->execute( Vhffs::Constants::ACTIVATED , time() - $maxexectime ) or return undef;
+
+ my $repos = [];
+ while(my $r = $sth->fetchrow_arrayref()) {
+ push(@$repos, _new Vhffs::Services::Cron($main, @$r));
+ }
+ return $repos;
+}
+
+
1;
Modified: trunk/vhffs-api/src/Vhffs/Services/Cron.pm
===================================================================
--- trunk/vhffs-api/src/Vhffs/Services/Cron.pm 2008-09-25 16:39:55 UTC (rev 1257)
+++ trunk/vhffs-api/src/Vhffs/Services/Cron.pm 2008-09-25 17:28:52 UTC (rev 1258)
@@ -403,6 +403,14 @@
return $request->execute( $date , $returncode , $self->{'cron_id'} );
}
+sub quick_reset
+{
+ my ($self) = @_;
+ my $query = 'UPDATE vhffs_cron SET running=0, lastrundate=NULL, lastrunreturncode=NULL WHERE cron_id=?';
+ my $request = $self->{'db'}->prepare($query);
+ return $request->execute( $self->{'cron_id'} );
+}
+
sub getall_per_group
{
my ($main, $group) = @_;
Modified: trunk/vhffs-robots/src/cron_scheduler.pl
===================================================================
--- trunk/vhffs-robots/src/cron_scheduler.pl 2008-09-25 16:39:55 UTC (rev 1257)
+++ trunk/vhffs-robots/src/cron_scheduler.pl 2008-09-25 17:28:52 UTC (rev 1258)
@@ -72,8 +72,9 @@
while(1) {
- if( time - $prevrun > int(rand 10)+5 ) {
+ if( time() - $prevrun > int(rand 10)+5 ) {
+ # new jobs ?
my $crons = Vhffs::Robots::Cron::get_runnable_jobs( $vhffs );
foreach my $c ( @{$crons} ) {
if( exists ${%jobs}{ $c->get_cron_id } ) {
@@ -84,16 +85,22 @@
}
}
- $prevrun = time;
+ # stalled jobs ?
+ $crons = Vhffs::Robots::Cron::get_stalled_jobs( $vhffs , $maxexectime + 60 );
+ foreach my $c ( @{$crons} ) {
+ $c->quick_reset();
+ }
+
+ $prevrun = time();
}
foreach ( keys %jobs ) {
my $job = ${%jobs}{$_};
if( $job->{'status'} == STATUS_CREATED ) {
- run_job( $job ) if( time > $job->{'runat'} );
+ run_job( $job ) if( time() > $job->{'runat'} );
}
elsif ( $job->{'status'} == STATUS_RUNNING ) {
- if( defined $maxexectime && $maxexectime > 0 && time - $job->{'startedat'} > $maxexectime ) {
+ if( defined $maxexectime && $maxexectime > 0 && time() - $job->{'startedat'} > $maxexectime ) {
kill 9, $job->{'pid'};
$job->{'status'} = STATUS_KILLED;
}
@@ -191,11 +198,12 @@
${%jobs}{$cron_id}{'output'} = undef;
${%jobs}{$cron_id}{'env'} = '';
${%jobs}{$cron_id}{'inheaders'} = 1;
- ${%jobs}{$cron_id}{'createdat'} = time;
+ ${%jobs}{$cron_id}{'createdat'} = time();
${%jobs}{$cron_id}{'runat'} = ${%jobs}{$cron_id}{'createdat'} + int(rand 4) +2;
${%jobs}{$cron_id}{'startedat'} = undef;
${%jobs}{$cron_id}{'status'} = STATUS_CREATED;
$cron->quick_inc_running();
+ $cron->quick_set_nextrundate( ${%jobs}{$cron_id}{'runat'} ); # so that we know when the process will/has be/been started in nextrundate
print scalar localtime().' + '.$cron_id.' '.$cron->get_cronpath."\n";
return ${%jobs}{$cron_id};
@@ -247,7 +255,7 @@
${%jobs}{$cron_id}{'pid'} = $pid;
${%jobs}{$cron_id}{'pipe'} = $par;
- ${%jobs}{$cron_id}{'startedat'} = time;
+ ${%jobs}{$cron_id}{'startedat'} = time();
${%jobs}{$cron_id}{'status'} = STATUS_RUNNING;
${%fd2jobs}{$par->fileno} = ${%jobs}{$cron_id};
$read_set->add($par);