Hopac how to implement a more nuanced MailboxProcessor?

I'm not sure if I should ask about this here or the Hopac Wiki , or if this sounds more like a design question, but here it goes. I recently researched F # agents to go with my Rx encoding and came across the following code How to create a job queue using MailboxProcessor? and thought hey, isn't this what is perfect for Hopac ?

Now I have a few questions about how to actually implement a "job queue" that can be started and paused in Hopac (as in a post queue post ).

  • Question: How to maintain the Hopac.Jobs queue and be able to pause the processing of jobs from it if needed, or perhaps also empty it? I could implement an explicit queue like in the job queue message ), but aside, a possible answer to this question interests me on its own.
  • Then a question for more general advice: I plan on making thousands of these agents so that they bring in input from, say, an Rx thread, or perhaps one thread for each agent, do some processing, and then send the results to an asynchronous interface (most likely it returns Task

    or Task<T>

    ), which may take a while to execute or fail (let's say there is a DB behind the interface), in which case I can time out and try again. It might be possible to improve this with Rx by queuing the results Job

    to output the Rx stream, but the highlights are interface based communication Task

    , timeout and retry on failure. Question:Will it be OK? I'm aiming for a slightly more realistic use case in a mixed code base of .NET code (i.e. C #, some VB.NET and F #), so views and implementation points would be appreciated.

I took a cue from a sample mailbox and created the following sample (the real entity of the code is the JobProcessor)

//Install-Package Hopac 
open Hopac
open Hopac.Extensions
open Hopac.Infixes
open Hopac.Job.Infixes


[<CustomEquality; CustomComparison>]
type Message<'a> = 
    | Start
    | Stop
    | Pause
    | Job of ('a -> unit)

    override x.Equals(obj) = 
        match obj with
        | :? Message<'a> as fu -> 
            match x, fu with
            | Start, Start | Stop, Stop | Pause, Pause -> true
            | Job f1, Job f2 -> true
            | _, _ -> false
        | _ -> false

    override x.GetHashCode() = 
        match x with
        | Start -> 1
        | Stop -> 2
        | Pause -> 3
        | _ -> 4

    interface System.IComparable with
        member x.CompareTo yobj = 
            match yobj with
            | :? Message<'a> as y -> compare x y
            | _ -> invalidArg "yobj" "cannot compare value of different types"

    interface System.IComparable<Message<'a>> with
        member x.CompareTo(y) = compare x y

type JobProcessor() = 
    let mMb = mb()
    do 
        run <| job { 
                   do! Job.foreverServer (mMb |>> fun msg -> 
                                              //Should an explicit job queue be introduce here?
                                              match msg with
                                              | Start -> printfn "Start"
                                              | Stop ->  printfn "Stop"
                                              | Pause -> printfn "Pause"
                                              | Job(f) ->
                                                printfn "Job"
                                                f())
               }

    member x.QueueJob(f) =
        Mailbox.Global.send mMb f
        //mMb <<-+ Message.Start

[<EntryPoint>]
let main argv = 
    let jp = new JobProcessor()
    let jobFunc i () = (printfn "%i" i)
    jp.QueueJob(Job(jobFunc 100)) 

      

+3


source to share





All Articles