Block / overwrite other sidekiq jobs from processing when an existing sidekiq job is processing a specific resource

I have sidekiq jobs handling many types of resources. However, for a specific type of resource, for example: Resource X, I need to make sure that only one sidekiq activity can handle that specific resource at any given time.

For example, if I have 3 sidekiq jobs that are simultaneously queued and want to interact with resource X, then only one sidekiq action can handle resource X, while the 2 remaining sidekiq jobs will have to wait (or reorder) while sidekiq is working. which is currently processing resources, ends.

I am currently trying to add a record to a database table when a sidekiq job is processing a resource and uses that to stop other sidekiq jobs from processing the resource until that record is removed from the database using a sidekiq action which (when he finished processing resource X) or after some elapsed time (for example: if a record was created more than 5 minutes ago, then it is considered that it no longer has exclusive access to resource X and the next task sidekiq who wants to process resource X can change this write and claim exclusive access to resource X).

The pseudocode of my current implementation:

def perform(res_id, res_type)

  # Only applies to "RESOURCE_X" 
  if res_type == RESOURCE_X
    if ResourceProcessor.where(res_id).empty? || ((Time.now-ResourceProcessor.where(res_id).first.created_at) > 5.minutes)
      ResourceProcessor.create(res_id: res_id).save
      process_resource_x(res_id)
    else
      SidekiqWorker.delayed(res_id, res_type, 5.minutes) #Try again later
      return
    end

    #Letting other sidekiq jobs know they can now fight over who gets to process resource X
    ResourceProcessor.where(res_id).destroy 

  else
    process_other_resource(res_id)
  end

end

      

Unfortunately my solution doesn't work. It works great if there is a delay between the sidekiq jobs that want to process resource X. However, if jobs that want to process resource X arrive at the same time, then my solution falls apart.

Is there any way to force some synchronization to happen only when processing resource X?

Btw, my sidekiq jobs can be distributed across multiple machines (but they access the same redis server on a dedicated machine).

+3


source to share


1 answer


I did more research based on the comments provided by Thomas.

The link he provided was extremely helpful. They implemented their own Lock class to achieve the desired results. However, I did not use their own locking code because I needed a different behavior.

The specific behavior I was looking for to implement is "Re-queue if locked", not "Wait if lock".



There are alternative tools I could use, such as redis-semaphore and with_advisory_gem . I tested the redis semaphore and found it was broken. It did not return lock status and resource counts correctly. Also, after checking for problems on Github, in some situations redis-semaphore can be stumped, so I decided not to use it. As a result, I also decided not to use with_advisory_gem due to its lower star count than the redis semaphore.

I ended up finding a way to implement the blocking pattern described in my question, which is to block sidekiq jobs based on a value in my database. I ran into concurrency issue for multiple sidekiq jobs reading stale values ​​at the expense of locking the entire database row with rail, its own locking-pessimistic class . This ensured that only 1 third-party worker could access the database row that has a lock value at any given time. The lock period is kept to a minimum because it is read-only and, when applicable, a write operation is performed when the database row is locked. Subsequent operations such as query execution and cleanup are performed after.

0


source







All Articles