1 /** 2 Copyright: © 2013-2014 rejectedsoftware e.K. 3 License: Subject to the terms of the GNU GPLv3 license, as written in the included LICENSE.txt file. 4 Authors: Sönke Ludwig 5 */ 6 module dubregistry.cache; 7 8 import vibe.core.log; 9 import vibe.core.stream; 10 import vibe.db.mongo.mongo; 11 import vibe.http.client; 12 import vibe.stream.memory; 13 14 import core.time; 15 import std.algorithm : startsWith; 16 import std.exception; 17 import std.typecons : tuple; 18 19 20 enum CacheMatchMode { 21 always, // return cached data if available 22 etag, // return cached data if the server responds with "not modified" 23 never // always request fresh data 24 } 25 26 27 class URLCache { 28 @safe: 29 private { 30 MongoClient m_db; 31 MongoCollection m_entries; 32 Duration m_maxCacheTime = 365.days; 33 } 34 35 this() 36 { 37 import dubregistry.mongodb : databaseName, getMongoClient; 38 m_db = getMongoClient(); 39 m_entries = m_db.getDatabase(databaseName)["urlcache.entries"]; 40 m_entries.ensureIndex([tuple("url", 1)]); 41 } 42 43 void clearEntry(URL url) 44 { 45 m_entries.remove(["url": url.toString()]); 46 } 47 48 void get(URL url, scope void delegate(scope InputStream str) @safe callback, 49 bool cache_priority = false, scope RequestModifier request_modifier = null) 50 { 51 get(url, callback, cache_priority ? CacheMatchMode.always : CacheMatchMode.etag, 52 request_modifier); 53 } 54 55 void get(URL url, scope void delegate(scope InputStream str, scope string[string] headers) @safe callback, 56 scope string[] record_headers, bool cache_priority = false, 57 scope RequestModifier request_modifier = null) 58 { 59 get(url, callback, record_headers, 60 cache_priority ? CacheMatchMode.always : CacheMatchMode.etag, 61 request_modifier); 62 } 63 64 void get(URL url, scope void delegate(scope InputStream str) @safe callback, 65 CacheMatchMode mode = CacheMatchMode.etag, scope RequestModifier request_modifier = null) 66 { 67 get(url, delegate(scope InputStream str, scope string[string] headers) @safe => callback(str), 68 null, mode, request_modifier); 69 } 70 71 void get(URL url, scope void delegate(scope InputStream str, scope string[string] headers) @safe callback, 72 scope string[] record_headers, CacheMatchMode mode = CacheMatchMode.etag, 73 scope RequestModifier request_modifier = null) 74 { 75 import std.datetime : Clock, UTC; 76 import vibe.http.auth.basic_auth; 77 import dubregistry.internal.utils : black; 78 import vibe.internal.interfaceproxy : asInterface; 79 80 auto user = url.username; 81 auto password = url.password; 82 url.username = null; 83 url.password = null; 84 85 InputStream result; 86 string[string] result_headers; 87 bool handled_uncached = false; 88 89 auto now = Clock.currTime(UTC()); 90 91 foreach (i; 0 .. 10) { // follow max 10 redirects 92 auto be = m_entries.findOne(["url": url.toString()]); 93 CacheEntry entry; 94 if (!be.isNull()) { 95 // invalidate out of date cache entries 96 if (be["_id"].get!BsonObjectID.timeStamp < now - m_maxCacheTime) 97 m_entries.remove(["_id": be["_id"]]); 98 99 deserializeBson(entry, be); 100 if (mode == CacheMatchMode.always) { 101 // directly return cache result for cache_priority == true 102 logDiagnostic("Cache HIT (early): %s", url.toString()); 103 if (entry.redirectURL.length) { 104 url = URL(entry.redirectURL); 105 continue; 106 } else { 107 auto data = be["data"].get!BsonBinData().rawData(); 108 auto mdata = () @trusted { return cast(ubyte[])data; } (); 109 scope tmpresult = createMemoryStream(mdata, false); 110 callback(tmpresult, entry.headers); 111 return; 112 } 113 } 114 } else { 115 entry._id = BsonObjectID.generate(); 116 entry.url = url.toString(); 117 } 118 119 requestHTTP(url, 120 (scope req){ 121 if (entry.etag.length && mode != CacheMatchMode.never) req.headers["If-None-Match"] = entry.etag; 122 if (user.length) addBasicAuth(req, user, password); 123 if (request_modifier) request_modifier(req); 124 }, 125 (scope res){ 126 foreach (header; record_headers) { 127 auto v = res.headers.get(header, null); 128 if (v !is null) 129 result_headers[header] = v; 130 } 131 132 switch (res.statusCode) { 133 default: 134 throw new Exception("Unexpected reply for '"~url.toString().black~"': "~httpStatusText(res.statusCode)); 135 case HTTPStatus.notModified: 136 logDiagnostic("Cache HIT: %s", url.toString()); 137 res.dropBody(); 138 auto data = be["data"].get!BsonBinData().rawData(); 139 result_headers = entry.headers; 140 result = createMemoryStream(cast(ubyte[])data, false); 141 break; 142 case HTTPStatus.notFound: 143 res.dropBody(); 144 throw new FileNotFoundException("File '"~url.toString().black~"' does not exist."); 145 case HTTPStatus.movedPermanently, HTTPStatus.found, HTTPStatus.temporaryRedirect: 146 auto pv = "Location" in res.headers; 147 enforce(pv !is null, "Server responded with redirect but did not specify the redirect location for "~url.toString()); 148 logDebug("Redirect to '%s'", *pv); 149 if (startsWith((*pv), "http:") || startsWith((*pv), "https:")) { 150 url = URL(*pv); 151 } else url.localURI = *pv; 152 res.dropBody(); 153 154 entry.redirectURL = url.toString(); 155 entry.headers = result_headers; 156 m_entries.update(["_id": entry._id], entry, UpdateFlags.Upsert); 157 break; 158 case HTTPStatus.ok: 159 auto pet = "ETag" in res.headers; 160 if (pet || mode == CacheMatchMode.always) { 161 logDiagnostic("Cache MISS: %s", url.toString()); 162 auto dst = createMemoryOutputStream(); 163 res.bodyReader.pipe(dst); 164 auto rawdata = dst.data; 165 if (pet) entry.etag = *pet; 166 entry.data = BsonBinData(BsonBinData.Type.Generic, cast(immutable)rawdata); 167 entry.headers = result_headers; 168 m_entries.update(["_id": entry._id], entry, UpdateFlags.Upsert); 169 result = createMemoryStream(rawdata, false); 170 break; 171 } 172 173 logDebug("Response without etag.. not caching: "~url.toString()); 174 175 logDiagnostic("Cache MISS (no etag): %s", url.toString()); 176 handled_uncached = true; 177 callback(res.bodyReader.asInterface!InputStream, result_headers); 178 break; 179 } 180 } 181 ); 182 183 if (handled_uncached) return; 184 185 if (result) { 186 callback(result, result_headers); 187 return; 188 } 189 } 190 191 throw new Exception("Too many redirects for "~url.toString().black); 192 } 193 } 194 195 class FileNotFoundException : Exception { 196 this(string msg, string file = __FILE__, size_t line = __LINE__) 197 { 198 super(msg, file, line); 199 } 200 } 201 202 private struct CacheEntry { 203 BsonObjectID _id; 204 string url; 205 string etag; 206 BsonBinData data; 207 @optional string[string] headers; 208 @optional string redirectURL; 209 } 210 211 private URLCache s_cache; 212 213 void downloadCached(URL url, scope void delegate(scope InputStream str) @safe callback, 214 bool cache_priority = false, scope RequestModifier request_modifier = null) 215 @safe { 216 if (!s_cache) s_cache = new URLCache; 217 s_cache.get(url, callback, cache_priority, request_modifier); 218 } 219 220 void downloadCached(string url, scope void delegate(scope InputStream str) @safe callback, 221 bool cache_priority = false, scope RequestModifier request_modifier = null) 222 @safe { 223 return downloadCached(URL.parse(url), callback, cache_priority, request_modifier); 224 } 225 226 void downloadCached(URL url, scope void delegate(scope InputStream str, scope string[string] headers) @safe callback, 227 scope string[] record_headers, bool cache_priority = false, scope RequestModifier request_modifier = null) 228 @safe { 229 if (!s_cache) s_cache = new URLCache; 230 s_cache.get(url, callback, record_headers, cache_priority, request_modifier); 231 } 232 233 void downloadCached(string url, scope void delegate(scope InputStream str, scope string[string] headers) @safe callback, 234 scope string[] record_headers, bool cache_priority = false, scope RequestModifier request_modifier = null) 235 @safe { 236 return downloadCached(URL.parse(url), callback, record_headers, cache_priority, request_modifier); 237 } 238 239 void clearCacheEntry(URL url) 240 @safe { 241 if (!s_cache) s_cache = new URLCache; 242 s_cache.clearEntry(url); 243 } 244 245 void clearCacheEntry(string url) 246 @safe { 247 clearCacheEntry(URL(url)); 248 } 249 250 alias RequestModifier = void delegate(scope HTTPClientRequest req); 251 252 // test header persistence in DB 253 unittest 254 { 255 try { 256 import dubregistry.mongodb : getMongoClient; 257 getMongoClient(); 258 } catch (Exception) { 259 logWarn("Skipping URLCache test because no MongoDB server is available"); 260 return; 261 } 262 263 downloadCached("https://api.github.com/repos/libmir/mir-algorithm/tags?per_page=10", (scope input, scope headers) { 264 assert(headers.length); 265 266 downloadCached("https://api.github.com/repos/libmir/mir-algorithm/tags?per_page=10", (scope input, scope headers) { 267 assert(headers.length); 268 }, ["Link"], true); 269 270 downloadCached("https://api.github.com/repos/libmir/mir-algorithm/tags?per_page=10", (scope input, scope headers) { 271 assert(headers.length); 272 }, ["Link"], false); 273 }, ["Link"], false); 274 }