1 /** 2 Copyright: © 2017 rejectedsoftware e.K. 3 License: Subject to the terms of the GNU GPLv3 license, as written in the included LICENSE.txt file. 4 Authors: Sönke Ludwig 5 */ 6 module dubregistry.internal.workqueue; 7 8 import std.algorithm.searching : canFind, countUntil; 9 import std.algorithm.mutation : swap; 10 import std.datetime : Clock, SysTime, UTC, msecs, hours; 11 import std.encoding : sanitize; 12 import vibe.core.core; 13 import vibe.core.log; 14 import vibe.core.sync; 15 import vibe.utils.array : FixedRingBuffer; 16 17 final class PackageWorkQueue { 18 @safe: 19 20 private { 21 FixedRingBuffer!string m_queue; 22 string m_current; 23 Task m_task; 24 TaskMutex m_mutex; 25 TaskCondition m_condition; 26 void delegate(string) m_handler; 27 SysTime m_lastSignOfLifeOfUpdateTask; 28 } 29 30 this(void delegate(string) @safe handler) 31 { 32 m_handler = handler; 33 m_queue.capacity = 10000; 34 m_mutex = new TaskMutex; 35 m_condition = new TaskCondition(m_mutex); 36 m_task = runTask(&processQueue); 37 } 38 39 bool isPending(string pack_name) 40 { 41 return getPosition(pack_name) >= 0; 42 } 43 44 sizediff_t getPosition(string pack_name) 45 { 46 if (m_current == pack_name) return 0; 47 synchronized (m_mutex) { 48 auto idx = m_queue[].countUntil(pack_name); 49 return idx >= 0 ? idx + 1 : -1; 50 } 51 } 52 53 void putFront(string pack_name) 54 { 55 import std.algorithm.comparison : min; 56 synchronized (m_mutex) { 57 // naive protection against spamming the queue 58 if (!m_queue[0 .. min(10, $)].canFind(pack_name)) 59 m_queue.putFront(pack_name); 60 } 61 62 nudgeWorker; 63 } 64 65 void put(string pack_name) 66 { 67 synchronized (m_mutex) { 68 if (!m_queue[].canFind(pack_name)) 69 m_queue.put(pack_name); 70 } 71 72 nudgeWorker; 73 } 74 75 private void nudgeWorker() 76 { 77 // watchdog for update task 78 if (m_task.running && Clock.currTime(UTC()) - m_lastSignOfLifeOfUpdateTask > 2.hours) { 79 logError("Update task has hung. Trying to interrupt."); 80 () @trusted { m_task.interrupt(); } (); 81 } 82 83 if (!m_task.running) 84 m_task = runTask(&processQueue); 85 m_condition.notifyAll(); 86 } 87 88 private void processQueue() 89 { 90 scope (exit) logWarn("Update task was killed!"); 91 while (true) { 92 m_lastSignOfLifeOfUpdateTask = Clock.currTime(UTC()); 93 logDiagnostic("Getting new package to be updated..."); 94 string pack; 95 synchronized (m_mutex) { 96 while (m_queue.empty) { 97 logDiagnostic("Waiting for package to be updated..."); 98 m_condition.wait(); 99 } 100 pack = m_queue.front; 101 m_queue.popFront(); 102 m_current = pack; 103 } 104 scope(exit) m_current = null; 105 logDiagnostic("Processing package %s.", pack); 106 try m_handler(pack); 107 catch (Exception e) { 108 logWarn("Failed to handle package %s: %s", pack, e.msg); 109 () @trusted { logDiagnostic("Full error: %s", e.toString().sanitize); } (); 110 } 111 } 112 } 113 } 114 115 unittest { 116 size_t done = false; 117 string expected; 118 size_t cnt = 0; 119 120 void handler(string pack) 121 { 122 assert(pack == expected); 123 cnt++; 124 sleep(100.msecs); 125 } 126 127 void test() 128 { 129 auto q = new PackageWorkQueue(&handler); 130 assert(!q.isPending("foo")); 131 assert(!q.isPending("bar")); 132 assert(q.getPosition("foo") < 0); 133 expected = "foo"; 134 q.put("foo"); 135 q.put("bar"); 136 assert(q.isPending("foo")); 137 assert(q.isPending("bar")); 138 assert(q.getPosition("bar") == q.getPosition("foo") + 1); 139 yield(); 140 assert(q.getPosition("foo") == 0); 141 assert(q.getPosition("bar") == 1); 142 expected = "bar"; 143 sleep(300.msecs); 144 assert(!q.isPending("foo")); 145 assert(!q.isPending("bar")); 146 assert(cnt == 2); 147 done = true; 148 exitEventLoop(); 149 } 150 151 runTask(&test); 152 runEventLoop(); 153 assert(done, "Test was skipped!?"); 154 }