24 May 2012 16:45
Inconsistent value of counter in multi-threaded application
Hi guys.
I'm newbie with cassandra and hector and I need your help.
I've written a small program to test counter insertion /
incrementation, and once it's done, I'm reading inserted data to check
counter value.
My problem is the following one: when I execute the insertion /
incrementation in mono-thread, the counter value is the expected one.
When I execute the same thing in multi-threaded environment, counter
has fantasist value (can be lower or greater than expected value).
Each callable provided to threads has its own keyspace and mutator
objects.
I don't use BatchMutation but I request execution on each call.
I've tried different ConsistencyLevelPolicy for read/write, and same
thing for the FailoverPolicy but the result is always the same.
I read somewhere to have consistent value for counter, I have to
define consistencyLevelPolicy to All for writting and FAIL_FAST for
the FailoverPolicy. But with this FailoverPolicy, this is the worst
result (allmost all counters inserted have fantasist value). With
FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE or
FailoverPolicy.ON_FAIL_TRY_ONE_NEXT_AVAILABLE it's better but not
perfect.
Please tell me how to manage counter in multi-threaded application to
have consistent value.
Let me explain to you my environment:
I'm using:
- Cassandra 1.1.0
- hector 1.0.5
I've created 2 datacenters locally:
Address DC Rack Status State Load
Owns Token
113427455640312814857969558651062452225
127.0.1.1 DC1 RAC1 Up Normal 56,76 KB
33,33% 0
127.0.2.1 DC2 RAC1 Up Normal 58,28 KB
0,00% 1
127.0.1.2 DC1 RAC1 Up Normal 61,34 KB
33,33% 56713727820156407428984779325531226112
127.0.2.2 DC2 RAC1 Up Normal 39,18 KB
0,00% 56713727820156407428984779325531226113
127.0.1.3 DC1 RAC1 Up Normal 39,18 KB
33,33% 113427455640312814857969558651062452224
127.0.2.3 DC2 RAC1 Up Normal 39,18 KB
0,00% 113427455640312814857969558651062452225
My keyspace is defined as below:
Keyspace: USERS:
Replication Strategy:
org.apache.cassandra.locator.NetworkTopologyStrategy
Durable Writes: true
Options: [DC2:3, DC1:3]
Column Families:
ColumnFamily: QABO_COUNT_LONG
Key Validation Class: org.apache.cassandra.db.marshal.LongType
Default column value validator:
org.apache.cassandra.db.marshal.CounterColumnType
Columns sorted by:
org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.Lo
ngType),org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type)
Row cache size / save period in seconds / keys to save : 0.0/0/0
Row Cache Provider: null
Key cache size / save period in seconds: 200000.0/0
GC grace seconds: 864000
Compaction min/max thresholds: 4/32
Read repair chance: 1.0
Replicate on write: true
Bloom Filter FP chance: default
Built indexes: []
Compaction Strategy:
org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy
Compression Options:
sstable_compression:
org.apache.cassandra.io.compress.SnappyCompressor
The keyspace for each Callable is create like this:
Keyspace k = null;
KeyspaceDefinition ksDef =
HFactory.createKeyspaceDefinition(keyspaceName,
ThriftKsDef.NETWORK_TOPOLOGY_STRATEGY, 3,
null);
ThriftKsDef tksDef = (ThriftKsDef)ksDef;
Map<String, String> strategyOptions = new HashMap<String,
String>();
strategyOptions.put("DC1", "3");
strategyOptions.put("DC2", "3");
tksDef.setStrategyOptions(strategyOptions);
cluster.addKeyspace(ksDef, true);
ConfigurableConsistencyLevel cp = new
ConfigurableConsistencyLevel();
cp.setConsistencyLevelForCfOperation(HConsistencyLevel.ALL,
"QABO_COUNT_LONG", OperationType.WRITE);
cp.setConsistencyLevelForCfOperation(HConsistencyLevel.QUORUM,
"QABO_COUNT_LONG", OperationType.READ);
k = HFactory.createKeyspace(keyspaceName, cluster, cp,
FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE);
The mutation is done like this:
mutator = HFactory.createMutator(keyspace,
LongSerializer.get());
Composite columnName = new Composite();
columnName.addComponent(quoteAsk.getDate(),
LongSerializer.get());
columnName.addComponent(this.productId,
StringSerializer.get());
columnName.addComponent(this.exchange, StringSerializer.get());
mutator.incrementCounter(user.longValue(), "QABO_COUNT_LONG",
columnName, 1);
Below is one log generated during execution:
2012-05-24 16:36:55,013 (CEST) Starting insertion in Cassandra, column
family: QABO_COUNT_LONG
2012-05-24 16:36:55,013 (CEST) List of users: 11161,
2012-05-24 16:36:56,417 (CEST) Creating column family: QABO_COUNT_LONG
2012-05-24 16:36:58,011 (CEST) Column family: QABO_COUNT_LONG created
2012-05-24 16:36:58,011 (CEST) Executing 2 threads.
2012-05-24 16:36:58,487 (CEST) Threads pool finished
2012-05-24 16:36:58,487 (CEST) Time needed to execute
generateNoSQLCompositeUserLongCounterQuoteAsks for 110 executions is
3474 ms.
2012-05-24 16:36:59,489 (CEST) Consistency Level for reading: QUORUM
2012-05-24 16:36:59,489 (CEST) Consistency Level for writting: ALL
2012-05-24 16:36:59,544 (CEST) User 11161, products: FR0005025004#025,
expected 11 but having 10
2012-05-24 16:36:59,545 (CEST) User 11161, products: FR0010307439#025,
expected 11 but having 12
2012-05-24 16:36:59,545 (CEST) Nb invalid counter values: 2
2012-05-24 16:36:59,545 (CEST) Starting cleaning data in Cassandra,
column family: QABO_COUNT_LONG
2012-05-24 16:36:59,549 (CEST) End of cleaning data in Cassandra,
column family: QABO_COUNT_LONG
2012-05-24 16:36:59,549 (CEST) Duration is: 0:00:00:004
2012-05-24 16:36:59,549 (CEST) Deleting column family: QABO_COUNT_LONG
2012-05-24 16:37:02,101 (CEST) Column family: QABO_COUNT_LONG deleted
2012-05-24 16:37:02,101 (CEST) Deleting keyspace: USERS
2012-05-24 16:37:06,119 (CEST) Keyspace: USERS deleted
RSS Feed