当今时代,数据不再昂贵,但从海量数据中获取价值变得昂贵,而要及时获取价值则更加昂贵,这正是大数据实时计算越来越流行的原因。以百分点公司为例,在高峰期每秒钟会有近万HTTP请求发送到百分点服务器上,这些请求包含了用户行为和个性化推荐请求。如何从这些数据中快速挖掘用户兴趣偏好并作出效果不错的推荐呢?这是百分点推荐引擎面临的首要问题。本文将从系统架构和算法两方面全介绍百分点公司在实时计算方面的经验和心得体会,供读者参考。
a) 实时计算架构
图 1百分点大数据平台原理示意图
工欲善其事,必先利其器。一个稳定可靠且高效的底层架构是实时计算的必要基础。图 1给出了百分点数据大平台的总体框架,如图所示,大数据平台包含数据存储和数据处理两个层次。
存储服务层提供了数据处理层需要的各类分布式存储,包括分布式文件系统(hadoop HDFS)、分布式SQL数据库(MySQL)、分布式NoSQL数据库(Redis、MongoDB、HBase)、分布式消息队列(Apache Kafka)、分布式搜索引擎(Apache Solr)以及必不可少的Apache Zookeeper。
数据处理层由四个部分组成。其中Web应用云包含了所有直接面对用户的Web服务,每个Web应用都会产生Web日志以及其他实时数据,这些数据一方面会及时交由实时计算框架进行处理,另一方面也会定期同步至离线计算框架;实时计算框架会处理接收到的实时数据,并将处理结果输出到数据查询框架或者离线计算框架;离线计算框架则定期对数据进行处理,并将处理结果输出至数据查询框架;数据查询框架提供了一系列应用接口供程序调取需要的各项数据,同时提供了一些Web工具帮助业务人员对海量数据进行统计、汇总和分析。
在百分点大数据平台中,与实时计算密切相关的有实时计算框架和数据查询框架,这部分的组件架构和数据流如图 2所示。
图 2实时计算框架和数据查询框架示意
从图上可以看出,数据采集服务会将收集到的实时数据推送到消息队列Kafka中;Kafka中的数据会被两个处理平台BDM CEP(Big Data Management Complex Event Processing)和Storm消费并处理。Storm是当下比较流行的开源流处理框架,百分点公司在2013年中开始使用Storm进行数据清洗、统计和一部分分析任务。在引入Storm之前,百分点所有的实时计算都是基于BDM CEP进行的,它是我们基于中间件ICE开发的一套流处理平台。BDM CEP包含有四类组件:dispatcher负责从Kafka中读取消息,根据消息内容分发给相应的worker;worker复杂处理接收到的消息,并将处理结果传递给其他worker或者输出到各类存储服务中;config负责维护dispatcher和worker的交互关系和配置信息,并在交互关系或配置更新时及时通知dispatcher和worker;monitor负责监控dispatcher和worker的运行情况,把监控信息提交给Ganglia,monitor还负责系统异常时的报警,以及dispatcher和worker发生故障时进行重启和迁移。数据查询框架由图中最下层的三个组件组成,其中BDM DS(Data Source)封装了一系列的数据查询逻辑并以REST API和ICE服务的形式供各种应用调用;BDM OLAP(Online Analytical Processing)提供了实时查询用户行为和标签明细,以及近实时的用户多维度统计、汇总和分析功能,这些功能是以REST API和Web应用方式提供的;BDM Search是对Solr和HBase的一次封装,以REST API和ICE服务方式对外提供近实时搜索功能。
百分点公司的主要服务都是运行在这套架构上的,它拥有良好的稳定性和扩展性,一般来说只需要增加水平扩展结点即可提高数据处理能力,这为百分点业务的稳定发展奠定了技术基础。
b) 实时计算算法
要真正实现大数据实时计算,光有框架是不行的,还必须针对特定业务开发特定的处理流程和算法。相比较离线计算而言,实时计算在算法方面需要考虑的更多,这是因为实时计算能够用到的存储资源远不如离线,而且处理过程的时间限制要比离线计算严格,这都要求实时计算算法必须做相当多的优化。在这一节中,笔者将以海量计数问题为例介绍百分点公司在实时计算算法方面的经验。
目前,百分点数据平台上包含了近千万的电商单品数据,实时追踪这些单品的浏览和交易数据是必须的,这也是做个性化推荐、商品画像、销量预测和用户画像等业务的必要前提。我们的问题是:如何设计一种算法使得我们可以实时查看任意单品最近24小时的浏览量?这个问题描述起来很简单,但稍加思索就会发现做起来并不容易。下面我们先给出一个简单方案,而后按照一定的原则逐步精化到最佳方案。
c) 简单方案
图 3按秒计数方案
看到这个问题时,大部分读者会很快想到如图 3所示的算法方案。图中红色、蓝色和绿色的方块分别表示不同的单品。在这个方案中,我们为每个单品保存一份浏览信息,它包含两个数据结构:
d) 历史浏览量列表(简称历史),一个列表,列表中每个元素包含一个时间戳和一个整数,分别代表过去24小时中的某一秒及这一秒钟的浏览量,按时间顺序排序。这个列表的最长会包含24*3600=86400个元素,但一般情况下极少有单品时时刻刻都被浏览,我们可以假设这个列表的平均长度不超过10000。
e) 累计浏览量(简称累计量),一个整数,代表截止到最后一次访问时的浏览量。
如图所示,假设蓝色单品对应的数据是 [(t1, a1), (t2, a2), …, (tn, an)]和A。这表示t1时刻的该单品浏览量是a1,t2时刻是a2,tn是最后一次记录到浏览该单品的时刻,浏览量是an。截止到tn,该单品的总浏览量是A。
当单品浏览源源不断进入到消息队列时,处理进程(或线程)P1,P2…会实时读取到这些信息,并修改对应单品的数据信息。例如,P1读取到t时刻对蓝色单品的浏览记录时,会进行下面的操作:
f) 得到当前时刻ct;
g) 对数据库中蓝色单品数据加锁,加锁成功后读取出数据,假设历史是[(t1, a1), (t2, a2), …, (tn, an)],累计量是A;
h) 累计量递增,即从A修改为A+1
i) 如果ct=tn,则更新历史为[(t1, a1), (t2, a2), …, (tn, an+1)],否则更新为[(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最后删除时间戳小于ct-24*3600的列表元素,删除的同时从累计量中减去对应时刻的浏览量,例如只有元素t1> ct-24*3600,则操作完成后的浏览量为A+1-a1;
j) 将新的历史和累计量输出至数据库,释放锁。
不难验证这个方案是可以正确得出每个单品24小时内的浏览量的,并且只要在资源(计算、存储和网络)充足的情况下,数据库中单品的浏览量是实时更新的。这个方案也是分布式实时计算中最简单最常见的一种模式。
k) 避免锁
图 4不包含锁的方案
第一个方案中需要对数据库加锁,无论加锁粒度多细,都会严重影响计算效率。虽然像Redis一类的内存数据库提供了incr这样的原子操作,但这种操作多数情况下只适用于整型数据,并不适合本问题的历史数据。
要想提高实时处理效率,避免锁是非常重要的。一种常见的做法是将并行操作串行化,就像MapReduce中的Reduce阶段一样,将key相同的数据交由同一个reducer处理。基于这个原理,我们可以将方案改造为如图 4所示,我们新增一个数据分发处理过程,它的作用是保证同一个单品的所有数据都会发送给同一个处理程序。例如将蓝色单品交由P1处理,红色交由P2处理,绿色交由P3处理。这样P1在处理过程中不需要对数据库加锁,因为不存在资源竞争。这样可以极大的提高计算效率,于是整个计算过程变为:
l) 得到当前时刻ct;
m) 读取数据库中蓝色单品信息,假设历史是[(t1, a1), (t2, a2), …, (tn, an)],累计量是A;
n) 累计递增,即从A修改为A+1
o) 如果ct=tn,则更新历史为[(t1, a1), (t2, a2), …, (tn, an+1)],否则更新为[(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最后删除时间戳小于ct-24*3600的列表元素,删除的同时从累量中减去对应时刻的浏览量;
p) 将新的历史和累计量输出至数据库。
步骤b)和e)省去了锁操作,整个系统的并发性和吞吐量会得到大大提高。当然,没有免费的午餐,这种方案的缺点在于存在单点隐患,例如一旦P1由于某些原因挂掉了,那么蓝色单品的数据将得不到及时处理,计数结果将无法保证实时。这种计算过程对系统监控和故障转移会有很高的要求。
q) 数据分层
图 5带有本地缓存的方案
方案二已经可以大大提高计算效率,但这还不够,我们可以看到在计算步骤b)和e)中总是要把历史和累计量同时从数据库中读出或写入,实际上这是没有必要的,因为只有累计量才是外部必须使用的数据,而历史只是算法的中间数据。这样,我们可以区别对待历史和累计量,我们将历史和累计量都缓存在计算进程中,定期更新历史至数据库,而累计量则实时更新。新的方案如图 5所示,计算过程变为:
r) 得到当前时刻ct;
s) 如果本地没有蓝色单品的信息,则从数据库中读取蓝色单品信息;否则直接使用本地缓存的信息。假设历史是[(t1, a1), (t2, a2), …, (tn, an)],累计量是A;
t) 累计量递增,即从A修改为A+1
u) 如果ct=tn,则更新历史为[(t1, a1), (t2, a2), …, (tn, an+1)],否则更新为[(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最后删除时间戳小于ct-24*3600的列表元素,删除的同时从累计量中减去对应时刻的浏览量;
v) 将新的累计量输出至数据库;如果满足一定的条件(例如上次输出时间足够久远,或者处理的消息量达到一定数量),则将历史输出至数据库。
这种方案可以大大降低数据库压力、数据IO和序列化反序列化次数,从而提高整个系统的处理效率。数据分层实际上是计算机中一种常用的路数,例如硬件中的高速缓存/内存/磁盘,系统IO中的缓冲区/磁盘文件,数据库的内存索引、系统DNS缓存等等。我们使用的开源搜索引擎Solr就使用了同样的思路达到近实时索引。Solr包含磁盘全量索引和实时增加的内存增量索引,并引入了“soft提交”的方式更新新索引。新数据到达后,Solr会使用“soft”提交的方式更新内存增量索引,在检索的时候通过同时请求全量索引和增量索引并合并的方式获得到最新的数据。之后会在服务器空闲的时候,Solr会把内存增量索引合并到磁盘全量索引中保证数据完整。
当然,这种方案也对系统的稳定性提出了更高的要求,因为一旦P1挂掉那么它缓存的数据将丢失,及时P1及时重启,这些数据也无法恢复,那么在一段时间内我们将无法得到准确的实时浏览量。
w) 模糊化
现在,我们来考虑存储资源问题。假设时间戳和整型都用long类型(8字节)保存,那么按照方案一中的估计,我们对每个单品的需要记录的数据大小约为10000×(8+8)+8=16008字节≈156KB,1000万单品的数据总量将超过1T,如果考虑到数据库和本地缓存因素,那么整个系统需要的存储量至少是2T!这对于计数这个问题而言显然是得不偿失的,我们必须尝试将数据量降低,在这个问题中可行的是降低历史的存储精度。我们将历史定义为小时级别精度,这样每个单品的历史至多有24个,数据量最多392字节,1000万单品的信息总量将变为3.6G,系统总的存储量不超过8G,这是可以接受的。如果考虑用int类型代替long类型存储时间(小时数),则存储量可以进一步降低到不足6G。这样新的计算过程变为:
x) 得到当前时刻精确到小时的部分ct;
y) 如果本地没有蓝色单品的信息,则从数据库中读取蓝色单品信息;否则直接使用本地缓存的信息。假设历史是[(t1, a1), (t2, a2), …, (tn, an)],累计量是A;
z) 累计量递增,即从A修改为A+1
aa) 如果ct=tn,则更新历史为[(t1, a1), (t2, a2), …, (tn, an+1)],否则更新为[(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最后删除小时数小于ct-24的列表元素,删除的同时从累计量中减去对应时刻的浏览量;
ab) 将新的浏览量输出至数据库;如果满足一定的条件,则将历史输出至数据库。
在这种方案下,数据库中存储的并不是过去24小时内的浏览量,而是过去23小时多一点内的。例如在1月2日12:15时数据库中的浏览量实际上是1月1日13:00到1月2日12:15的浏览量!
这种降低数据精度的方法我们可以称之为模糊化,它是用资源换效率的一种方法。在对数据精确性不是特别敏感的领域,这种方法可以大大降低系统资源使用量、提高系统的处理效率。利用模糊化的实时算法快速得到近似结果,而后用离线算法慢慢修正结果的精确度,是百分点在大数据处理中经常使用的招数。
ac) 局部精化
图 6局部精华示意图
有时候,模糊化会掩盖掉一些重要的细节信息,达不到业务需求的要求。例如,电商有很多的秒杀活动,此时必须及时监测单品浏览量,如果我们还按小时维度进行计算,那显然不能满足要求。这种情况下我们就必须对局部数据进行细化,它是模糊化的逆操作,我们称之为局部精化。如图 6所示,第k小时的数据是很敏感的,我们希望它的数据能更实时一些,那我们可以将第k小时的数据切分的更细,对它做10分钟、分钟甚至秒级别的计算,而其他时间段仍旧采用小时精度。
这种方案会增加系统的设计和开发难度,而且必须有灵活的配置才能满足多变的业务需求。
ad) 数据建模
除了局部细化,还有一种方法可以提高数据的精确度,这就是数据建模。在方案四中我们提到在小时精度下,实际上只能得到23小时多一点之前的浏览量,有一部分数据丢失了没有用到。实际上我们可以将丢弃掉的数据利用起来得到更好的结果。最简单思路是假设同一小时内单品的浏览量是线性增加的,那么我们显然可以利用相邻两个小时的浏览历史推算出任意时刻的浏览量。回到方案四中的例子,1月2日12:15的实时浏览量可以通过下面的公式计算得出:
[a0 + (a1-a0)×(60-15)/60] + a1 + … + a24
其中a0代表1月1日12:00到13:00之间的浏览量,依次类推,a24代表1月2日12:00到12:15之间的浏览量。公式中的a0 + (a1-a0)×(60-15)/60 估计了1月1日12:15-13:00之间的浏览量,这样就得出了从1月1日12:15到1月2日12:15之间24小时内的浏览量。
图 7某单品的全天浏览分布
我们还可以利用更复杂的浏览量分布模型得出精度更高的估计,图 7给出了某单品一天的浏览分布曲线,这个分布适用于绝大多数的商品以及绝大多数的时间。因此,我们完全可以利用这个分布来更精确的估计每个单品的浏览量,利用这个模型我们甚至不需要记录浏览历史,只需要知道当天0:00到当前的浏览总量就可以计算出前24小时内的浏览量,甚至预测接下来的浏览量情况!
当然,模型也不是万能的,模型本身的建立和更新也是有代价的,如果建模方法不恰当或者模型更新不及时,很有可能得出的结果会很差。
ae) 小结
本文首先介绍了百分点公司大数据平台的基本原理,并详细说明了其中与实时计算相关部分,实时计算框架和数据查询框架,的系统架构、处理流程和应用。而后,我们以海量数据计数问题为例,深入浅出的介绍了在百分点公司在实时计算算法中常用的方法和技巧,以及它们适用的场景和可能带来的问题。这些方法和技巧具有普遍性和通用性,被广泛应用于百分点个性化推荐引擎的各个模块,包括用户意图预测、用户画像、个性化推荐评分、商品分类等等。如果能在实际业务中灵活运用这些方法和技巧,则能够大大提高实时计算的数据规模和处理效率,帮助业务快速发展。希望本文的介绍能够帮助读者更好的理解大数据实时计算的方方面面。