Stephen | 24 Jul 18:10

[PATCH] Support for DB Clusters/Replication in ActiveRecord (RFC)

Hello,

I am still dressing this patch up for prime time  but I would like to get some feedback from the community on the approach.  This currently does pass a full "rake"

I have done some extensive work on active record to allow for databases to be defined in terms of "connection pools" rather than simply one database.  Most of the work was completed in connection_specification.rb and is done at this level  to be compatible with any backend you choose.   Also this patch maintains full compatibiltity with the current paradigm so no changes are necessary to current applications/documentation.

Defining a database pool is very easy and follows a common sense convention.  I will assume you are running with RAILS_ENV=development for this example.  You first define your connections as you always have in database.yml .  Your are then able to able to define
  development_read_pool: db1, db2, db3
  development_write_pool: db4, db5, db6  (In general, the name is RAILS_ENV_write_pool so you can test your clusters in development and production with no config changes)

where each is a set of connection names that you have already defined.   ActiveRecord::Base.connection will then return connections from the appropriate pool using round-robin when more than one connection is available to it.  This is handy if you hvae a high traffic website for example and you want to load balance over several slave servers for reading while writing to the one master server consistently.  The syntax for .connection is as follow

ExampleModel.connection            # default "compatibility behavior" always returns a write connection.
ExampleModel.connection(:read)  #  Return a connection from the read pool
ExampleModel.connection (:write)  #  Return a connection from the write pool

I have also changed  a few of the functions in base.rb to utilize the correction pool (for example, find_by_sql calls connection(:read))  This again makes the patch seamless to end applications while allowing them to use the new functionality.

A patch aganist the lastest CVS is attached.  Some rough notes on implementation are also include below - there not 100% complete but give a good idea of what was changed.

Thanks,
Stephen Blackstone



--------------------------

Implementation:

1.  Added two arrays to act as pools for connections related to this connection.

   <at> <at> read_connection_pool = {}  
   <at> <at> write_connection_pool = {}

   Each is later defined as an array such that

    <at> <at> read_connection_pool[name] represenets all of the read connections available to the current
        class. This allows us to stay fully backward compatiable with the old methodology where name
    is <at> active_connection_name


2.  Define 2 variables to track index of last used connection.  This is used when doing round-robin

    <at> <at> last_read_connection = 0
    <at> <at> last_write_connection = 0

3.  Define a function for appending connections to pools (append_spec_to_connection

_pools spec).
    If the config has an attribute read_only == true then it is only entered into the read_connection_pool
    and vice versa for the write_connection_pool.  <at> <at> defined_connections also is defined at the same time.

    <at> <at> write_connection_pool[spec.object_id] always equals <at> <at> defined_connections[spec.object_id]


5.  Define a function establish_connection_pools which looks for two variables to be
    set in the configuration:

    RAILS_ENV_read_connection_pool
    RAILS_ENV_write_conenction_pool

    Normally in Rails, this would setup in the YAML file.  Each variable is a comma delimited
    list of connections to use, one for read the other for write. 


6.  Define a function clear_connection_pool which clears the connection pools.

7.  Modify establish_connection
    a.  When passed nil, call establish_connection_pools(RAILS_ENV)
    b.  When passed a ConnectionSpecification, clear all connection pools as well as the
        active_connection_name

8.  Move code from establish connection to ConnectionSpecification constructor so an object can be made
    out of any spec thats passed to it.   (Avoiding breakage of DRY principle)


9.  Remove settings <at> <at> defined_connections[name] in establish_connection.  This is now done by
    calling


10.  Define two functions, round robin read and round robin write which
     returns AbstractAdapters from the various pools.

11.  Modify retreive connection so that it takes an id of an object that should be in <at> <at> defined_connections
     Remove it's dependency on connection= as this will break things.

12.  Modify connection= to call establish_connection on ConnectionSpecification.

13.  Moidfy self.remove_connection to call clear_connection_pool

14.  Modify active_connection_name to check if a pool exists instead of defined_connections
Index: connection_adapters/abstract/connection_specification.rb
===================================================================
--- connection_adapters/abstract/connection_specification.rb	(revision 4617)
+++ connection_adapters/abstract/connection_specification.rb	(working copy)
@@ -1,10 +1,21 @@
 require 'set'
+require 'pp'

 module ActiveRecord
   class Base
     class ConnectionSpecification #:nodoc:
       attr_reader :config, :adapter_method
-      def initialize (config, adapter_method)
+      def initialize (config, adapter_method = nil)
+    # If no adapter is found, try to devine it from the config.
+    # This was moved here from establish_connection since it's now
+    # useful in other places.  
+    if adapter_method.nil?
+        config = config.symbolize_keys
+          unless config.key?(:adapter) then raise AdapterNotSpecified, "database configuration does not
specify adapter" end
+          adapter_method = "#{config[:adapter]}_connection"
+          unless ActiveRecord::Base.respond_to?(adapter_method) then 
+          raise AdapterNotFound, "database configuration specifies nonexistent #{config[:adapter]}
adapter" end
+     end
         @config, @adapter_method = config, adapter_method
       end
     end
@@ -20,6 +31,16 @@
     # The class -> thread id -> adapter cache. (class -> adapter if not allow_concurrency)
     @@active_connections = {}

+    # Define a connection pool for doing database reads.
+    @@read_connection_pool = {}
+
+    # Define a connection pool for doing database reads.
+    @@write_connection_pool = {}
+    
+    # Used in looping the connection pools.
+    @@last_read_connection =  {}
+    @@last_write_connection = {}
+
     class << self
       # Retrieve the connection cache.
       def thread_safe_active_connections #:nodoc:
@@ -29,16 +50,32 @@
       def single_threaded_active_connections #:nodoc:
         @@active_connections
       end
-     
+      def thread_safe_read_pool
+        @@read_connection_pool[Thread.current.object_id] ||= {}
+      end
+      def thread_safe_write_pool
+        @@write_connection_pool[Thread.current.object_id] ||= {}
+      end
+      def single_threaded_read_pool
+        @@read_connection_pool
+      end
+      def single_threaded_write_pool
+        @@write_connection_pool
+      end
+        
       # pick up the right active_connection method from @@allow_concurrency
       if @@allow_concurrency
-        alias_method :active_connections, :thread_safe_active_connections
+        alias_method :active_connections,    :thread_safe_active_connections
+        alias_method :read_connection_pool,  :thread_safe_read_pool
+        alias_method :write_connection_pool, :thread_safe_write_pool
       else
-        alias_method :active_connections, :single_threaded_active_connections
+        alias_method :active_connections,    :single_threaded_active_connections
+        alias_method :read_connection_pool,  :single_threaded_read_pool
+        alias_method :write_connection_pool, :single_threaded_write_pool
       end

       # set concurrency support flag (not thread safe, like most of the methods in this file)
-      def allow_concurrency=(threaded) #:nodoc:
+      def allow_concurrency=(threaded) #:nodoc
         logger.debug "allow_concurrency=#{threaded}" if logger
         return if @@allow_concurrency == threaded
         clear_all_cached_connections!
@@ -52,8 +89,9 @@
       end

       def active_connection_name #:nodoc:
+        # This must use name, not active_connection_name
         @active_connection_name ||=
-           if active_connections[name] || @@defined_connections[name]
+           if write_connection_pool[name] || read_connection_pool[name]
              name
            elsif self == ActiveRecord::Base
              nil
@@ -70,13 +108,24 @@
       # Returns the connection currently associated with the class. This can
       # also be used to "borrow" the connection to do database work unrelated
       # to any of the specific Active Records.
-      def connection
-        if @active_connection_name && (conn = active_connections[@active_connection_name])
+      def connection(type = nil)
+      case type
+      when nil, :write
+        # Provides write connection on nil for backwards compatibility.
+        id = round_robin_write_connection.object_id
+      when :read
+        id = round_robin_read_connection.object_id
+      else
+      # If someone passes garbage, raise error.
+      # they could have misspelt "read" and that could bork their database.
+      raise ActiveRecordError, "Connection type can only be read or write."
+      end
+        if conn = active_connections[id]
           conn
         else
           # retrieve_connection sets the cache key.
-          conn = retrieve_connection
-          active_connections[@active_connection_name] = conn
+          conn = retrieve_connection(id)
+          active_connections[id] = conn
         end
       end

@@ -100,6 +149,28 @@
         end
       end

+      protected      
+      def round_robin_read_connection
+        # pp "ActiveConnectionName in round_robin_read is #{active_connection_name}"
+        # It's very important to use active_connection_name instead of @active_connection_name 
+        if read_connection_pool[active_connection_name].nil? ||
read_connection_pool[active_connection_name].size == 0
+          raise ConnectionNotEstablished, "No Connection is available"
+        else
+          @@last_read_connection[active_connection_name] =
(@@last_read_connection[active_connection_name] + 1) % read_connection_pool[active_connection_name].size
+          read_connection_pool[active_connection_name].to_a[@@last_read_connection[active_connection_name]][1]
+        end
+      end
+
+      def round_robin_write_connection
+        # See comments for round_robin_read_connection.
+        if write_connection_pool[active_connection_name].nil? ||
write_connection_pool[active_connection_name].size == 0
+          raise ConnectionNotEstablished, "No Connection is available"
+        else
+          @@last_write_connection[active_connection_name] =
(@@last_write_connection[active_connection_name] + 1) % write_connection_pool[active_connection_name].size
+          write_connection_pool[active_connection_name].to_a[@@last_write_connection[active_connection_name]][1]
+        end
+      end
+
       private
         def clear_cache!(cache, thread_id = nil, &block)
           if cache
@@ -108,7 +179,6 @@
               thread_cache, cache = cache, cache[thread_id]
               return unless cache
             end
-
             cache.each(&block) if block_given?
             cache.clear
           end
@@ -121,11 +191,9 @@
         # Remove stale threads from the cache.
         def remove_stale_cached_threads!(cache, &block)
           stale = Set.new(cache.keys)
-
           Thread.list.each do |thread|
             stale.delete(thread.object_id) if thread.alive?
           end
-
           stale.each do |thread_id|
             clear_cache!(cache, thread_id, &block)
           end
@@ -147,10 +215,70 @@
     # Returns the connection currently associated with the class. This can
     # also be used to "borrow" the connection to do database work that isn't
     # easily done without going straight to SQL.
-    def connection
-      self.class.connection
+    def connection(type = nil)
+      self.class.connection(type)
     end
+  
+  # Add specs to various connection pools.
+    def self.append_spec_to_connection_pools spec
+      # The default behavior allows a connection to be used for both reading and writing.
+      # Otherwise, we just load both pools.
+      
+  write_connection_pool[active_connection_name] ||= {}
+  read_connection_pool[active_connection_name] ||= {}
+  @@last_read_connection[active_connection_name] = 0
+  @@last_write_connection[active_connection_name] = 0

+  # Check out what were were passed if its not a ConnectionSpec, try to make one.
+      if !spec.kind_of?(ActiveRecord::Base::ConnectionSpecification)
+      spec = ConnectionSpecification.new(spec)
+      end
+      
+      if !spec.config.key?(:read_only) && !spec.config[:read_only] != 'true'
+        write_connection_pool[active_connection_name][spec.object_id] = spec if !write_connection_pool[active_connection_name][spec.object_id]
+      end
+      if !spec.config.key?(:write_only) && !spec.config[:write_only] != 'true'
+        read_connection_pool[active_connection_name][spec.object_id] = spec if !read_connection_pool[active_connection_name][spec.object_id]
+      end
+        @@defined_connections[spec.object_id] = spec
+    end
+    
+  def self.clear_connection_pools(klass=self)   
+    if write_connection_pool[klass.name].kind_of?(Hash)
+      write_connection_pool[klass.name].each_value {|conn| 
+      active_connections[conn.object_id].disconnect! if active_connections[conn.object_id] }
+      write_connection_pool[klass.name].clear
+    end
+    if read_connection_pool[klass.name].kind_of?(Hash)
+      read_connection_pool[klass.name].each_value {|conn|
+      active_connections[conn.object_id].disconnect! if active_connections[conn.object_id] }
+      read_connection_pool[klass.name].clear
+    end
+    read_connection_pool.delete(klass.name)
+    write_connection_pool.delete(klass.name)
+    clear_all_cached_connections!
+  end
+
+  def self.establish_connection_pool(environment)
+    # Connections that are specifically in a pool override the read/write settings.
+    @active_connection_name = name
+    if @read = configurations["#{environment}_read_pool"]
+      @read.split(",").each do |connection| 
+        spec = configurations[connection].clone
+        spec[:read_only] = true      
+        append_spec_to_connection_pools(spec)
+      end
+    end
+    if @write = configurations["#{environment}_write_pool"]
+      @write.split(",").each do  |connection|                         
+        spec = configurations[connection].clone
+        spec[:write_only] = true 
+        append_spec_to_connection_pools(spec)
+      end
+    end
+    (write_connection_pool[name].size > 0 && read_connection_pool[name].size > 0) ? true : false
+  end
+
     # Establishes the connection to the database. Accepts a hash as input where
     # the :adapter key must be specified with the name of a database adapter (in lower-case)
     # example for regular databases (MySQL, Postgresql, etc):
@@ -182,11 +310,13 @@
       case spec
         when nil
           raise AdapterNotSpecified unless defined? RAILS_ENV
-          establish_connection(RAILS_ENV)
+          if !establish_connection_pool(RAILS_ENV) 
+            establish_connection(RAILS_ENV)
+          end
         when ConnectionSpecification
-          clear_active_connection_name
+          clear_active_connection_name 
           @active_connection_name = name
-          @@defined_connections[name] = spec
+          append_spec_to_connection_pools(spec)
         when Symbol, String
           if configuration = configurations[spec.to_s]
             establish_connection(configuration)
@@ -194,12 +324,7 @@
             raise AdapterNotSpecified, "#{spec} database is not configured"
           end
         else
-          spec = spec.symbolize_keys
-          unless spec.key?(:adapter) then raise AdapterNotSpecified, "database configuration does not
specify adapter" end
-          adapter_method = "#{spec[:adapter]}_connection"
-          unless respond_to?(adapter_method) then raise AdapterNotFound, "database configuration specifies
nonexistent #{spec[:adapter]} adapter" end
-          remove_connection
-          establish_connection(ConnectionSpecification.new(spec, adapter_method))
+          establish_connection(ConnectionSpecification.new(spec))
       end
     end

@@ -207,27 +332,29 @@
     # active or defined connections: if it is the latter, it will be
     # opened and set as the active connection for the class it was defined
     # for (not necessarily the current class).
-    def self.retrieve_connection #:nodoc:
+    def self.retrieve_connection(id = nil) #:nodoc:
       # Name is nil if establish_connection hasn't been called for
       # some class along the inheritance chain up to AR::Base yet.
-      if name = active_connection_name
-        if conn = active_connections[name]
-          # Verify the connection.
-          conn.verify!(@@verification_timeout)
-        elsif spec = @@defined_connections[name]
-          # Activate this connection specification.
-          klass = name.constantize
-          klass.connection = spec
-          conn = active_connections[name]
-        end
+      if id.nil?
+      id = round_robin_write_connection.object_id
       end
-
+      if conn = active_connections[id]
+        # Verify the connection.
+       conn.verify!(@@verification_timeout)
+      elsif spec = @@defined_connections[id]
+        # Activate this connection specification.
+        # Note this no longer calls connection= which causes problems
+        # with read/write pools being clobbered.
+        config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency)
+        active_connections[id] = self.send(spec.adapter_method, config)
+        conn = active_connections[id]
+      end  
       conn or raise ConnectionNotEstablished
     end

     # Returns true if a connection that's accessible to this class have already been opened.
     def self.connected?
-      active_connections[active_connection_name] ? true : false
+      active_connections[active_connection_name][round_robin_write_connection.object_id] ? true : false
     end

     # Remove the connection for this class. This will close the active
@@ -235,21 +362,20 @@
     # can be used as argument for establish_connection, for easy
     # re-establishing of the connection.
     def self.remove_connection(klass=self)
-      spec = @@defined_connections[klass.name]
-      konn = active_connections[klass.name]
-      @@defined_connections.delete_if { |key, value| value == spec }
-      active_connections.delete_if { |key, value| value == konn }
-      konn.disconnect! if konn
-      spec.config if spec
+      # Arbitarily choosing the first spec off the write pool to send back.
+      # This is here to stay backwards compatible
+      spec = @@defined_connections[round_robin_write_connection.object_id] if !@@defined_connections[round_robin_write_connection.object_id].nil?
+      clear_connection_pools(klass)
+      spec if spec
     end

     # Set the connection for the class.
     def self.connection=(spec) #:nodoc:
       if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
-        active_connections[name] = spec
+        active_connections[active_connection_name] = spec
       elsif spec.kind_of?(ConnectionSpecification)
         config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency)
-        self.connection = self.send(spec.adapter_method, config)
+        establish_connection(spec.adapter_method, config)
       elsif spec.nil?
         raise ConnectionNotEstablished
       else
@@ -265,5 +391,5 @@
         logger.info "Active connection name: #{@active_connection_name}"
       end
     end
-  end
+  end_pool
 end
Index: base.rb
===================================================================
--- base.rb	(revision 4617)
+++ base.rb	(working copy)
@@ -419,7 +419,7 @@
       #   Post.find_by_sql "SELECT p.*, c.author FROM posts p, comments c WHERE p.id = c.post_id"
       #   Post.find_by_sql ["SELECT * FROM posts WHERE author = ? AND created > ?", author_id, start_date]
       def find_by_sql(sql)
-        connection.select_all(sanitize_sql(sql), "#{name} Load").collect! { |record|
instantiate(record) }
+        connection(:read).select_all(sanitize_sql(sql), "#{name} Load").collect! { |record|
instantiate(record) }
       end

       # Returns true if the given +id+ represents the primary key of a record in the database, false otherwise.
@@ -506,7 +506,7 @@
       #   Product.count_by_sql "SELECT COUNT(*) FROM sales s, customers c WHERE s.customer_id = c.id"
       def count_by_sql(sql)
         sql = sanitize_conditions(sql)
-        connection.select_value(sql, "#{name} Count").to_i
+        connection(:read).select_value(sql, "#{name} Count").to_i
       end

       # Increments the specified counter by one. So <tt>DiscussionBoard.increment_counter("post_count",
@@ -725,7 +725,7 @@
       # Returns an array of column objects for the table associated with this class.
       def columns
         unless @columns
-          @columns = connection.columns(table_name, "#{name} Columns")
+          @columns = connection(:read).columns(table_name, "#{name} Columns")
           @columns.each {|column| column.primary = column.name == primary_key}
         end
         @columns
_______________________________________________
Rails mailing list
Rails@...
http://lists.rubyonrails.org/mailman/listinfo/rails

Gmane