1 /**
2 	Copyright: © 2017 rejectedsoftware e.K.
3 	License: Subject to the terms of the GNU GPLv3 license, as written in the included LICENSE.txt file.
4 	Authors: Sönke Ludwig
5 */
6 module dubregistry.internal.workqueue;
7 
8 import std.algorithm.searching : canFind, countUntil;
9 import std.algorithm.mutation : swap;
10 import std.datetime : Clock, SysTime, UTC, msecs, hours;
11 import std.encoding : sanitize;
12 import vibe.core.core;
13 import vibe.core.log;
14 import vibe.core.sync;
15 import vibe.utils.array : FixedRingBuffer;
16 
17 final class PackageWorkQueue {
18 @safe:
19 
20 	private {
21 		FixedRingBuffer!string m_queue;
22 		string m_current;
23 		Task m_task;
24 		TaskMutex m_mutex;
25 		TaskCondition m_condition;
26 		void delegate(string) m_handler;
27 		SysTime m_lastSignOfLifeOfUpdateTask;
28 	}
29 
30 	this(void delegate(string) @safe handler)
31 	{
32 		m_handler = handler;
33 		m_queue.capacity = 10000;
34 		m_mutex = new TaskMutex;
35 		m_condition = new TaskCondition(m_mutex);
36 		m_task = runTask(&processQueue);
37 	}
38 
39 	bool isPending(string pack_name)
40 	{
41 		return getPosition(pack_name) >= 0;
42 	}
43 
44 	sizediff_t getPosition(string pack_name)
45 	{
46 		if (m_current == pack_name) return 0;
47 		synchronized (m_mutex) {
48 			auto idx = m_queue[].countUntil(pack_name);
49 			return idx >= 0 ? idx + 1 : -1;
50 		}
51 	}
52 
53 	void putFront(string pack_name)
54 	{
55 		import std.algorithm.comparison : min;
56 		synchronized (m_mutex) {
57 			// naive protection against spamming the queue
58 			if (!m_queue[0 .. min(10, $)].canFind(pack_name))
59 				m_queue.putFront(pack_name);
60 		}
61 
62 		nudgeWorker;
63 	}
64 
65 	void put(string pack_name)
66 	{
67 		synchronized (m_mutex) {
68 			if (!m_queue[].canFind(pack_name))
69 				m_queue.put(pack_name);
70 		}
71 
72 		nudgeWorker;
73 	}
74 
75 	private void nudgeWorker()
76 	{
77 		// watchdog for update task
78 		if (m_task.running && Clock.currTime(UTC()) - m_lastSignOfLifeOfUpdateTask > 2.hours) {
79 			logError("Update task has hung. Trying to interrupt.");
80 			() @trusted { m_task.interrupt(); } ();
81 		}
82 
83 		if (!m_task.running)
84 			m_task = runTask(&processQueue);
85 		m_condition.notifyAll();
86 	}
87 
88 	private void processQueue()
89 	{
90 		scope (exit) logWarn("Update task was killed!");
91 		while (true) {
92 			m_lastSignOfLifeOfUpdateTask = Clock.currTime(UTC());
93 			logDiagnostic("Getting new package to be updated...");
94 			string pack;
95 			synchronized (m_mutex) {
96 				while (m_queue.empty) {
97 					logDiagnostic("Waiting for package to be updated...");
98 					m_condition.wait();
99 				}
100 				pack = m_queue.front;
101 				m_queue.popFront();
102 				m_current = pack;
103 			}
104 			scope(exit) m_current = null;
105 			logDiagnostic("Processing package %s.", pack);
106 			try m_handler(pack);
107 			catch (Exception e) {
108 				logWarn("Failed to handle package %s: %s", pack, e.msg);
109 				() @trusted { logDiagnostic("Full error: %s", e.toString().sanitize); } ();
110 			}
111 		}
112 	}
113 }
114 
115 unittest {
116 	size_t done = false;
117 	string expected;
118 	size_t cnt = 0;
119 
120 	void handler(string pack)
121 	{
122 		assert(pack == expected);
123 		cnt++;
124 		sleep(100.msecs);
125 	}
126 
127 	void test()
128 	{
129 		auto q = new PackageWorkQueue(&handler);
130 		assert(!q.isPending("foo"));
131 		assert(!q.isPending("bar"));
132 		assert(q.getPosition("foo") < 0);
133 		expected = "foo";
134 		q.put("foo");
135 		q.put("bar");
136 		assert(q.isPending("foo"));
137 		assert(q.isPending("bar"));
138 		assert(q.getPosition("bar") == q.getPosition("foo") + 1);
139 		yield();
140 		assert(q.getPosition("foo") == 0);
141 		assert(q.getPosition("bar") == 1);
142 		expected = "bar";
143 		sleep(300.msecs);
144 		assert(!q.isPending("foo"));
145 		assert(!q.isPending("bar"));
146 		assert(cnt == 2);
147 		done = true;
148 		exitEventLoop();
149 	}
150 
151 	runTask(&test);
152 	runEventLoop();
153 	assert(done, "Test was skipped!?");
154 }