在完成了Apache Cassandra的四个基本学习步骤之后,可以尝试下实战性的编码了。
如有必要,建议再简单回顾一下:
Apache Cassandra Learning Step by Step (1)
Apache Cassandra Learning Step by Step (2): Core Concepts
Apache Cassandra Learning Step by Step (3): Samples ABC
Apache Cassandra Learning Step by Step (4): Data Modeling
基于第四点的建模思路,接下来我们要做的,就是搭建一个叫做JTwissandra的实战性项目,就是所谓的Java版本的Twissandra了。
其目的是为了以Twitter为假想对象,使用最简约(或者直接说简陋得了)的建模和实现,表达采用Apache Cassandra作为NoSQL平台的基本实现过程。
JTwissandra的基本编码环境:
1. Maven来管理
2. JUnit来测试
3. 基于hector client来作为Apache Cassandra的Java 客户端
大家可以通过下面的Github链接,直接clone出来最新的代码:
JTwissandra: https://github.com/itstarting/jtwissandra
也欢迎大家Fork或在这里直接拍砖——反正咱在NoSQL也是新手,脸皮厚点不要紧啦:)
1. 首先需要一个HFactoryHelper来初始化并建立Cassandra的客户端连接池和必要的对象:
import java.io.IOException;
import java.util.Properties;
import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.factory.HFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper for Cassandra initialization
*
* @author bright_zheng
*
*/
public class HFactoryHelper {
private static Logger logger = LoggerFactory.getLogger(HFactoryHelper.class);
private static Cluster cluster;
private static Keyspace keyspace = initKeyspace();
private static Properties properties;
private HFactoryHelper(){}
public static Keyspace getKeyspace(){
return keyspace;
}
private static Keyspace initKeyspace() {
properties = new Properties();
try {
properties.load(HFactoryHelper.class.getResourceAsStream("/config.properties"));
} catch (IOException ioe) {
ioe.printStackTrace();
}
String cluster_name = properties.getProperty("cluster.name", "Test Cluster");
logger.debug("cluster.name={}", cluster_name);
String cluster_hosts = properties.getProperty("cluster.hosts", "127.0.0.1:9160");
logger.debug("cluster.hosts={}", cluster_hosts);
String active_keyspace = properties.getProperty("keyspace", "JTWISSANDRA");
logger.debug("keyspace={}", active_keyspace);
cluster = HFactory.getOrCreateCluster(cluster_name, cluster_hosts);
ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.ONE);
return HFactory.createKeyspace(
active_keyspace,
cluster,
ccl);
}
}
2. 建立各项业务服务的基类BaseService。
import java.util.UUID;
import me.prettyprint.cassandra.serializers.LongSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.clock.MicrosecondsClockResolution;
import me.prettyprint.cassandra.utils.TimeUUIDUtils;
import me.prettyprint.hector.api.ClockResolution;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import bright.zheng.jtwissandra.HFactoryHelper;
/**
* Base service which all business services should extend
*
* @author bright_zheng
*
*/
public class BaseService {
protected Logger logger = LoggerFactory.getLogger(getClass());
protected static Keyspace KEYSPACE = HFactoryHelper.getKeyspace();
protected static final String CF_USER = "USER";
protected static final String CF_FRIEND = "FRIEND";
protected static final String CF_FOLLOWER = "FOLLOWER";
protected static final String CF_TWEET = "TWEET";
protected static final String CF_TIMELINE = "TIMELINE";
protected static final StringSerializer SERIALIZER_STRING
= StringSerializer.get();
protected static final LongSerializer SERIALIZER_LONG
= LongSerializer.get();
protected static final int TWEETS_LIMIT_DEFAULT = 10;
protected static final int TWEETS_LIMIT_MAX = 50;
protected HColumn<String, String> createColumn(String name, String value) {
return HFactory.createColumn(name, value, SERIALIZER_STRING, SERIALIZER_STRING);
}
protected HColumn<String, Long> createColumn(String name, Long value) {
return HFactory.createColumn(name, value, SERIALIZER_STRING, SERIALIZER_LONG);
}
protected HColumn<Long, String> createColumn(Long name, String value) {
return HFactory.createColumn(name, value, SERIALIZER_LONG, SERIALIZER_STRING);
}
/**
* REF: http://wiki.apache.org/cassandra/FAQ#working_with_timeuuid_in_java
*
* @return UUID
*/
public UUID getUUID(){
//TODO: which UUID should we use to make sure it's unique?
ClockResolution clock = new MicrosecondsClockResolution();
return TimeUUIDUtils.getTimeUUID(clock);
//return TimeUUIDUtils.getUniqueTimeUUIDinMillis();
}
protected Long getTimestamp(UUID uuid){
//return uuid.timestamp();
return TimeUUIDUtils.getTimeFromUUID(uuid);
}
protected Long generateTimestamp(){
return getTimestamp(getUUID());
}
}
3. 下面是各项业务服务代码:
3.1 UserService
package bright.zheng.jtwissandra.service;
import java.util.UUID;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import bright.zheng.jtwissandra.bean.User;
/**
* User service
*
* @author bright_zheng
*
*/
public class UserService extends BaseService{
/**
* Sample CLI cmd:
* set USER['550e8400-e29b-41d4-a716-446655440000']['user_name'] = 'itstarting';
* set USER['550e8400-e29b-41d4-a716-446655440000']['password'] = '111222';
* set USER['550e8400-e29b-41d4-a716-446655440000']['create_timestamp'] = 1329836819890000;
*
* @param user
*/
public String addUser(User user) {
Mutator<String> mutator = HFactory.createMutator(
KEYSPACE, SERIALIZER_STRING);
UUID uuid = this.getUUID();
String user_uuid = uuid.toString();
Long create_timestamp = this.getTimestamp(uuid);
logger.debug("user_uuid={}", user_uuid);
logger.debug("user_name={}", user.getUser_name());
logger.debug("password={}", user.getUser_password());
logger.debug("create_timestamp={}", create_timestamp);
mutator.addInsertion(user_uuid, CF_USER,
this.createColumn("user_name", user.getUser_name()));
mutator.addInsertion(user_uuid, CF_USER,
this.createColumn("password", user.getUser_password()));
mutator.addInsertion(user_uuid, CF_USER,
this.createColumn("create_timestamp", create_timestamp));
mutator.execute();
//return the generated uuid
return user_uuid;
}
}
3.2 FriendService
package bright.zheng.jtwissandra.service;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.MutationResult;
import me.prettyprint.hector.api.mutation.Mutator;
/**
* Friend service
*
* @author bright_zheng
*
*/
public class FriendService extends BaseService{
/**
* Adding a friend has two business logic:
* 1. Add the friend's uuid to the Friend CF under my uuid
* 2. Add my uuid to the friend's uuid as follower
*
* set FRIEND['550e8400-e29b-41d4-a716-446655440000']['1329836819859000']
* = '550e8400-e29b-41d4-a716-446655440001;
*
* set FOLLOWER['550e8400-e29b-41d4-a716-446655440001']['1329836819859000'']
* = '550e8400-e29b-41d4-a716-446655440000;
*
* @param me
* @param friend
*/
public MutationResult followFriend(String me, String friend) {
Mutator<String> mutator = HFactory.createMutator(
KEYSPACE, SERIALIZER_STRING);
Long timestamp = this.generateTimestamp();
logger.debug("timestamp={}", timestamp);
mutator.addInsertion(me, CF_FRIEND,
this.createColumn(timestamp, friend));
mutator.addInsertion(friend, CF_FOLLOWER,
this.createColumn(timestamp, me));
return mutator.execute();
}
}
3.3 TimelineService
package bright.zheng.jtwissandra.service;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceQuery;
import bright.zheng.jtwissandra.bean.Timeline;
/**
* Timeline service
*
* @author bright_zheng
*
*/
public class TimelineService extends BaseService{
/**
* get specified user's first Timeline
*
* @param user_uuid
* @return
*/
public TimelineWrapper getTimeline(String user_uuid){
return getTimeline(user_uuid, 0L, TWEETS_LIMIT_DEFAULT);
}
/**
* get specified user's Timeline with start point
*
* @param user_uuid
* @param start
* @return
*/
public TimelineWrapper getTimeline(String user_uuid, long start){
return getTimeline(user_uuid, start, TWEETS_LIMIT_DEFAULT);
}
/**
* get specified user's Timeline with start point and limit
*
* @param user_uuid
* @param start
* @param limit
* @return
*/
public TimelineWrapper getTimeline(String user_uuid, long start, int limit){
if (start<0) start = 0;
if (limit<0) limit = TWEETS_LIMIT_DEFAULT;
if (limit>TWEETS_LIMIT_MAX) limit = TWEETS_LIMIT_MAX;
SliceQuery<String, Long, String> sliceQuery =
HFactory.createSliceQuery(KEYSPACE, SERIALIZER_STRING, SERIALIZER_LONG, SERIALIZER_STRING);
sliceQuery.setColumnFamily(CF_TIMELINE);
sliceQuery.setKey(user_uuid);
sliceQuery.setRange(start, Long.MAX_VALUE, false, limit+1);
QueryResult<ColumnSlice<Long, String>> result = sliceQuery.execute();
List<HColumn<Long, String>> list = result.get().getColumns();
long next = 0L;
if(list==null){
return new TimelineWrapper(null, next);
}else if (list.size()<=limit){
return new TimelineWrapper(convertToTimeline(list), 0L);
}else{
HColumn<Long,String> last = list.get(list.size()-1);
next = last.getName(); //the name is the timestamp as the "next" start
list.remove(list.size()-1);
return new TimelineWrapper(convertToTimeline(list), next);
}
}
private List<Timeline> convertToTimeline(List<HColumn<Long,String>> cols){
Iterator<HColumn<Long,String>> it = cols.iterator();
List<Timeline> result = new ArrayList<Timeline>();
while(it.hasNext()){
HColumn<Long,String> col = it.next();
result.add(new Timeline(col.getValue(), col.getName()));
}
return result;
}
public class TimelineWrapper{
private List<Timeline> timelines;
private long nextTimeline;
public TimelineWrapper(List<Timeline> timelines, long nextTimeline){
this.timelines = timelines;
this.nextTimeline = nextTimeline;
}
public long getNextTimeline() {
return nextTimeline;
}
public List<Timeline> getTimelines() {
return timelines;
}
}
}
3.4 TweetService
package bright.zheng.jtwissandra.service;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.beans.Rows;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.MultigetSliceQuery;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceQuery;
import bright.zheng.jtwissandra.bean.Tweet;
/**
* Tweet service
*
* @author bright_zheng
*
*/
public class TweetService extends BaseService{
/**
* Adding a tweet has following logic:
* 1. Save the tweet to CF of TWEET
* 2. Add the new tweet to my TIMELINE
* 3. Add the new tweet to all my followers' TIMELINE
*
* @param me
* @param friend
*/
public String addTweet(String user_uuid, String tweet_content) {
Mutator<String> mutator = HFactory.createMutator(
KEYSPACE, SERIALIZER_STRING);
//the tweet uuid
UUID uuid = this.getUUID();
String tweet_uuid = uuid.toString();
logger.debug("tweet_uuid={}", tweet_uuid);
//the timestamp to build the timeline
Long timestamp = this.getTimestamp(uuid);
logger.debug("timestamp={}", timestamp);
mutator.addInsertion(tweet_uuid, CF_TWEET,
this.createColumn("user_uuid", user_uuid));
mutator.addInsertion(tweet_uuid, CF_TWEET,
this.createColumn("tweet_content", tweet_content));
mutator.addInsertion(user_uuid, CF_TIMELINE,
this.createColumn(timestamp, tweet_uuid));
// get back all my follower and insert the tweet to his/her TIMELINE
SliceQuery<String, Long, String> sliceQuery =
HFactory.createSliceQuery(KEYSPACE, SERIALIZER_STRING, SERIALIZER_LONG, SERIALIZER_STRING);
sliceQuery.setColumnFamily(CF_FOLLOWER);
sliceQuery.setKey(user_uuid);
sliceQuery.setRange(Long.MIN_VALUE, Long.MAX_VALUE, false, 500); //TODO: 500 followers hard code here?
QueryResult<ColumnSlice<Long, String>> result = sliceQuery.execute();
Iterator<HColumn<Long, String>> followers = result.get().getColumns().iterator();
while(followers.hasNext()) {
HColumn<Long, String> follower = followers.next();
String follower_uuid = follower.getValue();
logger.debug("follower's uuid={}", follower_uuid);
logger.debug("timestamp={}", follower.getName());
//insert the tweet to the follower's TIMELINE
mutator.addInsertion(follower_uuid, CF_TIMELINE,
this.createColumn(timestamp, tweet_uuid));
}
mutator.execute();
//return the new generated tweet's uuid
return tweet_uuid;
}
/**
* Should we add this service?
*
* @param tweet_uuid
* @return
*/
public Tweet getTweet(String tweet_uuid){
return null;
}
public List<Tweet> getTweets(List<String> tweet_uuids){
MultigetSliceQuery<String, String, String> multigetSlicesQuery =
HFactory.createMultigetSliceQuery(KEYSPACE, SERIALIZER_STRING, SERIALIZER_STRING, SERIALIZER_STRING);
multigetSlicesQuery.setColumnFamily(CF_TWEET);
multigetSlicesQuery.setColumnNames("user_uuid","tweet_content");
multigetSlicesQuery.setKeys(tweet_uuids);
QueryResult<Rows<String, String, String>> results = multigetSlicesQuery.execute();
return convertRowsToTweets(results.get());
}
private List<Tweet> convertRowsToTweets(Rows<String, String, String> rows){
List<Tweet> list = new ArrayList<Tweet>();
Iterator<Row<String, String, String>> iterator = rows.iterator();
while(iterator.hasNext()){
Row<String, String, String> row = iterator.next();
ColumnSlice<String, String> cs = row.getColumnSlice();
list.add(new Tweet(row.getKey(),
cs.getColumnByName("tweet_content").getValue(),
cs.getColumnByName("user_uuid").getValue()));
}
return list;
}
}
4. 当然少不了JUnit测试用例了:
package bright.zheng.jtwissandra;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import junit.framework.Assert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import bright.zheng.jtwissandra.bean.Timeline;
import bright.zheng.jtwissandra.bean.Tweet;
import bright.zheng.jtwissandra.bean.User;
import bright.zheng.jtwissandra.service.FriendService;
import bright.zheng.jtwissandra.service.TimelineService;
import bright.zheng.jtwissandra.service.TimelineService.TimelineWrapper;
import bright.zheng.jtwissandra.service.TweetService;
import bright.zheng.jtwissandra.service.UserService;
/**
* Test cases for all services currently provided.
* Please drop and create schema first and then run all cases as one round
* The 'me' and 'friend' will be created each round dynamically for easier testing
*
* @author bright_zheng
*
*/
public class ServiceTest{
Logger logger = LoggerFactory.getLogger(ServiceTest.class);
private static UserService SERVICE_USER = new UserService();
private static FriendService SERVICE_FRIEND = new FriendService();
private static TweetService SERVICE_TWEET = new TweetService();
private static TimelineService SERVICE_TIMELINE = new TimelineService();
private static String me;
private static String friend;
private static long nextTimeline = 0L;
@BeforeClass
public static void setUp(){
//
}
@Test
public void addUser() {
logger.debug("=====================addUser{====================");
//add user 1
me = SERVICE_USER.addUser(new User("itstarting","1234"));
logger.debug("This round of tesing, ME={}", me);
Assert.assertNotNull(me);
//add user 2
friend = SERVICE_USER.addUser(new User("test1","1234"));
logger.debug("This round of tesing, FRIEND={}", friend);
Assert.assertNotNull(friend);
logger.debug("=====================}//addUser====================");
}
/**
* I'm following a friend
*/
@Test
public void followFriend() {
logger.debug("=====================followFriend{====================");
SERVICE_FRIEND.followFriend(me, friend);
logger.debug("=====================}//followFriend====================");
}
/**
* I'm followed by a follower
*/
@Test
public void followedByFollower() {
logger.debug("=====================followedByFollower{====================");
SERVICE_FRIEND.followFriend(friend, me);
logger.debug("=====================}//followedByFollower====================");
}
/**
* I'm twittering
*/
@Test
public void addTweetByMe() {
logger.debug("=====================addTweetByMe{====================");
for(int i=0; i<100; i++){
String tweet_uuid = SERVICE_TWEET.addTweet(me, "Hellow JTWISSANDRA -- by itstarting:" + i);
Assert.assertNotNull(tweet_uuid);
}
logger.debug("=====================}//addTweetByMe====================");
}
/**
* My friend is twittering
*
*/
@Test
public void addTweetByFriend() {
logger.debug("=====================addTweetByFriend{====================");
for(int i=0; i<100; i++){
String tweet_uuid = SERVICE_TWEET.addTweet(friend, "Hellow JTWISSANDRA -- by test1:" + i);
Assert.assertNotNull(tweet_uuid);
}
logger.debug("=====================}//addTweetByFriend====================");
}
/**
* Get tweets for me
*/
@Test
public void getTweetsByMe(){
logger.debug("=====================getTweetsByMe{====================");
getTweets(me, 0);
logger.debug("=====================}//getTweetsByMe====================");
}
/**
* Get tweets at next Timeline (if any)
*/
@Test
public void getTweetsByMeForNextTimeline(){
logger.debug("=====================getTweetsByMeForNextTimeline{====================");
if(nextTimeline>0L){
getTweets(me, nextTimeline);
}
logger.debug("=====================}//getTweetsByMeForNextTimeline====================");
}
/**
* Get tweets for my friend
*/
@Test
public void getTweetsByMyFriend(){
logger.debug("=====================getTweetsByMyFriend{====================");
getTweets(friend, 0);
logger.debug("=====================}//getTweetsByMyFriend====================");
}
/**
*
*/
@Test
public void getTweetsByMyFriendForNextTimeline(){
logger.debug("=====================getTweetsByMyFriendForNextTimeline{====================");
getTweets(friend, nextTimeline);
logger.debug("=====================}//getTweetsByMyFriendForNextTimeline====================");
}
private void getTweets(String user_uuid, long start){
TimelineWrapper wrapper = SERVICE_TIMELINE.getTimeline(user_uuid, start);
Assert.assertNotNull(wrapper);
List<Timeline> list = wrapper.getTimelines();
List<String> tweet_uuids = new ArrayList<String>();
for(Timeline timeline: list){
String tweet_uuid = timeline.getTweet_uuid();
logger.debug("From Timeline: tweet_uuid={}, tweet_timestamp={}",
tweet_uuid, timeline.getTweet_timestamp());
tweet_uuids.add(tweet_uuid);
}
List<Tweet> tweets = SERVICE_TWEET.getTweets(tweet_uuids);
Iterator<Tweet> it = tweets.iterator();
while(it.hasNext()){
Tweet tweet = it.next();
logger.debug("From Tweet: tweet_uuid={}, tweet_content={}, user_uuid={}",
new Object[]{tweet.getTweet_uuid(),
tweet.getTweet_content(),
tweet.getUser_uuid()
});
}
if(wrapper.getNextTimeline() > 0L){
logger.debug("The start timeline of next page is: {}", wrapper.getNextTimeline());
nextTimeline = wrapper.getNextTimeline();
}else{
logger.debug("No next page available");
nextTimeline = 0L;
}
}
@AfterClass
public static void shutdown(){
//cluster.getConnectionManager().shutdown();
}
}
这是个一锅端的测试用例,全部跑一次可以覆盖几乎所有的业务服务逻辑。
5. 最后,别忘了在跑之前创建必要的schema:
drop keyspace JTWISSANDRA;
create keyspace JTWISSANDRA
with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy'
and strategy_options = [{replication_factor:1}];
use JTWISSANDRA;
create column family USER
with comparator = UTF8Type
and key_validation_class = UTF8Type
and default_validation_class = UTF8Type
and column_metadata = [
{column_name: user_name, validation_class: UTF8Type,
index_name:user_name_idx, index_type:KEYS }
{column_name: user_password, validation_class: UTF8Type}
{column_name: create_timestamp, validation_class: LongType,
index_name:create_timestamp_idx, index_type:KEYS}
];
create column family FRIEND
with comparator = LongType
and key_validation_class = UTF8Type
and default_validation_class = UTF8Type;
create column family FOLLOWER
with comparator = LongType
and key_validation_class = UTF8Type
and default_validation_class = UTF8Type;
create column family TWEET
with comparator = UTF8Type
and key_validation_class = UTF8Type
and default_validation_class = UTF8Type
and column_metadata = [
{column_name: user_uuid, validation_class: UTF8Type}
{column_name: tweet_content, validation_class: UTF8Type}
];
create column family TIMELINE
with comparator = LongType
and key_validation_class = UTF8Type
and default_validation_class = UTF8Type;
6. 其他?
有些咋想,就不贴了吧,略过……有兴趣的可从Github clone下来跑跑看。