001package io.jstach.rainbowgum.disruptor;
002
003import java.util.Collection;
004import java.util.EnumSet;
005import java.util.List;
006import java.util.concurrent.ThreadFactory;
007
008import org.eclipse.jdt.annotation.Nullable;
009
010import com.lmax.disruptor.BlockingWaitStrategy;
011import com.lmax.disruptor.EventHandler;
012import com.lmax.disruptor.ExceptionHandler;
013import com.lmax.disruptor.RingBuffer;
014import com.lmax.disruptor.dsl.Disruptor;
015import com.lmax.disruptor.dsl.ProducerType;
016import com.lmax.disruptor.util.DaemonThreadFactory;
017
018import io.jstach.rainbowgum.LogAppender;
019import io.jstach.rainbowgum.LogConfig;
020import io.jstach.rainbowgum.LogEvent;
021import io.jstach.rainbowgum.LogPublisher;
022import io.jstach.rainbowgum.LogPublisher.AsyncLogPublisher;
023import io.jstach.rainbowgum.MetaLog;
024import io.jstach.rainbowgum.LogAppender.Appenders;
025
026/**
027 * Disruptor async publisher.
028 */
029public final class DisruptorLogPublisher implements AsyncLogPublisher {
030
031        private final Disruptor<LogEventCell> disruptor;
032
033        private final RingBuffer<LogEventCell> ringBuffer;
034
035        private final Iterable<? extends LogAppender> appenders;
036
037        /**
038         * Creates a factory of disruptor log publishers.
039         * @param bufferSize ring buffer size.
040         * @return factory to generate this class.
041         */
042        public static PublisherFactory of(int bufferSize) {
043                return new PublisherFactory() {
044                        @Override
045                        public LogPublisher create(String name, LogConfig config, Appenders appenders) {
046                                return of(appenders.flags(EnumSet.of(LogAppender.AppenderFlag.REUSE_BUFFER)).asList(),
047                                                DaemonThreadFactory.INSTANCE, bufferSize);
048                        }
049                };
050        }
051
052        /**
053         * Creates.
054         * @param appenders appenders.
055         * @param threadFactory thread factory to create writer thread.
056         * @param bufferSize maximum queue elements.
057         * @return publisher.
058         */
059        public static DisruptorLogPublisher of(Collection<? extends LogAppender> appenders, ThreadFactory threadFactory,
060                        int bufferSize) {
061
062                Disruptor<LogEventCell> disruptor = new Disruptor<>(LogEventCell::new, bufferSize, threadFactory,
063                                ProducerType.MULTI, new BlockingWaitStrategy());
064                disruptor.setDefaultExceptionHandler(new LogExceptionHandler(disruptor::shutdown));
065
066                boolean found = false;
067                for (var appender : appenders) {
068                        disruptor.handleEventsWith(new LogEventHandler(appender));
069                        found = true;
070                }
071                if (!found) {
072                        throw new IllegalStateException();
073                }
074                var ringBuffer = disruptor.getRingBuffer();
075
076                var router = new DisruptorLogPublisher(disruptor, ringBuffer, List.copyOf(appenders));
077                return router;
078        }
079
080        @Override
081        public void start(LogConfig config) {
082                disruptor.start();
083
084        }
085
086        DisruptorLogPublisher(Disruptor<LogEventCell> disruptor, RingBuffer<LogEventCell> ringBuffer,
087                        Iterable<? extends LogAppender> appenders) {
088                super();
089                this.disruptor = disruptor;
090                this.ringBuffer = ringBuffer;
091                this.appenders = appenders;
092        }
093
094        @Override
095        public void log(LogEvent event) {
096                long sequence = ringBuffer.next();
097                try {
098                        LogEventCell cell = ringBuffer.get(sequence);
099                        cell.event = event;
100                }
101                finally {
102                        ringBuffer.publish(sequence);
103                }
104
105        }
106
107        @Override
108        public void close() {
109                this.disruptor.halt();
110        }
111
112        @Override
113        public String toString() {
114                return super.toString() + "[appenders=" + this.appenders + "]";
115        }
116
117        private static class LogEventCell {
118
119                @Nullable
120                LogEvent event;
121
122        }
123
124        private static record LogEventHandler(LogAppender appender) implements EventHandler<LogEventCell> {
125
126                @Override
127                public void onEvent(LogEventCell event, long sequence, boolean endOfBatch) throws Exception {
128                        var logEvent = event.event;
129                        if (logEvent == null) {
130                                return;
131                        }
132                        appender.append(logEvent);
133                }
134
135        }
136
137        private record LogExceptionHandler(Runnable shutdownHook) implements ExceptionHandler<Object> {
138
139                @Override
140                public void handleEventException(Throwable ex, long sequence, Object event) {
141                        if (ex instanceof InterruptedException) {
142                                shutdownHook.run();
143                        }
144                        else {
145                                MetaLog.error(DisruptorLogPublisher.class, ex);
146                                throw new RuntimeException(ex);
147                        }
148                }
149
150                @Override
151                public void handleOnStartException(Throwable ex) {
152                        MetaLog.error(DisruptorLogPublisher.class, ex);
153                }
154
155                @Override
156                public void handleOnShutdownException(Throwable ex) {
157                        MetaLog.error(DisruptorLogPublisher.class, ex);
158                }
159
160        }
161
162}