Showing posts with label cassandra1.0.12. Show all posts
Showing posts with label cassandra1.0.12. Show all posts

Sunday, December 20, 2015

what happened to the old sstables after apache cassandra compaction is done

Last we study into apache cassandra 1.0.8 compaction and now in this article , we will focus on what will happen after the sstables were compacted. Reading on the class CompactionTask method execute(...) with snippet on the compaction sstables.

1:      ...  
2:      ...  
3:      cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);  
4:      // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up  
5:      for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())  
6:      {  
7:        SSTableReader key = ssTableReaderMapEntry.getKey();  
8:        for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())  
9:          key.cacheKey(entry.getKey(), entry.getValue());  
10:      }  

After the sstables compaction process is done, we see that the new sstable is persist and the old sstables are replaces. After that, the key cache is also updated. Onto the sstables replacements is where we interested in this article. Tracing down execution calls made.

1:    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)  
2:    {  
3:      data.replaceCompactedSSTables(sstables, replacements, compactionType);  
4:    } 

1:    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)  
2:    {  
3:      replace(sstables, replacements);  
4:      notifySSTablesChanged(sstables, replacements, compactionType);  
5:    }

1:    private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
2:    {  
3:      View currentView, newView;  
4:      do  
5:      {  
6:        currentView = view.get();  
7:        newView = currentView.replace(oldSSTables, replacements);  
8:      }  
9:      while (!view.compareAndSet(currentView, newView));  
11:      addNewSSTablesSize(replacements);  
12:      removeOldSSTablesSize(oldSSTables);  
14:      cfstore.updateCacheSizes();  
15:    }

1:    public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)  
2:    {  
3:      for (INotificationConsumer subscriber : subscribers)  
4:      {  
5:        INotification notification = new SSTableListChangedNotification(added, removed, compactionType);  
6:        subscriber.handleNotification(notification, this);  
7:      }  
8:    }  

At this point, replace compacted sstables consists of actual replacements and notify on sstables changed. But first we will take a look at replacement process.

1:      public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
2:      {  
3:        List<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements);  
4:        IntervalTree intervalTree = buildIntervalTree(newSSTables);  
5:        return new View(memtable, memtablesPendingFlush, Collections.unmodifiableList(newSSTables), compacting, intervalTree);  
6:      }

1:      private List<SSTableReader> newSSTables(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
2:      {  
3:        ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables);  
4:        int newSSTablesSize = sstables.size() - oldSSTables.size() + Iterables.size(replacements);  
5:        assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this);  
6:        List<SSTableReader> newSSTables = new ArrayList<SSTableReader>(newSSTablesSize);  
7:        for (SSTableReader sstable : sstables)  
8:        {  
9:          if (!oldSet.contains(sstable))  
10:            newSSTables.add(sstable);  
11:        }  
12:        Iterables.addAll(newSSTables, replacements);  
13:        assert newSSTables.size() == newSSTablesSize : String.format("Expecting new size of %d, got %d while replacing %s by %s in %s", newSSTablesSize, newSSTables.size(), oldSSTables, replacements, this);  
14:        return newSSTables;  
15:      }

1:      private IntervalTree buildIntervalTree(List<SSTableReader> sstables)  
2:      {  
3:        List<Interval> intervals = new ArrayList<Interval>(sstables.size());  
4:        for (SSTableReader sstable : sstables)  
5:          intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));  
6:        return new IntervalTree<SSTableReader>(intervals);  
7:      }

1:    private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)  
2:    {  
3:      for (SSTableReader sstable : newSSTables)  
4:      {  
5:        assert sstable.getKeySamples() != null;  
6:        if (logger.isDebugEnabled())  
7:          logger.debug(String.format("adding %s to list of files tracked for %s.%s",  
8:                sstable.descriptor,, cfstore.getColumnFamilyName()));  
9:        long size = sstable.bytesOnDisk();  
10:        liveSize.addAndGet(size);  
11:        totalSize.addAndGet(size);  
12:        sstable.setTrackedBy(this);  
13:      }  
14:    }  

1:    private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables)  
2:    {  
3:      for (SSTableReader sstable : oldSSTables)  
4:      {  
5:        if (logger.isDebugEnabled())  
6:          logger.debug(String.format("removing %s from list of files tracked for %s.%s",  
7:                sstable.descriptor,, cfstore.getColumnFamilyName()));  
8:        liveSize.addAndGet(-sstable.bytesOnDisk());  
9:        sstable.markCompacted();  
10:        sstable.releaseReference();  
11:      }  
12:    }

1:    /**  
2:     * Mark the sstable as compacted.  
3:     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere  
4:     * except for threads holding a reference.  
5:     */  
6:    public void markCompacted()  
7:    {  
8:      if (logger.isDebugEnabled())  
9:        logger.debug("Marking " + getFilename() + " compacted");  
10:      try  
11:      {  
12:        if (!new File(descriptor.filenameFor(Component.COMPACTED_MARKER)).createNewFile())  
13:          throw new IOException("Unable to create compaction marker");  
14:      }  
15:      catch (IOException e)  
16:      {  
17:        throw new IOError(e);  
18:      }  
20:      boolean alreadyCompacted = isCompacted.getAndSet(true);  
21:      assert !alreadyCompacted : this + " was already marked compacted";  
22:    }

1:    public void releaseReference()  
2:    {  
3:      if (references.decrementAndGet() == 0 && isCompacted.get())  
4:      {  
5:        // Force finalizing mmapping if necessary  
6:        ifile.cleanup();  
7:        dfile.cleanup();  
9:        deletingTask.schedule();  
10:      }  
11:      assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;  
12:    }

1:    public void schedule()  
2:    {  
3:      StorageService.tasks.submit(this);  
4:    }  

1:    /**  
2:     * Resizes the key and row caches based on the current key estimate.  
3:     */  
4:    public synchronized void updateCacheSizes()  
5:    {  
6:      long keys = estimateKeys();  
7:      keyCache.updateCacheSize(keys);  
8:      rowCache.updateCacheSize(keys);  
9:    }  

As shown above, there are many things even in the replacement process! We can summarized based on the code trace above,

  • an interval tree is built using the replacement sstable. After that, that new view is returned.
  • the process above is repeated until the view become equal.
  • addNewSSTablesSize make the replacement sstable become active.
  • finally it is time to remove the old sstables.
  • the old sstables will be marks as compacted and then remove when it is no longer reference by threads.

Onto the method notifySSTablesChanged(),

1:    public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)  
2:    {  
3:      for (INotificationConsumer subscriber : subscribers)  
4:      {  
5:        INotification notification = new SSTableListChangedNotification(added, removed, compactionType);  
6:        subscriber.handleNotification(notification, this);  
7:      }  
8:    }  

For each of the subscribers, the sstable list change is notified and class that implement the interface should handle the changed.

Sunday, May 10, 2015

My journey and experience on upgrading apache cassandra from version1.0.12 to 1.1.12

If you have read my previous post on apache cassandra upgrade, this is another journey to major upgrade apache cassandra from version 1.0 to 1.1. In this article, I will share on my experience on upgrading cassandra from version 1.0.12 to 1.1.12.

The sstable version used by cassandra 1.0.12 is hd  and you should ensure that all nodes sstables become hd before proceed upgrade to a newer version of cassandra.

First, let read some highlight of cassandra 1.1

  • api version 19.33.0

  • new file,

  • new directory structure for sstable and filename change for sstable.

  • more features/improvement to nodetool such as compactionstats has remaining timestamp, calculate exact size required for cleanup operations, you can now stop compaction, rangekeysample, getsstables, repair print progress, etc.

  • global key and row cache.

  • cql 3.0 beta

  • schema change for cassandra in caching.

  • libthrift version 0.7.0.

  • sstable hf version.

  • default compressor become snappy compressor.

  • a lot of improvement to level compression strategy.

  • sliced_buffer_size_in_kb option has been removed from the cassandra.yaml configuration file (this option was a no-op since 1.0).

  • thread stack size increased to 160k

  • added flag UseTLAB for jvm to improve read speed.

As this is a newer version of cassandra compare the previous, it is always good to setup a test node and so you can play around and get familiar with it before actually doing the upgrade. With this new node, you can also quickly test with your application client which write and/or read to the test cassandra node. It is also recommended to do some load test to see the result is what you have expected.

If you want to be extremely careful on the upgrade, then reading the code changes between the version you chose to upgrade is always recommended. This is the link for this upgrade  and I know and understand as there are huge differences in betweeen them, so you should split as small as possible to read through it. You can learn a lot from the experience coder if you spend a lot of time reading their code and you can learn new technology too. It is a daunting huge tasks but if you willing to spend sometime to read them, the benefits return is just too much to even describe here.

If you upgrade from 1.0.12 to 1.1.12, cassandra 1.1 is smart enough to move the sstable into new directory structure. So, it ease your job that you do not need to move the sstable into the new directory structure. When the new cassandra 1.1.12 starting up, it will move for you.

So you might want to consider prepare the configuration file for your cluster environment before hand. For example, cassandra.yaml, and By doing this, you can decrease the upgrade process time duration and less error when you are not actually doing it but a upgrade script will symlink this for you. So spend sometime to write upgrade and downgrade scripts for the production cluster and tests it.

Because upgrade process will take time (a long one, depend on how many nodes you have in cluster) and it will tired you in the process (remember, there will be post upgrade issues which you need to deal with), so I suggest you create a upgrade script to handle the upgrade process. The cassandra configuration which you prepare before will be automatically symlink within this script. When you do this, you reduce risk such as factor human error and for a production cluster, you will NOT want to risk anything or cut the risk to as minimum as possible.

There is official upgrade documentation here at datastax but because your cluster environment might be different, so you might want to write the upgrade step taking into consideration from the official documentation and let peer review so you cover as much as possible. Best if your peer will tests and raise in some questions which you might not think of.

If you have using monitoring system such as opscenter, spm, jconsole, or your own monitoring system, you wanna check it out if these monitoring can support the newer version of cassandra.

key cache and row cache per column family based has been replace with global key cache and global row cache respectively. These global cache settings can be found in casandra.yaml file. If you leave it to default, 1 millon key cache by default. Below are some new parameter for cassandra 1.1,

  • populate_io_cache_on_flush

  • key_cache_size_in_mb

  • key_cache_save_period

  • row_cache_size_in_mb

  • row_cache_save_period

  • row_cache_provider

  • commitlog_segment_size_in_mb

  • trickle_fsync

  • trickle_fsync_interval_in_kb

  • internode_authenticator

and below are configuration get removed

  • sliced_buffer_size_in_kb

  • thrift_max_message_length_in_mb

For the upgrade steps in production, these steps are taken appropriately:

pre-upgrade apply to all node in cluster.
* stop any repair , cleanup in all cassandra node and no streaming happened. Streaming are the nodes bootstrap or you rebuild a node.

upgrade steps.
1. download cassandra 1.1.12 and verify binary is not corrupted.
2. extract the compressed tarball.
3. nodetool snapshot.
4. nodetool drain.
5. stop cassandra if it not stopped.
6. symlink new configuration files.
7. start cassandra 1.1.12
8. monitor cassandra system.log
9. check monitoring system.

If everything looks okay for first node, best if you do two nodes, and then continue till the rest of the node in rolling upgrade fashion. After you migrate, you might also noticed there are 3 more additional column families in cassandra 1.1

cassandra 1.0 system keyspace has a total of 7 column families

  • HintsColumnFamily

  • IndexInfo

  • LocationInfo

  • Migrations

  • NodeIdInfo

  • Schema

  • Versions

cassandra 1.1 system keyspace has a total 10 column families.

  • HintsColumnFamily

  • IndexInfo

  • LocationInfo

  • Migrations

  • NodeIdInfo

  • Schema

  • schema_columnfamilies

  • schema_columns

  • schema_keyspaces

  • Versions

If you are using level compaction strategy, these sstable need to be scrub accordingly. There are nodetool scrub and offline sstablescrub for this job. If you have defined column family using counter type, you should upgrade the sstable using nodetool upgradesstables.

That's it and if you need professional service for this, please contact me and I will be gladly to provide professional advice and/or service.

Sunday, March 29, 2015

My journey and experience on upgrading apache cassandra 1.0.8 to 1.0.12

Upon request of my blog reader, today I will share with you my experience on upgrading apache cassandra version 1.0.8 to 1.0.12 on a production live cluster. By sharing this information, I hope if you are also running and/or administer cassandra cluster, you can learn from my experience and ease your worry or pain.

First, let's lay out what's the current architecture in this environment.

  • java 6

  • 12 nodes cluster.

  • two spinning disk with raid 0, 32GB total system memory where 14GB allocated to the cassandra heap instance, with 800MB for young gen. quad core cpu.

  • pretty much stock cassandra.yaml configuration with the following different like concurrent_write to 64, flush_largest_memtables_at to 0.8, compaction_throughput_mb_per_sec to 64.

  • node load per node average at 500-550GB.

As you can see, this is pretty ancient cassandra we are using at of this time of writing but because cassandra has been rock solid serving read/write requests for years, it stays like this stable condition forever and we leverage on the benefit of scalling out like adding nodes from six to nine and eventually to twelve now. Realizing that the disk failure do happened in the nodes of the cluster, because of cassandra has a no single point of failure in mind, we can afford to loose a single node out of operation while replacing it. That were a few of the reasons we stayed with cassandra 1.0 for quite sometime.

Because we would like to probably goes to cassandra 2.0 and beyond, and java 6 has been EOL for quite sometime, it would be wise to upgrade java before cassandra. Because system are integrated like an ecosystem, it would be also wise to look at java used in the client system that read/write requests to the cassandra cluster. So make a checklist brainstorming what are clients that integrate into the cluster and then check out what are the current stable java 7 available. Example:

cassandra 1.0 cassandra-1.0.12 java miniumum 6 and above.

hector client using casandra 2.0.4 so java 7 minimum

datastax cql driver use cassandra 2.1.2 so java 7 minimum

java 7 update release note

features and enhancement

java 7 in wiki

before upgrading, check if cassandra using different unicode on the data
Early versions of the Java SE 7 release added support for Unicode 5.1.0. The final version of the Java SE 7 release supports Unicode 6.0.0. Unicode 6.0.0 is a major version of the Unicode Standard and adds support for over 2000 additional characters, as well as support for properties and data files.

As of the time of checking, we picked java 7 update 72. Upgrading java 6 to java 7 update 72 in the cassandra 1.0.8 is a painless process other than just time consuming. As load per node is huge and total number of nodes in cluster. I follow this steps for java upgrade in cassandra node.

upgrade java for all cassandra node
1. write a script to automatically install java7 on node, update java stacked size to 256k in set JAVA_HOME for file to java 7.
2. execute the script in rolling fashion for all the node with one upgrade at a time.
3. stop cassandra
4. execute the script.
5. start the cassandra instance
6.0 start the cassandra instance and monitor after the node is up and then check the monitoring system after node elapsed for 30minutes, 60minutes, 1hours and 2hours.
6.1 check your client can read/write to that one upgraded node.

By now, you can perform the next node in the ring, but you can skip step 6.0 as you are sure that it is going to work. One thing I observed is that, the gc duration for cassandra using java 6 and java 7 is it is down by half! That's could means faster gc means more cpu cycle to process other tasks and less stop of the world for cassandra instance.

Leave this cluster with java 7 upgraded run a day or two and if it is okay, continue to cassandra upgrade. So which cassandra version to upgrade to? There are several guidelines I followed.

1. choose ONLY STABLE release for production cluster. How to choose? You should read this link.
2. read NEWS.txt  and Changes.txt . As time to time, change to the code base may affect example, the sstable. So pay attention especially between cassandra major upgrade.
3. read the code difference between the version you decided to upgrade too, example for this upgrade.
4. read the datastax upgrading node for minor version.

I spent a lot of time doing step 3 and by reading the code diference, realize what has been change and/or added and consider it will impact your cassandra environment. In order for further upgrade to cassandra 1.1, you will need to upgrade to the latest version of the one currently deployed. Example here. Once read the above checkpoints, you may have a lot of questions and TODOs and that will give further works. In the next step, it is best if you find out the questions and TODOs you have and then verify in the test cluster before apply to a production cluster.

For me, I have written a few bash scripts example mentioned above, java upgrade. Also I have written install test cluster for cassandra upgrade. Remember to also write script to snapshot the data directory using nodetool and then also write script to automatically downgrade. When something goes wrong, you can revert using the automatic downgrade script and using the backup from nodetool dump. Then you will need to save the configurations example,,, cassandra.yaml or any other in your environment cluster.

With these scripts written and tested, it is best if you get and acknowledgements from the management if this is to be proceed and also, it would be best if you have someone who is also administer of cassandra cluster with you just for the good and bad moments. ;-) You can also reach me by my follow button in the home page. :)

upgrade cassandra from 1.0.8 to 1.0.12

  1. stop repair and cleanup in all nodes in the cluster.

  2. write a script to automatically upgrade it and so you dont panic, waste time and composed during node upgrade. Trust me, save you a lot of time and human error free. scripts content could be the following:
    - download cassandra 1.0.12 and extract, file permission ,etc
    - backup current cassandra 1.0.8 using nodetool snapshots. make sure you write the snapshot directory name like MyKeyspace-1.0.8-date
    - drain the node.
    - stop cassandra if it is not yet stopped.
    - update cassandra 1.0.12 with your cluster settings.

  3. check the configuration changed and then start cassandra 1.0.12 new instance.

  4. monitor after the node is up and then check the monitoring system after node elapsed for 30minutes, 60minutes, 1hours and 2hours.

  5. check your client can read/write to that one upgraded node.

By now, you can perform the next node in the ring, but you can skip step 4.0 as you are sure that it is going to work. As the version of the cassandra sstable change in 1.0.10, from hc to hd, it is best all sstables in all nodes, using the hd version before perform the next major upgrade.

That's it for this article and whilst this maybe not cover all, may contain mistake, and/or if you want to comment, please leave your comment below.

Saturday, March 28, 2015

Investigate into apache cassandra corrupt sstable exception

Today, we will take a look at another apache cassandra 1.0.8 exception. Example of stack trace below.
ERROR [SSTableBatchOpen:2] 2015-03-07 06:11:58,544 (line 228) Corrupt sstable /var/lib/cassandra/data/MySuperKeyspace/MyColumnFamily-hc-6681=[Index.db, Statistics.db, CompressionInfo.db, Filter.db, Data.db]; skipped Input/output error
at Method)
at java.util.concurrent.Executors$
at java.util.concurrent.ThreadPoolExecutor.runWorker(
at java.util.concurrent.ThreadPoolExecutor$

Before we go into the code base for this stacktrace, I have no idea what is this about and this one shown when the cassandra 1.0.12 instance is booting up. Last I remember I trigger user defined compaction twice in cassandra 1.0.8 using the same sstables and after first compaction is done, then this sstable stay forever... like for two weeks plus. Then we have upgrade for the cassandra.

Enough said, let's go into the code base and understand what is really mean by corrupt sstable. Bottom of the the stack trace pretty obvious, ThreadPoolExecutor execute a future task run method.Then it is now on apache cassandra namespace codebase, as can be read below class SSTableReader, method batchOpen(), code snippet
    public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
final Set<DecoratedKey> savedKeys,
final DataTracker tracker,
final CFMetaData metadata,
final IPartitioner partitioner)
final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();

ExecutorService executor = DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors());
for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
Runnable runnable = new Runnable()
public void run()
SSTableReader sstable;
sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner);
catch (IOException ex)
logger.error("Corrupt sstable " + entry + "; skipped", ex);

executor.awaitTermination(7, TimeUnit.DAYS);
catch (InterruptedException e)
throw new AssertionError(e);

return sstables;


As can be read above, somewhere within the method open() throw the IOException, hence the above exception was thrown. Two stack trace up, we read that, sstable load method execute and, method. With the method read from class ByteBufferUtil as shown below.
    public static ByteBuffer read(DataInput in, int length) throws IOException
if (in instanceof FileDataInput)
return ((FileDataInput) in).readBytes(length);

byte[] buff = new byte[length];
return ByteBuffer.wrap(buff);

We see that, the input in a instance of FileDataInput stream and read the bytes with length. Since FileDataInput is a interface, we read that, the class that implement this interface is RandomAccessReader class and method readBytes as the follow.
public ByteBuffer readBytes(int length) throws IOException
assert length >= 0 : "buffer length should not be negative: " + length;

byte[] buff = new byte[length];
readFully(buff); // reading data buffer

return ByteBuffer.wrap(buff);

to read bytes with length is actually to read fully on the length but started on the current file pointer pointing at. And a little bit way up in the stack trace, method reBuffer()
* Read data from file starting from current currentOffset to populate buffer.
* @throws IOException on any I/O error.
protected void reBuffer() throws IOException

if (bufferOffset >= channel.size())

channel.position(bufferOffset); // setting channel position

int read = 0;

while (read < buffer.length)
int n =, read, buffer.length - read);
if (n < 0)
read += n;

validBufferBytes = read;

bytesSinceCacheFlush += read;

if (skipIOCache && bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
// with random I/O we can't control what we are skipping so
// it will be more appropriate to just skip a whole file after
// we reach threshold
CLibrary.trySkipCache(this.fd, 0, 0);
bytesSinceCacheFlush = 0;

and this method call superclass to read another chunk into the buffer. The upper class RandomAccessFile , method readBytes()
* Reads a sub array as a sequence of bytes.
* @param b the buffer into which the data is read.
* @param off the start offset of the data.
* @param len the number of bytes to read.
* @exception IOException If an I/O error has occurred.
private int readBytes(byte b[], int off, int len) throws IOException {
Object traceContext = IoTrace.fileReadBegin(path);
int bytesRead = 0;
try {
bytesRead = readBytes0(b, off, len);
} finally {
IoTrace.fileReadEnd(traceContext, bytesRead == -1 ? 0 : bytesRead);
return bytesRead;

private native int readBytes0(byte b[], int off, int len) throws IOException;

.. and we are at the end of this path, it turn out that the call to readBytes0 thrown exception, the lower layer native non java call throwing the IO exception. You can use nodetool scrub to see if this fix the problem but what I do basically wipe the data directory for the cassandra and rebuild it. Then I don't see anymore of this message anymore.

That's it for this article and if you want to improve and/or comment, please leave your input below.