001package io.jstach.rainbowgum.disruptor; 002 003import java.util.Collection; 004import java.util.EnumSet; 005import java.util.List; 006import java.util.concurrent.ThreadFactory; 007 008import org.eclipse.jdt.annotation.Nullable; 009 010import com.lmax.disruptor.BlockingWaitStrategy; 011import com.lmax.disruptor.EventHandler; 012import com.lmax.disruptor.ExceptionHandler; 013import com.lmax.disruptor.RingBuffer; 014import com.lmax.disruptor.dsl.Disruptor; 015import com.lmax.disruptor.dsl.ProducerType; 016import com.lmax.disruptor.util.DaemonThreadFactory; 017 018import io.jstach.rainbowgum.LogAppender; 019import io.jstach.rainbowgum.LogConfig; 020import io.jstach.rainbowgum.LogEvent; 021import io.jstach.rainbowgum.LogPublisher; 022import io.jstach.rainbowgum.LogPublisher.AsyncLogPublisher; 023import io.jstach.rainbowgum.MetaLog; 024import io.jstach.rainbowgum.LogAppender.Appenders; 025 026/** 027 * Disruptor async publisher. 028 */ 029public final class DisruptorLogPublisher implements AsyncLogPublisher { 030 031 private final Disruptor<LogEventCell> disruptor; 032 033 private final RingBuffer<LogEventCell> ringBuffer; 034 035 private final Iterable<? extends LogAppender> appenders; 036 037 /** 038 * Creates a factory of disruptor log publishers. 039 * @param bufferSize ring buffer size. 040 * @return factory to generate this class. 041 */ 042 public static PublisherFactory of(int bufferSize) { 043 return new PublisherFactory() { 044 @Override 045 public LogPublisher create(String name, LogConfig config, Appenders appenders) { 046 return of(appenders.flags(EnumSet.of(LogAppender.AppenderFlag.REUSE_BUFFER)).asList(), 047 DaemonThreadFactory.INSTANCE, bufferSize); 048 } 049 }; 050 } 051 052 /** 053 * Creates. 054 * @param appenders appenders. 055 * @param threadFactory thread factory to create writer thread. 056 * @param bufferSize maximum queue elements. 057 * @return publisher. 058 */ 059 public static DisruptorLogPublisher of(Collection<? extends LogAppender> appenders, ThreadFactory threadFactory, 060 int bufferSize) { 061 062 Disruptor<LogEventCell> disruptor = new Disruptor<>(LogEventCell::new, bufferSize, threadFactory, 063 ProducerType.MULTI, new BlockingWaitStrategy()); 064 disruptor.setDefaultExceptionHandler(new LogExceptionHandler(disruptor::shutdown)); 065 066 boolean found = false; 067 for (var appender : appenders) { 068 disruptor.handleEventsWith(new LogEventHandler(appender)); 069 found = true; 070 } 071 if (!found) { 072 throw new IllegalStateException(); 073 } 074 var ringBuffer = disruptor.getRingBuffer(); 075 076 var router = new DisruptorLogPublisher(disruptor, ringBuffer, List.copyOf(appenders)); 077 return router; 078 } 079 080 @Override 081 public void start(LogConfig config) { 082 disruptor.start(); 083 084 } 085 086 DisruptorLogPublisher(Disruptor<LogEventCell> disruptor, RingBuffer<LogEventCell> ringBuffer, 087 Iterable<? extends LogAppender> appenders) { 088 super(); 089 this.disruptor = disruptor; 090 this.ringBuffer = ringBuffer; 091 this.appenders = appenders; 092 } 093 094 @Override 095 public void log(LogEvent event) { 096 long sequence = ringBuffer.next(); 097 try { 098 LogEventCell cell = ringBuffer.get(sequence); 099 cell.event = event; 100 } 101 finally { 102 ringBuffer.publish(sequence); 103 } 104 105 } 106 107 @Override 108 public void close() { 109 this.disruptor.halt(); 110 } 111 112 @Override 113 public String toString() { 114 return super.toString() + "[appenders=" + this.appenders + "]"; 115 } 116 117 private static class LogEventCell { 118 119 @Nullable 120 LogEvent event; 121 122 } 123 124 private static record LogEventHandler(LogAppender appender) implements EventHandler<LogEventCell> { 125 126 @Override 127 public void onEvent(LogEventCell event, long sequence, boolean endOfBatch) throws Exception { 128 var logEvent = event.event; 129 if (logEvent == null) { 130 return; 131 } 132 appender.append(logEvent); 133 } 134 135 } 136 137 private record LogExceptionHandler(Runnable shutdownHook) implements ExceptionHandler<Object> { 138 139 @Override 140 public void handleEventException(Throwable ex, long sequence, Object event) { 141 if (ex instanceof InterruptedException) { 142 shutdownHook.run(); 143 } 144 else { 145 MetaLog.error(DisruptorLogPublisher.class, ex); 146 throw new RuntimeException(ex); 147 } 148 } 149 150 @Override 151 public void handleOnStartException(Throwable ex) { 152 MetaLog.error(DisruptorLogPublisher.class, ex); 153 } 154 155 @Override 156 public void handleOnShutdownException(Throwable ex) { 157 MetaLog.error(DisruptorLogPublisher.class, ex); 158 } 159 160 } 161 162}