Index: src/main/java/org/activiti/engine/impl/jobexecutor/TimerDeclarationImpl.java
===================================================================
--- src/main/java/org/activiti/engine/impl/jobexecutor/TimerDeclarationImpl.java	(revision 3159)
+++ src/main/java/org/activiti/engine/impl/jobexecutor/TimerDeclarationImpl.java	(working copy)
@@ -99,12 +99,18 @@
         .getProcessEngineConfiguration()
         .getBusinessCalendarManager()
         .getBusinessCalendar(type.caledarName);
+    
+    
 
     String dueDateString = executionEntity == null ? description.getExpressionText() : (String) description.getValue(executionEntity);
     Date duedate = businessCalendar.resolveDuedate(dueDateString);
 
+    int retries = Context.getProcessEngineConfiguration().getJobExecutor().getAmountOfRetries();
+
     TimerEntity timer = new TimerEntity(this);
     timer.setDuedate(duedate);
+    timer.setRetries(retries);
+
     if (executionEntity != null) {
       timer.setExecution(executionEntity);
     }
Index: src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java
===================================================================
--- src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java	(revision 3159)
+++ src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java	(working copy)
@@ -20,6 +20,7 @@
 import org.activiti.engine.impl.cmd.AcquireJobsCmd;
 import org.activiti.engine.impl.interceptor.Command;
 import org.activiti.engine.impl.interceptor.CommandExecutor;
+import org.activiti.engine.impl.persistence.entity.JobEntity;
 import org.activiti.engine.runtime.Job;
 
 /**
@@ -52,6 +53,10 @@
   protected int waitTimeInMillis = 5 * 1000;
   protected String lockOwner = UUID.randomUUID().toString();
   protected int lockTimeInMillis = 5 * 60 * 1000;
+  
+  private int amountOfRetries = JobEntity.DEFAULT_RETRIES;
+  private int retriesDelayInMills = 0;   
+
       
   public void start() {
     if (isActive) {
@@ -159,6 +164,26 @@
   public boolean isActive() {
     return isActive;
   }
+
+  
+  public int getAmountOfRetries() {
+    return amountOfRetries;
+  }
+
+  
+  public void setAmountOfRetries(int amountOfRetries) {
+    this.amountOfRetries = amountOfRetries;
+  }
+
+  
+  public int getRetriesDelayInMills() {
+    return retriesDelayInMills;
+  }
+
+  
+  public void setRetriesDelayInMills(int retriesDelayInMills) {
+    this.retriesDelayInMills = retriesDelayInMills;
+  }
   
   public RejectedJobsHandler getRejectedJobsHandler() {
     return rejectedJobsHandler;
Index: src/main/java/org/activiti/engine/impl/cmd/DecrementJobRetriesCmd.java
===================================================================
--- src/main/java/org/activiti/engine/impl/cmd/DecrementJobRetriesCmd.java	(revision 3159)
+++ src/main/java/org/activiti/engine/impl/cmd/DecrementJobRetriesCmd.java	(working copy)
@@ -15,6 +15,8 @@
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
 
 import org.activiti.engine.impl.cfg.TransactionContext;
 import org.activiti.engine.impl.cfg.TransactionState;
@@ -24,6 +26,7 @@
 import org.activiti.engine.impl.jobexecutor.JobExecutor;
 import org.activiti.engine.impl.jobexecutor.MessageAddedNotification;
 import org.activiti.engine.impl.persistence.entity.JobEntity;
+import org.activiti.engine.impl.util.ClockUtil;
 
 /**
  * @author Tom Baeyens
@@ -40,23 +43,32 @@
   }
 
   public Object execute(CommandContext commandContext) {
+    
+    JobExecutor jobExecutor = Context.getProcessEngineConfiguration().getJobExecutor();
+    
     JobEntity job = Context
       .getCommandContext()
       .getJobManager()
       .findJobById(jobId);
+    
     job.setRetries(job.getRetries() - 1);
-    job.setLockOwner(null);
-    job.setLockExpirationTime(null);
     
+    
+
     if(exception != null) {
       job.setExceptionMessage(exception.getMessage());
       job.setExceptionStacktrace(getExceptionStacktrace());
     }
     
-    JobExecutor jobExecutor = Context.getProcessEngineConfiguration().getJobExecutor();
-    MessageAddedNotification messageAddedNotification = new MessageAddedNotification(jobExecutor);
-    TransactionContext transactionContext = commandContext.getTransactionContext();
-    transactionContext.addTransactionListener(TransactionState.COMMITTED, messageAddedNotification);
+    if (jobExecutor.getRetriesDelayInMills() > 0 ) {
+      calculateNextAttemptTime(job, jobExecutor.getRetriesDelayInMills());
+    } else {
+      job.setLockExpirationTime(null);
+      job.setLockOwner(null);
+      MessageAddedNotification messageAddedNotification = new MessageAddedNotification(jobExecutor);
+      TransactionContext transactionContext = commandContext.getTransactionContext();
+      transactionContext.addTransactionListener(TransactionState.COMMITTED, messageAddedNotification);
+    } 
     
     return null;
   }
@@ -66,4 +78,14 @@
     exception.printStackTrace(new PrintWriter(stringWriter));
     return stringWriter.toString();
   }
+
+  protected void calculateNextAttemptTime(JobEntity job, int delayRetriesInMillis) {
+
+    GregorianCalendar gregorianCalendar = new GregorianCalendar();
+    gregorianCalendar.setTime(ClockUtil.getCurrentTime());
+    gregorianCalendar.add(Calendar.MILLISECOND, delayRetriesInMillis);
+    job.setLockExpirationTime(gregorianCalendar.getTime());
+
+  }
+
 }
Index: src/main/java/org/activiti/engine/impl/test/AbstractActivitiTestCase.java
===================================================================
--- src/main/java/org/activiti/engine/impl/test/AbstractActivitiTestCase.java	(revision 3159)
+++ src/main/java/org/activiti/engine/impl/test/AbstractActivitiTestCase.java	(working copy)
@@ -176,6 +176,10 @@
   }
 
   public void waitForJobExecutorToProcessAllJobs(long maxMillisToWait, long intervalMillis) {
+    waitForJobExecutor(maxMillisToWait, intervalMillis, true);
+  }
+
+  public void waitForJobExecutor(long maxMillisToWait, long intervalMillis, boolean processAllJobs) {
     JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
     jobExecutor.start();
 
@@ -193,7 +197,7 @@
       } finally {
         timer.cancel();
       }
-      if (areJobsAvailable) {
+      if (areJobsAvailable && processAllJobs) {
         throw new ActivitiException("time limit of " + maxMillisToWait + " was exceeded");
       }
 
Index: src/main/java/org/activiti/engine/impl/persistence/entity/JobEntity.java
===================================================================
--- src/main/java/org/activiti/engine/impl/persistence/entity/JobEntity.java	(revision 3159)
+++ src/main/java/org/activiti/engine/impl/persistence/entity/JobEntity.java	(working copy)
@@ -249,4 +249,5 @@
     }
     return exceptionByteArray;
   }
+
 }
Index: src/main/java/org/activiti/engine/impl/persistence/entity/ExecutionEntity.java
===================================================================
--- src/main/java/org/activiti/engine/impl/persistence/entity/ExecutionEntity.java	(revision 3159)
+++ src/main/java/org/activiti/engine/impl/persistence/entity/ExecutionEntity.java	(working copy)
@@ -503,6 +503,7 @@
     message.setExecution(this);
     message.setExclusive(getActivity().isExclusive());
     message.setJobHandlerType(AsyncContinuationJobHandler.TYPE);
+    message.setRetries(Context.getProcessEngineConfiguration().getJobExecutor().getAmountOfRetries());
     // At the moment, only AtomicOperationTransitionCreateScope can be performed asynchronously,
     // so there is no need to pass it to the handler
 
Index: src/test/java/org/activiti/engine/test/jobexecutor/JobExecutorRetryTest.java
===================================================================
--- src/test/java/org/activiti/engine/test/jobexecutor/JobExecutorRetryTest.java	(revision 0)
+++ src/test/java/org/activiti/engine/test/jobexecutor/JobExecutorRetryTest.java	(revision 0)
@@ -0,0 +1,131 @@
+package org.activiti.engine.test.jobexecutor;
+
+import org.activiti.engine.impl.interceptor.Command;
+import org.activiti.engine.impl.interceptor.CommandContext;
+import org.activiti.engine.impl.interceptor.CommandExecutor;
+import org.activiti.engine.impl.persistence.entity.JobEntity;
+import org.activiti.engine.impl.persistence.entity.MessageEntity;
+import org.activiti.engine.impl.test.PluggableActivitiTestCase;
+
+/**
+ * Test case for jira ACT-1046
+ * 
+ * @author Dawid Wrzosek
+ */
+public class JobExecutorRetryTest extends PluggableActivitiTestCase {
+
+  protected TweetExceptionHandler tweetExceptionHandler = new TweetExceptionHandler();
+
+  protected CommandExecutor commandExecutor;
+
+  public void setUp() throws Exception {
+    processEngineConfiguration.getJobHandlers().put(tweetExceptionHandler.getType(), tweetExceptionHandler);
+    processEngineConfiguration.getJobExecutor().setRetriesDelayInMills(2000);
+    processEngineConfiguration.getJobExecutor().setWaitTimeInMillis(1000);
+    this.commandExecutor = processEngineConfiguration.getCommandExecutorTxRequired();
+  }
+
+  public void tearDown() throws Exception {
+    processEngineConfiguration.getJobHandlers().remove(tweetExceptionHandler.getType());
+    processEngineConfiguration.getJobExecutor().setAmountOfRetries(JobEntity.DEFAULT_RETRIES);
+    processEngineConfiguration.getJobExecutor().setRetriesDelayInMills(0);
+  }
+
+  public void testDelayWithoutRetry() throws Exception {
+
+    // when
+
+    tweetExceptionHandler.setExceptionsRemaining(1);
+
+    String jobId = commandExecutor.execute(new Command<String>() {
+
+      public String execute(CommandContext commandContext) {
+        MessageEntity message = createTweetExceptionMessage();
+        commandContext.getJobManager().send(message);
+
+        return message.getId();
+      }
+    });
+
+    waitForJobExecutor(1500, 1000, false);
+
+    // then
+    JobEntity job = (JobEntity) managementService.createJobQuery().jobId(jobId).singleResult();
+    assertNotNull(job);
+    assertEquals(2, job.getRetries());
+    assertNotNull(job.getLockExpirationTime());
+
+    // clean up
+    processEngineConfiguration.getJobExecutor().setRetriesDelayInMills(0);
+    waitForJobExecutor(100000, 1000, true);
+
+  }
+
+  public void testDelayWith1Retry() throws Exception {
+
+    // when
+    tweetExceptionHandler.setExceptionsRemaining(2);
+
+    String jobId = commandExecutor.execute(new Command<String>() {
+
+      public String execute(CommandContext commandContext) {
+        MessageEntity message = createTweetExceptionMessage();
+        commandContext.getJobManager().send(message);
+
+        return message.getId();
+      }
+    });
+
+    waitForJobExecutor(4000, 200, false);
+
+    // then
+    JobEntity job = (JobEntity) managementService.createJobQuery().jobId(jobId).singleResult();
+
+    assertNotNull(job);
+    assertEquals(1, job.getRetries());
+    assertNotNull(job.getLockExpirationTime());
+    // clean up
+    processEngineConfiguration.getJobExecutor().setRetriesDelayInMills(0);
+    waitForJobExecutor(100000, 1000, true);
+
+  }
+
+  public void testAmountOfRetries() throws Exception {
+
+    // when
+    tweetExceptionHandler.setExceptionsRemaining(10);
+    processEngineConfiguration.getJobExecutor().setAmountOfRetries(5);
+
+    String jobId = commandExecutor.execute(new Command<String>() {
+
+      public String execute(CommandContext commandContext) {
+        MessageEntity message = createTweetExceptionMessage();
+        message.setRetries(processEngineConfiguration.getJobExecutor().getAmountOfRetries());
+
+        commandContext.getJobManager().send(message);
+
+        return message.getId();
+      }
+    });
+
+    waitForJobExecutor(15000, 100, true);
+
+    // then
+    JobEntity job = (JobEntity) managementService.createJobQuery().jobId(jobId).singleResult();
+    assertNotNull(jobId);
+    assertEquals(0, job.getRetries());
+    assertEquals(5, tweetExceptionHandler.getExceptionsRemaining());
+
+    // clean up
+    tweetExceptionHandler.setExceptionsRemaining(0);
+    managementService.executeJob(jobId);
+
+  }
+
+  protected MessageEntity createTweetExceptionMessage() {
+    MessageEntity message = new MessageEntity();
+    message.setJobHandlerType("tweet-exception");
+    return message;
+  }
+
+}
