1 /**
2 	Copyright: © 2013-2014 rejectedsoftware e.K.
3 	License: Subject to the terms of the GNU GPLv3 license, as written in the included LICENSE.txt file.
4 	Authors: Sönke Ludwig
5 */
6 module dubregistry.cache;
7 
8 import vibe.core.log;
9 import vibe.core.stream;
10 import vibe.db.mongo.mongo;
11 import vibe.http.client;
12 import vibe.stream.memory;
13 
14 import core.time;
15 import std.algorithm : startsWith;
16 import std.exception;
17 import std.typecons : tuple;
18 
19 
20 enum CacheMatchMode {
21 	always, // return cached data if available
22 	etag,   // return cached data if the server responds with "not modified"
23 	never   // always request fresh data
24 }
25 
26 
27 class URLCache {
28 @safe:
29 	private {
30 		MongoClient m_db;
31 		MongoCollection m_entries;
32 		Duration m_maxCacheTime = 365.days;
33 	}
34 
35 	this()
36 	{
37 		import dubregistry.mongodb : databaseName, getMongoClient;
38 		m_db = getMongoClient();
39 		m_entries = m_db.getDatabase(databaseName)["urlcache.entries"];
40 		m_entries.ensureIndex([tuple("url", 1)]);
41 	}
42 
43 	void clearEntry(URL url)
44 	{
45 		m_entries.remove(["url": url.toString()]);
46 	}
47 
48 	void get(URL url, scope void delegate(scope InputStream str) @safe callback,
49 		bool cache_priority = false, scope RequestModifier request_modifier = null)
50 	{
51 		get(url, callback, cache_priority ? CacheMatchMode.always : CacheMatchMode.etag,
52 			request_modifier);
53 	}
54 
55 	void get(URL url, scope void delegate(scope InputStream str, scope string[string] headers) @safe callback,
56 		scope string[] record_headers, bool cache_priority = false,
57 		scope RequestModifier request_modifier = null)
58 	{
59 		get(url, callback, record_headers,
60 			cache_priority ? CacheMatchMode.always : CacheMatchMode.etag,
61 			request_modifier);
62 	}
63 
64 	void get(URL url, scope void delegate(scope InputStream str) @safe callback,
65 		CacheMatchMode mode = CacheMatchMode.etag, scope RequestModifier request_modifier = null)
66 	{
67 		get(url, delegate(scope InputStream str, scope string[string] headers) @safe => callback(str),
68 			null, mode, request_modifier);
69 	}
70 
71 	void get(URL url, scope void delegate(scope InputStream str, scope string[string] headers) @safe callback,
72 		scope string[] record_headers, CacheMatchMode mode = CacheMatchMode.etag,
73 		scope RequestModifier request_modifier = null)
74 	{
75 		import std.datetime : Clock, UTC;
76 		import vibe.http.auth.basic_auth;
77 		import dubregistry.internal.utils : black;
78 		import vibe.internal.interfaceproxy : asInterface;
79 
80 		auto user = url.username;
81 		auto password = url.password;
82 		url.username = null;
83 		url.password = null;
84 
85 		InputStream result;
86 		string[string] result_headers;
87 		bool handled_uncached = false;
88 
89 		auto now = Clock.currTime(UTC());
90 
91 		foreach (i; 0 .. 10) { // follow max 10 redirects
92 			auto be = m_entries.findOne(["url": url.toString()]);
93 			CacheEntry entry;
94 			if (!be.isNull()) {
95 				// invalidate out of date cache entries
96 				if (be["_id"].get!BsonObjectID.timeStamp < now - m_maxCacheTime)
97 					m_entries.remove(["_id": be["_id"]]);
98 
99 				deserializeBson(entry, be);
100 				if (mode == CacheMatchMode.always) {
101 					// directly return cache result for cache_priority == true
102 					logDiagnostic("Cache HIT (early): %s", url.toString());
103 					if (entry.redirectURL.length) {
104 						url = URL(entry.redirectURL);
105 						continue;
106 					} else {
107 						auto data = be["data"].get!BsonBinData().rawData();
108 						auto mdata = () @trusted { return cast(ubyte[])data; } ();
109 						scope tmpresult = createMemoryStream(mdata, false);
110 						callback(tmpresult, entry.headers);
111 						return;
112 					}
113 				}
114 			} else {
115 				entry._id = BsonObjectID.generate();
116 				entry.url = url.toString();
117 			}
118 
119 			requestHTTP(url,
120 				(scope req){
121 					if (entry.etag.length && mode != CacheMatchMode.never) req.headers["If-None-Match"] = entry.etag;
122 					if (user.length) addBasicAuth(req, user, password);
123 					if (request_modifier) request_modifier(req);
124 				},
125 				(scope res){
126 					foreach (header; record_headers) {
127 						auto v = res.headers.get(header, null);
128 						if (v !is null)
129 							result_headers[header] = v;
130 					}
131 
132 					switch (res.statusCode) {
133 						default:
134 							throw new Exception("Unexpected reply for '"~url.toString().black~"': "~httpStatusText(res.statusCode));
135 						case HTTPStatus.notModified:
136 							logDiagnostic("Cache HIT: %s", url.toString());
137 							res.dropBody();
138 							auto data = be["data"].get!BsonBinData().rawData();
139 							result_headers = entry.headers;
140 							result = createMemoryStream(cast(ubyte[])data, false);
141 							break;
142 						case HTTPStatus.notFound:
143 							res.dropBody();
144 							throw new FileNotFoundException("File '"~url.toString().black~"' does not exist.");
145 						case HTTPStatus.movedPermanently, HTTPStatus.found, HTTPStatus.temporaryRedirect:
146 							auto pv = "Location" in res.headers;
147 							enforce(pv !is null, "Server responded with redirect but did not specify the redirect location for "~url.toString());
148 							logDebug("Redirect to '%s'", *pv);
149 							if (startsWith((*pv), "http:") || startsWith((*pv), "https:")) {
150 								url = URL(*pv);
151 							} else url.localURI = *pv;
152 							res.dropBody();
153 
154 							entry.redirectURL = url.toString();
155 							entry.headers = result_headers;
156 							m_entries.update(["_id": entry._id], entry, UpdateFlags.Upsert);
157 							break;
158 						case HTTPStatus.ok:
159 							auto pet = "ETag" in res.headers;
160 							if (pet || mode == CacheMatchMode.always) {
161 								logDiagnostic("Cache MISS: %s", url.toString());
162 								auto dst = createMemoryOutputStream();
163 								res.bodyReader.pipe(dst);
164 								auto rawdata = dst.data;
165 								if (pet) entry.etag = *pet;
166 								entry.data = BsonBinData(BsonBinData.Type.Generic, cast(immutable)rawdata);
167 								entry.headers = result_headers;
168 								m_entries.update(["_id": entry._id], entry, UpdateFlags.Upsert);
169 								result = createMemoryStream(rawdata, false);
170 								break;
171 							}
172 
173 							logDebug("Response without etag.. not caching: "~url.toString());
174 
175 							logDiagnostic("Cache MISS (no etag): %s", url.toString());
176 							handled_uncached = true;
177 							callback(res.bodyReader.asInterface!InputStream, result_headers);
178 							break;
179 					}
180 				}
181 			);
182 
183 			if (handled_uncached) return;
184 
185 			if (result) {
186 				callback(result, result_headers);
187 				return;
188 			}
189 		}
190 
191 		throw new Exception("Too many redirects for "~url.toString().black);
192 	}
193 }
194 
195 class FileNotFoundException : Exception {
196 	this(string msg, string file = __FILE__, size_t line = __LINE__)
197 	{
198 		super(msg, file, line);
199 	}
200 }
201 
202 private struct CacheEntry {
203 	BsonObjectID _id;
204 	string url;
205 	string etag;
206 	BsonBinData data;
207 	@optional string[string] headers;
208 	@optional string redirectURL;
209 }
210 
211 private URLCache s_cache;
212 
213 void downloadCached(URL url, scope void delegate(scope InputStream str) @safe callback,
214 	bool cache_priority = false, scope RequestModifier request_modifier = null)
215 @safe {
216 	if (!s_cache) s_cache = new URLCache;
217 	s_cache.get(url, callback, cache_priority, request_modifier);
218 }
219 
220 void downloadCached(string url, scope void delegate(scope InputStream str) @safe callback,
221 	bool cache_priority = false, scope RequestModifier request_modifier = null)
222 @safe {
223 	return downloadCached(URL.parse(url), callback, cache_priority, request_modifier);
224 }
225 
226 void downloadCached(URL url, scope void delegate(scope InputStream str, scope string[string] headers) @safe callback,
227 	scope string[] record_headers, bool cache_priority = false, scope RequestModifier request_modifier = null)
228 @safe {
229 	if (!s_cache) s_cache = new URLCache;
230 	s_cache.get(url, callback, record_headers, cache_priority, request_modifier);
231 }
232 
233 void downloadCached(string url, scope void delegate(scope InputStream str, scope string[string] headers) @safe callback,
234 	scope string[] record_headers, bool cache_priority = false, scope RequestModifier request_modifier = null)
235 @safe {
236 	return downloadCached(URL.parse(url), callback, record_headers, cache_priority, request_modifier);
237 }
238 
239 void clearCacheEntry(URL url)
240 @safe {
241 	if (!s_cache) s_cache = new URLCache;
242 	s_cache.clearEntry(url);
243 }
244 
245 void clearCacheEntry(string url)
246 @safe {
247 	clearCacheEntry(URL(url));
248 }
249 
250 alias RequestModifier = void delegate(scope HTTPClientRequest req);
251 
252 // test header persistence in DB
253 unittest
254 {
255 	try {
256 		import dubregistry.mongodb : getMongoClient;
257 		getMongoClient();
258 	} catch (Exception) {
259 		logWarn("Skipping URLCache test because no MongoDB server is available");
260 		return;
261 	}
262 
263 	downloadCached("https://api.github.com/repos/libmir/mir-algorithm/tags?per_page=10", (scope input, scope headers) {
264 		assert(headers.length);
265 
266 		downloadCached("https://api.github.com/repos/libmir/mir-algorithm/tags?per_page=10", (scope input, scope headers) {
267 			assert(headers.length);
268 		}, ["Link"], true);
269 
270 		downloadCached("https://api.github.com/repos/libmir/mir-algorithm/tags?per_page=10", (scope input, scope headers) {
271 			assert(headers.length);
272 		}, ["Link"], false);
273 	}, ["Link"], false);
274 }