package org.mule.test.http.functional;

import com.ning.http.client.AsyncCompletionHandlerBase;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Response;
import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider;
import io.qameta.allure.Story;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.collection.IsEmptyCollection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.tck.junit4.rule.DynamicPort;

@Story("Source overload handling")
/* loaded from: input_file:org/mule/test/http/functional/HttpListenerFlowBackPressureTestCase.class */
public class HttpListenerFlowBackPressureTestCase extends AbstractHttpTestCase {

    @Rule
    public DynamicPort listenPort = new DynamicPort("port");
    private static AtomicInteger numProcessedRequests;
    private static Latch keepProcessorsActive;
    private static CountDownLatch processedLatch;
    private static ConcurrentLinkedQueue<Throwable> accumulatedErrors;
    private AtomicInteger okResponses;
    private AtomicInteger overloadResponses;
    private CountDownLatch sentLatch;
    private CountDownLatch overloadResponseLatch;
    private CountDownLatch allResponseLatch;
    private AsyncHttpClient client;
    private ExecutorService executorService;
    private static int BUFFER_SIZE = 256;
    private static int OVERLOAD_COUNT = 44;

    /* loaded from: input_file:org/mule/test/http/functional/HttpListenerFlowBackPressureTestCase$BlocksMP.class */
    public static class BlocksMP implements Processor {
        public CoreEvent process(CoreEvent coreEvent) throws MuleException {
            try {
                HttpListenerFlowBackPressureTestCase.numProcessedRequests.incrementAndGet();
                HttpListenerFlowBackPressureTestCase.keepProcessorsActive.await();
                HttpListenerFlowBackPressureTestCase.processedLatch.countDown();
            } catch (Throwable th) {
                HttpListenerFlowBackPressureTestCase.accumulatedErrors.add(th);
            }
            return coreEvent;
        }
    }

    protected String getConfigFile() {
        return "http-listener-flow-backpressure-config.xml";
    }

    @Before
    public void setup() {
        keepProcessorsActive = new Latch();
        numProcessedRequests = new AtomicInteger(0);
        this.okResponses = new AtomicInteger(0);
        this.overloadResponses = new AtomicInteger(0);
        accumulatedErrors = new ConcurrentLinkedQueue<>();
        this.sentLatch = new CountDownLatch(BUFFER_SIZE + OVERLOAD_COUNT);
        this.overloadResponseLatch = new CountDownLatch(OVERLOAD_COUNT);
        this.allResponseLatch = new CountDownLatch(BUFFER_SIZE + OVERLOAD_COUNT);
        processedLatch = new CountDownLatch(BUFFER_SIZE);
        this.client = new AsyncHttpClient(new GrizzlyAsyncHttpProvider(new AsyncHttpClientConfig.Builder().build()));
        this.executorService = Executors.newFixedThreadPool(1);
    }

    @After
    public void tearDown() {
        this.executorService.shutdownNow();
        this.client.close();
    }

    @Test
    public void overloadScenario() throws Exception {
        String format = String.format("http://localhost:%s/", Integer.valueOf(this.listenPort.getNumber()));
        for (int i = 0; i < BUFFER_SIZE + OVERLOAD_COUNT; i++) {
            this.executorService.submit(() -> {
                executeRequestInAnotherThread(format);
            });
        }
        this.sentLatch.await();
        this.overloadResponseLatch.await();
        Assert.assertThat(Integer.valueOf(this.overloadResponses.get()), CoreMatchers.equalTo(Integer.valueOf(OVERLOAD_COUNT)));
        Assert.assertThat(Integer.valueOf(this.okResponses.get()), CoreMatchers.equalTo(0));
        keepProcessorsActive.release();
        processedLatch.await();
        this.allResponseLatch.await();
        Assert.assertThat(Integer.valueOf(this.overloadResponses.get()), CoreMatchers.equalTo(Integer.valueOf(OVERLOAD_COUNT)));
        Assert.assertThat(Integer.valueOf(this.okResponses.get()), CoreMatchers.equalTo(Integer.valueOf(BUFFER_SIZE)));
        Assert.assertThat(Integer.valueOf(numProcessedRequests.get()), CoreMatchers.equalTo(Integer.valueOf(BUFFER_SIZE)));
        Assert.assertThat(accumulatedErrors, IsEmptyCollection.empty());
    }

    private void executeRequestInAnotherThread(String str) {
        this.client.prepareGet(str).execute(new AsyncCompletionHandlerBase() { // from class: org.mule.test.http.functional.HttpListenerFlowBackPressureTestCase.1
            /* renamed from: onCompleted, reason: merged with bridge method [inline-methods] */
            public Response m9onCompleted(Response response) throws Exception {
                try {
                    int statusCode = response.getStatusCode();
                    if (statusCode == HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode()) {
                        HttpListenerFlowBackPressureTestCase.this.overloadResponses.incrementAndGet();
                        HttpListenerFlowBackPressureTestCase.this.overloadResponseLatch.countDown();
                        Assert.assertThat(response.getStatusText(), CoreMatchers.is(HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getReasonPhrase()));
                    } else if (statusCode == HttpConstants.HttpStatus.OK.getStatusCode()) {
                        HttpListenerFlowBackPressureTestCase.this.okResponses.incrementAndGet();
                        Assert.assertThat(response.getResponseBody(), CoreMatchers.is("the result"));
                    } else {
                        HttpListenerFlowBackPressureTestCase.accumulatedErrors.add(new AssertionError("request returned invalid status code: " + statusCode));
                    }
                } catch (Throwable th) {
                    HttpListenerFlowBackPressureTestCase.accumulatedErrors.add(th);
                } finally {
                    HttpListenerFlowBackPressureTestCase.this.allResponseLatch.countDown();
                }
                return response;
            }
        });
        this.sentLatch.countDown();
    }
}
