[vhffs-dev] [1258] added handling of stalled jobs (scheduler killed, node crashed, or such)

[ Thread Index | Date Index | More vhffs.org/vhffs-dev Archives ]


Revision: 1258
Author:   gradator
Date:     2008-09-25 19:28:52 +0200 (Thu, 25 Sep 2008)

Log Message:
-----------
added handling of stalled jobs (scheduler killed, node crashed, or such)

Modified Paths:
--------------
    trunk/vhffs-api/src/Vhffs/Robots/Cron.pm
    trunk/vhffs-api/src/Vhffs/Services/Cron.pm
    trunk/vhffs-robots/src/cron_scheduler.pl


Modified: trunk/vhffs-api/src/Vhffs/Robots/Cron.pm
===================================================================
--- trunk/vhffs-api/src/Vhffs/Robots/Cron.pm	2008-09-25 16:39:55 UTC (rev 1257)
+++ trunk/vhffs-api/src/Vhffs/Robots/Cron.pm	2008-09-25 17:28:52 UTC (rev 1258)
@@ -44,7 +44,7 @@
 {
 	my $main = shift;
 
-	my $sql = 'SELECT c.cron_id, c.cronpath, c.interval, c.reportmail, c.lastrundate, c.lastrunreturncode, c.nextrundate, c.running, o.owner_uid, o.owner_gid, o.object_id, o.date_creation, o.description, o.state FROM vhffs_cron c INNER JOIN vhffs_object o ON c.object_id=o.object_id WHERE o.state=? AND running=0 AND ( c.nextrundate IS NULL OR c.nextrundate<? )';
+	my $sql = 'SELECT c.cron_id, c.cronpath, c.interval, c.reportmail, c.lastrundate, c.lastrunreturncode, c.nextrundate, c.running, o.owner_uid, o.owner_gid, o.object_id, o.date_creation, o.description, o.state FROM vhffs_cron c INNER JOIN vhffs_object o ON c.object_id=o.object_id WHERE o.state=? AND c.running=0 AND ( c.nextrundate IS NULL OR c.nextrundate<? )';
 	my $dbh = $main->get_db_object();
 	my $sth = $dbh->prepare($sql);
 	$sth->execute( Vhffs::Constants::ACTIVATED , time() ) or return undef;
@@ -57,4 +57,22 @@
 }
 
 
+sub get_stalled_jobs
+{
+	my $main = shift;
+	my $maxexectime = shift;
+
+	my $sql = 'SELECT c.cron_id, c.cronpath, c.interval, c.reportmail, c.lastrundate, c.lastrunreturncode, c.nextrundate, c.running, o.owner_uid, o.owner_gid, o.object_id, o.date_creation, o.description, o.state FROM vhffs_cron c INNER JOIN vhffs_object o ON c.object_id=o.object_id WHERE o.state=? AND c.running!=0 AND c.nextrundate IS NOT NULL AND c.nextrundate<?';
+	my $dbh = $main->get_db_object();
+	my $sth = $dbh->prepare($sql);
+	$sth->execute( Vhffs::Constants::ACTIVATED , time() - $maxexectime ) or return undef;
+
+	my $repos = [];
+	while(my $r = $sth->fetchrow_arrayref()) {
+		push(@$repos, _new Vhffs::Services::Cron($main, @$r));
+	}
+	return $repos;
+}
+
+
 1;

Modified: trunk/vhffs-api/src/Vhffs/Services/Cron.pm
===================================================================
--- trunk/vhffs-api/src/Vhffs/Services/Cron.pm	2008-09-25 16:39:55 UTC (rev 1257)
+++ trunk/vhffs-api/src/Vhffs/Services/Cron.pm	2008-09-25 17:28:52 UTC (rev 1258)
@@ -403,6 +403,14 @@
 	return $request->execute( $date , $returncode , $self->{'cron_id'} );
 }
 
+sub quick_reset
+{
+	my ($self) = @_;
+	my $query = 'UPDATE vhffs_cron SET running=0, lastrundate=NULL, lastrunreturncode=NULL WHERE cron_id=?';
+	my $request = $self->{'db'}->prepare($query);
+	return $request->execute( $self->{'cron_id'} );
+}
+
 sub getall_per_group
 {
 	my ($main, $group) = @_;

Modified: trunk/vhffs-robots/src/cron_scheduler.pl
===================================================================
--- trunk/vhffs-robots/src/cron_scheduler.pl	2008-09-25 16:39:55 UTC (rev 1257)
+++ trunk/vhffs-robots/src/cron_scheduler.pl	2008-09-25 17:28:52 UTC (rev 1258)
@@ -72,8 +72,9 @@
 
 while(1)  {
 
-	if( time - $prevrun > int(rand 10)+5 )  {
+	if( time() - $prevrun > int(rand 10)+5 )  {
 
+		# new jobs ?
 		my $crons = Vhffs::Robots::Cron::get_runnable_jobs( $vhffs );
 		foreach my $c ( @{$crons} )  {
 			if( exists ${%jobs}{ $c->get_cron_id } )  {
@@ -84,16 +85,22 @@
 			}
 		}
 
-		$prevrun = time;
+		# stalled jobs ?
+		$crons = Vhffs::Robots::Cron::get_stalled_jobs( $vhffs , $maxexectime + 60 );
+		foreach my $c ( @{$crons} )  {
+			$c->quick_reset();
+		}
+
+		$prevrun = time();
 	}
 
 	foreach ( keys %jobs )  {
 		my $job = ${%jobs}{$_};
 		if( $job->{'status'} == STATUS_CREATED )  {
-			run_job( $job ) if( time > $job->{'runat'} );
+			run_job( $job ) if( time() > $job->{'runat'} );
 		}
 		elsif ( $job->{'status'} == STATUS_RUNNING )  {
-			if( defined $maxexectime  &&  $maxexectime > 0  &&  time - $job->{'startedat'} > $maxexectime ) {
+			if( defined $maxexectime  &&  $maxexectime > 0  &&  time() - $job->{'startedat'} > $maxexectime ) {
 				kill 9, $job->{'pid'};
 				$job->{'status'} = STATUS_KILLED;
 			}
@@ -191,11 +198,12 @@
 	${%jobs}{$cron_id}{'output'} = undef;
 	${%jobs}{$cron_id}{'env'} = '';
 	${%jobs}{$cron_id}{'inheaders'} = 1;
-	${%jobs}{$cron_id}{'createdat'} = time;
+	${%jobs}{$cron_id}{'createdat'} = time();
 	${%jobs}{$cron_id}{'runat'} = ${%jobs}{$cron_id}{'createdat'} + int(rand 4) +2;
 	${%jobs}{$cron_id}{'startedat'} = undef;
 	${%jobs}{$cron_id}{'status'} = STATUS_CREATED;
 	$cron->quick_inc_running();
+	$cron->quick_set_nextrundate( ${%jobs}{$cron_id}{'runat'} );  # so that we know when the process will/has be/been started in nextrundate
 
 	print scalar localtime().'    + '.$cron_id.' '.$cron->get_cronpath."\n";
 	return ${%jobs}{$cron_id};
@@ -247,7 +255,7 @@
 
 		${%jobs}{$cron_id}{'pid'} = $pid;
 		${%jobs}{$cron_id}{'pipe'} = $par;
-		${%jobs}{$cron_id}{'startedat'} = time;
+		${%jobs}{$cron_id}{'startedat'} = time();
 		${%jobs}{$cron_id}{'status'} = STATUS_RUNNING;
 		${%fd2jobs}{$par->fileno} = ${%jobs}{$cron_id};
 		$read_set->add($par);


Mail converted by MHonArc 2.6.19+ http://listengine.tuxfamily.org/