Reactive Extensions Partie 3 : Implémenter un IScheduler
J'ai expliqué dans mon précédant article le concept de IScheduler dans Rx (Reactive Extensions). Je rappelle que dans sa version actuelle, Rx dispose de deux schedulers : Now et Later. Now a l'avantage de programmer des unités de travail sur le thread en cours, ce qui fait que la synchronisation n'est pas un problème. D'un autre coté, Later est asynchrone, donc ne bloque pas le thread en cours, mais peut executer les unités de travail sur différents threads, ce qui nécessite donc une synchronisation des données partagées.
Je me suis amusé a écrire un scheduler qui est un croisement de Now et Later. Il s'agit d'un scheduler asynchrone (comme Scheduler.Later) qui programme toutes ses unités de travail sur un seul thread, il n'y a donc pas à se soucier de la synchronisation (comme Scheduler.Now).
Voici le code :
public class SingleThreadSchduler : IScheduler, IDisposable
{
private readonly BlockingCollection<Tuple<Action, BooleanDisposable>> queue;
private readonly CancellationTokenSource cancellation;
private readonly Thread thread;
public SingleThreadSchduler(Action initialize)
{
queue = new BlockingCollection<Tuple<Action, BooleanDisposable>>();
cancellation = new CancellationTokenSource();
initialize();
thread = new Thread(Run);
thread.IsBackground = true;
thread.Start();
}
public SingleThreadSchduler()
: this(() => { })
{ }
public bool IsDisposed
{
get { return cancellation.IsCancellationRequested; }
}
public IDisposable Schedule(Action action, TimeSpan dueTime)
{
if (cancellation.IsCancellationRequested)
throw new ObjectDisposedException("SingleThreadSchduler");
BooleanDisposable disposable = new BooleanDisposable();
IDisposable schedulerDisposable = Scheduler.Later.Schedule(() => queue.Add(Tuple.Create(action, disposable)), dueTime);
return new GroupDisposable(disposable, schedulerDisposable);
}
public IDisposable Schedule(Action action)
{
if (cancellation.IsCancellationRequested)
throw new ObjectDisposedException("SingleThreadSchduler");
BooleanDisposable disposable = new BooleanDisposable();
queue.Add(Tuple.Create(action, disposable));
return disposable;
}
public void Dispose()
{
cancellation.Cancel();
}
private void Run()
{
Tuple<Action, BooleanDisposable> item;
try
{
while (queue.TryTake(out item, -1, cancellation.Token))
{
if (!item.Item2.IsDisposed)
item.Item1();
}
}
catch (OperationCanceledException ex)
{
if (ex.CancellationToken != cancellation.Token)
throw;
}
}
}
Ce scheduler utilise une queue pour stocker les unités de travail à executer. Si une unité de travail se trouve programmée alors qu'une autre est toujours en cours d'execution, la nouvelle unité de travail sera placée dans la queue en attendant que le thread soit libéré. Si la queue est vide, la BlockingCollection fera en sorte à ce que le thread soit mis en sommeil jusqu'à ce du travail soit programmé.
Il faut cependant faire attention à n'utiliser ce scheduler que lorsque sa durée de vie sera suffisement longue puisque la création de nouveaux threads est une opération relativement lente.
Ce post vous a plu ? Ajoutez le dans vos favoris pour ne pas perdre de temps à le retrouver le jour où vous en aurez besoin :