System

小說:哪里是紅花草種子產地?作者:卓平純秉更新時間:2019-05-27字數:69213

System



一、Disruptor的簡介

  Disruptor是由LAMX(歐洲頂級金融公司)設計和開源的大規模、高并發、低延遲的異步處理框架,也可以說他
是最快的消息框架(JMS)。整個業務邏輯處理器完全運行在內存中,其LMAX架構可以達到一個線程里每秒處理6百萬
流水,用1微秒的延遲可以獲得100K+吞吐量的爆炸性能。非常適合那種實時性高、延遲率低、業務流水量大的應用場
景,比如銀行的實時交易處理、讀寫操作分離、數據緩存等。
  Disruptor是基于生產者-消費者模型,實現了"隊列“功能的無鎖高并發框架。他可以做到一個生產者對應多個消
費者且消費者之間可以并行的處理,也可以形成先后順序的處理。Disruptor本質上解決的就是在兩個獨立的處理過
程之間交換數據。Disruptor框架的一些核心類有:
 1.Disruptor:用于控制整個消費者-生產者模型的處理器
   2.RingBuffer:用于存放數據
   3.EventHandler:一個用于處理事件的接口(可以當做生產者,也可以當做消費者)。
   4.EventFactory:事件工廠類。
   5.WaitStrategy:用于實現事件處理等待RingBuffer游標策略的接口。
   6.SequeueBarrier:隊列屏障,用于處理訪問RingBuffer的序列。
   7.用于運行disruptor的線程或者線程池。

二、Disruptor的入門

  Disruptor的編寫一般可以分為以下幾步:
   ?。?)定義事件;
   ?。?)定義事件工廠;
   ?。?)消費者–定義事件處理的具體實現;
   ?。?)定義用于事件處理(消費者)的線程池;
   ?。?)指定等待策略:
      Disruptor 提供了多個WaitStrategy的實現,例如:BlockingWaitStrategy、SleepingWaitStrategy、
YieldingWaitStrategy等:
      BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環境中能提供
更加一致的性能表現;
      SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其
對生產者線程的影響最小,適合用于異步日志類似的場景;
      YieldingWaitStrategy 的性能是最好的,適合用于低延遲的系統。在要求極高性能且事件處理線
數小于 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。
      WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
      WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
      WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
   ?。?)生產(發布)消息;
   ?。?)關閉disruptor業務邏輯處理器;
  Disruptor的一些核心概念有:
  ? - Ring Buffer(環形緩沖區) :
    曾經RingBuffer是Disruptor中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過
Disruptor進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用戶的自定
義實現來完全替代。
  ? - Sequence Disruptor :
    通過順序遞增的序號來編號管理。通過其進行交換的數據(事件),對數據(事件)的處理過程總是沿著序
號逐個遞增處理。一個Sequence用于跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一
個AtomicLong也可以用于標識進度,但定義Sequence來負責該問題還有另一個目的,那就是防止不同的 Sequence之間
的CPU緩存偽共享(Flase Sharing)問題。
  ? - Sequencer :
    Sequencer是Disruptor的真正核心。此接口有兩個實現類SingleProducerSequencer、MultiProducerSequencer
,它們定義在生產者和消費者之間快速、正確地傳遞數據的并發算法。
  ? - Sequence Barrier
    用于保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。
Sequence Barrier 還定義了決定Consumer是否還有可處理的事件的邏輯。
  ? - Wait Strategy
    定義 Consumer 如何進行等待下一個事件的策略。(注:Disruptor定義了多種不同的策略,針對不同的場
景,提供了不一樣的性能表現)
  ? - Event
  在Disruptor的語義中,生產者和消費者之間進行交換的數據被稱為事件(Event)。它不是一個被Disruptor
定義的特定類型,而是由 Disruptor 的使用者定義并指定。
  ? - EventProcessor
    EventProcessor持有特定消費者的Sequence,并提供用于調用事件處理實現的事件循環(Event Loop)。
  ? - EventHandler
    Disruptor 定義的事件處理接口,由用戶實現,用于處理事件,是 Consumer 的真正實現。
  ? - Producer
    即生產者,只是泛指調用 Disruptor 發布事件的用戶代碼,Disruptor 沒有定義特定接口或類型。
? 栗子:

  Event:

/**
 * 事件(Event)就是通過 Disruptor 進行交換的數據類型。
 * @author lcy
 *
 */
public class TransactionEvent {
    private long seq;
    private double amount;
    private long callNumber;
    
    public long getCallNumber() {
        return callNumber;
    }
    
    @Override
    public String toString() {
        return "TransactionEvent [seq=" + seq + ", amount=" + amount + ", callNumber=" + callNumber + "]";
    }
    
    public void setCallNumber(long callNumber) {
        this.callNumber = callNumber;
    }
    public long getSeq() {
        return seq;
    }
    public void setSeq(long seq) {
        this.seq = seq;
    }
    public double getAmount() {
        return amount;
    }
    public void setAmount(double amount) {
        this.amount = amount;
    }
}

  Factory:

/**
 * Event Factory 定義了如何實例化前面第1步中定義的事件(Event)
 * Disruptor 通過 EventFactory 在 RingBuffer 中預創建 Event 的實例。 
        一個 Event 實例實際上被用作一個“數據槽”,發布者發布前,先從 RingBuffer 獲得一個 Event 的實例,
        然后往 Event 實例中填充數據,之后再發布到 RingBuffer中,之后由 Consumer 獲得該 Event 實例并從中讀取數據。
 * @author lcy
 *
 */
public class TransactionEventFactory implements EventFactory<TransactionEvent>{

    @Override
    public TransactionEvent newInstance() {
        // TODO Auto-generated method stub
        return new TransactionEvent();
    }
}

  Customer:

/**
 * 事件處理類-交易流水初始化
 * @author lcy
 *
 */
public class AmountTrasfer implements EventTranslator<TransactionEvent>{

    @Override
    public void translateTo(TransactionEvent arg0, long arg1) {
        arg0.setAmount(Math.random()*99);
        arg0.setCallNumber(17088888888L);
        arg0.setSeq(System.currentTimeMillis());
         System.out.println("設置交易流水:"+arg0.getSeq());
    }
}
/**
 * 消費者–定義事件處理的具體實現
 * 攔截交易流水
 * @author lcy
 *
 */
public class TransHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{

    @Override
    public void onEvent(TransactionEvent transactionEvent) throws Exception {
         System.out.println("交易流水號為:"+transactionEvent.getSeq()+"||交易金額為:"+transactionEvent.getAmount());
    }

    @Override
    public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception {
        // TODO Auto-generated method stub
        this.onEvent(arg0);
    }
}
/**
 * 發送驗證短信
 * @author lcy
 *
 */
public class SendMsgHandler implements EventHandler<TransactionEvent>{

    @Override
    public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception {
        // TODO Auto-generated method stub
         System.out.println("向手機號:"+arg0.getCallNumber()+"發送驗證短信......");
        
    }
}
/**
 * 交易流水入庫操作
 * @author lcy
 *
 */
public class InnerDBHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{

    @Override
    public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception {
        // TODO Auto-generated method stub
        this.onEvent(arg0);
    }

    @Override
    public void onEvent(TransactionEvent arg0) throws Exception {
        arg0.setSeq(arg0.getSeq()*10000);
        System.out.println("攔截入庫流水號------------  "+arg0.getSeq());
    }
}

  Producer:

/**
 * 生產者、發布事件
 * @author lcy
 *
 */
public class TransactionEventProducer implements Runnable {
    // 線程同步輔助類 - 允許一個或多個線程一直等待
    CountDownLatch cdl;
    Disruptor disruptor;

    public TransactionEventProducer(CountDownLatch cdl, Disruptor disruptor) {
        super();
        this.cdl = cdl;
        this.disruptor = disruptor;
    }

    public TransactionEventProducer() {
        super();
        // TODO Auto-generated constructor stub
    }


    @Override
    public void run() {
        AmountTrasfer th;
        try {
            //Event對象初始化類
            th = new AmountTrasfer();
            //發布事件
            disruptor.publishEvent(th);
        } finally {
            // 遞減鎖存器的計數 -如果計數到達零,則釋放所有等待的線程。
            cdl.countDown();
        }
    }

    // 定義環大小,2的倍數
    private static final int BUFFER_SIZE = 1024;
    // 定義處理事件的線程或線程池
    ExecutorService pool = Executors.newFixedThreadPool(7);

    /**
     * 批處理模式
     * @throws Exception
     */
    public void BatchDeal() throws Exception {
        //創建一個單生產者的ringBuffer
        final RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() {

            @Override
            public TransactionEvent newInstance() {
                return new TransactionEvent();
            }
            //設置等待策略,YieldingWaitStrategy 的性能是最好的,適合用于低延遲的系統。
        }, BUFFER_SIZE,new YieldingWaitStrategy());
        //創建SequenceBarrier
        SequenceBarrier barrier = ringBuffer.newBarrier();
        //創建消息處理器
        BatchEventProcessor<TransactionEvent> eventProcessor = new BatchEventProcessor<TransactionEvent>(ringBuffer,barrier,new InnerDBHandler());
        //構造反向依賴,eventProcessor之間沒有依賴關系則可以將Sequence直接加入
        ringBuffer.addGatingSequences(eventProcessor.getSequence());
        //提交消息處理器
        pool.submit(eventProcessor);
        //提交一個有返回值的任務用于執行,返回一個表示任務的未決結果的 Future。
        Future<Void> submit = pool.submit(new Callable<Void>() {
            //計算結果,如果無法計算結果則拋出異常
            @Override
            public Void call() throws Exception {
                long seq;
                for (int i=0;i<7000;i++) {
                    System.out.println("生產者:"+i);
                    //環里一個可用的區塊
                    seq=ringBuffer.next();
                    //為環里的對象賦值
                    ringBuffer.get(seq).setAmount(Math.random()*10);
                    System.out.println("TransactionEvent:   "+ringBuffer.get(seq).toString());
                    //發布這個區塊的數據,
                    ringBuffer.publish(seq);
                }
                return null;
            }
        });
        //等待計算完成,然后獲取其結果。
        submit.get();
        Thread.sleep(1000);
        //關閉消息處理器
        eventProcessor.halt();
        //關閉線程池
        pool.shutdown();
    }
    
     /**
      * 工作池模式
      * @throws Exception
      */
    @SuppressWarnings("unchecked")
    public void poolDeal() throws Exception {
        RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() {

            @Override
            public TransactionEvent newInstance() {
                return new TransactionEvent();
            }
        }, BUFFER_SIZE, new YieldingWaitStrategy());
        SequenceBarrier barrier = ringBuffer.newBarrier();
        //創建一個定長的線程池
        ExecutorService pool2 = Executors.newFixedThreadPool(5);
        //交易流水入庫操作
        WorkHandler<TransactionEvent> innerDBHandler = new InnerDBHandler();
        ExceptionHandler arg2;
        WorkerPool<TransactionEvent> workerPool = new WorkerPool<TransactionEvent>(ringBuffer, barrier, new IgnoreExceptionHandler(), innerDBHandler);
        workerPool.start(pool2);
        long seq;
        for(int i =0;i<7;i++){
            seq = ringBuffer.next();
            ringBuffer.get(seq).setAmount(Math.random()*99);
            ringBuffer.publish(seq);
        }
        Thread.sleep(1000);
        workerPool.halt();
        pool2.shutdown();
    }
    
    /**
     * disruptor處理器用來組裝生產者和消費者
     * @throws Exception 
     */
    @SuppressWarnings("unchecked")
    public void disruptorManage() throws Exception{
        //創建用于處理事件的線程池
        ExecutorService pool2 = Executors.newFixedThreadPool(7);
        //創建disruptor對象
        /**
         * 用來指定數據生成者有一個還是多個,有兩個可選值ProducerType.SINGLE和ProducerType.MULTI
         * BusySpinWaitStrategy是一種延遲最低,最耗CPU的策略。通常用于消費線程數小于CPU數的場景
         */
        Disruptor<TransactionEvent> disruptor2 = new Disruptor<TransactionEvent>(new EventFactory<TransactionEvent>() {

            @Override
            public TransactionEvent newInstance() {
                return new TransactionEvent();
            }                                  
        },BUFFER_SIZE,pool2,ProducerType.SINGLE,new BusySpinWaitStrategy());
        //創建消費者組,先執行攔截交易流水和入庫操作
        EventHandlerGroup<TransactionEvent> eventsWith = disruptor2.handleEventsWith(new InnerDBHandler(),new TransHandler());
        //在進行風險交易的2次驗證操作
        eventsWith.then(new SendMsgHandler());
        //啟動disruptor
        disruptor2.start();
        //在線程能通過 await()之前,必須調用 countDown() 的次數
        CountDownLatch latch = new CountDownLatch(1);
        //將封裝好的TransactionEventProducer類提交
        pool2.submit(new TransactionEventProducer(latch,disruptor2));
        //使當前線程在鎖存器倒計數至零之前一直等待,以保證生產者任務完全消費掉
        latch.await();
        //關閉disruptor業務邏輯處理器
        disruptor2.shutdown();
        //銷毀線程池
        pool2.shutdown();
    }
}

  Test:

/**
 * 測試類
 * @author lcy
 *
 */
public class Test {
    public static void main(String[] args) throws Exception {
        TransactionEventProducer producer = new TransactionEventProducer();
        for (int i = 0; i < 100; i++)
            producer.disruptorManage();
        System.out.println("--------------------------------------------------");
    }
}

三、記一次生產上的BUG

  前段時間升級的時候出現了這樣一個BUG,導致了近萬用戶的交易失敗。首先確認了我們在生產上并沒有部署攔
截交易的規則,所有的交易流水都是放行的不會加入我們的風險名單庫。那么內存庫里的幾萬灰名單是怎么來的呢?
  我們在上線成功后需使用真實的用戶進行一波生產上的測試,而在測試的過程中為了配合測試那邊的需求,
需將特定的幾個測試賬號配置成加入灰名單并進行2次認證的攔截規則。測試通過后便將那幾條測試規則給撤回了。但
是我們忽略了一個問題,因為Disruptor框架在初始化環的時候,只會new一次這個對象。這就導致了插入環里“槽”的對
象始終都是第一次進入“灰名單”對象,等到環被塞滿后下條流水進來的時候就會使用“槽”里的“灰名單”對象。即使這筆
交易不是風險交易也會加入到灰名單中,導致了大量的用戶交易失敗。
  上線后的第二天,我們頭兒就意識到了這個問題,想通過重啟服務、清空環來暫時解決這個問題(服務器有負載均
衡),因為環被清空后,之前在環里的“灰名單”對象也就不存在了,而且生產上沒有部署將用戶加入“灰名單”的規則,環
里的對象就一定是“干凈的”,這個問題也就得到了解決。但是、可是、可但是、萬萬沒想到啊,當晚生產上還是出現了問題。
灰名單里的用戶數量已經逼近2萬了,大量用戶不能進行電子交易。
  為什么項目已經被重啟了,環也被清空了,也沒有規則會產生新的灰名單,那2萬的灰名單用戶是從哪來的?事后
通過查看代碼發現,雖然環被清空了,但是在清空之前已經有部分用戶被存到了灰名單里。這些用戶在某一時間再次
進行交易時,會重新將這條交易的狀態設置為“灰名單”(其他業務需要),這就導致了接待這條交易流水的“槽”會被重
新賦值為“灰名單”的狀態,然后環里的“灰名單”槽就會越滾越多。
  Event在使用的時候一定不要忘記將關鍵的屬性進行初始化,這樣才能保證從環里取出的對象是初始狀態的,不會被上次處理的數據所影響。

當前文章:http://www.hfcxdn.com/array/zw4864bwl1.html

發布時間:2019-05-27 15:01:22

火棘最新供應信息哪里靠譜? 哪里有木瓜樹供應商? 【園藝師詳解】月季和薔薇的區別在哪里 4.5公分早園竹哪里最多? 一棵紫藤能覆蓋多大面積? 發現最好的劍麻小苗產地,有圖有真相 江蘇最新美人梅小苗價格_商情先知! 原包裝進口百喜草種籽,大量現貨批發中 怎么在草坪磚內種草? 黑麥草多年生是真的嗎?

狼尾草種子品種多嗎? 皇竹草種子對種植技術有什么要求? 機場花卉品種推薦有哪些? 草花的種植方案有哪些? 別墅常用草花地被有哪些?  陜西適合種植披堿草嗎? 夏天可以種植五彩石竹嗎? 12月可以播種海棠嗎? 赤霉素處理紫薇樹種子可以提高發芽率嗎? 塔柏苗的最新市場價格是多少? 哪里有供應黃金柳的種植基地?

編輯:建馬

我要說兩句: (0人參與)

發布
捕鱼达人之深海狩猎