Details

    • Type: Bug Bug
    • Status: Open Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: JRuby 1.5.1
    • Fix Version/s: None
    • Component/s: Core Classes/Modules
    • Labels:
      None
    • Environment:
      osx / jvm 1.6
    • Patch Submitted:
      Yes
    • Number of attachments :
      2

      Description

      The stomp consumer times out after 5 seconds throwing the following error.

      consumer.rb:18:in `_receive': execution expired (Stomp::Error::PacketParsingTimeout)
      	from consumer.rb:12:in `_receive'
      	from consumer.rb:7:in `_receive'
      	from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/connection.rb:325:in `__old_receive'
      	from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/connection.rb:335:in `receive'
      	from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/client.rb:289:in `start_listeners'
      	from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/client.rb:287:in `start'
      	from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/client.rb:287:in `start_listeners'
      	from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/client.rb:96:in `initialize'
      	from consumer.rb:53:in `new'
      	from consumer.rb:53
      

      Here is the code to this example

      consumer.rb
      require 'rubygems'
      require 'stomp'
      
      client = Stomp::Client.new "failover:(stomp://localhost:61613)?initialReconnectDelay=100"
      client.subscribe('/queue/test_queue',  {:ack => "client", "activemq.prefetchSize" => 1, "activemq.exclusive" => true }) do |msg|
        p msg.body
        client.acknowledge(msg)
      end
      
      client.join
      
      producer.rb
      require 'rubygems'
      require 'stomp'
      
      client = Stomp::Client.new "failover:(stomp://localhost:61613)?initialReconnectDelay=100"
      client.publish("/queue/test_queue", "hello world!",{:persistent=>true})
      

      And here is the monkey patch that fixes the issue; commenting out the timeout callback

      # for stomp subscriber
      if defined?(JRUBY_VERSION)
        module Stomp
          class Connection
            def _receive( read_socket )
              @read_semaphore.synchronize do
                 line = read_socket.gets
                 return nil if line.nil?
      
                 # If the reading hangs for more than 5 seconds, abort the parsing process
                 #Timeout::timeout(5, Stomp::Error::PacketParsingTimeout) do
                   # Reads the beginning of the message until it runs into a empty line
                   message_header = ''
                   begin
                     message_header += line
                     begin
                       line = read_socket.gets
                     rescue
                        p read_socket
                     end
                   end until line =~ /^\s?\n$/
      
                   # Checks if it includes content_length header
                   content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/
                   message_body = ''
      
                   # If it does, reads the specified amount of bytes
                   char = ''
                   if content_length
                     message_body = read_socket.read content_length[1].to_i
                     raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0"
                   # Else reads, the rest of the message until the first \0
                   else
                     message_body += char while read_socket.ready? && (char = parse_char(read_socket.getc)) != "\0"
                   end
      
                   # If the buffer isn't empty, reads the next char and returns it to the buffer
                   # unless it's a \n
                   if read_socket.ready?
                     last_char = read_socket.getc
                     read_socket.ungetc(last_char) if parse_char(last_char) != "\n"
                   end
      
                   # Adds the excluded \n and \0 and tries to create a new message with it
                   Message.new(message_header + "\n" + message_body + "\0")
                 end
               #end
            end
          end
        end
      end
      
      1. consumer.rb
        2 kB
        victor igumnov
      2. producer.rb
        0.2 kB
        victor igumnov

        Activity

        Hide
        Hiro Asari added a comment -

        Formatting change for readability.

        Show
        Hiro Asari added a comment - Formatting change for readability.
        Hide
        Hiro Asari added a comment -

        So, if I understand this correctly, as long as there is something for consumer.rb to consume on the message queue every 5 seconds, consumer can stay up. (I verified this by hand.) As I mentioned in IRC, it appears to me by design; maybe the consumer needs to anticipate this exception and work around it. (http://gitorious.org/stomp/mainline/commit/60dee5d080e3f1dc68f11d85224afed6109e4497)

        All in all, I do not think it is a JRuby issue.

        Show
        Hiro Asari added a comment - So, if I understand this correctly, as long as there is something for consumer.rb to consume on the message queue every 5 seconds, consumer can stay up. (I verified this by hand.) As I mentioned in IRC, it appears to me by design; maybe the consumer needs to anticipate this exception and work around it. ( http://gitorious.org/stomp/mainline/commit/60dee5d080e3f1dc68f11d85224afed6109e4497 ) All in all, I do not think it is a JRuby issue.
        Hide
        victor igumnov added a comment -

        well, running it on MRI does not throw an exception while on JRuby it does. JRuby is behaving differently than MRI; isn't JRuby's goal to be compatible with MRI?

        Also, it will error even if you do have a continuous stream of messages.

        Show
        victor igumnov added a comment - well, running it on MRI does not throw an exception while on JRuby it does. JRuby is behaving differently than MRI; isn't JRuby's goal to be compatible with MRI? Also, it will error even if you do have a continuous stream of messages.
        Hide
        Hiro Asari added a comment -

        Ah. Right. Now that you mention it, it probably is Timeout issue. I'm going to try reproducing the problem without Stomp.

        Show
        Hiro Asari added a comment - Ah. Right. Now that you mention it, it probably is Timeout issue. I'm going to try reproducing the problem without Stomp.
        Hide
        Kit Plummer added a comment -

        Tested on JRuby-1.6.3 - the monkey patch doesn't work either.

        Show
        Kit Plummer added a comment - Tested on JRuby-1.6.3 - the monkey patch doesn't work either.
        Hide
        victor igumnov added a comment -

        The only version that works with JRuby and the patch is 1.1.6. There has been changes in 1.1.8 that have broke compatibility completely.

        Try it out, I know it works since I use it on our JRuby setup to run http://shrinkbooth.com/ that uses a queue to update URL statistics.

        Show
        victor igumnov added a comment - The only version that works with JRuby and the patch is 1.1.6. There has been changes in 1.1.8 that have broke compatibility completely. Try it out, I know it works since I use it on our JRuby setup to run http://shrinkbooth.com/ that uses a queue to update URL statistics.
        Hide
        Charles Oliver Nutter added a comment -

        Ok, I have no idea how to set up a stomp instance. Can someone post instructions to reproduce the problem? It's hard to investigate without that key piece of information

        I want to help, but I'm not an expert on everything

        Show
        Charles Oliver Nutter added a comment - Ok, I have no idea how to set up a stomp instance. Can someone post instructions to reproduce the problem? It's hard to investigate without that key piece of information I want to help, but I'm not an expert on everything
        Hide
        victor igumnov added a comment -

        Charles,

        Thank you for looking into this issue, I have your easy button right here:

        http://uploadbooth.com:9161/admin/queues.jsp (web console)

        uploadbooth.com:51612 (stomp)

        I will keep the activemq instance running for as long as you need it to test JRuby compatibility.

        -Victor

        Show
        victor igumnov added a comment - Charles, Thank you for looking into this issue, I have your easy button right here: http://uploadbooth.com:9161/admin/queues.jsp (web console) uploadbooth.com:51612 (stomp) I will keep the activemq instance running for as long as you need it to test JRuby compatibility. -Victor
        Hide
        Rob Hunter added a comment - - edited

        We ran into the same issue, and it seems to be a combination bug of JRuby and the 'Stomp' gem.

        interpreter 'stomp' gem result
        Ruby 1.8.7 (ree) 1.1.9 fine
        JRuby 1.6.4 1.1.9 timeout immediately (usually)
        JRuby 1.6.4 unreleased fine

        The 'stomp' gem changed its implementation to avoid hitting this problem in JRuby (something to do with trailing newlines – see commit 245e734a0f4a3c3097fdffb7c8dc9b2380c98958).

        I'm not sure whether the old behaviour was wrong and worked on MRI by accident, but it was certainly different between the interpreters.

        Specifically, the following code shows the issue. It's a little more involved than a minimal case, but it does work reliably (or rather, unreliably).

        subscriber.rb
        require 'stomp'
        
        # writing to $stderr doesn't show up from the subscription thread, so this logger sees the errors
        class MyErrorLogger
          attr_reader :errs
          def on_miscerr *args
            (@errs ||= []) << args
          end
        end
        logger = MyErrorLogger.new
        
        class MySubscriber
          attr_reader :received_messages
        
          def initialize
            @received_messages = []
            @logger = MyErrorLogger.new
            client = Stomp::Client.new :hosts => [:login => "ruby", :passcode => "ruby", :host => "localhost"], :logger => @logger
            client.subscribe "/topic/Internal.Notification.Event.Test" do |msg|
              @received_messages << msg
            end
          end
        
          def errs
            @logger.errs
          end
        end
        
        subscriber = MySubscriber.new
        until subscriber.errs.any? { Thread.pass }
        # should only ever get here if there are genuine timeouts (over 5 seconds of message-parsing time)
        p subscriber.errs
        

        The problem manifests publish an event from Ruby (the subscriber bug is triggered no matter which language publishes the event)

        require 'stomp'
        
        publisher = Stomp::Client.new "ruby", "ruby", "localhost"
        publisher.publish "/topic/Internal.Notification.Event.Test", "test message"
        
        Show
        Rob Hunter added a comment - - edited We ran into the same issue, and it seems to be a combination bug of JRuby and the 'Stomp' gem. interpreter 'stomp' gem result Ruby 1.8.7 (ree) 1.1.9 fine JRuby 1.6.4 1.1.9 timeout immediately (usually) JRuby 1.6.4 unreleased fine The 'stomp' gem changed its implementation to avoid hitting this problem in JRuby (something to do with trailing newlines – see commit 245e734a0f4a3c3097fdffb7c8dc9b2380c98958 ). I'm not sure whether the old behaviour was wrong and worked on MRI by accident, but it was certainly different between the interpreters. Specifically, the following code shows the issue. It's a little more involved than a minimal case, but it does work reliably (or rather, unreliably). subscriber.rb require 'stomp' # writing to $stderr doesn't show up from the subscription thread, so this logger sees the errors class MyErrorLogger attr_reader :errs def on_miscerr *args (@errs ||= []) << args end end logger = MyErrorLogger. new class MySubscriber attr_reader :received_messages def initialize @received_messages = [] @logger = MyErrorLogger. new client = Stomp::Client. new :hosts => [:login => "ruby" , :passcode => "ruby" , :host => "localhost" ], :logger => @logger client.subscribe "/topic/Internal.Notification.Event.Test" do |msg| @received_messages << msg end end def errs @logger.errs end end subscriber = MySubscriber. new until subscriber.errs.any? { Thread .pass } # should only ever get here if there are genuine timeouts (over 5 seconds of message-parsing time) p subscriber.errs The problem manifests publish an event from Ruby (the subscriber bug is triggered no matter which language publishes the event) require 'stomp' publisher = Stomp::Client. new "ruby" , "ruby" , "localhost" publisher.publish "/topic/Internal.Notification.Event.Test" , "test message"

          People

          • Assignee:
            Unassigned
            Reporter:
            victor igumnov
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated: