Index: src/main/java/org/activiti/engine/impl/jobexecutor/TimerDeclarationImpl.java
===================================================================
--- src/main/java/org/activiti/engine/impl/jobexecutor/TimerDeclarationImpl.java	(revision 3103)
+++ src/main/java/org/activiti/engine/impl/jobexecutor/TimerDeclarationImpl.java	(working copy)
@@ -99,12 +99,18 @@
         .getProcessEngineConfiguration()
         .getBusinessCalendarManager()
         .getBusinessCalendar(type.caledarName);
+    
+    
 
     String dueDateString = executionEntity == null ? description.getExpressionText() : (String) description.getValue(executionEntity);
     Date duedate = businessCalendar.resolveDuedate(dueDateString);
 
+    int retries = Context.getProcessEngineConfiguration().getJobExecutor().getAmountOfRetries();
+
     TimerEntity timer = new TimerEntity(this);
     timer.setDuedate(duedate);
+    timer.setRetries(retries);
+
     if (executionEntity != null) {
       timer.setExecution(executionEntity);
     }
Index: src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java
===================================================================
--- src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java	(revision 3103)
+++ src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java	(working copy)
@@ -22,6 +22,7 @@
 
 import org.activiti.engine.ActivitiException;
 import org.activiti.engine.impl.interceptor.CommandExecutor;
+import org.activiti.engine.impl.persistence.entity.JobEntity;
 
 /**
  * Manager class in charge of all background / asynchronous
@@ -46,6 +47,8 @@
   protected int queueSize = 5;
   protected int corePoolSize = 3;
   private int maxPoolSize = 10;
+  private int amountOfRetries = JobEntity.DEFAULT_RETRIES;
+  private int retriesDelayInMills = 0;   
 
   protected JobAcquisitionThread jobAcquisitionThread;
   protected BlockingQueue<Runnable> threadPoolQueue;
@@ -227,4 +230,24 @@
   public void setAutoActivate(boolean isAutoActivate) {
     this.isAutoActivate = isAutoActivate;
   }
+
+  
+  public int getAmountOfRetries() {
+    return amountOfRetries;
+  }
+
+  
+  public void setAmountOfRetries(int amountOfRetries) {
+    this.amountOfRetries = amountOfRetries;
+  }
+
+  
+  public int getRetriesDelayInMills() {
+    return retriesDelayInMills;
+  }
+
+  
+  public void setRetriesDelayInMills(int retriesDelayInMills) {
+    this.retriesDelayInMills = retriesDelayInMills;
+  }
 }
Index: src/main/java/org/activiti/engine/impl/cmd/DecrementJobRetriesCmd.java
===================================================================
--- src/main/java/org/activiti/engine/impl/cmd/DecrementJobRetriesCmd.java	(revision 3103)
+++ src/main/java/org/activiti/engine/impl/cmd/DecrementJobRetriesCmd.java	(working copy)
@@ -15,6 +15,8 @@
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
 
 import org.activiti.engine.impl.cfg.TransactionContext;
 import org.activiti.engine.impl.cfg.TransactionState;
@@ -24,6 +26,7 @@
 import org.activiti.engine.impl.jobexecutor.JobExecutor;
 import org.activiti.engine.impl.jobexecutor.MessageAddedNotification;
 import org.activiti.engine.impl.persistence.entity.JobEntity;
+import org.activiti.engine.impl.util.ClockUtil;
 
 /**
  * @author Tom Baeyens
@@ -40,23 +43,30 @@
   }
 
   public Object execute(CommandContext commandContext) {
+    
+    JobExecutor jobExecutor = Context.getProcessEngineConfiguration().getJobExecutor();
+    
     JobEntity job = Context
       .getCommandContext()
       .getJobManager()
       .findJobById(jobId);
+    
     job.setRetries(job.getRetries() - 1);
     job.setLockOwner(null);
     job.setLockExpirationTime(null);
     
+    calculateNextAttemptTime(job, jobExecutor.getRetriesDelayInMills());
+    
     if(exception != null) {
       job.setExceptionMessage(exception.getMessage());
       job.setExceptionStacktrace(getExceptionStacktrace());
     }
     
-    JobExecutor jobExecutor = Context.getProcessEngineConfiguration().getJobExecutor();
-    MessageAddedNotification messageAddedNotification = new MessageAddedNotification(jobExecutor);
-    TransactionContext transactionContext = commandContext.getTransactionContext();
-    transactionContext.addTransactionListener(TransactionState.COMMITTED, messageAddedNotification);
+    if (jobExecutor.getRetriesDelayInMills() == 0) {
+      MessageAddedNotification messageAddedNotification = new MessageAddedNotification(jobExecutor);
+      TransactionContext transactionContext = commandContext.getTransactionContext();
+      transactionContext.addTransactionListener(TransactionState.COMMITTED, messageAddedNotification);
+    }
     
     return null;
   }
@@ -66,4 +76,14 @@
     exception.printStackTrace(new PrintWriter(stringWriter));
     return stringWriter.toString();
   }
+
+  protected void calculateNextAttemptTime(JobEntity job, int delayRetriesInMillis) {
+
+    GregorianCalendar gregorianCalendar = new GregorianCalendar();
+    gregorianCalendar.setTime(ClockUtil.getCurrentTime());
+    gregorianCalendar.add(Calendar.MILLISECOND, delayRetriesInMillis);
+    job.setNextAttemptTime(gregorianCalendar.getTime());
+
+  }
+
 }
Index: src/main/java/org/activiti/engine/impl/test/AbstractActivitiTestCase.java
===================================================================
--- src/main/java/org/activiti/engine/impl/test/AbstractActivitiTestCase.java	(revision 3103)
+++ src/main/java/org/activiti/engine/impl/test/AbstractActivitiTestCase.java	(working copy)
@@ -175,6 +175,10 @@
   }
 
   public void waitForJobExecutorToProcessAllJobs(long maxMillisToWait, long intervalMillis) {
+    waitForJobExecutor(maxMillisToWait, intervalMillis, true);
+  }
+
+  public void waitForJobExecutor(long maxMillisToWait, long intervalMillis, boolean processAllJobs) {
     JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
     jobExecutor.start();
 
@@ -192,7 +196,7 @@
       } finally {
         timer.cancel();
       }
-      if (areJobsAvailable) {
+      if (areJobsAvailable && processAllJobs) {
         throw new ActivitiException("time limit of " + maxMillisToWait + " was exceeded");
       }
 
Index: src/main/java/org/activiti/engine/impl/persistence/entity/JobEntity.java
===================================================================
--- src/main/java/org/activiti/engine/impl/persistence/entity/JobEntity.java	(revision 3103)
+++ src/main/java/org/activiti/engine/impl/persistence/entity/JobEntity.java	(working copy)
@@ -57,6 +57,7 @@
   protected boolean isExclusive = DEFAULT_EXCLUSIVE;
 
   protected int retries = DEFAULT_RETRIES;
+  protected Date nextAttemptTime = null;
 
   protected String jobHandlerType = null;
   protected String jobHandlerConfiguration = null;
@@ -248,4 +249,14 @@
     }
     return exceptionByteArray;
   }
+
+  
+  public Date getNextAttemptTime() {
+    return nextAttemptTime;
+  }
+
+  
+  public void setNextAttemptTime(Date nextAttemptTime) {
+    this.nextAttemptTime = nextAttemptTime;
+  }
 }
Index: src/main/java/org/activiti/engine/impl/persistence/entity/ExecutionEntity.java
===================================================================
--- src/main/java/org/activiti/engine/impl/persistence/entity/ExecutionEntity.java	(revision 3103)
+++ src/main/java/org/activiti/engine/impl/persistence/entity/ExecutionEntity.java	(working copy)
@@ -502,6 +502,7 @@
     message.setExecution(this);
     message.setExclusive(getActivity().isExclusive());
     message.setJobHandlerType(AsyncContinuationJobHandler.TYPE);
+    message.setRetries(Context.getProcessEngineConfiguration().getJobExecutor().getAmountOfRetries());
     // At the moment, only AtomicOperationTransitionCreateScope can be performed asynchronously,
     // so there is no need to pass it to the handler
 
Index: src/main/resources/org/activiti/db/mapping/entity/Job.xml
===================================================================
--- src/main/resources/org/activiti/db/mapping/entity/Job.xml	(revision 3103)
+++ src/main/resources/org/activiti/db/mapping/entity/Job.xml	(working copy)
@@ -21,6 +21,7 @@
     <result property="executionId" column="EXECUTION_ID_" jdbcType="VARCHAR" />
     <result property="processInstanceId" column="PROCESS_INSTANCE_ID_" jdbcType="VARCHAR" />
     <result property="retries" column="RETRIES_" jdbcType="INTEGER" />
+    <result property="nextAttemptTime" column="NEXT_ATTEMPT_TIME_" jdbcType="TIMESTAMP" />
     <result property="exceptionByteArrayId" column="EXCEPTION_STACK_ID_" jdbcType="VARCHAR" />
     <result property="exceptionMessage" column="EXCEPTION_MSG_" jdbcType="VARCHAR" />
     <result property="jobHandlerType" column="HANDLER_TYPE_" jdbcType="VARCHAR" />
@@ -53,6 +54,7 @@
     where (RETRIES_ &gt; 0)
       and (DUEDATE_ is null or DUEDATE_ &lt; #{now, jdbcType=TIMESTAMP})
       and (LOCK_OWNER_ is null or LOCK_EXP_TIME_ &lt; #{now, jdbcType=TIMESTAMP})
+      and (NEXT_ATTEMPT_TIME_ is null or NEXT_ATTEMPT_TIME_ &lt; #{now, jdbcType=TIMESTAMP})
       and (RETRIES_ &gt; 0)
   </select>
   
@@ -62,6 +64,7 @@
     where (RETRIES_ &gt; 0)
       and (DUEDATE_ is null or DUEDATE_ &lt; #{now, jdbcType=TIMESTAMP})
       and (LOCK_OWNER_ is null or LOCK_EXP_TIME_ &lt; #{now, jdbcType=TIMESTAMP})
+      and (NEXT_ATTEMPT_TIME_ is null or NEXT_ATTEMPT_TIME_ &lt; #{now, jdbcType=TIMESTAMP}) 
       and (RETRIES_ &gt; 0)
       and (EXCLUSIVE_ = TRUE)
       and (PROCESS_INSTANCE_ID_ = #{pid})
@@ -290,7 +293,8 @@
        LOCK_OWNER_ = #{lockOwner, jdbcType=VARCHAR},
        RETRIES_ = #{retries, jdbcType=INTEGER},
        EXCEPTION_STACK_ID_ = #{exceptionByteArrayId, jdbcType=VARCHAR},
-       EXCEPTION_MSG_ = #{exceptionMessage, jdbcType=VARCHAR}
+       EXCEPTION_MSG_ = #{exceptionMessage, jdbcType=VARCHAR},
+       NEXT_ATTEMPT_TIME_ = #{nextAttemptTime, jdbcType=TIMESTAMP}
     </set>
     where ID_= #{id, jdbcType=VARCHAR}
       and REV_ = #{revision, jdbcType=INTEGER}
Index: src/main/resources/org/activiti/db/upgrade/activiti.h2.upgradestep.58.to.59.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/upgrade/activiti.h2.upgradestep.58.to.59.engine.sql	(revision 0)
+++ src/main/resources/org/activiti/db/upgrade/activiti.h2.upgradestep.58.to.59.engine.sql	(revision 0)
@@ -0,0 +1,2 @@
+alter table ACT_RU_JOB 
+add NEXT_ATTEMPT_TIME_ timestamp;
Index: src/main/resources/org/activiti/db/upgrade/activiti.postgres.upgradestep.58.to.59.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/upgrade/activiti.postgres.upgradestep.58.to.59.engine.sql	(revision 0)
+++ src/main/resources/org/activiti/db/upgrade/activiti.postgres.upgradestep.58.to.59.engine.sql	(revision 0)
@@ -0,0 +1,2 @@
+alter table ACT_RU_JOB 
+add NEXT_ATTEMPT_TIME_ timestamp;
Index: src/main/resources/org/activiti/db/upgrade/activiti.db2.upgradestep.58.to.59.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/upgrade/activiti.db2.upgradestep.58.to.59.engine.sql	(revision 0)
+++ src/main/resources/org/activiti/db/upgrade/activiti.db2.upgradestep.58.to.59.engine.sql	(revision 0)
@@ -0,0 +1,2 @@
+alter table ACT_RU_JOB 
+add NEXT_ATTEMPT_TIME_ timestamp;
Index: src/main/resources/org/activiti/db/upgrade/activiti.mysql.upgradestep.58.to.59.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/upgrade/activiti.mysql.upgradestep.58.to.59.engine.sql	(revision 0)
+++ src/main/resources/org/activiti/db/upgrade/activiti.mysql.upgradestep.58.to.59.engine.sql	(revision 0)
@@ -0,0 +1,2 @@
+alter table ACT_RU_JOB 
+add NEXT_ATTEMPT_TIME_ timestamp;
Index: src/main/resources/org/activiti/db/upgrade/activiti.oracle.upgradestep.58.to.59.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/upgrade/activiti.oracle.upgradestep.58.to.59.engine.sql	(revision 0)
+++ src/main/resources/org/activiti/db/upgrade/activiti.oracle.upgradestep.58.to.59.engine.sql	(revision 0)
@@ -0,0 +1,2 @@
+alter table ACT_RU_JOB 
+add NEXT_ATTEMPT_TIME_ TIMESTAMP(6);
Index: src/main/resources/org/activiti/db/upgrade/activiti.mssql.upgradestep.58.to.59.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/upgrade/activiti.mssql.upgradestep.58.to.59.engine.sql	(revision 0)
+++ src/main/resources/org/activiti/db/upgrade/activiti.mssql.upgradestep.58.to.59.engine.sql	(revision 0)
@@ -0,0 +1,2 @@
+alter table ACT_RU_JOB 
+add NEXT_ATTEMPT_TIME_ datetime;
Index: src/main/resources/org/activiti/db/create/activiti.postgres.create.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/create/activiti.postgres.create.engine.sql	(revision 3103)
+++ src/main/resources/org/activiti/db/create/activiti.postgres.create.engine.sql	(working copy)
@@ -56,6 +56,7 @@
     EXECUTION_ID_ varchar(64),
     PROCESS_INSTANCE_ID_ varchar(64),
     RETRIES_ integer,
+    NEXT_ATTEMPT_TIME_ timestamp,
     EXCEPTION_STACK_ID_ varchar(64),
     EXCEPTION_MSG_ varchar(4000),
     DUEDATE_ timestamp,
Index: src/main/resources/org/activiti/db/create/activiti.oracle.create.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/create/activiti.oracle.create.engine.sql	(revision 3103)
+++ src/main/resources/org/activiti/db/create/activiti.oracle.create.engine.sql	(working copy)
@@ -55,6 +55,7 @@
     EXECUTION_ID_ NVARCHAR2(64),
     PROCESS_INSTANCE_ID_ NVARCHAR2(64),
     RETRIES_ INTEGER,
+    NEXT_ATTEMPT_TIME_ TIMESTAMP(6),
     EXCEPTION_STACK_ID_ NVARCHAR2(64),
     EXCEPTION_MSG_ NVARCHAR2(2000),
     DUEDATE_ TIMESTAMP(6),
Index: src/main/resources/org/activiti/db/create/activiti.mssql.create.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/create/activiti.mssql.create.engine.sql	(revision 3103)
+++ src/main/resources/org/activiti/db/create/activiti.mssql.create.engine.sql	(working copy)
@@ -55,6 +55,7 @@
     EXECUTION_ID_ nvarchar(64),
     PROCESS_INSTANCE_ID_ nvarchar(64),
     RETRIES_ int,
+    NEXT_ATTEMPT_TIME_ datetime,
     EXCEPTION_STACK_ID_ nvarchar(64),
     EXCEPTION_MSG_ nvarchar(4000),
     DUEDATE_ datetime NULL,
Index: src/main/resources/org/activiti/db/create/activiti.db2.create.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/create/activiti.db2.create.engine.sql	(revision 3103)
+++ src/main/resources/org/activiti/db/create/activiti.db2.create.engine.sql	(working copy)
@@ -57,6 +57,7 @@
     EXECUTION_ID_ varchar(64),
     PROCESS_INSTANCE_ID_ varchar(64),
     RETRIES_ integer,
+    NEXT_ATTEMPT_TIME_ timestamp,
     EXCEPTION_STACK_ID_ varchar(64),
     EXCEPTION_MSG_ varchar(4000),
     DUEDATE_ timestamp null,
Index: src/main/resources/org/activiti/db/create/activiti.h2.create.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/create/activiti.h2.create.engine.sql	(revision 3103)
+++ src/main/resources/org/activiti/db/create/activiti.h2.create.engine.sql	(working copy)
@@ -55,6 +55,7 @@
     EXECUTION_ID_ varchar(64),
     PROCESS_INSTANCE_ID_ varchar(64),
     RETRIES_ integer,
+    NEXT_ATTEMPT_TIME_ timestamp,
     EXCEPTION_STACK_ID_ varchar(64),
     EXCEPTION_MSG_ varchar(4000),
     DUEDATE_ timestamp,
Index: src/main/resources/org/activiti/db/create/activiti.mysql.create.engine.sql
===================================================================
--- src/main/resources/org/activiti/db/create/activiti.mysql.create.engine.sql	(revision 3103)
+++ src/main/resources/org/activiti/db/create/activiti.mysql.create.engine.sql	(working copy)
@@ -56,6 +56,7 @@
     EXECUTION_ID_ varchar(64),
     PROCESS_INSTANCE_ID_ varchar(64),
     RETRIES_ integer,
+    NEXT_ATTEMPT_TIME_ timestamp,
     EXCEPTION_STACK_ID_ varchar(64),
     EXCEPTION_MSG_ varchar(4000),
     DUEDATE_ timestamp NULL,
Index: src/test/java/org/activiti/engine/test/jobexecutor/JobExecutorRetryTest.java
===================================================================
--- src/test/java/org/activiti/engine/test/jobexecutor/JobExecutorRetryTest.java	(revision 0)
+++ src/test/java/org/activiti/engine/test/jobexecutor/JobExecutorRetryTest.java	(revision 0)
@@ -0,0 +1,135 @@
+package org.activiti.engine.test.jobexecutor;
+
+import org.activiti.engine.impl.interceptor.Command;
+import org.activiti.engine.impl.interceptor.CommandContext;
+import org.activiti.engine.impl.interceptor.CommandExecutor;
+import org.activiti.engine.impl.persistence.entity.JobEntity;
+import org.activiti.engine.impl.persistence.entity.MessageEntity;
+import org.activiti.engine.impl.test.PluggableActivitiTestCase;
+import org.activiti.engine.impl.util.ClockUtil;
+
+/**
+ * Test case for jira ACT-1046
+ * 
+ * @author Dawid Wrzosek
+ */
+public class JobExecutorRetryTest extends PluggableActivitiTestCase {
+
+  protected TweetExceptionHandler tweetExceptionHandler = new TweetExceptionHandler();
+
+  protected CommandExecutor commandExecutor;
+
+  public void setUp() throws Exception {
+    processEngineConfiguration.getJobHandlers().put(tweetExceptionHandler.getType(), tweetExceptionHandler);
+    processEngineConfiguration.getJobExecutor().setRetriesDelayInMills(2000);
+    processEngineConfiguration.getJobExecutor().setWaitTimeInMillis(1000);
+    this.commandExecutor = processEngineConfiguration.getCommandExecutorTxRequired();
+  }
+
+  public void tearDown() throws Exception {
+    processEngineConfiguration.getJobHandlers().remove(tweetExceptionHandler.getType());
+    processEngineConfiguration.getJobExecutor().setAmountOfRetries(JobEntity.DEFAULT_RETRIES);
+    processEngineConfiguration.getJobExecutor().setRetriesDelayInMills(0);
+  }
+
+  public void testDelayWithoutRetry() throws Exception {
+
+    // when
+
+    tweetExceptionHandler.setExceptionsRemaining(1);
+
+    String jobId = commandExecutor.execute(new Command<String>() {
+
+      public String execute(CommandContext commandContext) {
+        MessageEntity message = createTweetExceptionMessage();
+        commandContext.getJobManager().send(message);
+
+        return message.getId();
+      }
+    });
+
+    waitForJobExecutor(1500, 1000, false);
+
+    // then
+    JobEntity job = (JobEntity) managementService.createJobQuery().jobId(jobId).singleResult();
+    assertNotNull(job);
+    assertEquals(2, job.getRetries());
+    assertNotNull(job.getNextAttemptTime());
+
+    // clean up
+    processEngineConfiguration.getJobExecutor().setRetriesDelayInMills(0);
+    waitForJobExecutor(100000, 1000, true);
+
+  }
+
+  public void testDelayWith1Retry() throws Exception {
+
+    // when
+    tweetExceptionHandler.setExceptionsRemaining(2);
+
+    String jobId = commandExecutor.execute(new Command<String>() {
+
+      public String execute(CommandContext commandContext) {
+        MessageEntity message = createTweetExceptionMessage();
+        commandContext.getJobManager().send(message);
+
+        return message.getId();
+      }
+    });
+
+    waitForJobExecutor(4000, 200, false);
+
+    // then
+    JobEntity job = (JobEntity) managementService.createJobQuery().jobId(jobId).singleResult();
+
+    assertNotNull(job);
+    assertEquals(1, job.getRetries());
+    assertNotNull(job.getNextAttemptTime());
+
+    assertNotNull(ClockUtil.getCurrentTime());
+
+    // clean up
+    processEngineConfiguration.getJobExecutor().setRetriesDelayInMills(0);
+    waitForJobExecutor(100000, 1000, true);
+
+  }
+
+  public void testAmountOfRetries() throws Exception {
+
+    // when
+    tweetExceptionHandler.setExceptionsRemaining(10);
+    processEngineConfiguration.getJobExecutor().setAmountOfRetries(5);
+
+    String jobId = commandExecutor.execute(new Command<String>() {
+
+      public String execute(CommandContext commandContext) {
+        MessageEntity message = createTweetExceptionMessage();
+        message.setRetries(processEngineConfiguration.getJobExecutor().getAmountOfRetries());
+
+        commandContext.getJobManager().send(message);
+
+        return message.getId();
+      }
+    });
+
+    waitForJobExecutor(15000, 100, true);
+
+    // then
+    JobEntity job = (JobEntity) managementService.createJobQuery().jobId(jobId).singleResult();
+    assertNotNull(jobId);
+    assertEquals(0, job.getRetries());
+    assertEquals(5, tweetExceptionHandler.getExceptionsRemaining());
+
+    // clean up
+    tweetExceptionHandler.setExceptionsRemaining(0);
+    managementService.executeJob(jobId);
+
+  }
+
+  protected MessageEntity createTweetExceptionMessage() {
+    MessageEntity message = new MessageEntity();
+    message.setJobHandlerType("tweet-exception");
+    return message;
+  }
+
+}
