Sunday, June 5, 2016

java collection framework

Once I was asked by a company what is the data structure in java and i was not prepare at all, but as usual, why bother remembered every details when we can google and start to read. It turn out that the answer they are seeking is the java collection framework and the next question comes, like what are the characteristics of the collections.

Well, to be really honest, who go remember every details when we can read the javadoc? Anyway, recently I found this chart circulating in the facebook which remind me of the questions asked. So I thought this is helpful and we should not memorized every fine details but the essence point is you know where to get the material and willing to share.

So here goes!

This is a short article and I hope you find this useful in your daily coding reference than use it to answer some funny questions. haha!

Saturday, June 4, 2016

First look into sector/sphere

Wikipedia excerpt

Sector/Sphere is an open source software suite for high-performance distributed data storage and processing. It can be broadly compared to Google's GFS/MapReduce stack. Sector is a distributed file system targeting data storage over a large number of commodity computers. Sphere is the programming framework that supports massive in-storage parallel data processing for data stored in Sector. Additionally, Sector/Sphere is unique in its ability to operate in a wide area network (WAN) setting.
Today, we will take a look into another big data technology, sector/sphere Let's download the source here.

 user@localhost:~/Desktop$ tar -xf sector.2.8.tar   
 user@localhost:~/Desktop$ cd sector/  
 user@localhost:~/Desktop/sector$ ls  
 total 116K  
 -rw-r--r-- 1 user user 5.5K Feb 3 2012 rpm.spec  
 -rw-r--r-- 1 user user 2.2K Feb 3 2012 release_note.txt  
 -rw-r--r-- 1 user user 509 Feb 3 2012 NOTICE.txt  
 -rw-r--r-- 1 user user 506 Feb 3 2012 Makefile.common  
 -rw-r--r-- 1 user user 279 Feb 3 2012 Makefile  
 -rw-r--r-- 1 user user 286 Feb 3 2012 README.txt  
 -rw-r--r-- 1 user user 12K Feb 3 2012 LICENSE.txt  
 -rw-r--r-- 1 user user  0 Feb 3 2012  
 -rw-r--r-- 1 user user 6.4K Feb 3 2012 README.stable_branch.txt  
 -rw-r--r-- 1 user user 7.3K Feb 3 2012 LOG  
 drwxr-xr-x 2 user user 4.0K Feb 3 2012 client  
 drwxr-xr-x 2 user user 4.0K Feb 3 2012 common  
 drwxr-xr-x 4 user user 4.0K Feb 3 2012 doc  
 drwxr-xr-x 3 user user 4.0K Feb 3 2012 examples  
 drwxr-xr-x 4 user user 4.0K Feb 3 2012 fuse  
 drwxr-xr-x 2 user user 4.0K Feb 3 2012 gmp  
 drwxr-xr-x 2 user user 4.0K Feb 3 2012 include  
 drwxr-xr-x 2 user user 4.0K Feb 3 2012 lib  
 drwxr-xr-x 2 user user 4.0K Feb 3 2012 master  
 drwxr-xr-x 2 user user 4.0K Feb 3 2012 security  
 drwxr-xr-x 3 user user 4.0K Feb 3 2012 slave  
 drwxr-xr-x 2 user user 4.0K Feb 3 2012 test  
 drwxr-xr-x 2 user user 4.0K Feb 3 2012 tools  
 drwxr-xr-x 2 user user 4.0K Feb 3 2012 udt  
 drwxr-xr-x 3 user user 4.0K Feb 3 2012 conf  
 user@localhost:~/Desktop/sector$ find .  

As you can read above, we downloaded sector 2.8 and extract it. The content consists of the client, tools, slave, conf, fuse, common, master, udt, security, doc, test, gmp and example components.

For the compiling instruction, the documentation where I refer to can be found here.

 $ make  
 make[1]: Entering directory '/home/user/Desktop/sector/udt'  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX md5.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX common.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX window.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX list.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX buffer.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX packet.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX channel.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX queue.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX core.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX cache.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX epoll.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX api.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra -D IA32 -D LINUX ccc.cpp -c  
 g++ -shared -o md5.o common.o window.o list.o buffer.o packet.o channel.o queue.o core.o cache.o epoll.o api.o ccc.o -lstdc++ -lpthread -lssl -lcrypto -L. -L../lib   
 /usr/bin/ld: cannot find -lssl  
 /usr/bin/ld: cannot find -lcrypto  
 collect2: error: ld returned 1 exit status  
 Makefile:25: recipe for target '' failed  
 make[1]: *** [] Error 1  
 make[1]: Leaving directory '/home/user/Desktop/sector/udt'  
 Makefile:5: recipe for target 'subdirs' failed  
 make: *** [subdirs] Error 2  

oppsss.... looks like ssl library never get install in my system.

 user@localhost:~/Desktop/sector$ sudo apt-get install libssl-dev   
 Reading package lists... Done  
 Building dependency tree      
 Reading state information... Done  
 The following packages were automatically installed and are no longer required:  
  gstreamer0.10-nice gstreamer1.0-clutter libappstream-glib7 libasm3-java libcamel-1.2-52 libdbusmenu-qt5-2 libebook-contacts-1.2-1 libecal-1.2-18 libedata-cal-1.2-27 libedataserver-1.2-20  
  libept1.4.16 libfarstream-0.1-0 libgegl-0.2-0 libgsf-1-114 libgsf-1-common libgsoap7 libisl13 libkf5bookmarks-data libkf5bookmarks5 libkf5kiofilewidgets5 libkf5notifications-data  
  libkf5notifications5 libkf5solid5 libkf5solid5-data libphonon4qt5-4 librarian0 libraw10 libruby2.1 libvncserver1 libvte-2.90-9 libvte-2.90-common libx264-146:i386 libx265-59  
  libx265-59:i386 linux-image-4.1.0-2-amd64 phonon4qt5 phonon4qt5-backend-vlc python-dbus-dev python-zeitgeist rarian-compat vlc-plugin-samba  
 Use 'sudo apt autoremove' to remove them.  
 The following additional packages will be installed:  
 The following NEW packages will be installed:  
  libssl-dev libssl-doc  
 0 upgraded, 2 newly installed, 0 to remove and 287 not upgraded.  
 Need to get 2,788 kB of archives.  
 After this operation, 10.3 MB of additional disk space will be used.  
 Do you want to continue? [Y/n] Y  
 Get:1 testing/main amd64 libssl-dev amd64 1.0.2f-2 [1,538 kB]  
 Get:2 testing/main amd64 libssl-doc all 1.0.2f-2 [1,250 kB]                                                   
 Fetched 2,788 kB in 8s (312 kB/s)                                                                                
 Selecting previously unselected package libssl-dev:amd64.  
 (Reading database ... 224709 files and directories currently installed.)  
 Preparing to unpack .../libssl-dev_1.0.2f-2_amd64.deb ...  
 Unpacking libssl-dev:amd64 (1.0.2f-2) ...  
 Selecting previously unselected package libssl-doc.  
 Preparing to unpack .../libssl-doc_1.0.2f-2_all.deb ...  
 Unpacking libssl-doc (1.0.2f-2) ...  
 Processing triggers for man-db (2.7.5-1) ...  
 Setting up libssl-dev:amd64 (1.0.2f-2) ...  
 Setting up libssl-doc (1.0.2f-2) ...  

Let's compile it again.

 user@localhost:~/Desktop/sector$ make  
 make[1]: Entering directory '/home/user/Desktop/sector/udt'  
 g++ -shared -o md5.o common.o window.o list.o buffer.o packet.o channel.o queue.o core.o cache.o epoll.o api.o ccc.o -lstdc++ -lpthread -lssl -lcrypto -L. -L../lib   
 ar -rcs libudt.a md5.o common.o window.o list.o buffer.o packet.o channel.o queue.o core.o cache.o epoll.o api.o ccc.o  
 make[1]: Leaving directory '/home/user/Desktop/sector/udt'  
 make[1]: Entering directory '/home/user/Desktop/sector/udt'  
 mv libudt.a ../lib  
 make[1]: Leaving directory '/home/user/Desktop/sector/udt'  
 make[1]: Entering directory '/home/user/Desktop/sector/common'  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra osportable.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra conf.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra sfopt.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra snode.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra meta.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra index.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra memobj.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra transaction.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra topology.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra log.cpp -c  
 log.cpp: In member function ‘void logger::LogAggregate::setLogLevel(logger::LogLevel)’:  
 log.cpp:404:24: warning: variable ‘newLogger’ set but not used [-Wunused-but-set-variable]  
    log_mapped_value_t newLogger;  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra sphere.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra constant.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra crypto.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra dhash.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra routing.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra udttransport.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra ssltransport.cpp -c  
 g++ -fPIC -DMULTITHREADED_LOGGER -I. -I../include -I../udt -g -DDEBUG -Wall -Wextra tcptransport.cpp -c  
 tcptransport.cpp: In member function ‘virtual int sector::TCPTransport::close()’:  
 tcptransport.cpp:157:11: error: ‘::close’ has not been declared  
   return ::close(m_iSocket);  
 tcptransport.cpp:157:11: note: suggested alternative:  
 In file included from ../include/sector.h:31:0,  
          from tcptransport.cpp:33:  
 ../udt/udt.h:310:13: note:  ‘UDT::close’  
  UDT_API int close(UDTSOCKET u);  
 Makefile:30: recipe for target 'tcptransport.o' failed  
 make[1]: *** [tcptransport.o] Error 1  
 make[1]: Leaving directory '/home/user/Desktop/sector/common'  
 Makefile:5: recipe for target 'subdirs' failed  
 make: *** [subdirs] Error 2  

It's a bit pity as compilation failed and that definitely a blocker for new people to pick up this great software. If you develop or know cpp, please leave a message in the comment how to make this compilation works. Otherwise, if you want to know more on this software, this is another useful link.

Friday, June 3, 2016

First look into ECL (enterprise control language)

It's been a while for a quiet moment since I actively blogging due to the fact
of family issue. But I hope things will goes even smoother and continue what I
like the best, learning information technology and contribute back to the
opensource society.

Today, we will take a look into another big data technology. ECL or Enterprise
Control Language

is a declarative, data centric programming language designed in 2000 to allow a team of programmers to process big data across a high performance computing cluster without the programmer being involved in many of the lower level, imperative decisions.[1][2]

As this article only meant for introductory, we will just go through with whatever documentation officially available from HPCC Systems, LexisNexis Risk Solutions. As such to speed up of ecl acquaintance, download a virtual image from this link. This virtual machine which already preconfigured hpcc system ready together with ecl to play with.

For me, I have chosen image of current version with gold release running on a 64bit cpu. Next, you need to install virtualbox on your pc in order to run this virtual image. In the past, I have describe many times how to install virtualbox via apt-get.

Particulars that you might want to pay attention how to quickly get the downloaded virtual image to run on the virtualbox is, hpcc systems require two network adapters and make sure you have them configured correctly. You don't have to create a new virtual machine but just select from file dropdown and choose 'Import Appliance'. Then navigate to the downloaded image and import it. Next, power on the virtual machine. You should see something similar as of following.

Open your browser and point to http://<your hpcc system ip address>:8010/#/stub/ECL . Click on
ECL on top of menu bar and then in the submenu below, click on Playground. There are some example
how to model the data store, insert the data and them query the data. If you want more explanation
you can read on this link.

If ecl interest you, you should really read the programmer guide here, hpcc system offered a lot of documentations which can be found here.

That's it for this learning experience. I must say it is very easy to setup to quickly learn what is ECL compare to the previous hadoop system. If you are looking into big data analytic, ecl might be a good option to begin with.

Friday, January 1, 2016

Happy new year 2016

Happy new year 2016 to everyone!

In the past year 2015, many things has happened, a lot of blogs published mainly big data articles and java articles. I enjoy writing this technology articles after spent considerable amount of time onto different topics in my spare times.

Whilst in the past year many things has happened, one part that struck me dearly was my dear friend noflexdan passed away due to health complication. The domain that he left behind, that we started actively blogging on information technology took a big stumble blocks. As a result of his partway, I decided to start a new domain which inherit the very fundamental of sharing information technology through blogging but no longer using his domain name. What's rightfully belong to him, will be forever belong to him. I miss you.

Another part that struck me recently when my mom has diabetes complication which this time cause her both of her legs cannot move and swollen. Although almost a month has past, her swollen has reduced and pain has reduced a lot, but she still unable to walk. Her health, well being and daily routine is currently handle majorly by myself and people that close to me.

Due to this changes, I have basically do not have much additional time to spend time writing information technology blogs anytime soon. If you have been a vivid reader in the past, my sincere apology for all this has coming to me. I hope everything will be better soon so that my mom can self take care and I can resume my technology writing.

Till then if you want to share what you learn on information technology, you can drop me an email and I shall arrange this!

Let's hope 2016 will be better than in the past year and I wish everyone stay healthy.

Jason Wee

Sunday, December 20, 2015

what happened to the old sstables after apache cassandra compaction is done

Last we study into apache cassandra 1.0.8 compaction and now in this article , we will focus on what will happen after the sstables were compacted. Reading on the class CompactionTask method execute(...) with snippet on the compaction sstables.

1:      ...  
2:      ...  
3:      cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);  
4:      // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up  
5:      for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())  
6:      {  
7:        SSTableReader key = ssTableReaderMapEntry.getKey();  
8:        for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())  
9:          key.cacheKey(entry.getKey(), entry.getValue());  
10:      }  

After the sstables compaction process is done, we see that the new sstable is persist and the old sstables are replaces. After that, the key cache is also updated. Onto the sstables replacements is where we interested in this article. Tracing down execution calls made.

1:    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)  
2:    {  
3:      data.replaceCompactedSSTables(sstables, replacements, compactionType);  
4:    } 

1:    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)  
2:    {  
3:      replace(sstables, replacements);  
4:      notifySSTablesChanged(sstables, replacements, compactionType);  
5:    }

1:    private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
2:    {  
3:      View currentView, newView;  
4:      do  
5:      {  
6:        currentView = view.get();  
7:        newView = currentView.replace(oldSSTables, replacements);  
8:      }  
9:      while (!view.compareAndSet(currentView, newView));  
11:      addNewSSTablesSize(replacements);  
12:      removeOldSSTablesSize(oldSSTables);  
14:      cfstore.updateCacheSizes();  
15:    }

1:    public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)  
2:    {  
3:      for (INotificationConsumer subscriber : subscribers)  
4:      {  
5:        INotification notification = new SSTableListChangedNotification(added, removed, compactionType);  
6:        subscriber.handleNotification(notification, this);  
7:      }  
8:    }  

At this point, replace compacted sstables consists of actual replacements and notify on sstables changed. But first we will take a look at replacement process.

1:      public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
2:      {  
3:        List<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements);  
4:        IntervalTree intervalTree = buildIntervalTree(newSSTables);  
5:        return new View(memtable, memtablesPendingFlush, Collections.unmodifiableList(newSSTables), compacting, intervalTree);  
6:      }

1:      private List<SSTableReader> newSSTables(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
2:      {  
3:        ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables);  
4:        int newSSTablesSize = sstables.size() - oldSSTables.size() + Iterables.size(replacements);  
5:        assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this);  
6:        List<SSTableReader> newSSTables = new ArrayList<SSTableReader>(newSSTablesSize);  
7:        for (SSTableReader sstable : sstables)  
8:        {  
9:          if (!oldSet.contains(sstable))  
10:            newSSTables.add(sstable);  
11:        }  
12:        Iterables.addAll(newSSTables, replacements);  
13:        assert newSSTables.size() == newSSTablesSize : String.format("Expecting new size of %d, got %d while replacing %s by %s in %s", newSSTablesSize, newSSTables.size(), oldSSTables, replacements, this);  
14:        return newSSTables;  
15:      }

1:      private IntervalTree buildIntervalTree(List<SSTableReader> sstables)  
2:      {  
3:        List<Interval> intervals = new ArrayList<Interval>(sstables.size());  
4:        for (SSTableReader sstable : sstables)  
5:          intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));  
6:        return new IntervalTree<SSTableReader>(intervals);  
7:      }

1:    private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)  
2:    {  
3:      for (SSTableReader sstable : newSSTables)  
4:      {  
5:        assert sstable.getKeySamples() != null;  
6:        if (logger.isDebugEnabled())  
7:          logger.debug(String.format("adding %s to list of files tracked for %s.%s",  
8:                sstable.descriptor,, cfstore.getColumnFamilyName()));  
9:        long size = sstable.bytesOnDisk();  
10:        liveSize.addAndGet(size);  
11:        totalSize.addAndGet(size);  
12:        sstable.setTrackedBy(this);  
13:      }  
14:    }  

1:    private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables)  
2:    {  
3:      for (SSTableReader sstable : oldSSTables)  
4:      {  
5:        if (logger.isDebugEnabled())  
6:          logger.debug(String.format("removing %s from list of files tracked for %s.%s",  
7:                sstable.descriptor,, cfstore.getColumnFamilyName()));  
8:        liveSize.addAndGet(-sstable.bytesOnDisk());  
9:        sstable.markCompacted();  
10:        sstable.releaseReference();  
11:      }  
12:    }

1:    /**  
2:     * Mark the sstable as compacted.  
3:     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere  
4:     * except for threads holding a reference.  
5:     */  
6:    public void markCompacted()  
7:    {  
8:      if (logger.isDebugEnabled())  
9:        logger.debug("Marking " + getFilename() + " compacted");  
10:      try  
11:      {  
12:        if (!new File(descriptor.filenameFor(Component.COMPACTED_MARKER)).createNewFile())  
13:          throw new IOException("Unable to create compaction marker");  
14:      }  
15:      catch (IOException e)  
16:      {  
17:        throw new IOError(e);  
18:      }  
20:      boolean alreadyCompacted = isCompacted.getAndSet(true);  
21:      assert !alreadyCompacted : this + " was already marked compacted";  
22:    }

1:    public void releaseReference()  
2:    {  
3:      if (references.decrementAndGet() == 0 && isCompacted.get())  
4:      {  
5:        // Force finalizing mmapping if necessary  
6:        ifile.cleanup();  
7:        dfile.cleanup();  
9:        deletingTask.schedule();  
10:      }  
11:      assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;  
12:    }

1:    public void schedule()  
2:    {  
3:      StorageService.tasks.submit(this);  
4:    }  

1:    /**  
2:     * Resizes the key and row caches based on the current key estimate.  
3:     */  
4:    public synchronized void updateCacheSizes()  
5:    {  
6:      long keys = estimateKeys();  
7:      keyCache.updateCacheSize(keys);  
8:      rowCache.updateCacheSize(keys);  
9:    }  

As shown above, there are many things even in the replacement process! We can summarized based on the code trace above,

  • an interval tree is built using the replacement sstable. After that, that new view is returned.
  • the process above is repeated until the view become equal.
  • addNewSSTablesSize make the replacement sstable become active.
  • finally it is time to remove the old sstables.
  • the old sstables will be marks as compacted and then remove when it is no longer reference by threads.

Onto the method notifySSTablesChanged(),

1:    public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)  
2:    {  
3:      for (INotificationConsumer subscriber : subscribers)  
4:      {  
5:        INotification notification = new SSTableListChangedNotification(added, removed, compactionType);  
6:        subscriber.handleNotification(notification, this);  
7:      }  
8:    }  

For each of the subscribers, the sstable list change is notified and class that implement the interface should handle the changed.

Saturday, December 19, 2015

Learning into samsung ssd 850 PRO

In the last article, I talk about an ssd vs hdd comparison which you can be found here. In this article, I will spend sometime to learn into samsung ssd 850. Samsung ssd 850 pro is a solid state drive. At this moment, this drive produced a staggering of IOPS 100,000 read IOPS and 90,000 write IOPS[19] . Support sata interface of 6gigabits per seconds.

Additional characteristics of this model.

  • 4 KB aligned random I/O at QD32
  • 10,000 read IOPS, 36,000 write IOPS at QD1
  • 550 MB/s sequential read, 520 MB/s sequential write on 256 GB and larger models
  • 550 MB/s sequential read, 470 MB/s sequential write on 128 GB model[19]

The capacity for this model range from 128GB, 256GB, 512GB, 1024GB, 2048GB. A 2.5inch form factor with a dimension of 100W x 69.85H x 6.8D in millimeter. The primary storage is made of Samsung 3-core MEX controller with DRAM as the caching memory. Caching memory size with respect to the ssd capacity are 256 MB (128 GB) or 512 MB (256 GB & 512 GB) or 1 GB (1 TB) LPDDR2 or 2 GB LPDDR3 respectively. From the official sites, the following performances are outlines.

 Sequential Read         Max. 550 MB/s  
 Sequential Write        Max. 520 MB/s (256 GB/512 GB/1 TB/2 TB)  
                         Max. 470 MB/s (128 GB)  
 4KB Random Read (QD1)   Max. 10,000 IOPS  
 4KB Random Write (QD1)  Max. 36,000 IOPS  
 4KB Random Read (QD32)  Max. 100,000 IOPS  
 4KB Random Write (QD32) Max. 90,000 IOPS  

With this blazing speed, as for me as a software engineer, I really think the speed help me in term of application launching like eclipse and especially code grepping or file finding. But I think this speed is definitely a ideal match for software like cassandra and elasticsearch where disk I/O speed is much sought after. With the number show above and with my previously hdparm comparison, I think it will give very much difference if switch from normal hdd spinning disk to this ssd.

Full support for TRIM, garbage collection, and SMART technology are guaranteed. With a staggering reliability of 2 million hours mean time between failures and 150Terabytes written and TEN YEARS warranty period, I must say I am impressed and convinced. The power consumption is also very low. During active read/write, maximum of 3.5W and 3.0W respectively. If you are using this for laptop which power is always a matter of concern, ssd is a wise decision.

With 24 awards under the belt for this model, I think for the skeptical section of people, it is safe to really start to use ssd. If you have a budget, I think rather than just up the cpu and ram, a switch from hdd to ssd should give very much difference too.

I leave this article with samsung 850 pro ssd videos

Friday, December 18, 2015

Learn java util concurrent part5

This is the last series of learning into java util concurrent package. If you have not read the series before, you can find part1, part2, part3 and part4 at the respectively links. In this series, we will study remaining 19 classes in java.util.concurrent package.


  • Abstract base class for tasks that run within a ForkJoinPool.

1:        ForkJoinTask<Integer> fjt = ForkJoinTask.adapt(new Summer(44,55));  
2:        fjt.invoke();  
3:        Integer sum = fjt.get();  
4:        System.out.println(sum);  
5:        System.out.println(fjt.isDone());  
6:        fjt.join();  

Noticed that a new callable class was adapted into ForkJoinTask. The execution is commenced with invokeking the task. You can check if the task is complete using isDone method.


  • A thread managed by a ForkJoinPool, which executes ForkJoinTasks.

1:        ForkJoinWorkerThreadFactory customFactory = new ForkJoinWorkerThreadFactory() {  
2:           @Override  
3:           public ForkJoinWorkerThread newThread(ForkJoinPool pool) {  
4:              return null;  
5:           }  
6:        };  

As explained by ForkJoinWorkerThread javadoc, ForkJoinWorkerThreadFactory return a thread from the pool.


  • A cancellable asynchronous computation.
  • A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask implements Runnable, a FutureTask can be submitted to an Executor for execution.

1:        FutureTask<Integer> ft = new FutureTask<Integer>(new Summer(66,77));  
3:        System.out.println(ft.get());  

If you have a long running task, you can use FutureTask so the task can be cancel. Next, we will go into another three queues.


  • An optionally-bounded blocking deque based on linked nodes.
  • The capacity, if unspecified, is equal to Integer.MAX_VALUE.
  • Most operations run in constant time (ignoring time spent blocking). Exceptions include remove, removeFirstOccurrence, removeLastOccurrence, contains, iterator.remove(), and the bulk operations, all of which run in linear time.


  • An optionally-bounded blocking queue based on linked nodes.
  • This queue orders elements FIFO (first-in-first-out).
  • The head of the queue is that element that has been on the queue the longest time.
  • The tail of the queue is that element that has been on the queue the shortest time.
  • New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
  • The capacity, if unspecified, is equal to Integer.MAX_VALUE.


  • An unbounded TransferQueue based on linked nodes.
  • This queue orders elements FIFO (first-in-first-out) with respect to any given producer.
  • The head of the queue is that element that has been on the queue the longest time for some producer.
  • the size method is NOT a constant-time operation.

1:        LinkedBlockingDeque<Integer> lbd = new LinkedBlockingDeque<Integer>();  
2:        lbd.add(1);  
3:        lbd.add(2);  
4:        lbd.add(3);  
6:        LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<Integer>();  
7:        lbq.add(4);  
8:        lbq.add(5);  
9:        lbq.add(6);  
11:        LinkedTransferQueue<Integer> ltq = new LinkedTransferQueue<Integer>();  
12:        ltq.add(7);  
13:        ltq.add(8);  
14:        ltq.add(9);  

Like the queues we talked about in part4, these queues will shown its benefits under multithreaded codes.


  • A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage.
  • This implementation restricts the maximum number of parties to 65535.

1:        Phaser phaser = new Phaser();  
2:        phaser.register();  
3:        System.out.println("current phase number : " + phaser.getPhase());  
4:        testPhaser(phaser, 2000);  
5:        testPhaser(phaser, 4000);  
6:        testPhaser(phaser, 6000);  
8:        phaser.arriveAndDeregister();  
9:        Thread.sleep(10000);  
10:        System.out.println("current phase number : " + phaser.getPhase());  


  • An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.
  • This class does not permit null elements.
  • The Iterator provided in method iterator() is not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order

1:        PriorityBlockingQueue<Integer> pbq = new PriorityBlockingQueue<Integer>();  
2:        pbq.add(10);  
3:        pbq.add(11);  
4:        pbq.add(12);  


  • A recursive resultless ForkJoinTask.

1:        long[] array = {1,3,2,5,4,9,5,7,8};  
2:        RecursiveAction ar = new SortTask(array);  
3:        ar.invoke();  
4:        System.out.println("array " + array[0]);  
5:        System.out.println("array " + array[1]);  
6:        System.out.println("array " + array[2]);  
7:        System.out.println("array " + array[3]);  
8:        System.out.println("array " + array[4]);  
9:        System.out.println("array " + array[5]);  
10:        System.out.println("array " + array[6]);  
11:        System.out.println("array " + array[7]);  
12:        System.out.println("array " + array[8]);  
14:     static class SortTask extends RecursiveAction {  
16:        final long[] array;  
17:        final int lo, hi;  
19:        SortTask(long[] array, int lo, int hi) {  
20:           this.array = array;  
21:           this.lo = lo;  
22:           this.hi = hi;  
23:        }  
25:        SortTask(long[] array) {  
26:           this(array, 0, array.length);  
27:        }  
29:        @Override  
30:        protected void compute() {  
31:           if (hi - lo < THRESHOLD)  
32:              sortSequentially(lo,hi);  
33:           else {  
34:              int mid = (lo + hi) >>> 1;  
35:              invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi));  
36:              merge(lo, mid, hi);  
37:           }  
38:        }  
40:        // implementation details follow:  
41:        static final int THRESHOLD = 1000;  
43:        void sortSequentially(int lo, int hi) {  
44:           Arrays.sort(array, lo, hi);  
45:        }  
47:        void merge(int lo, int mid, int hi) {  
48:           long[] buf = Arrays.copyOfRange(array, lo, mid);  
49:           for (int i = 0, j = lo, k = mid; i < buf.length; j++)  
50:              array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];  
51:        }  
53:     }  

recursive sorting to the array by invoke commence to the object ar.


  • A recursive result-bearing ForkJoinTask.

1:        RecursiveTask<Integer> fibo = new Fibonacci(10);  
2:        fibo.invoke();  
3:        System.out.println(fibo.get());  
5:     static class Fibonacci extends RecursiveTask<Integer> {  
7:        final int n;  
9:        Fibonacci(int n) {  
10:           this.n = n;  
11:        }  
13:        protected Integer compute() {  
14:           if (n <= 1)  
15:              return n;  
16:           Fibonacci f1 = new Fibonacci(n - 1);  
17:           f1.fork();  
18:           Fibonacci f2 = new Fibonacci(n - 2);  
19:           return f2.compute() + f1.join();  
20:        }  
21:     }  


  • A ThreadPoolExecutor that can additionally schedule commands to run after a given delay, or to execute periodically.

1:        ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(10);  
2:        Future<Integer> total = stpe.submit(new Summer(88,99));  
3:        System.out.println(total.get());  
4:        stpe.shutdown();  


  • A counting semaphore.
  • Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource.

1:        ConnectionLimiter cl = new ConnectionLimiter(3);  
2:        URLConnection conn = cl.acquire(new URL(""));  
3:        conn = cl.acquire(new URL(""));  
4:        conn = cl.acquire(new URL(""));  
5:        cl.release(conn);  
7:     static class ConnectionLimiter {  
8:        private final Semaphore semaphore;  
10:        private ConnectionLimiter(int max) {  
11:           semaphore = new Semaphore(max);  
12:        }  
14:        public URLConnection acquire(URL url) throws IOException, InterruptedException {  
15:           semaphore.acquire();  
16:           return url.openConnection();  
17:        }  
19:        public void release(URLConnection conn) {  
20:           try {  
21:              // blahblah  
22:           } finally {  
23:              semaphore.release();  
24:           }  
25:        }  
26:     }  


  • A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa.
  • This queue does not permit null elements.

1:        final SynchronousQueue<String> queue = new SynchronousQueue<String>();  
2:        Thread a = new Thread(new QueueProducer(queue));  
3:        a.start();  
4:        Thread b = new Thread(new QueueConsumer(queue));  
5:        b.start();  
7:        Thread.sleep(1000);  
9:        a.interrupt();  
10:        b.interrupt();  
13:     static class QueueProducer implements Runnable {  
15:        private SynchronousQueue<String> queue;  
17:        public QueueProducer(SynchronousQueue<String> queue) {  
18:           this.queue = queue;  
19:        }  
21:        @Override  
22:        public void run() {  
23:           String event = "SYNCHRONOUS_EVENT";  
24:           String another_event = "ANOTHER_EVENT";  
26:           try {  
27:              queue.put(event);  
28:              System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event);  
30:              queue.put(another_event);  
31:              System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), another_event);  
32:           } catch (InterruptedException e) {  
33:           }  
35:        }  
37:     }  
39:     static class QueueConsumer implements Runnable {  
41:        private SynchronousQueue<String> queue;  
43:        public QueueConsumer(SynchronousQueue<String> queue) {  
44:           this.queue = queue;  
45:        }  
47:        @Override  
48:        public void run() {  
49:           try {  
50:              String event = queue.take();  
51:              // thread will block here  
52:              System.out.printf("[%s] consumed event : %s %n", Thread.currentThread().getName(), event);  
53:           } catch (InterruptedException e) {  
54:           }  
56:        }  
58:     }  


  • A random number generator isolated to the current thread.

1:        ThreadLocalRandom tlr = ThreadLocalRandom.current();  
2:        System.out.println(tlr.nextInt());  


  • An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.

1:        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(4);  
2:        ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, blockingQueue);  

The last four classes are policies of the previous ThreadPoolExecutor which execute under specific condition.


  • A handler for rejected tasks that throws a RejectedExecutionException.


  • A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded.


  • A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded.


  • A handler for rejected tasks that silently discards the rejected task.

1:        ThreadPoolExecutor.AbortPolicy ap = new ThreadPoolExecutor.AbortPolicy();  
2:        try {  
3:        ap.rejectedExecution(() -> System.out.println("abort"), tpe);  
4:        } catch (Exception e) {  
6:        }  
8:        ThreadPoolExecutor.CallerRunsPolicy crp = new ThreadPoolExecutor.CallerRunsPolicy();  
9:        try {  
10:        crp.rejectedExecution(() -> System.out.println("run"), tpe);  
11:        } catch (Exception e) {  
13:        }  
15:        ThreadPoolExecutor.DiscardOldestPolicy dop = new ThreadPoolExecutor.DiscardOldestPolicy();  
16:        try {  
17:        dop.rejectedExecution(() -> System.out.println("abort"), tpe);  
18:        } catch (Exception e) {  
20:        }  
22:        ThreadPoolExecutor.DiscardPolicy dp = new ThreadPoolExecutor.DiscardPolicy();  
23:        try {  
24:        dp.rejectedExecution(() -> System.out.println("discard"), tpe);  
25:        } catch (Exception e) {  
27:        }  

That's it for these long learning series of java util concurrent.