[PATCH] Support for DB Clusters/Replication in ActiveRecord (RFC)
Subject: [PATCH] Support for DB Clusters/Replication in ActiveRecord (RFC)
Newsgroups: gmane.comp.lang.ruby.rails
Date: 2006-07-24 16:13:04 GMT
Hello,
I am still dressing this patch up for prime time but I
would like to get some feedback from the community on the approach.
This currently does pass a full "rake"
I have done some
extensive work on active record to allow for databases to be defined in
terms of "connection pools" rather than simply one database. Most of
the work was completed in connection_specification.rb and is done at
this level to be compatible with any backend you choose. Also this
patch maintains full compatibiltity with the current paradigm so no
changes are necessary to current applications/documentation.
Defining a database pool is very easy and follows a common
sense convention. I will assume you are running with
RAILS_ENV=development for this example. You first define your
connections as you always have in database.yml
. Your are then able to able to define
development_read_pool: db1, db2, db3
development_write_pool: db4, db5, db6 (In general, the name is
RAILS_ENV_write_pool so you can test your clusters in development and
production with no config changes)
where each is a set of connection names that you have already
defined. ActiveRecord::Base.connection will then return connections
from the appropriate pool using round-robin when more than one
connection is available to it. This is handy if you hvae a high
traffic website for example and you want to load balance over several
slave servers for reading while writing to the one master server
consistently. The syntax for .connection is as follow
ExampleModel.connection # default "compatibility behavior" always returns a write connection.
ExampleModel.connection(:read) # Return a connection from the read pool
ExampleModel.connection
(:write) # Return a connection from the write pool
I have also changed a few of the functions in base.rb to utilize
the correction pool (for example, find_by_sql calls connection(:read))
This again makes the patch seamless to end applications while allowing
them to use the new functionality.
A patch aganist the lastest CVS is attached. Some rough notes on implementation are
also include below - there not 100% complete but give a good idea of
what was changed.
Thanks,
Stephen Blackstone
--------------------------
Implementation:
1. Added two arrays to act as pools for connections related to this connection.
<at> <at> read_connection_pool = {}
<at> <at> write_connection_pool = {}
Each is later defined as an array such that
<at> <at> read_connection_pool[name] represenets all of the read connections available to the current
class. This allows us to stay fully backward compatiable with the old methodology where name
is <at> active_connection_name
2. Define 2 variables to track index of last used connection. This is used when doing round-robin
<at> <at> last_read_connection = 0
<at> <at> last_write_connection = 0
3. Define a function for appending connections to pools (append_spec_to_connection
If the config has an attribute read_only == true then it is only entered into the read_connection_pool
and vice versa for the write_connection_pool. <at> <at> defined_connections also is defined at the same time.
<at> <at> write_connection_pool[spec.object_id] always equals <at> <at> defined_connections[spec.object_id]
5. Define a function establish_connection_pools which looks for two variables to be
set in the configuration:
RAILS_ENV_read_connection_pool
RAILS_ENV_write_conenction_pool
Normally in Rails, this would setup in the YAML file. Each variable is a comma delimited
list of connections to use, one for read the other for write.
6. Define a function clear_connection_pool which clears the connection pools.
7. Modify establish_connection
a. When passed nil, call establish_connection_pools(RAILS_ENV)
b. When passed a ConnectionSpecification, clear all connection pools as well as the
active_connection_name
8. Move code from establish connection to ConnectionSpecification constructor so an object can be made
out of any spec thats passed to it. (Avoiding breakage of DRY principle)
9. Remove settings <at> <at> defined_connections[name] in establish_connection. This is now done by
calling
10. Define two functions, round robin read and round robin write which
returns AbstractAdapters from the various pools.
11. Modify retreive connection so that it takes an id of an object that should be in <at> <at> defined_connections
Remove it's dependency on connection= as this will break things.
12. Modify connection= to call establish_connection on ConnectionSpecification.
13. Moidfy self.remove_connection to call clear_connection_pool
14. Modify active_connection_name to check if a pool exists instead of defined_connections
Index: connection_adapters/abstract/connection_specification.rb
===================================================================
--- connection_adapters/abstract/connection_specification.rb (revision 4617)
+++ connection_adapters/abstract/connection_specification.rb (working copy)
@@ -1,10 +1,21 @@
require 'set'
+require 'pp'
module ActiveRecord
class Base
class ConnectionSpecification #:nodoc:
attr_reader :config, :adapter_method
- def initialize (config, adapter_method)
+ def initialize (config, adapter_method = nil)
+ # If no adapter is found, try to devine it from the config.
+ # This was moved here from establish_connection since it's now
+ # useful in other places.
+ if adapter_method.nil?
+ config = config.symbolize_keys
+ unless config.key?(:adapter) then raise AdapterNotSpecified, "database configuration does not
specify adapter" end
+ adapter_method = "#{config[:adapter]}_connection"
+ unless ActiveRecord::Base.respond_to?(adapter_method) then
+ raise AdapterNotFound, "database configuration specifies nonexistent #{config[:adapter]}
adapter" end
+ end
@config, @adapter_method = config, adapter_method
end
end
@@ -20,6 +31,16 @@
# The class -> thread id -> adapter cache. (class -> adapter if not allow_concurrency)
@@active_connections = {}
+ # Define a connection pool for doing database reads.
+ @@read_connection_pool = {}
+
+ # Define a connection pool for doing database reads.
+ @@write_connection_pool = {}
+
+ # Used in looping the connection pools.
+ @@last_read_connection = {}
+ @@last_write_connection = {}
+
class << self
# Retrieve the connection cache.
def thread_safe_active_connections #:nodoc:
@@ -29,16 +50,32 @@
def single_threaded_active_connections #:nodoc:
@@active_connections
end
-
+ def thread_safe_read_pool
+ @@read_connection_pool[Thread.current.object_id] ||= {}
+ end
+ def thread_safe_write_pool
+ @@write_connection_pool[Thread.current.object_id] ||= {}
+ end
+ def single_threaded_read_pool
+ @@read_connection_pool
+ end
+ def single_threaded_write_pool
+ @@write_connection_pool
+ end
+
# pick up the right active_connection method from @@allow_concurrency
if @@allow_concurrency
- alias_method :active_connections, :thread_safe_active_connections
+ alias_method :active_connections, :thread_safe_active_connections
+ alias_method :read_connection_pool, :thread_safe_read_pool
+ alias_method :write_connection_pool, :thread_safe_write_pool
else
- alias_method :active_connections, :single_threaded_active_connections
+ alias_method :active_connections, :single_threaded_active_connections
+ alias_method :read_connection_pool, :single_threaded_read_pool
+ alias_method :write_connection_pool, :single_threaded_write_pool
end
# set concurrency support flag (not thread safe, like most of the methods in this file)
- def allow_concurrency=(threaded) #:nodoc:
+ def allow_concurrency=(threaded) #:nodoc
logger.debug "allow_concurrency=#{threaded}" if logger
return if @@allow_concurrency == threaded
clear_all_cached_connections!
@@ -52,8 +89,9 @@
end
def active_connection_name #:nodoc:
+ # This must use name, not active_connection_name
@active_connection_name ||=
- if active_connections[name] || @@defined_connections[name]
+ if write_connection_pool[name] || read_connection_pool[name]
name
elsif self == ActiveRecord::Base
nil
@@ -70,13 +108,24 @@
# Returns the connection currently associated with the class. This can
# also be used to "borrow" the connection to do database work unrelated
# to any of the specific Active Records.
- def connection
- if @active_connection_name && (conn = active_connections[@active_connection_name])
+ def connection(type = nil)
+ case type
+ when nil, :write
+ # Provides write connection on nil for backwards compatibility.
+ id = round_robin_write_connection.object_id
+ when :read
+ id = round_robin_read_connection.object_id
+ else
+ # If someone passes garbage, raise error.
+ # they could have misspelt "read" and that could bork their database.
+ raise ActiveRecordError, "Connection type can only be read or write."
+ end
+ if conn = active_connections[id]
conn
else
# retrieve_connection sets the cache key.
- conn = retrieve_connection
- active_connections[@active_connection_name] = conn
+ conn = retrieve_connection(id)
+ active_connections[id] = conn
end
end
@@ -100,6 +149,28 @@
end
end
+ protected
+ def round_robin_read_connection
+ # pp "ActiveConnectionName in round_robin_read is #{active_connection_name}"
+ # It's very important to use active_connection_name instead of @active_connection_name
+ if read_connection_pool[active_connection_name].nil? ||
read_connection_pool[active_connection_name].size == 0
+ raise ConnectionNotEstablished, "No Connection is available"
+ else
+ @@last_read_connection[active_connection_name] =
(@@last_read_connection[active_connection_name] + 1) % read_connection_pool[active_connection_name].size
+ read_connection_pool[active_connection_name].to_a[@@last_read_connection[active_connection_name]][1]
+ end
+ end
+
+ def round_robin_write_connection
+ # See comments for round_robin_read_connection.
+ if write_connection_pool[active_connection_name].nil? ||
write_connection_pool[active_connection_name].size == 0
+ raise ConnectionNotEstablished, "No Connection is available"
+ else
+ @@last_write_connection[active_connection_name] =
(@@last_write_connection[active_connection_name] + 1) % write_connection_pool[active_connection_name].size
+ write_connection_pool[active_connection_name].to_a[@@last_write_connection[active_connection_name]][1]
+ end
+ end
+
private
def clear_cache!(cache, thread_id = nil, &block)
if cache
@@ -108,7 +179,6 @@
thread_cache, cache = cache, cache[thread_id]
return unless cache
end
-
cache.each(&block) if block_given?
cache.clear
end
@@ -121,11 +191,9 @@
# Remove stale threads from the cache.
def remove_stale_cached_threads!(cache, &block)
stale = Set.new(cache.keys)
-
Thread.list.each do |thread|
stale.delete(thread.object_id) if thread.alive?
end
-
stale.each do |thread_id|
clear_cache!(cache, thread_id, &block)
end
@@ -147,10 +215,70 @@
# Returns the connection currently associated with the class. This can
# also be used to "borrow" the connection to do database work that isn't
# easily done without going straight to SQL.
- def connection
- self.class.connection
+ def connection(type = nil)
+ self.class.connection(type)
end
+
+ # Add specs to various connection pools.
+ def self.append_spec_to_connection_pools spec
+ # The default behavior allows a connection to be used for both reading and writing.
+ # Otherwise, we just load both pools.
+
+ write_connection_pool[active_connection_name] ||= {}
+ read_connection_pool[active_connection_name] ||= {}
+ @@last_read_connection[active_connection_name] = 0
+ @@last_write_connection[active_connection_name] = 0
+ # Check out what were were passed if its not a ConnectionSpec, try to make one.
+ if !spec.kind_of?(ActiveRecord::Base::ConnectionSpecification)
+ spec = ConnectionSpecification.new(spec)
+ end
+
+ if !spec.config.key?(:read_only) && !spec.config[:read_only] != 'true'
+ write_connection_pool[active_connection_name][spec.object_id] = spec if !write_connection_pool[active_connection_name][spec.object_id]
+ end
+ if !spec.config.key?(:write_only) && !spec.config[:write_only] != 'true'
+ read_connection_pool[active_connection_name][spec.object_id] = spec if !read_connection_pool[active_connection_name][spec.object_id]
+ end
+ @@defined_connections[spec.object_id] = spec
+ end
+
+ def self.clear_connection_pools(klass=self)
+ if write_connection_pool[klass.name].kind_of?(Hash)
+ write_connection_pool[klass.name].each_value {|conn|
+ active_connections[conn.object_id].disconnect! if active_connections[conn.object_id] }
+ write_connection_pool[klass.name].clear
+ end
+ if read_connection_pool[klass.name].kind_of?(Hash)
+ read_connection_pool[klass.name].each_value {|conn|
+ active_connections[conn.object_id].disconnect! if active_connections[conn.object_id] }
+ read_connection_pool[klass.name].clear
+ end
+ read_connection_pool.delete(klass.name)
+ write_connection_pool.delete(klass.name)
+ clear_all_cached_connections!
+ end
+
+ def self.establish_connection_pool(environment)
+ # Connections that are specifically in a pool override the read/write settings.
+ @active_connection_name = name
+ if @read = configurations["#{environment}_read_pool"]
+ @read.split(",").each do |connection|
+ spec = configurations[connection].clone
+ spec[:read_only] = true
+ append_spec_to_connection_pools(spec)
+ end
+ end
+ if @write = configurations["#{environment}_write_pool"]
+ @write.split(",").each do |connection|
+ spec = configurations[connection].clone
+ spec[:write_only] = true
+ append_spec_to_connection_pools(spec)
+ end
+ end
+ (write_connection_pool[name].size > 0 && read_connection_pool[name].size > 0) ? true : false
+ end
+
# Establishes the connection to the database. Accepts a hash as input where
# the :adapter key must be specified with the name of a database adapter (in lower-case)
# example for regular databases (MySQL, Postgresql, etc):
@@ -182,11 +310,13 @@
case spec
when nil
raise AdapterNotSpecified unless defined? RAILS_ENV
- establish_connection(RAILS_ENV)
+ if !establish_connection_pool(RAILS_ENV)
+ establish_connection(RAILS_ENV)
+ end
when ConnectionSpecification
- clear_active_connection_name
+ clear_active_connection_name
@active_connection_name = name
- @@defined_connections[name] = spec
+ append_spec_to_connection_pools(spec)
when Symbol, String
if configuration = configurations[spec.to_s]
establish_connection(configuration)
@@ -194,12 +324,7 @@
raise AdapterNotSpecified, "#{spec} database is not configured"
end
else
- spec = spec.symbolize_keys
- unless spec.key?(:adapter) then raise AdapterNotSpecified, "database configuration does not
specify adapter" end
- adapter_method = "#{spec[:adapter]}_connection"
- unless respond_to?(adapter_method) then raise AdapterNotFound, "database configuration specifies
nonexistent #{spec[:adapter]} adapter" end
- remove_connection
- establish_connection(ConnectionSpecification.new(spec, adapter_method))
+ establish_connection(ConnectionSpecification.new(spec))
end
end
@@ -207,27 +332,29 @@
# active or defined connections: if it is the latter, it will be
# opened and set as the active connection for the class it was defined
# for (not necessarily the current class).
- def self.retrieve_connection #:nodoc:
+ def self.retrieve_connection(id = nil) #:nodoc:
# Name is nil if establish_connection hasn't been called for
# some class along the inheritance chain up to AR::Base yet.
- if name = active_connection_name
- if conn = active_connections[name]
- # Verify the connection.
- conn.verify!(@@verification_timeout)
- elsif spec = @@defined_connections[name]
- # Activate this connection specification.
- klass = name.constantize
- klass.connection = spec
- conn = active_connections[name]
- end
+ if id.nil?
+ id = round_robin_write_connection.object_id
end
-
+ if conn = active_connections[id]
+ # Verify the connection.
+ conn.verify!(@@verification_timeout)
+ elsif spec = @@defined_connections[id]
+ # Activate this connection specification.
+ # Note this no longer calls connection= which causes problems
+ # with read/write pools being clobbered.
+ config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency)
+ active_connections[id] = self.send(spec.adapter_method, config)
+ conn = active_connections[id]
+ end
conn or raise ConnectionNotEstablished
end
# Returns true if a connection that's accessible to this class have already been opened.
def self.connected?
- active_connections[active_connection_name] ? true : false
+ active_connections[active_connection_name][round_robin_write_connection.object_id] ? true : false
end
# Remove the connection for this class. This will close the active
@@ -235,21 +362,20 @@
# can be used as argument for establish_connection, for easy
# re-establishing of the connection.
def self.remove_connection(klass=self)
- spec = @@defined_connections[klass.name]
- konn = active_connections[klass.name]
- @@defined_connections.delete_if { |key, value| value == spec }
- active_connections.delete_if { |key, value| value == konn }
- konn.disconnect! if konn
- spec.config if spec
+ # Arbitarily choosing the first spec off the write pool to send back.
+ # This is here to stay backwards compatible
+ spec = @@defined_connections[round_robin_write_connection.object_id] if !@@defined_connections[round_robin_write_connection.object_id].nil?
+ clear_connection_pools(klass)
+ spec if spec
end
# Set the connection for the class.
def self.connection=(spec) #:nodoc:
if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
- active_connections[name] = spec
+ active_connections[active_connection_name] = spec
elsif spec.kind_of?(ConnectionSpecification)
config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency)
- self.connection = self.send(spec.adapter_method, config)
+ establish_connection(spec.adapter_method, config)
elsif spec.nil?
raise ConnectionNotEstablished
else
@@ -265,5 +391,5 @@
logger.info "Active connection name: #{@active_connection_name}"
end
end
- end
+ end_pool
end
Index: base.rb
===================================================================
--- base.rb (revision 4617)
+++ base.rb (working copy)
@@ -419,7 +419,7 @@
# Post.find_by_sql "SELECT p.*, c.author FROM posts p, comments c WHERE p.id = c.post_id"
# Post.find_by_sql ["SELECT * FROM posts WHERE author = ? AND created > ?", author_id, start_date]
def find_by_sql(sql)
- connection.select_all(sanitize_sql(sql), "#{name} Load").collect! { |record|
instantiate(record) }
+ connection(:read).select_all(sanitize_sql(sql), "#{name} Load").collect! { |record|
instantiate(record) }
end
# Returns true if the given +id+ represents the primary key of a record in the database, false otherwise.
@@ -506,7 +506,7 @@
# Product.count_by_sql "SELECT COUNT(*) FROM sales s, customers c WHERE s.customer_id = c.id"
def count_by_sql(sql)
sql = sanitize_conditions(sql)
- connection.select_value(sql, "#{name} Count").to_i
+ connection(:read).select_value(sql, "#{name} Count").to_i
end
# Increments the specified counter by one. So <tt>DiscussionBoard.increment_counter("post_count",
@@ -725,7 +725,7 @@
# Returns an array of column objects for the table associated with this class.
def columns
unless @columns
- @columns = connection.columns(table_name, "#{name} Columns")
+ @columns = connection(:read).columns(table_name, "#{name} Columns")
@columns.each {|column| column.primary = column.name == primary_key}
end
@columns
_______________________________________________ Rails mailing list Rails@... http://lists.rubyonrails.org/mailman/listinfo/rails
RSS Feed