Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

from __future__ import absolute_import 

from celery.utils.log import get_task_logger 

 

from bookie.bcelery.celery import celery 

 

 

import transaction 

from whoosh.store import LockError 

from whoosh.writing import IndexingError 

 

from bookie.lib.importer import Importer 

from bookie.lib.readable import ReadUrl 

from bookie.models import initialize_sql 

from bookie.models import Bmark 

from bookie.models import Readable 

from bookie.models.auth import UserMgr 

from bookie.models.stats import StatBookmarkMgr 

from bookie.models.queue import ImportQueueMgr 

 

from .celery import load_ini 

INI = load_ini() 

initialize_sql(INI) 

 

 

@celery.task(ignore_result=True) 

def hourly_stats(): 

    """Hourly we want to runa series of numbers to track 

 

    Currently we're monitoring: 

    - Total number of bookmarks in the system 

    - Unique number of urls in the system 

    - Total number of tags in the system 

 

    """ 

    count_total.delay() 

    count_unique.delay() 

    count_tags.delay() 

 

 

@celery.task(ignore_result=True) 

def count_total(): 

    """Count the total number of bookmarks in the system""" 

    trans = transaction.begin() 

    StatBookmarkMgr.count_total_bookmarks() 

    trans.commit() 

 

 

@celery.task(ignore_result=True) 

def count_unique(): 

    """Count the unique number of bookmarks/urls in the system""" 

    trans = transaction.begin() 

    StatBookmarkMgr.count_unique_bookmarks() 

    trans.commit() 

 

 

@celery.task(ignore_result=True) 

def count_tags(): 

    """Count the total number of tags in the system""" 

    trans = transaction.begin() 

    StatBookmarkMgr.count_total_tags() 

    trans.commit() 

 

 

@celery.task() 

def importer_process(import_id): 

    """Start the process of running the import. 

 

    We load it, mark it as running, and begin begin a task to process. 

 

    :param import_id: import id we need to pull and work on 

 

    """ 

    trans = transaction.begin() 

    imp = ImportQueueMgr.get(import_id) 

    import_id = imp.id 

 

    # Log that we've scheduled it 

    logger = get_task_logger('importer_process') 

    logger.info("IMPORT: SCHEDULED for {0}.".format(imp.username)) 

    # We need to mark that it's running to prevent it getting picked up 

    # again. 

    imp.mark_running() 

    trans.commit() 

    importer_process_worker.delay(import_id) 

 

 

@celery.task() 

def importer_process_worker(import_id): 

    """Do the real import work 

 

    :param import_id: import id we need to pull and work on 

 

    """ 

    logger = get_task_logger('importer_process_worker') 

 

    trans = transaction.begin() 

    import_job = ImportQueueMgr.get(import_id) 

    logger.info("IMPORT: RUNNING for {username}".format(**dict(import_job))) 

 

    try: 

        # process the file using the import script 

        import_file = open(import_job.file_path) 

        importer = Importer( 

            import_file, 

            import_job.username) 

        importer.process() 

 

        # Processing kills off our transaction so we need to start a new one 

        # to update that our import is complete. 

        trans = transaction.begin() 

        import_job = ImportQueueMgr.get(import_id) 

        import_job.mark_done() 

        user = UserMgr.get(username=import_job.username) 

        from bookie.lib.message import UserImportSuccessMessage 

        msg = UserImportSuccessMessage( 

            user.email, 

            'Bookie: Your requested import has completed.', 

            INI) 

        msg.send({ 

            'username': import_job.username, 

        }) 

 

        logger.info( 

            "IMPORT: COMPLETE for {username}".format(**dict(import_job))) 

        trans.commit() 

 

    except Exception, exc: 

        # We need to log this and probably send an error email to the 

        # admin 

        from bookie.lib.message import ImportFailureMessage 

        from bookie.lib.message import UserImportFailureMessage 

 

        trans = transaction.begin() 

        import_job = ImportQueueMgr.get(import_id) 

        user = UserMgr.get(username=import_job.username) 

 

        msg = ImportFailureMessage( 

            INI.get('email.from'), 

            'Import failure!', 

            INI) 

        msg.send({ 

            'username': import_job.username, 

            'file_path': import_job.file_path, 

            'exc': str(exc) 

        }) 

 

        # Also send an email to the user that their import failed. 

        msg = UserImportFailureMessage( 

            user.email, 

            'Bookie: We are sorry, your import failed.', 

            INI) 

        msg.send({ 

            'username': import_job.username, 

            'exc': str(exc) 

        }) 

 

        logger.error(exc) 

        logger.error(str(exc)) 

        import_job.mark_error() 

        logger.info( 

            "IMPORT: ERROR for {username}".format(**dict(import_job))) 

        logger.info(exc) 

        trans.commit() 

 

 

@celery.task(ignore_result=True) 

def email_signup_user(email, msg, settings, message_data): 

    """Do the real import work 

 

    :param iid: import id we need to pull and work on 

 

    """ 

    from bookie.lib.message import InvitationMsg 

    msg = InvitationMsg(email, msg, settings) 

    status = msg.send(message_data) 

    if status == 4: 

        from bookie.lib.applog import SignupLog 

        trans = transaction.begin() 

        SignupLog(SignupLog.ERROR, 

                  'Could not send smtp email to signup: ' + email) 

        trans.commit() 

 

 

class BookmarkNotFoundException(Exception): 

    pass 

 

 

@celery.task(ignore_result=True, default_retry_delay=30) 

def fulltext_index_bookmark(bid, content): 

    """Insert bookmark data into the fulltext index.""" 

    logger = get_task_logger('fulltext_index_bookmark') 

 

    b = Bmark.query.get(bid) 

 

    if not b: 

        logger.error('Could not load bookmark to fulltext index: ' + str(bid)) 

        fulltext_index_bookmark.retry(exc=BookmarkNotFoundException()) 

    else: 

        from bookie.models.fulltext import get_writer 

        logger.warning('getting writer') 

        writer = get_writer() 

 

        if content: 

            found_content = content 

        elif b.readable: 

            found_content = b.readable.clean_content 

        else: 

            found_content = u"" 

 

        try: 

            writer.update_document( 

                bid=unicode(b.bid), 

                description=b.description if b.description else u"", 

                extended=b.extended if b.extended else u"", 

                tags=b.tag_str if b.tag_str else u"", 

                readable=found_content, 

            ) 

            writer.commit() 

            logger.warning('writer commit') 

        except (IndexingError, LockError), exc: 

            # There was an issue saving into the index. 

            logger.error(exc) 

            logger.warning('sending back to the queue') 

            # This should send the work over to a celery task that will try 

            # again in that space. 

            writer.cancel() 

            fulltext_index_bookmark.retry(exc=exc, countdown=60) 

 

 

@celery.task(ignore_result=True) 

def reindex_fulltext_allbookmarks(sync=False): 

    """Rebuild the fulltext index with all bookmarks.""" 

    logger = get_task_logger('fulltext_index_bookmark') 

    logger.warning("Starting freshen of fulltext index.") 

 

    bookmarks = Bmark.query.all() 

 

    for b in bookmarks: 

        if sync: 

            fulltext_index_bookmark(b.bid, None) 

        else: 

            fulltext_index_bookmark.delay(b.bid, None) 

 

 

@celery.task(ignore_result=True) 

def fetch_unfetched_bmark_content(ignore_result=True): 

    """Check the db for any unfetched content. Fetch and index.""" 

    logger = get_task_logger('fetch_unfetched_bmark_content') 

    logger.info("Checking for unfetched bookmarks") 

 

    url_list = Bmark.query.outerjoin( 

        Readable, Bmark.readable).\ 

        filter(Readable.imported == None).all() 

 

    for bmark in url_list: 

        fetch_bmark_content.delay(bmark.bid) 

 

 

@celery.task(ignore_result=True) 

def fetch_bmark_content(bid): 

    """Given a bookmark, fetch its content and index it.""" 

    trans = transaction.begin() 

    logger = get_task_logger('fetch_bmark_content') 

 

    if not bid: 

        raise Exception('missing bookmark id') 

    bmark = Bmark.query.get(bid) 

    if not bmark: 

        raise Exception('Bookmark not found: ' + str(bid)) 

    hashed = bmark.hashed 

 

    try: 

        read = ReadUrl.parse(hashed.url) 

    except ValueError, exc: 

        # We hit this where urllib2 choked trying to get the protocol type of 

        # this url to fetch it. 

        logger.error('Could not parse url: ' + hashed.url) 

        logger.error('exc') 

        read = None 

 

    if read: 

        logger.debug(read) 

        logger.debug(read.content) 

 

        logger.debug("%s: %s %d %s %s" % ( 

            hashed.hash_id, 

            read.url, 

            len(read.content) if read.content else -1, 

            read.is_error(), 

            read.status_message)) 

 

        if not read.is_image(): 

            if not bmark.readable: 

                bmark.readable = Readable() 

 

            bmark.readable.content = read.content 

        else: 

            if not bmark.readable: 

                bmark.readable = Readable() 

            bmark.readable.content = None 

 

        # set some of the extra metadata 

        bmark.readable.content_type = read.content_type 

        bmark.readable.status_code = read.status 

        bmark.readable.status_message = read.status_message 

        trans.commit() 

        fulltext_index_bookmark.delay( 

            bid, 

            read.content if read else None) 

    else: 

        logger.error( 

            'No readable record for bookmark: ', 

            str(bid, bmark.hashed.url)) 

 

        # There was a failure reading the thing. 

        bmark.readable = Readable() 

        bmark.readable.status = '900' 

        bmark.readable.status_message = ( 

            'No readable record ' 

            'during existing processing') 

        trans.commit()