import type { QueryFunction } from "@tanstack/react-query";
import {
  castArray,
  findLastIndex,
  isEqual,
  pullAllWith,
  range,
  sortBy,
} from "lodash";
import type { ArrayOrSingle } from "ts-essentials";
import { getClients } from "../../../domain/datastores";
import type { RecordListResponse, Topic } from "../../../services/datastore";
import type { DataFilter, TimeRange } from "../types";

export function intervalToTimeRange(interval: [number, number]): TimeRange {
  const [startTimeMs, endTimeMs] = interval;

  return { startTimeMs, endTimeMs };
}

export function timeRangeToInterval(timeRange: TimeRange): [number, number] {
  return [timeRange.startTimeMs, timeRange.endTimeMs];
}

export function prepareDataFilter(
  dataFilter: ArrayOrSingle<DataFilter> | undefined
): string | undefined {
  if (dataFilter === undefined) {
    return undefined;
  }

  const dataFilterArray = castArray(dataFilter);

  if (dataFilterArray.length === 0) {
    return undefined;
  }

  return JSON.stringify(dataFilterArray);
}

export type TimestampPageResponse = Pick<
  RecordListResponse,
  "data" | "offset" | "limit"
>;

export function createTimestampPageQueryFn(
  timestamp: number,
  // Number of records prior to timestamp
  priorRecordsCount: number,
  topicId: Topic["id"],
  pageLimit: number,
  // Should already be a multiple of page limit
  pageOffset: number
): QueryFunction<TimestampPageResponse> {
  if (pageOffset + pageLimit <= priorRecordsCount) {
    return async function fetchDescendingRecords(context) {
      const { topicApi } = getClients();

      const response = await topicApi.listRecords(
        {
          topicId,
          timestampLte: timestamp,
          limit: pageLimit,
          offset: priorRecordsCount - (pageOffset + pageLimit),
          sort: "desc",
          order: "timestamp",
        },
        context
      );

      return {
        limit: pageLimit,
        offset: pageOffset,
        // Records must be sorted by ascending timestamp
        data: response.data.reverse(),
      };
    };
  }

  if (pageOffset >= priorRecordsCount) {
    return async function fetchAscendingRecords(context) {
      const { topicApi } = getClients();

      const response = await topicApi.listRecords(
        {
          topicId,
          timestampGt: timestamp,
          limit: pageLimit,
          offset: pageOffset - priorRecordsCount,
          sort: "asc",
          order: "timestamp",
        },
        context
      );

      return {
        limit: pageLimit,
        offset: pageOffset,
        data: response.data,
      };
    };
  }

  return async function fetchMixedDirectionRecords(context) {
    const { topicApi } = getClients();

    const [descendingResponse, ascendingResponse] = await Promise.all([
      // All records in this page less than or equal to the timestamp
      topicApi.listRecords(
        {
          topicId,
          timestampLte: timestamp,
          limit: priorRecordsCount - pageOffset,
          sort: "desc",
          order: "timestamp",
        },
        context
      ),
      // All records in this page strictly greater than the timestamp
      topicApi.listRecords(
        {
          topicId,
          timestampGt: timestamp,
          limit: pageOffset + pageLimit - priorRecordsCount,
          sort: "asc",
          order: "timestamp",
        },
        context
      ),
    ]);

    return {
      limit: pageLimit,
      offset: pageOffset,
      data: descendingResponse.data.reverse().concat(ascendingResponse.data),
    };
  };
}

export function findTimestampOffset(
  timestampLte: number,
  responses: ReadonlyArray<TimestampPageResponse | undefined>
): number | undefined {
  const definedResponses = responses.filter(
    (response): response is TimestampPageResponse => response !== undefined
  );
  if (definedResponses.length === 0) {
    return undefined;
  }

  const sortedResponses = sortBy(definedResponses, "offset");

  // Tracks the last loop iteration's response if all records in the response
  // occurred prior to the timestamp
  let previousPriorResponse: TimestampPageResponse | null = null;
  for (const response of sortedResponses) {
    const isFirstChunk = response.offset === 0;
    // The offset will always be 0 when the timestamp is less than or equal to
    // the earliest record in the topic
    if (isFirstChunk && timestampLte <= response.data[0].timestamp) {
      return 0;
    }

    // Find the last record <= timestamp. This is performing a similar operation
    // to what the back-end would be doing, but client-side with a limited chunk
    // of data. Any candidate offset !== -1 is actually 1 less than what the
    // back-end would return.
    const candidateOffset = findLastIndex(
      response.data,
      (record) => record.timestamp <= timestampLte
    );

    // Chunk didn't have any records <= timestamp.
    if (candidateOffset === -1) {
      // Edge case when offset falls on chunk boundary.
      // The timestamp was greater than all records in the previous chunk and
      // the chunk wasn't the last in the topic.
      if (
        previousPriorResponse !== null &&
        response.offset ===
          previousPriorResponse.offset + previousPriorResponse.limit
      ) {
        return response.offset;
      }

      previousPriorResponse = null;

      continue;
    }

    const isLastRecordInChunk = candidateOffset === response.data.length - 1;
    // This check misses the case where the number of records is a multiple of
    // the limit, as the number of records in the chunk would equal the limit
    // for the last topic, but that should be rare. Not all responses may
    // have useful counts though so it'll have to do.
    const isLastChunk = response.data.length < response.limit;

    // While a record was found, it was the last record in its chunk.
    // Unless this is the last chunk in the topic, it's possible a later
    // record exists in a different chunk which would be the actual result
    // returned by the back-end.
    if (isLastRecordInChunk && !isLastChunk) {
      previousPriorResponse = response;

      continue;
    }

    // A record existed whose offset in the topic is the same as what the
    // back-end would've returned, so no request is necessary.
    return response.offset + candidateOffset + 1;
  }

  return undefined;
}

/**
 * Calculate a panel's viewing window within the player bounds using the
 * current timestamp and a desired window size.
 *
 * A window is best understood in terms of the chart visualizations. Rather
 * than displaying a chart spanning the entire topic, the user can view a
 * "window" which shows a narrower time range based around the current
 * timestamp. The following explains the relationship between a window and the
 * topic it's derived from:
 *
 *                     +---------+
 *                     |--       |
 *          Window --> |  \      |
 *                     |   ------|
 *                     +---------+
 *      10 seconds --> [---------]
 * +--------------------------------------+
 * |  /\    --------------                |
 * | /  \  /              \               |
 * |/    \/                ---------------|
 * +--------------------------------------+
 * 0:00                     0:45          1:00
 *                         Current
 *                        timestamp
 * [--------------------------------------]
 *        Player bounds (60 seconds)
 *
 * The window spans 10 seconds and is centered around the current timestamp
 * (0:45). The window is showing a slice of the chart in the time range
 * [0:35, 0:55]. If the current timestamp were incremented to 0:46, the window
 * would then show only the slice of the chart in the range [0:36, 0:56].
 *
 * When the player is playing, the constantly-changing timestamp will give the
 * appearance of the window sliding along. However, the window will always be
 * contained entirely within the player bounds. The window essentially stops
 * "sliding" once its end time reaches the player's end time. Though the
 * timestamp can continue increasing until it too reaches the player's end time,
 * the window will remain static and, importantly, still honor the provided
 * window size. To use the example above, if the current timestamp were 0:59,
 * the calculated window would be [0:50, 1:00]; the window would no longer be
 * centered around the current timestamp as doing so would overflow the player
 * bounds.
 *
 * The only situation in which the calculated window's size is less than the
 * desired window size is if the player's duration is less than the window size.
 */
export function calculateRecordWindow(
  windowSizeMs: number,
  timestampMs: number,
  bounds: TimeRange
): TimeRange {
  if (windowSizeMs >= bounds.endTimeMs - bounds.startTimeMs) {
    // Window can only equal the player bounds
    return bounds;
  }

  const halfWindowSizeMs = windowSizeMs / 2;

  if (timestampMs <= bounds.startTimeMs + halfWindowSizeMs) {
    // Centering the window around the timestamp would cause the lower half
    // of the window to overflow the player's starting bound
    return {
      startTimeMs: bounds.startTimeMs,
      endTimeMs: bounds.startTimeMs + windowSizeMs,
    };
  }

  if (timestampMs >= bounds.endTimeMs - halfWindowSizeMs) {
    // Centering the window around the timestamp would cause the upper half
    // of the window to overflow the player's ending bound
    return {
      startTimeMs: bounds.endTimeMs - windowSizeMs,
      endTimeMs: bounds.endTimeMs,
    };
  }

  // The window is centered around the current timestamp. There's no danger here
  // of either window bound overflowing the player's bounds since the conditions
  // above guarantee the difference between the timestamp and both player
  // bounds is > `halfWindowSizeMs`
  return {
    startTimeMs: timestampMs - halfWindowSizeMs,
    endTimeMs: timestampMs + halfWindowSizeMs,
  };
}

export interface CalculateWindowChunksParams {
  chunkSizeMs: number;
  bufferAheadMs: number;
  bufferBehindMs: number;
  window: TimeRange | undefined;
  playerBounds: TimeRange | undefined;
}

export interface WindowChunks {
  required: TimeRange[];
  bufferAhead: TimeRange[];
  bufferBehind: TimeRange[];
}

/**
 * Calculate the chunks required to display the record window as well as chunks
 * to be buffered prior to and after the window.
 *
 * For a given window within the player's bounds, three categories of chunks
 * need to be calculated:
 *
 * 1. Required chunks - Chunks needed to satisfy the range of records defined
 *    by the window
 * 2. Buffer-ahead chunks - Chunks after the window that will likely be needed
 *    soon and should therefore be buffered
 * 3. Buffer-behind chunks - Chunks prior to the window that will likely be
 *    needed if the user seeks backwards and should therefore be buffered
 *
 * A given chunk can only belong to a single category.
 *
 * The three categories also have relative priorities based on which should be
 * fetched first:
 *
 * 1. Required chunks - Needed immediately
 * 2. Buffer-ahead chunks - Needed soon but not immediately
 * 3. Buffer-behind chunks - Lowest priority. Since automatic playback is
 *    typical (i.e. the window is moving forward), these won't be needed unless
 *    the user manually seeks backwards.
 *
 * Note the priorities above only describe the fetching priority assuming some
 * or none of the chunks have been fetched. Once fetched, chunks in all three
 * categories should be buffered in memory.
 *
 * Buffering behind will be truncated at the player's start bound. For example,
 * in the following diagram any amount of time to buffer behind the window will
 * be ignored since the window starts at the player's start bound.
 *
 *    Window
 * [----------]
 * |-----|-----|-----|-----|-----|-----|-----|
 * 0     5     10    15    20    25    30    35
 *
 *
 * Buffering ahead, however, will "wrap around" the timeline if the given
 * amount of time to buffer is truncated. Consider the diagram below:
 *
 *                              Window
 * +*----->                  [----------]-----*-+
 * ||-----|-----|-----|-----|-----|-----|-----| |
 * |0     5     10    15    20    25    30    35|
 * +--------------------------------------------+
 *
 * The window ends at 30 seconds and there's a buffer-ahead time of 10 seconds.
 * Because there's only 5 seconds between the end of the window and the
 * player's end bound, 5 seconds of buffer-ahead time would have otherwise been
 * truncated. Those remaining 5 seconds essentially "wrap around" the player
 * bounds and are used to buffer more chunks from the start of the player
 * bounds onwards.
 *
 * This is desirable since a user can replay a log once they reach the end.
 * Though the chunks are chronologically prior to the current window, they are
 * logically the next chunks the user would see. So it may be more accurate to
 * think of buffer-ahead chunks as "the chunks the user will likely need
 * next" and buffer-behind chunks as "the chunks the user likely just saw."
 *
 * There is the possibility of overlaps between chunk categories using this
 * buffering strategy. The buffer-ahead time could wrap around and overlap with
 * the buffer-behind time. The buffer-ahead time could even wrap around and
 * overlap the window's time range if the log is small enough and the
 * buffer-ahead time large enough.
 *
 * Additionally, because chunk boundaries are always multiples of the chunk
 * size {@link calculateRangeChunks (see `calculateRangeChunks`)}, a chunk
 * could be calculated to belong to multiple categories if the window bounds
 * aren't lined up with chunk boundaries.
 *
 * As a chunk can only belong to a single category, in these cases a chunk
 * will belong to the highest-priority category in which it's found.
 * For example:
 *
 *                         Window     BA
 *                      [----------]------>
 * |-----|-----|-----|-----|-----|-----|-----|
 * 0     5     10    15    20    25    30    35
 *
 * The chunk [25, 30) would be calculated to belong to both the required and
 * buffer-ahead chunks. However, since the required chunks have a higher
 * priority than buffer-ahead chunks, [25, 30) will **only** belong to the
 * required chunks.
 */
export function calculateWindowChunks({
  chunkSizeMs,
  bufferAheadMs,
  bufferBehindMs,
  window,
  playerBounds,
}: CalculateWindowChunksParams): WindowChunks {
  if (window === undefined || playerBounds === undefined) {
    return {
      required: [],
      bufferAhead: [],
      bufferBehind: [],
    };
  }

  const requiredChunks = calculateRangeChunks(window, chunkSizeMs);

  const bufferAheadEndTimeMs = window.endTimeMs + bufferAheadMs;
  const bufferAheadRange: TimeRange = {
    startTimeMs: window.endTimeMs,
    endTimeMs: Math.min(playerBounds.endTimeMs, bufferAheadEndTimeMs),
  };
  const bufferAheadChunks = calculateRangeChunks(bufferAheadRange, chunkSizeMs);

  const wasBufferAheadTruncated = bufferAheadEndTimeMs > playerBounds.endTimeMs;
  if (wasBufferAheadTruncated) {
    const truncatedAmountMs = bufferAheadEndTimeMs - playerBounds.endTimeMs;
    const wrappedBufferAheadRange: TimeRange = {
      startTimeMs: playerBounds.startTimeMs,
      endTimeMs: Math.min(
        window.startTimeMs,
        playerBounds.startTimeMs + truncatedAmountMs
      ),
    };
    bufferAheadChunks.push(
      ...calculateRangeChunks(wrappedBufferAheadRange, chunkSizeMs)
    );
  }

  // Remove chunks from the buffer-ahead chunks that are also in the
  // required chunks
  pullAllWith(bufferAheadChunks, requiredChunks, isEqual);

  const bufferBehindStartTimeMs = window.startTimeMs - bufferBehindMs;
  const bufferBehindRange: TimeRange = {
    startTimeMs: Math.max(playerBounds.startTimeMs, bufferBehindStartTimeMs),
    endTimeMs: window.startTimeMs,
  };
  const bufferBehindChunks = calculateRangeChunks(
    bufferBehindRange,
    chunkSizeMs
  );

  // Remove chunks from the buffer-behind chunks that are also in the
  // required or buffer-ahead chunks
  pullAllWith(bufferBehindChunks, requiredChunks, isEqual);
  pullAllWith(bufferBehindChunks, bufferAheadChunks, isEqual);

  return {
    required: requiredChunks,
    bufferAhead: bufferAheadChunks,
    bufferBehind: bufferBehindChunks,
  };
}

/**
 * Calculate the minimum sequence of chunks to fetch for the given time range.
 *
 * A "chunk" is a time range within which a single record list request should
 * be made for a given topic's records. A chunk has the following properties:
 * - Its endpoints are represented as UTC timestamps in milliseconds.
 * - Its start time is inclusive while its end time is exclusive.
 * - Its size is the distance between its start and end points
 *
 * A topic's chunks collectively have the following properties:
 * - They all have the same size
 * - They do not overlap
 * - There are no timestamps in the player's bounds not represented by a chunk
 * - A given record can only belong to a single chunk. A record belongs to a
 *   chunk if and only if its timestamp is within the chunk's bounds.
 *
 * The following timeline visualizes a topic's chunks:
 *
 *       [-----)
 * |-----|-----|-----|-----|-----|-----|-----|
 * 0     5     10    15    20    25    30    35
 *
 * Each chunk has a size of 5 seconds. A single chunk is adjacent to but not
 * overlapping its neighbors. The second chunk in the topic - defined by the
 * interval [5, 10) - would contain any records whose timestamp `t` satisfies
 * the relationship `5 <= t < 10`.
 *
 * The most important property of all chunks is that their bounds are calculated
 * deterministically. A chunk's bounds are always a multiple of the chunk size,
 * so given an arbitrary timestamp, calculating the chunk containing it just
 * means flooring that timestamp to the nearest multiple of the chunk size.
 *
 * To effectively fetch and cache a topic's records, it's critical a set of
 * records can be deterministically identified. The chunking algorithm solves
 * this. Given an arbitrary time range, this utility calculates the sequence
 * of chunks which fully contain it. Consider the diagram above, this time with
 * a time range defined by the interval [13, 23]:
 *
 *                  Time Range
 *                 [----------]
 * |-----|-----|-----|-----|-----|-----|-----|
 * 0     5     10    15    20    25    30    35
 *
 * The range overlaps with 3 chunks, starting at chunk [10, 15) and ending
 * with chunk [20, 25). Consequently, to display all records within the range
 * would mean fetching those chunks. However, once a chunk's records are
 * fetched, they can be cached using the chunk's bounds as an identifier since
 * those bounds are always deterministically calculated.
 */
export function calculateRangeChunks(
  timeRange: TimeRange,
  chunkSizeMs: number
): TimeRange[] {
  // To support stable cache entries, chunks need to be deterministically
  // calculated regardless of the range from which the chunks are being
  // calculated. In practice, this means chunk boundaries must always be a
  // multiple of the chunk size.
  const earliestChunkStartTimeMs =
    Math.floor(timeRange.startTimeMs / chunkSizeMs) * chunkSizeMs;

  // Calculate the minimum number of sequential chunks needed to fully contain
  // the time range
  const numChunks = Math.ceil(
    (timeRange.endTimeMs - earliestChunkStartTimeMs) / chunkSizeMs
  );

  return range(numChunks).map((chunkIndex) => {
    const chunkOffsetMs = chunkSizeMs * chunkIndex;
    const chunkStartBoundMs = earliestChunkStartTimeMs + chunkOffsetMs;

    return {
      startTimeMs: chunkStartBoundMs,
      endTimeMs: chunkStartBoundMs + chunkSizeMs,
    };
  });
}
