package com.dx168.quote.core;

import android.content.Context;
import android.util.Log;
import com.baidao.quotation.Category;
import com.baidao.quotation.CategoryNotice;
import com.baidao.quotation.Quote;
import com.baidao.quotation.QuoteMessageUtil;
import com.baidao.quotation.Snapshot;
import com.dx168.framework.dxrpc.RetryUntilSuccess;
import com.google.gson.Gson;
import com.networkbench.agent.impl.instrumentation.NBSGsonInstrumentation;
import com.networkbench.agent.impl.instrumentation.NBSInstrumented;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

/* JADX INFO: Access modifiers changed from: package-private */
@NBSInstrumented
/* loaded from: classes.dex */
public class QuoteSocket {
    private static final String TAG = QuoteSocket.class.getSimpleName();
    private final int clientType;
    private final String clientVersion;
    private final Context context;
    private volatile Socket keepAliveSocket;
    private OnQuoteListener listener;
    private Thread readThread;
    private List<Category> subscribedCategoryList;
    private Timer timer;
    private Thread writeThread;
    private Object timerLock = new Object();
    private Object socketLock = new Object();
    private boolean heartBeatReceived = true;
    private Queue<byte[]> writeQueue = new ArrayBlockingQueue(512, true);
    private Runnable write = new Runnable() { // from class: com.dx168.quote.core.QuoteSocket.4
        @Override // java.lang.Runnable
        public void run() {
            byte[] bArr;
            while (QuoteSocket.this.isRunning) {
                if (QuoteSocket.this.keepAliveSocket == null || QuoteSocket.this.keepAliveSocket.isClosed()) {
                    try {
                        synchronized (QuoteSocket.this.writeQueue) {
                            QuoteSocket.this.writeQueue.wait(5000L);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    synchronized (QuoteSocket.this.writeQueue) {
                        bArr = (byte[]) QuoteSocket.this.writeQueue.poll();
                    }
                    if (QuoteSocket.this.keepAliveSocket == null || bArr == null || bArr.length <= 0) {
                        synchronized (QuoteSocket.this.writeQueue) {
                            try {
                                QuoteSocket.this.writeQueue.wait(5000L);
                            } catch (InterruptedException e2) {
                                e2.printStackTrace();
                            }
                        }
                    } else {
                        try {
                            QuoteSocket.this.keepAliveSocket.getOutputStream().write(bArr);
                        } catch (IOException e3) {
                            e3.printStackTrace();
                        }
                    }
                }
            }
        }
    };
    private boolean isRunning = true;
    final byte[] data = new byte[2048];
    private Runnable read = new Runnable() { // from class: com.dx168.quote.core.QuoteSocket.5
        @Override // java.lang.Runnable
        public void run() {
            while (QuoteSocket.this.isRunning) {
                try {
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
                if (QuoteSocket.this.keepAliveSocket == null || QuoteSocket.this.keepAliveSocket.isClosed()) {
                    try {
                        synchronized (QuoteSocket.this.writeQueue) {
                            QuoteSocket.this.writeQueue.wait(5000L);
                        }
                    } catch (InterruptedException e3) {
                        e3.printStackTrace();
                    }
                } else {
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    int i = 0;
                    while (QuoteSocket.this.keepAliveSocket != null && !QuoteSocket.this.keepAliveSocket.isClosed()) {
                        int read = QuoteSocket.this.keepAliveSocket.getInputStream().read(QuoteSocket.this.data);
                        if (read == -1) {
                            try {
                                Thread.sleep(1000L);
                            } catch (Exception e4) {
                                e4.printStackTrace();
                            }
                        }
                        byteArrayOutputStream.write(QuoteSocket.this.data, 0, read);
                        if (i == 0) {
                            i = (QuoteSocket.this.data[2] & 255) + ((QuoteSocket.this.data[3] & 255) * 256) + 4;
                        }
                        if (i > read) {
                            i -= read;
                        } else {
                            while (i <= read) {
                                int size = (byteArrayOutputStream.size() - 4) - (read - i);
                                if (size > 0) {
                                    byte[] bArr = new byte[size];
                                    System.arraycopy(byteArrayOutputStream.toByteArray(), 4, bArr, 0, size);
                                    if (QuoteMessageUtil.getInstance().isHeartbeat(bArr, bArr.length)) {
                                        QuoteSocket.this.heartBeatReceived = true;
                                    } else if (QuoteMessageUtil.getInstance().isCategoryNotice(bArr, bArr.length)) {
                                        QuoteSocket.this.handleCategoryNotice(QuoteMessageUtil.getInstance().parseCategoryNotice(bArr, bArr.length));
                                    } else {
                                        Snapshot parseSnapshot = QuoteMessageUtil.getInstance().parseSnapshot(bArr, bArr.length);
                                        if (parseSnapshot != null) {
                                            QuoteSocket.this.handleSnapshot(parseSnapshot, QuoteSocket.this.listener);
                                        }
                                    }
                                    ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
                                    byteArrayOutputStream2.write(byteArrayOutputStream.toByteArray(), size + 4, (byteArrayOutputStream.size() - size) - 4);
                                    byteArrayOutputStream = byteArrayOutputStream2;
                                    read -= i;
                                    if (byteArrayOutputStream.size() > 0) {
                                        byte[] byteArray = byteArrayOutputStream.toByteArray();
                                        if (byteArray.length < 4) {
                                            QuoteSocket.this.resubscribe();
                                        } else {
                                            i = (byteArray[2] & 255) + ((byteArray[3] & 255) * 256) + 4;
                                        }
                                    }
                                }
                            }
                            i = read == 0 ? 0 : i - read;
                        }
                    }
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e5) {
                        e5.printStackTrace();
                    }
                }
            }
        }
    };

    public QuoteSocket(Context context, int i, String str) {
        this.clientType = i;
        this.context = context.getApplicationContext();
        this.clientVersion = str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addSubscription(Collection<Category> collection) {
        synchronized (this.writeQueue) {
            this.writeQueue.clear();
            Iterator<Category> it = collection.iterator();
            while (it.hasNext()) {
                this.writeQueue.add(QuoteMessageUtil.getInstance().getSubscription(it.next().id));
            }
            this.writeQueue.notifyAll();
        }
    }

    private void closeLongSocket() {
        try {
            if (this.keepAliveSocket != null) {
                this.keepAliveSocket.close();
                this.keepAliveSocket = null;
            }
        } catch (IOException e) {
            Log.e(TAG, "closeLongSocket error", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleCategoryNotice(CategoryNotice categoryNotice) {
        if (categoryNotice == null) {
            return;
        }
        String str = TAG;
        StringBuilder append = new StringBuilder().append("===handleCategoryNotice: ");
        Gson gson = new Gson();
        Log.d(str, append.append(!(gson instanceof Gson) ? gson.toJson(categoryNotice) : NBSGsonInstrumentation.toJson(gson, categoryNotice)).toString());
        CategoryHelper.updateCategory(this.context, categoryNotice);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleSnapshot(Snapshot snapshot, OnQuoteListener onQuoteListener) {
        Category categoryById = CategoryHelper.getCategoryById(this.context, snapshot.getSid());
        if (categoryById == null) {
            return;
        }
        Quote orCreateSnapshotById = CategoryHelper.getOrCreateSnapshotById(this.context, categoryById.id);
        if (orCreateSnapshotById == null) {
            orCreateSnapshotById = Quote.build(categoryById);
        } else {
            orCreateSnapshotById.update(categoryById);
        }
        orCreateSnapshotById.update(snapshot);
        if (orCreateSnapshotById.isValidQuote()) {
            onQuoteListener.onNewQuote(orCreateSnapshotById);
            CategoryHelper.updateSnapshot(orCreateSnapshotById);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resubscribe() {
        if (this.keepAliveSocket != null) {
            try {
                this.keepAliveSocket.close();
                Log.d("tag", ">>>>>resubscribe, close keep alive socket");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (this.subscribedCategoryList == null || this.subscribedCategoryList.isEmpty()) {
            return;
        }
        subscribe(this.subscribedCategoryList, this.listener);
    }

    private void startHeartbeat() {
        synchronized (this.timerLock) {
            if (this.timer != null) {
                return;
            }
            this.timer = new Timer();
            this.timer.schedule(new TimerTask() { // from class: com.dx168.quote.core.QuoteSocket.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    if (!QuoteSocket.this.heartBeatReceived) {
                        Log.d("tag", ">>>>>timeout, resubscribe");
                        QuoteSocket.this.resubscribe();
                        QuoteSocket.this.heartBeatReceived = true;
                    } else if (QuoteSocket.this.keepAliveSocket != null) {
                        synchronized (QuoteSocket.this.writeQueue) {
                            QuoteSocket.this.writeQueue.add(QuoteMessageUtil.getInstance().getHeartbeatBuffer());
                            QuoteSocket.this.writeQueue.notifyAll();
                            Log.d("tag", ">>>>>add time buffer to queue");
                        }
                        QuoteSocket.this.heartBeatReceived = false;
                    }
                }
            }, 5000L, 5000L);
        }
    }

    private void startReadThread() {
        if (this.readThread != null && this.readThread.isAlive()) {
            Log.i(TAG, "-----readThread is alive");
            return;
        }
        this.readThread = new Thread(this.read);
        this.readThread.start();
        Log.i(TAG, "-----start a new readThread");
    }

    private void startWriteThread() {
        if (this.writeThread != null && this.writeThread.isAlive()) {
            Log.i(TAG, "-----writeThread is alive");
            return;
        }
        this.writeThread = new Thread(this.write);
        this.writeThread.start();
        Log.i(TAG, "-----start a new writeThread");
    }

    private void stopHeartbeat() {
        synchronized (this.timerLock) {
            if (this.timer != null) {
                this.timer.cancel();
                this.timer = null;
            }
        }
    }

    public Observable<Socket> connect() {
        return getQuoteServer().flatMap(new Func1<QuoteServer, Observable<Socket>>() { // from class: com.dx168.quote.core.QuoteSocket.2
            @Override // rx.functions.Func1
            public Observable<Socket> call(final QuoteServer quoteServer) {
                return Observable.create(new Observable.OnSubscribe<Socket>() { // from class: com.dx168.quote.core.QuoteSocket.2.1
                    @Override // rx.functions.Action1
                    public void call(Subscriber<? super Socket> subscriber) {
                        if (QuoteSocket.this.keepAliveSocket == null || QuoteSocket.this.keepAliveSocket.isClosed()) {
                            synchronized (QuoteSocket.this.socketLock) {
                                if (QuoteSocket.this.keepAliveSocket == null || QuoteSocket.this.keepAliveSocket.isClosed()) {
                                    try {
                                        QuoteSocket.this.keepAliveSocket = new Socket(quoteServer.ip, quoteServer.port);
                                        QuoteSocket.this.keepAliveSocket.setSoTimeout(10000);
                                    } catch (IOException e) {
                                        subscriber.onError(new ConnectException(e.getMessage()));
                                    }
                                }
                            }
                            QuoteSocket.this.addSubscription(QuoteSocket.this.subscribedCategoryList);
                        } else {
                            QuoteSocket.this.addSubscription(QuoteSocket.this.subscribedCategoryList);
                        }
                        subscriber.onNext(QuoteSocket.this.keepAliveSocket);
                    }
                });
            }
        });
    }

    public Observable<QuoteServer> getQuoteServer() {
        return QuoteAPI.getInstance().getQuoteServer(this.context, this.clientType, this.clientVersion).retryWhen(new RetryUntilSuccess());
    }

    public void start() {
        this.isRunning = true;
        startWriteThread();
        startReadThread();
        startHeartbeat();
    }

    public void stop() {
        this.isRunning = false;
        stopHeartbeat();
        synchronized (this.writeQueue) {
            this.writeQueue.clear();
            this.writeQueue.notifyAll();
        }
        closeLongSocket();
    }

    public void subscribe(List<Category> list, OnQuoteListener onQuoteListener) {
        this.subscribedCategoryList = list;
        this.listener = onQuoteListener;
        this.writeQueue.clear();
        startWriteThread();
        startReadThread();
        startHeartbeat();
        connect().observeOn(Schedulers.io()).subscribeOn(Schedulers.io()).retryWhen(new RetryUntilSuccess()).subscribe((Subscriber<? super Socket>) new Subscriber<Socket>() { // from class: com.dx168.quote.core.QuoteSocket.3
            @Override // rx.Observer
            public void onCompleted() {
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                Logger.d(QuoteSocket.TAG, "socket connect err: " + th.getMessage() + " retry");
            }

            @Override // rx.Observer
            public void onNext(Socket socket) {
                Logger.d(QuoteSocket.TAG, "socket is connected");
            }
        });
    }

    public void unsubscribe() {
        if (this.keepAliveSocket == null || this.keepAliveSocket.isClosed() || this.subscribedCategoryList == null || this.subscribedCategoryList.size() <= 0) {
            return;
        }
        synchronized (this.writeQueue) {
            this.writeQueue.clear();
            Iterator<Category> it = this.subscribedCategoryList.iterator();
            while (it.hasNext()) {
                this.writeQueue.add(QuoteMessageUtil.getInstance().getUnsubscription(it.next().id));
            }
            this.writeQueue.notifyAll();
            Log.d("tag", ">>>>>>>>> unsubscribe, notify writequeue");
        }
    }
}
