import { from, of, defer, Observable, combineLatest } from "rxjs";
import { map, first, concatMap, skipWhile, switchMap } from "rxjs/operators";
import Pusher from "pusher-js";
import { fetchAsObservable } from "./fetcher";
import canopyUrls from "canopy-urls!sofe";
import auth from "cp-client-auth!sofe";
import { onlineListener } from "online-listener";
import { isEmpty } from "lodash";
import { catchError } from "auto-trace";

const PUSHER_KEY =
  canopyUrls.getEnvironment() === canopyUrls.PRODUCTION_ENVIRONMENT
    ? "98ba77c3dc4178f50c4b"
    : "b41431fba10735c5209f";

const authorizeMe = () => {
  return {
    customHandler: ({ socketId, channelName }, callback) => {
      const handleError = catchError();
      fetchAsObservable(
        `${canopyUrls.getAPIUrl()}/pusher-authenticate?socketId=${socketId}&channel=${channelName}`
      ).subscribe(
        (authData) => {
          try {
            callback(null, authData);
          } catch (error) {
            callback(handleError(error), error);
          }
        },
        (e) => {
          callback(handleError(e), e);
        }
      );
    },
  };
};

let error,
  pusher,
  channels = {};

export function onPusher(type, channelName) {
  return combineLatest([onlineListener]).pipe(
    skipWhile((online) => !online),
    switchMap(() => {
      return from([channelName])
        .pipe(
          concatMap((channelName) =>
            channelName
              ? of(channelName)
              : auth
                  .getLoggedInUserAsObservable()
                  .pipe(first())
                  .pipe(map((loggedInUser) => `private-${loggedInUser.id}`))
          )
        )
        .pipe(
          concatMap((channelName) => {
            if (!pusher) {
              pusher = new Pusher(PUSHER_KEY, {
                cluster: "mt1",
                channelAuthorization: authorizeMe(),
                userAuthentication: authorizeMe(),
              });
            } else if (pusher.connection.state === "disconnected") {
              pusher.connect();
            }

            let channel;
            if (channels[channelName]) {
              channel = channels[channelName];
            } else {
              channel = pusher.subscribe(channelName);
              channels[channelName] = channel;
            }

            return defer(() => {
              return new Observable((observer) => {
                if (error) {
                  processError(error);
                }

                channel.bind(type, processPush);
                channel.bind("pusher:subscription_error", processError);
                channels[channelName].count
                  ? channels[channelName].count++
                  : (channels[channelName].count = 1);

                function processError(e) {
                  error = e;
                  setTimeout(() => observer.error(error), 1000);
                }

                function processPush(data) {
                  observer.next(data);
                }

                return () => {
                  channel.unbind(type, processPush);
                  channel.unbind("pusher:subscription_error", processError);
                  channels[channelName].count--;
                  if (channels[channelName].count === 0) {
                    pusher.unsubscribe(channelName);
                    delete channels[channelName];
                    if (isEmpty(channels)) {
                      pusher.disconnect();
                    }
                  }
                };
              });
            });
          })
        );
    })
  );
}
