Skip to main content
  1. References/
  2. Algo Reference/
  3. Canonicals/

Topic 10: Heap / Priority Queue

··4638 words·22 mins

Key-Intuition: Heaps efficiently maintain order-statistics in dynamic datasets, great for priority-based retrieval and merging sorted sequences.

We can expect them to be especially useful when used as aux datastructures, in conjunction with other topic areas.

Canonical Question Types #

Top-K and Median Finding #

  • Objective
    • Efficiently find the k-th largest, smallest, or median elements from streaming data or static arrays.

Problems #

  1. Kth Largest Element (215)

    Oh wait we can do this via heapq, just have to use the api correctly else will be too slow. Since we need kth largest, we can effectively keep a fixed capacity in our heap by checking the size of it and if it’s already k elements large, we can do heapreplace instead of heappush. Other attempts to manually push and pop from the heap manually will have ridiculous performance overheads.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
        import heapq
        class Solution:
            def findKthLargest(self, nums, k):
                heap = []
                for num in nums:
                    if len(heap) < k:
                        heapq.heappush(heap, num)
                    else:
                        if num > heap[0]:
                            heapq.heapreplace(heap, num)
    
                return heap[0]
    

    Alternatively, for this question, we can use the quickselect algo, though that is more manual.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    
        import random
        from typing import List
    
        class Solution:
            def findKthLargest(self, nums: List[int], k: int) -> int:
                def quickselect(left, right, target):
                    if left == right:
                        return nums[left]
                    pivot_index = random.randint(left, right)
                    # Partition around pivot and return final pivot position
                    new_pivot = partition(left, right, pivot_index)
                    if new_pivot == target:
                        return nums[new_pivot]
                    elif new_pivot < target:
                        return quickselect(new_pivot + 1, right, target)
                    else:
                        return quickselect(left, new_pivot - 1, target)
    
    
                def partition(left, right, pivot_index):
                    """
                    Returns the pivot idx after paritioning left and right segments based on the element values.
    
                    The property it maintains thereafter is the all to the right of pivot are STRICTLY MORE THAN pivot.
    
                    All the left of hte pivot are STRICTLY LESS THAN pivot.
                    """
                    pivot_value = nums[pivot_index]
                    nums[pivot_index], nums[right] = nums[right], nums[pivot_index]
                    store_index = left
                    for i in range(left, right):
                        if nums[i] < pivot_value:
                            nums[store_index], nums[i] = nums[i], nums[store_index]
                            store_index += 1
                    # move pivot to its final place
                    nums[store_index], nums[right] = nums[right], nums[store_index]
    
                    return store_index
    
                n = len(nums)
                target = n - k  # index of k-th largest in sorted order
    
                return quickselect(0, n - 1, target)
    
  2. Kth Largest Element in a Stream (703)

    the only thing that matters here is that we keep a fixed capacity heap “manually” and it’s good enough for us. There’s no need to negate this because we let the structure handle the order, don’t short circuit that part.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
        import heapq
        class KthLargest:
            def __init__(self, k: int, nums: list[int]):
                self.k = k
                self.heap = nums
                heapq.heapify(self.heap)
                # prune it to only keep top k elements
                while len(self.heap) > self.k:
                    heapq.heappop(self.heap)
    
            def add(self, val: int) -> int:
                heapq.heappush(self.heap, val)
                if len(self.heap) > self.k:
                    heapq.heappop(self.heap)
                return self.heap[0]
    

    Here I use heapreplace to make the fixed capacity handling a little more idiomatic:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    
       import heapq
       class KthLargest:
           def __init__(self, k, nums):
               self.k = k
               self.heap = nums
               heapq.heapify(self.heap)
               while len(self.heap) > k:
                   heapq.heappop(self.heap)
    
           def add(self, val):
               if len(self.heap) < self.k:
                   heapq.heappush(self.heap, val)
               else:
                   # Only pushpop if val is bigger, otherwise kth largest does not change
                   if val > self.heap[0]:
                       heapq.heapreplace(self.heap, val)
               return self.heap[0]
    
  3. ⭐️ Find Median from Data Stream (295)

    Streams usually we can consider using a balancing approach a max and a min heap (to get the median).

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    
        import heapq
    
        class MedianFinder:
            def __init__(self):
                self.small = [] # Max heap (as negative numbers)
                self.large = [] # Min heap
    
            def addNum(self, num: int) -> None:
                # Python heapq is minheap, so push negative for maxheap
                heapq.heappush(self.small, -num)
                # All numbers in small must be <= numbers in large, so we take the largest number and shift it to the large heap
                heapq.heappush(self.large, -heapq.heappop(self.small))
                # Balance: small can be larger by 1, so we rebalance if large is greater than small
                if len(self.large) > len(self.small):
                    heapq.heappush(self.small, -heapq.heappop(self.large))
    
            def findMedian(self) -> float:
                # odd numbers encountered:
                if len(self.small) > len(self.large):
                    return -self.small[0]
    
                # even numbers encountered:
                return (self.large[0] - self.small[0]) / 2.0
    

    Suppose we know that the range is a bounded range, then we have the option to just keep track of the median position based on the bounded range.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    
        class MedianFinder:
            def __init__(self):
                self.freq = [0] * 101           # freq[i]: count of number i in [0,100]
                self.count = 0                  # total number of inserted values
    
            def addNum(self, num: int) -> None:
                self.freq[num] += 1
                self.count += 1
    
            def findMedian(self) -> float:
                total = self.count
    
                # Find the median index(es) (0-based in sorted stream)
                if total % 2 == 1:
                    median_pos1 = median_pos2 = total // 2
                else:
                    median_pos1 = total // 2 - 1
                    median_pos2 = total // 2
    
                # helps us account for the even cases where we'll have 2 values
                result = []
                running_count = 0
                for value in range(101):  # Loop over all possible values
                    running_count += self.freq[value]
                    # Add value to result if we cross the median positions
                    if len(result) == 0 and running_count > median_pos1:
                        result.append(value)
                    if len(result) == 1 and running_count > median_pos2:
                        result.append(value)
                        break
    
                return sum(result) / len(result)
    
  4. K Closest Points to Origin (973)

    It’s a euclidean distance shortcut where we don’t do the sqrt-ing. We judge using a valid proxy metric that is easier to calculate.

    Also, it’s so much simpler to just use the heapq.nsmallest() api directly for this question.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
        import heapq
        class Solution:
            def kClosest(self, points: list[list[int]], k: int) -> list[list[int]]:
                get_euclidean_dist = lambda p: p[0]**2 + p[1]**2
    
                return heapq.nsmallest(k, points, key=get_euclidean_dist)
    
        # just for demo, here's a more handwritten one:
        import heapq
        class Solution:
            def kClosest(self, points: list[list[int]], k: int) -> list[list[int]]:
                # Maintain a max-heap of size k (heap stores -(distance, point))
                heap = []
                for p in points:
                    dist = -(p[0] ** 2 + p[1] ** 2)
                    if len(heap) < k:
                        heapq.heappush(heap, (dist, p))
                    else:
                        heapq.heappushpop(heap, (dist, p))
                return [item[1] for item in heap]
    

Merging Sorted Lists or Arrays #

  • Objective
    • Merge multiple sorted sequences or intervals efficiently using min-heaps.

Problems #

  1. Merge K Sorted Lists (23)

    here, the heapq is used as an aux ds to help make it easier to choose which is the next provider (linked list) to take from. Here’s both the solutions.

    Same concept as 2 sorted lists, just maintain k providers. We have two solutions, one that is space-optimal \(O(N / logk)\) time \(O(1)\) space and one that is not \(O(N / logk)\) time \(O(k)\) space .

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
        class Solution:
            def mergeKLists(self, lists: List[Optional[ListNode]]) -> Optional[ListNode]:
                if not lists:
                    return None
                n = len(lists)
                interval = 1
                while interval < n:
                    for i in range(0, n - interval, interval * 2):
                        lists[i] = self.merge2Lists(lists[i], lists[i + interval])
                    interval *= 2
                return lists[0] if n > 0 else None
    
            def merge2Lists(self, l1, l2):
                dummy = curr = ListNode()
                while l1 and l2:
                    if l1.val < l2.val:
                        curr.next, l1 = l1, l1.next
                    else:
                        curr.next, l2 = l2, l2.next
                    # advance
                    curr = curr.next
    
                curr.next = l1 or l2
    
                return dummy.next
    

    The easiest way imo to get the best value each time is to just use a priority queue. I prefer the PQ solution.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    
       import heapq
    
        # Definition for singly-linked list.
        # class ListNode:
        #     def __init__(self, val=0, next=None):
        #         self.val = val
        #         self.next = next
        class Solution:
            def mergeKLists(self, lists: List[Optional[ListNode]]) -> Optional[ListNode]:
                # filters away the empty lists:
                # NOTE: the use of id(ls) is to give a tie-breaker in the case that two lists have the same value.
                #       this id usage is an easy way to avoid compiler issues
                providers = [(ls.val, id(ls), ls)
                             for ls in lists
                             if ls]
                heapq.heapify(providers)
                pre_head = dummy = ListNode()
    
                while providers:
                    (val, _, ls) = heapq.heappop(providers)
                    dummy.next = ls
                    if ls.next:
                        ls = ls.next
                        new_val = ls.val
                        heapq.heappush(providers, (new_val, id(ls), ls))
    
                    dummy = dummy.next
    
                return pre_head.next
    
  2. Merge Intervals (56) with heap variant

    The optimal solution is typically a linear scan but we could rely on a heap version of this solution as well.

    It’s a linear scan alright (after sorting intervals using the start time), we have a working accumulator that we keep making reference to.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
        from typing import List
    
        class Solution:
            def merge(self, intervals: List[List[int]]) -> List[List[int]]:
                if not intervals:
                    return []
    
                # Sort intervals based on start time
                intervals.sort(key=lambda x: x[0])
                merged_intervals = [intervals[0]]
    
                for start, end in intervals[1:]:
                    last_merged_start, last_merged_end = merged_intervals[-1]
    
                    if start <= last_merged_end:  # Overlapping intervals
                        merged_intervals[-1][1] = max(last_merged_end, end)  # Merge
                    else:
                        merged_intervals.append([start, end])  # No overlap, add new
    
                return merged_intervals
    

    In the heap-based approach, intervals can be pushed into a min-heap based on their start time, and then repeatedly pop and merge the top intervals.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
       import heapq
    
       class Solution:
           def merge(self, intervals: list[list[int]]) -> list[list[int]]:
               if not intervals:
                   return []
               # Build a min-heap by the start time
               heap = [(start, end) for start, end in intervals]
               heapq.heapify(heap)
               result = []
               # Pop the first interval
               prev_start, prev_end = heapq.heappop(heap)
               while heap:
                   curr_start, curr_end = heapq.heappop(heap)
                   if curr_start <= prev_end:
                       # Merge intervals
                       prev_end = max(prev_end, curr_end)
                   else:
                       # No overlap, push previous interval to result
                       result.append([prev_start, prev_end])
                       prev_start, prev_end = curr_start, curr_end
               # Don't forget the last interval
               result.append([prev_start, prev_end])
               return result
    

    This approach is not more optimal than the linear scan (after sorting), but it is instructive as an alternative and can be more useful if the input is streaming or not already sorted.

Frequency and Top-K Elements #

  • Objective
    • Identify the top k most frequent elements in arrays or strings using priority queues for efficient counts management.

Problems #

  1. Top K Frequent Elements (347)

    There’s a couple of ways to do this since we’re asked for frequency counts.

    The most optimal approach that runs in \(O(n)\) time is BucketSorting which looks like this:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
       from collections import Counter
    
       class Solution:
           def topKFrequent(self, nums: List[int], k: int) -> List[int]:
               count = Counter(nums)
               # Bucket sort: index = frequency, value = list of numbers with that frequency
               max_possible_frequency = len(nums)
               buckets = [[] for _ in range(max_possible_frequency + 1)]
               for num, freq in count.items():
                   buckets[freq].append(num)
    
               # Flatten buckets from high to low frequency
               res = []
               for freq in range(len(buckets) - 1, 0, -1):
                   for num in buckets[freq]:
                       res.append(num)
                       if len(res) == k:
                           return res
    

    The other solutions that are suboptimal will run in \(O(n log n)\) time. Firstly we consider the heap based solution: A heap-based approach is \(O(n log k)\) where \(k\) is the char count

    1
    2
    3
    4
    5
    6
    7
    
       import heapq
       from collections import Counter
    
       class Solution:
           def topKFrequent(self, nums: List[int], k: int) -> List[int]:
               count = Counter(nums)
               return [item for item, _ in heapq.nlargest(k, count.items(), key=lambda x: x[1])]
    

    A counter based solution runs in \(O(nlogn)\) :

    1
    2
    3
    4
    5
    
       from collections import Counter
    
       class Solution:
           def topKFrequent(self, nums: List[int], k: int) -> List[int]:
               return [elem for [ elem, _ ] in Counter(nums).most_common(k)]
    

Scheduling and Task Management #

  • Objective
    • Use priority queues for simulating task scheduling, managing intervals, and handling dynamic priorities.
  • Classic Examples (often conceptual or system design related)

Problems #

  1. Last Stone Weight (1046)

    This is a parable to actual scheduling algos, the idea is just to maintain a running schedule and just retrieve from there.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
        import heapq
    
        class Solution:
            def lastStoneWeight(self, stones: list[int]) -> int:
                stones = [-s for s in stones] # API is a minheap, hence the negation
                heapq.heapify(stones)
    
                while len(stones) > 1:
                    first = -heapq.heappop(stones)  # Largest
                    second = -heapq.heappop(stones) # Next largest
                    if first != second:
                        heapq.heappush(stones, -(first - second))
    
                return -stones[0] if stones else 0
    
  2. Task Scheduling variations

  3. CPU scheduling simulation problems

    • Task Scheduler (621)

      This can do a time-simulation approach.

      We use two things: a max heap for schedule-able tasks and a cooldown QUEUE for the tasks that are currently in cooldown. Cooldowns happen in FIFO hence the use of the deque. I prefer this approach to the problem over the purely mathematical approach.

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      
            import heapq
            from collections import Counter, deque
      
            # TIME SIMULATION APPROACH
      
            class Solution:
                def leastInterval(self, tasks: List[str], n: int) -> int:
                    freqs = Counter(tasks)
                    pq = [(-cnt, task) for task, cnt in freqs.items()]
                    heapq.heapify(pq)
                    cooldown = deque()  # (ready_time, -cnt, task)
                    time = 0
                    while pq or cooldown:
                        time += 1
                        if pq:
                            cnt, task = heapq.heappop(pq)
                            if cnt + 1:  # more left
                                cooldown.append((time + n, cnt + 1, task))
                        if cooldown and cooldown[0][0] == time:
                            _, next_cnt, next_task = cooldown.popleft()
                            heapq.heappush(pq, (next_cnt, next_task))
                    return time
      

      This particular question has a math approach too. Realise that the tasks with max frequency are the bottleneck in the schedule. We need to ensure at least n intervals between repeats of the most common task. We fill all cooldown slots with other tasks if we can else we must idle. So the formula to compute the time needed if we strictly alternate between maxtasks and idleslots, possibly filling them with other tasks, otherwise falling back to the total number of tasks.

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      
            from collections import Counter
      
            class Solution:
                def leastInterval(self, tasks: List[str], n: int) -> int:
                    freqs = Counter(tasks)
                    f_max = max(freqs.values())
                    n_max = sum(1 for freq in freqs.values() if freq == f_max)
                    part_len = f_max - 1
                    empty_slots = part_len * (n + 1)
                    res = part_len * (n + 1) + n_max
                    return max(res, len(tasks))
      

Online Median / Running Statistics #

  • Objective
    • Maintain running medians or generalized order-statistics efficiently on dynamically inserting and removing elements.

Problems #

  1. TODO Sliding Window Median (480)
  2. Data Stream Median or Quantile problems

Heap Sort and Selection Problems #

  • Objective
    • Use heaps for efficient sorting or selecting the nth smallest/largest element.

Problems #

  1. Heap Sort algorithms

  2. Kth Smallest Element In a Sorted Matrix

    The heap implementation of this involves us keeping a heap and popping from it for (k - 1) times. We keep adding to the heap if possible. The init version of the heap is just all the first elements from each row.

    We do this popping because we keep a minheap that takes in values from cells within the matrix.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
       import heapq
    
    
       class Solution:
           def kthSmallest(self, matrix: List[List[int]], k: int) -> int:
               n = len(matrix)
               # we init the heap with the first element from each row:
               heap = [
                   (matrix[row][0], 0, row) for row in range(n)
               ]  # (elem, column idx, row idx)
               heapq.heapify(heap)
    
               # now we extract from the min heap for (k - 1) times so that we can pop out the kth smallest element:
               for _ in range(k - 1):
                   elem, col, row = heapq.heappop(heap)
                   # add next element to heap, if exists:
                   if col < (n - 1):
                       new_elem = matrix[row][col + 1]
                       heapq.heappush(heap, (new_elem, col + 1, row))
    
               return heap[0][0]
    

Interval and Range Merging with Heaps #

  • Objective
    • Maintain earliest ending intervals or ranges for merging or scheduling problems using min-heaps.

Problems #

  1. Meeting Rooms II (253) (NC link)

    We have 2 choices: we can use a min heap and play with the intervals OR we can do the sweep line algo Here’s my version with PQs:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
        import heapq
    
        class Solution:
            def minMeetingRooms(self, intervals: List[Interval]) -> int:
                if not intervals:
                    return 0
    
                # Sort intervals by start time
                intervals.sort(key=lambda x: x.start)
    
                # Min-heap to track end times of meetings currently occupying rooms
                min_heap = []
    
                # Add the first meeting's end time
                heapq.heappush(min_heap, intervals[0].end)
    
                for interval in intervals[1:]:
                    start, end = interval.start, interval.end
    
                    # If the current meeting starts after or when a room is freed
                    if start >= min_heap[0]:
                        heapq.heappop(min_heap)  # Reuse room
    
                    # Assign new room (or reused room with updated end time)
                    heapq.heappush(min_heap, end)
    
                # Number of rooms = size of min-heap
                return len(min_heap)
    

    And here’s the usage of the sweep line algo

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
        # SWEEP-LINE ALGO
        def minMeetingRooms(intervals):
            if not intervals:
                return 0
    
            starts = sorted(i[0] for i in intervals)
            ends = sorted(i[1] for i in intervals)
    
            s, e = 0, 0
            rooms = 0
            max_rooms = 0
    
            while s < len(intervals):
                if starts[s] < ends[e]:
                    rooms += 1
                    max_rooms = max(max_rooms, rooms)
                    s += 1
                else:
                    rooms -= 1
                    e += 1
    
            return max_rooms
         # - Explanation:
         #   - Separate all start times and end times into two sorted lists.
    
         #   - Use pointers to iterate through both arrays, increment a counter when a meeting starts, decrement when one ends.
    
         #   - Track the max counter during traversal → max concurrency → min rooms required.
    
         #   - This approach avoids heaps and uses two sorted arrays, can be slightly faster for certain use-cases.
    
  2. Car Fleet (853)

    Merging from one-side problem. Can view the problem as creating contiguous segments of the cars array based on arrival times after merging. Each segment maps to exactly one car fleet. Top of the stack will be the next arriving fleet, nearest to the destination, we have to just check if the car in consideration is going to be part of that fleet or will start a new fleet.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
        class Solution:
            def carFleet(self, target: int, position: List[int], speed: List[int]) -> int:
                cars = sorted(list(zip(position, speed)), reverse=True)
                arrival_time_stack = []
                for pos, s in cars:
                    arrival_time = (target - pos) / s
    
                    if not arrival_time_stack: # empty stack
                        arrival_time_stack.append(arrival_time)
                        continue
    
                    # if slower than the prev arrival, then it's a new fleet, else ignore this:
                    if arrival_time_stack and arrival_time > arrival_time_stack[-1]:
                        arrival_time_stack.append(arrival_time)
    
                return len(arrival_time_stack)
    

    for completeness, here’s a heap-based approach that is similar

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
       import heapq
       from typing import List
    
       class Solution:
           def carFleet(self, target: int, position: List[int], speed: List[int]) -> int:
               # Build max-heap based on position (negate to simulate max-heap with Python's min-heap)
               heap = [(-pos, speed[i]) for i, pos in enumerate(position)]
               heapq.heapify(heap)
    
               fleets = 0
               last_arrival_time = 0.0
    
               # Process cars from closest to target (largest position) to farthest
               while heap:
                   pos, spd = heapq.heappop(heap)
                   pos = -pos  # revert negation to original position
                   arrival_time = (target - pos) / spd
    
                   # If current car's arrival time is greater than last fleet's time, it forms a new fleet
                   if arrival_time > last_arrival_time:
                       fleets += 1
                       last_arrival_time = arrival_time
                   # else, it merges into the fleet represented by last_arrival_time
    
               return fleets
    

Graph Algorithms using Heaps #

  • Objective
    • Use heaps for efficient priority-based processing in graph algorithms like Dijkstra’s or Prim’s.

      Refer directly to the Advanced Graph Sections below.

Problems #

  1. Network Delay Time (743)

    This is just a classic dijkstra, single source we want to find the longest branch form this single source to every other node. Applies for network related problem domains as well. Note the use of the 1-idx shimming and splicing here.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    
        from collections import defaultdict
        import heapq
    
        class Solution:
            def networkDelayTime(self, times: List[List[int]], n: int, k: int) -> int:
                graph = defaultdict(list)
                for u, v, w in times:
                    graph[u].append((v, w))
    
                # we're going to shim the 0th idx to handle the 1-indexing, then later ignore it
                dist = [float('inf')] * (n + 1)
                dist[k] = 0
                heap = [(0, k)] # current dist, node (min-pq by distance)
    
                while heap:
                    curr_dist, node = heapq.heappop(heap)
                    if curr_dist > dist[node]:
                        continue
                    for nei, w in graph[node]:
                        # if can relax:
                        if curr_dist + w < dist[nei]:
                            dist[nei] = curr_dist + w
                            heapq.heappush(heap, (dist[nei], nei))
    
                max_dist = max(dist[1:])
                return max_dist if max_dist != float('inf') else -1
    
  2. Minimum Spanning Tree algorithms

Custom Comparator / Complex Priority Logic #

  • Objective
    • Implement priority queues with custom ordering or multi-key priorities.

Problems #

  1. Any problem requiring lex order or multi-field ordering for tasks or objects

  2. Design Twitter (355)

    The idea here is to maintain multiple PQs, one for each user then we can merge them in using heapq.merge()

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
        from collections import defaultdict
        import heapq
    
        class Twitter:
            def __init__(self):
                self.tweet_count = 0
                self.uid_to_tweets = defaultdict(list)
                self.uid_to_following = defaultdict(set)
    
            def postTweet(self, userId: int, tweetId: int) -> None:
                self.tweet_count += 1
                self.uid_to_tweets[userId].append((self.tweet_count, tweetId, userId))
    
            def getNewsFeed(self, userId: int) -> list[int]:
                posts = heapq.merge(*(self.uid_to_tweets[uid]
                                      for uid in self.uid_to_following[userId] | {userId}))
    
                return [post[1] for post in heapq.nlargest(10, posts)]
    
            def follow(self, followerId: int, followeeId: int) -> None:
                self.uid_to_following[followerId].add(followeeId)
    
            def unfollow(self, followerId: int, followeeId: int) -> None:
                self.uid_to_following[followerId].discard(followeeId)
    

    However, the optimal approach is really to implement k-way merging:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    
        from collections import defaultdict
        import heapq
    
        class Twitter:
            def __init__(self):
                self.time = 0
                self.user_tweets = defaultdict(list)
                self.following = defaultdict(set)
    
            def postTweet(self, userId: int, tweetId: int) -> None:
                self.time += 1
                self.user_tweets[userId].append((self.time, tweetId))
    
            def getNewsFeed(self, userId: int) -> list[int]:
                result = []
                # We'll build a heap of (timestamp, tweetId, whose_list, index)
                heap = []
                users = self.following[userId].copy()
                users.add(userId)
                for uid in users:
                    tweets = self.user_tweets[uid]
                    if tweets:
                        # push last tweet (most recent), track index
                        ts, tid = tweets[-1]
                        heapq.heappush(heap, (-ts, tid, uid, len(tweets) - 1))
                while heap and len(result) < 10:
                    ts, tid, uid, idx = heapq.heappop(heap)
                    result.append(tid)
                    if idx > 0:
                        prev_ts, prev_tid = self.user_tweets[uid][idx - 1]
                        heapq.heappush(heap, (-prev_ts, prev_tid, uid, idx - 1))
                return result
    
            def follow(self, followerId: int, followeeId: int):
                if followerId != followeeId:
                    self.following[followerId].add(followeeId)
    
            def unfollow(self, followerId: int, followeeId: int):
                self.following[followerId].discard(followeeId)
    

Approximate / Streaming Data Structures with Heaps #

  • Objective
    • Use approximate counting and priority heuristics for large or streaming datasets.

Problems #

  1. Count-Min Sketch with priority queues (more advanced/outside standard Leetcode scope)

SOE #

  • Reversed or wrong comparator in heap (min vs max). The default is min heap ordedring, so if we want to keep max heaps, then we have to negate the values.

  • Forgetting lazy deletion or invalidation in PQ operations

  • Improper heapify or failure to maintain heap invariant

    Remember that we can technically peek by just checking the first element in the heap, that’s guaranteed to the min element in the heap.

  • Sometimes a heap is overkill if we can get away with doing a quickselect, since that would avoid completely sorting the input

  • forgetting the heap invariants (should only use the heap API for our needs, we shouldn’t accidentally think that it’s a sorted array)

Decision Flow #

  • Need streaming top-k? → Sliding window or heap
  • Merging sorted data? → Min-heap merging
  • Frequency based selections? → Heap top-k strategy

Styles, Tricks and Boilerplate #

Boilerplate #

Quickselect Algo #

The main part is the partition function, then the rest is the intuitive understanding of the invariants / properties that lead us to the reason why partitioning it k times is good enough.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import random
from typing import List

class Solution:
    def findKthLargest(self, nums: List[int], k: int) -> int:
        def quickselect(left, right, target):
            if left == right:
                return nums[left]
            pivot_index = random.randint(left, right)
            # Partition around pivot and return final pivot position
            new_pivot = partition(left, right, pivot_index)
            if new_pivot == target:
                return nums[new_pivot]
            elif new_pivot < target:
                return quickselect(new_pivot + 1, right, target)
            else:
                return quickselect(left, new_pivot - 1, target)


        def partition(left, right, pivot_index):
            """
            Returns the pivot idx after paritioning left and right segments based on the element values.

            The property it maintains thereafter is the all to the right of pivot are STRICTLY MORE THAN pivot.

            All the left of hte pivot are STRICTLY LESS THAN pivot.
            """
            pivot_value = nums[pivot_index]
            nums[pivot_index], nums[right] = nums[right], nums[pivot_index]
            store_index = left
            for i in range(left, right):
                if nums[i] < pivot_value:
                    nums[store_index], nums[i] = nums[i], nums[store_index]
                    store_index += 1
            # move pivot to its final place
            nums[store_index], nums[right] = nums[right], nums[store_index]

            return store_index

        n = len(nums)
        target = n - k  # index of k-th largest in sorted order

        return quickselect(0, n - 1, target)

Style: #

  1. let the heap do its job when you’re maintaining order statistics, just prune AFTER the heap does its job (e.g. after it gets added).

Tricks: #

  1. Double heap approach is optimal for general data-streams. We can keep balancing them and we can rebalance when we add / remove to the overall system.

  2. MATH TRICK: when dealing with Euclidean distances, we can just use the squared distances and avoid having to do the sqrt.

  3. For stream median finding, if it’s a bounded range, representing the domain via a fixed size array that keeps frequency counts, then just adjusting a median pointer is a great idea.

Python Recipes: #

  1. Heap order statistics functions:
    • We can pass in a lambda for the nsmallest comparator.

      E.g. return heapq.nsmallest(k, points, key=lambda p: p[0]**2 + p[1]**2)

    • copying a heap preserves its heap properties because it’s “just a list”

    • heapq.heappushpop() and heapq.heapreplace is very useful:

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      
           import heapq
      
           heap = [5, 7, 9]
           heapq.heapify(heap)
      
           heapq.heappush(heap, 3)         # Add element
           smallest = heapq.heappop(heap)  # Remove smallest element
      
           result = heapq.heappushpop(heap, 4)  # Push 4 then pop smallest
           result2 = heapq.heapreplace(heap, 6) # Pop smallest then push 6
      
           top3 = heapq.nlargest(3, [1,5,2,9,7])
           merged = heapq.merge([1,3,5], [2,4,6])
      

TODO KIV #

[ ] Existing Canonicals #

[ ] Fresh #