摘要:方法首先初始化一個(gè)回調(diào)函數(shù),這是當(dāng)一個(gè)成為之后就會(huì)調(diào)用的一個(gè)用于初始化一系列變量的方法,包括拓?fù)淙绾卧诩荷戏峙?,拓?fù)錉顟B(tài)更新,清除函數(shù),還有監(jiān)控線程等。
寫(xiě)在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯(cuò)誤或者沒(méi)有詳細(xì)解釋,如果在閱讀過(guò)程發(fā)現(xiàn)錯(cuò)誤很歡迎在文章下面評(píng)論指出。文章后續(xù)會(huì)陸續(xù)更新,可以關(guān)注或者收藏,轉(zhuǎn)發(fā)請(qǐng)先私信我,謝謝。對(duì)了,筆者看的是2.2.1這個(gè)版本
概述??JStorm是一個(gè)分布式的實(shí)時(shí)計(jì)算引擎,是阿里巴巴根據(jù)storm的流處理模型進(jìn)行重寫(xiě)的一個(gè)框架,支持相同的邏輯模型(也就是拓?fù)浣Y(jié)構(gòu)),然后底層的實(shí)現(xiàn)卻大有不同。不過(guò)本文并不是打算對(duì)兩個(gè)框架進(jìn)行比較,接下來(lái)我會(huì)從源碼的角度上來(lái)解析JStorm是如何工作的。
??作為第一個(gè)篇章,筆者先來(lái)介紹下nimbus以及它啟動(dòng)的時(shí)候做了什么。JStorm的主節(jié)點(diǎn)上運(yùn)行著nimbus的守護(hù)進(jìn)程,這個(gè)進(jìn)程主要負(fù)責(zé)與ZK通信,分發(fā)代碼,給集群中的從節(jié)點(diǎn)分配任務(wù),監(jiān)視集群狀態(tài)等等。此外nimbus需要維護(hù)的所有狀態(tài)都會(huì)存儲(chǔ)在ZK中,JStorm為了減少對(duì)ZK的訪問(wèn)次數(shù)做了一些緩存,這個(gè)后續(xù)代碼分析會(huì)說(shuō)到。以上是nimbus功能的簡(jiǎn)介,接下來(lái)我們從源碼的角度看看Nimbus到底做了什么。首先在Nimbus啟動(dòng)的時(shí)候:
//設(shè)置主線程由于未捕獲異常而突然中止時(shí)調(diào)用的默認(rèn)程序 Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler()); //加載集群的配置信息 Map config = Utils.readStormConfig(); //這下面這個(gè)方法內(nèi)部注釋掉了,筆者暫時(shí)沒(méi)有太在意,后續(xù)再補(bǔ)充 JStormServerUtils.startTaobaoJvmMonitor(); //創(chuàng)建一個(gè)NimbusServer實(shí)例 NimbusServer instance = new NimbusServer(); //創(chuàng)建一個(gè)默認(rèn)的nimbus啟動(dòng)類 INimbus iNimbus = new DefaultInimbus(); //開(kāi)始進(jìn)行實(shí)際的初始化 instance.launchServer(config, iNimbus);
??其實(shí)在DefaultUncaughtExceptionHandler中也并沒(méi)有太多的處理操作,簡(jiǎn)單判斷是否是內(nèi)存溢出,然后正常關(guān)閉,否則就是異常直接拋出然后中斷。讀取配置的過(guò)程就不詳細(xì)講解了。NimbusServer這個(gè)類主要封裝了一些用于操作Nimbus的成員變量和方法,Nimbus的啟動(dòng)操作基本都是定義在這個(gè)類內(nèi)的(上述代碼就是這個(gè)類中的main方法所定義的)。
??最重要的方法是launchServer,接下來(lái)就詳細(xì)的解說(shuō)這個(gè)方法的作用,首先來(lái)看下launchServer這個(gè)方法內(nèi)部的代碼:
private void launchServer(final Map conf, INimbus inimbus) { LOG.info("Begin to start nimbus with conf " + conf); try { //判斷配置模式是否正確 StormConfig.validate_distributed_mode(conf); createPid(conf); //設(shè)置退出時(shí)的操作 initShutdownHook(); //這個(gè)方法在默認(rèn)實(shí)現(xiàn)中沒(méi)有任何操作 inimbus.prepare(conf, StormConfig.masterInimbus(conf)); //創(chuàng)建NimbusData對(duì)象 data = createNimbusData(conf, inimbus); //這個(gè)方法主要負(fù)責(zé)處理當(dāng)nimbus線程稱為leader線程之后的操作 initFollowerThread(conf); int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf); hs = new Httpserver(port, conf); hs.start(); //如果集群是運(yùn)行在yarn上,也需要做一些初始化操作。 initContainerHBThread(conf); serviceHandler = new ServiceHandler(data); //thrift是一個(gè)分布式的RPC框架 initThrift(conf); } catch (Throwable e) { if (e instanceof OutOfMemoryError) { LOG.error("Halting due to Out Of Memory Error..."); } LOG.error("Fail to run nimbus ", e); } finally { cleanup(); } LOG.info("Quit nimbus"); }判斷配置中的模式
??只是判斷配置信息中的一個(gè)字段名為“storm.cluster.mode”是否是“distributed”,本地模式下是“l(fā)ocal”。
initShutdownHook??添加退出的時(shí)候一些操作,包括設(shè)置參數(shù)提醒集群要退出,清除nimbus存儲(chǔ)下的一些工作線程(負(fù)責(zé)處理通信,分發(fā)代碼,心跳的一系列守護(hù)線程),關(guān)閉打開(kāi)的各種資源等。
createNimbusData??這個(gè)方法用于創(chuàng)建一個(gè)NimbusData的對(duì)象,這個(gè)對(duì)象封裝了Nimbus與ZK通信的一些成員變量。下面會(huì)在每個(gè)方法內(nèi)部逐漸講到NimbusData的一些成員變量以及他們的作用。首先來(lái)看看NimbusData的構(gòu)造方法。
public NimbusData(final Map conf, INimbus inimbus) throws Exception { this.conf = conf; //兩個(gè)方法分別處理打開(kāi)的文件流和blob傳輸流 createFileHandler(); mkBlobCacheMap(); this.nimbusHostPortInfo = NimbusInfo.fromConf(conf); this.blobStore = BlobStoreUtils.getNimbusBlobStore(conf, nimbusHostPortInfo); this.isLaunchedCleaner = false; this.isLaunchedMonitor = false; this.submittedCount = new AtomicInteger(0); this.stormClusterState = Cluster.mk_storm_cluster_state(conf); createCache(); this.taskHeartbeatsCache = new ConcurrentHashMap>(); //創(chuàng)建一個(gè)調(diào)度線程池,默認(rèn)大小為12 this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM); this.statusTransition = new StatusTransition(this); this.startTime = TimeUtils.current_time_secs(); this.inimubs = inimbus; localMode = StormConfig.local_mode(conf); this.metricCache = new JStormMetricCache(conf, this.stormClusterState); this.clusterName = ConfigExtension.getClusterName(conf); pendingSubmitTopologies = new TimeCacheMap (JStormUtils.MIN_10); topologyTaskTimeout = new ConcurrentHashMap (); tasksHeartbeat = new ConcurrentHashMap (); this.metricsReporter = new JStormMetricsReporter(this); this.metricRunnable = ClusterMetricsRunnable.mkInstance(this); String configUpdateHandlerClass = ConfigExtension.getNimbusConfigUpdateHandlerClass(conf); this.configUpdateHandler = (ConfigUpdateHandler) Utils.newInstance(configUpdateHandlerClass); if (conf.containsKey(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN)) { String string = (String) conf.get(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN); nimbusNotify = (ITopologyActionNotifierPlugin) Utils.newInstance(string); } else { nimbusNotify = null; } }
??3.1 createFileHandler:在這方法內(nèi)部,實(shí)現(xiàn)了一個(gè)匿名的內(nèi)部類ExpiredCallback,在其內(nèi)部實(shí)現(xiàn)了一個(gè)方法叫expire,利用回調(diào)的方式來(lái)關(guān)閉Channel或者BufferFileInputStream實(shí)例對(duì)象。
public void createFileHandler() { ExpiredCallback
??然后初始化NimbusData的兩個(gè)成員變量uploaders和downloaders,這兩個(gè)分別維護(hù)需要上傳的通道和需要下載的通道。TimeCacheMap這個(gè)類的主要實(shí)現(xiàn)邏輯是在其構(gòu)造函數(shù)內(nèi)部啟動(dòng)一個(gè)守護(hù)線程。首先創(chuàng)建一個(gè)緩沖區(qū),只要系統(tǒng)不關(guān)閉,則在守護(hù)線程內(nèi)部不斷的緩沖區(qū)獲取對(duì)象,在對(duì)象不為空的情況下調(diào)用回調(diào)函數(shù)的expire方法,并執(zhí)行相應(yīng)的操作,這里具體傳進(jìn)來(lái)的expire方法是關(guān)閉Channel或者BufferFileInputStream。
??3.2. mkBlobCacheMap:和上述的方法非常類似,也是申明一個(gè)匿名內(nèi)部類,然后初始化幾個(gè)成員變量。代碼幾乎和上個(gè)方法一樣就不浪費(fèi)拌面去貼了。這里expire方法中要關(guān)閉的是兩個(gè)流AtomicOutputStream和BufferInputStream,blobUploaders和blobDownloaders分別存放著上傳和下載所打開(kāi)的流。blobListers存放上傳和下載的數(shù)據(jù)。
??3.3. 初始化幾個(gè)成員變量,包括NimbusInfo(包含了主機(jī)名,端口和標(biāo)志是否是leader),BlobStore(用來(lái)存儲(chǔ)blob數(shù)據(jù)的,使用鍵值存儲(chǔ),阿里提供了兩個(gè)不同的blob存儲(chǔ)方式,一種是本地文件系統(tǒng)存儲(chǔ),一種的hdfs存儲(chǔ),兩種方式的區(qū)別在于,由于本地文件存儲(chǔ)并不能保證一致性,所以需要ZK介入來(lái)保證,這是JStorm的默認(rèn)配置。如果使用hdfs來(lái)存儲(chǔ),則不需要ZK介入,因?yàn)閔dfs能保證一致性和正確性),StormClusterState(存儲(chǔ)整個(gè)集群的狀態(tài),這個(gè)是從ZK上獲取的),為了避免多次向ZK通信,還需要設(shè)置緩存信息,任務(wù)的心跳信息等等。
??3.4. 初始化好metrics相關(guān)的報(bào)告線程和監(jiān)聽(tīng)線程。
??4.1. 方法首先初始化一個(gè)回調(diào)函數(shù),這是當(dāng)一個(gè)nimbus成為leader之后就會(huì)調(diào)用的一個(gè)用于初始化一系列變量的方法,包括拓?fù)淙绾卧诩荷戏峙?,拓?fù)錉顟B(tài)更新,清除函數(shù),還有監(jiān)控線程等。后續(xù)會(huì)有新的篇章來(lái)介紹這個(gè)init方法,這里先放這個(gè)方法的源碼。
private void init(Map conf) throws Exception { data.init(); NimbusUtils.cleanupCorruptTopologies(data); //拓?fù)浞峙? initTopologyAssign(); //狀態(tài)更新 initTopologyStatus(); //清除函數(shù) initCleaner(conf); initMetricRunnable(); if (!data.isLocalMode()) { initMonitor(conf); //mkRefreshConfThread(data); } }
??4.2. 初始化一個(gè)Runnable的子類,在構(gòu)造方法中,首先判斷集群并不是使用本地模式,然后更新ZK上的節(jié)點(diǎn)信息(將nimbus注冊(cè)到ZK上)。然后通過(guò)ZK獲取集群的狀態(tài)信息,畢竟nimbus是需要維護(hù)整個(gè)集群的。緊接著判斷是否存在leader,兩次都無(wú)法選舉出leader之后,則將ZK上的nimbus信息刪除并退出。如果blobstore使用的是本地文件模式(有本文模式還有hdfs模式兩種)還需要添加一個(gè)回調(diào)函數(shù),這個(gè)回調(diào)函數(shù)執(zhí)行的操作是,當(dāng)這個(gè)nimbus不是leader的時(shí)候,對(duì)blob進(jìn)行同步。此外還需要將那些active的blob存到ZK中,而將死掉的進(jìn)行清除(原因前文3.3也說(shuō)到過(guò),本地模式存儲(chǔ)無(wú)法保證一致性,所以需要ZK進(jìn)行維護(hù),而hdfs自帶容錯(cuò)機(jī)制,能保證數(shù)據(jù)的一致性)。
??4.3. 設(shè)置該線程為守護(hù)線程,并啟動(dòng)這個(gè)線程。run方法首先判斷當(dāng)前保存在ZK上的集群中是否有l(wèi)eader,如果沒(méi)有則選舉當(dāng)前nimbus為leader線程。如果有了leader線程,則需要判斷是否跟當(dāng)前的nimbus相同,如果不相同則停止當(dāng)前的nimbus,畢竟已經(jīng)有l(wèi)eader存在了。如果是相同的,則需要判斷本地的狀態(tài)中,如果還沒(méi)有設(shè)置為leader,表明當(dāng)前nimbus還沒(méi)有進(jìn)行初始化,則先設(shè)置nimbus為leader然后回調(diào)函數(shù)進(jìn)行初始化,也就是調(diào)用init(conf)方法。
獲取一個(gè)端口(默認(rèn)的端口是7621)用于構(gòu)建HttpServer實(shí)例對(duì)象??梢杂糜谔幚砗徒邮躷cp連接,啟動(dòng)一個(gè)新的線程進(jìn)行httpserver的監(jiān)聽(tīng)。(主要作用或者說(shuō)在哪里用到尚且不明確)。
??這個(gè)方法的主要作用是得知是否能在資源管理器(yarn)上運(yùn)行jstorm集群,如果可以的話,則需要?jiǎng)?chuàng)建一個(gè)新的線程用于處理。(其實(shí)這里使用容器的目的是可以在一個(gè)物理集群上運(yùn)行多個(gè)不一樣的邏輯集群甚至多個(gè)JStorm集群,能動(dòng)態(tài)調(diào)整邏輯集群分到的資源,此外,資源管理器能提供非常強(qiáng)的可擴(kuò)展性)。容器線程會(huì)被添加到NimbusServer中,后續(xù)使用到的時(shí)候再詳細(xì)講解。這個(gè)容器線程也是守護(hù)線程,且馬上就會(huì)啟動(dòng),這個(gè)線程的run方法里面包含兩個(gè)處理:
??6.1. handleWriteDir:這個(gè)方法的主要作用是清除掉容器上的過(guò)期心跳信息,準(zhǔn)確的說(shuō),如果JStorm集群容器目錄下的心跳信息大于10,則需要清除(從最老的開(kāi)始)。
??6.2. handlReadDir:這里主要是用于維護(hù)本地是否能接受到集群上的hb信息,如果多次超時(shí)則要拋出異常。
??thrift是JStorm使用的一個(gè)分布式RPC框架。筆者后續(xù)再添加相應(yīng)的源碼解析。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/66980.html
摘要:下面就來(lái)講講第一個(gè)初始化操作拓?fù)浞峙?。如果沒(méi)有舊的分配信息,說(shuō)明拓?fù)浞峙漕愋蜑?。到這里,預(yù)分配,創(chuàng)建拓?fù)浞峙渖舷挛木屯瓿闪?。集群下的分配,?jiàn)下文講解資源準(zhǔn)備首先第一步是判斷拓?fù)浞峙涞念愋褪欠穹弦?,不符合則拋出異常。 ??寫(xiě)在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯(cuò)誤或者沒(méi)有詳細(xì)解釋,如果在閱讀過(guò)程發(fā)現(xiàn)錯(cuò)誤很歡迎在文章下面評(píng)論指出。文章后續(xù)會(huì)陸續(xù)更新,可以關(guān)注或者收藏...
摘要:即使是容器已經(jīng)退出的也可以看到,所以可以通過(guò)這種方式來(lái)分析非預(yù)期的退出。也可以直接通過(guò)在容器內(nèi)啟動(dòng)一個(gè)更方便地調(diào)試容器,不必一條條執(zhí)行。和獲得容器中進(jìn)程的狀態(tài)和在容器里執(zhí)行的效果類似。通過(guò)查看容器的詳細(xì)信息飯后鏡像和容器的詳細(xì)信息。 『重用』容器名 但我們?cè)诰帉?xiě)/調(diào)試Dockerfile的時(shí)候我們經(jīng)常會(huì)重復(fù)之前的command,比如這種docker run --name jstorm-...
摘要:淘寶定制基于,是國(guó)內(nèi)第一個(gè)優(yōu)化定制且開(kāi)源的服務(wù)器版虛擬機(jī)。數(shù)據(jù)庫(kù)開(kāi)源數(shù)據(jù)庫(kù)是基于官方版本的一個(gè)分支,由阿里云數(shù)據(jù)庫(kù)團(tuán)隊(duì)維護(hù),目前也應(yīng)用于阿里巴巴集團(tuán)業(yè)務(wù)以及阿里云數(shù)據(jù)庫(kù)服務(wù)。淘寶服務(wù)器是由淘寶網(wǎng)發(fā)起的服務(wù)器項(xiàng)目。 Java JAVA 研發(fā)框架 SOFAStack SOFAStack(Scalable Open Financial Architecture Stack)是用于快速構(gòu)建金融...
閱讀 2860·2021-11-24 09:39
閱讀 2842·2021-09-23 11:45
閱讀 3457·2019-08-30 12:49
閱讀 3429·2019-08-30 11:18
閱讀 2106·2019-08-29 16:42
閱讀 3400·2019-08-29 16:35
閱讀 1386·2019-08-29 11:21
閱讀 1985·2019-08-26 13:49