Coverage for bookie.views.api : 64%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
"""Controllers related to viewing lists of bookmarks"""
"""Verify that we should be checking with content""" if 'with_content' in params and params['with_content'] != 'false': return True else: return False
def ping(request): """Verify that you've setup your api correctly and verified
""" 'success': True, 'message': 'Looks good' }
def ping_missing_user(request): """You ping'd but were missing the username in the url for some reason.
""" 'success': False, 'message': 'Missing username in your api url.' }
def ping_missing_api(request): """You ping'd but didn't specify the actual api url.
""" 'success': False, 'message': 'The API url should be /api/v1' }
def bmark_get(request): """Return a bookmark requested via hash_id
We need to return a nested object with parts bmark - readable """
# the hash id will always be there or the route won't match username=username)
last = BmarkMgr.get_recent_bmark(username=username) if last is not None: last_bmark = {'last': dict(last)}
else:
"""Update the bookmark found with settings passed in"""
# if the only new tags are commands, then don't erase the # existing tags # we need to process any commands associated as well
# the all the new tags are command tags, just tack them on # for processing, but don't touch existing tags for command_tag in new_tags.values(): mark.tags[command_tag.name] = command_tag else: # in this case, rewrite the tags wit the new ones
def bmark_add(request): """Add a new bookmark to the system""" else: request.response.status_int = 400 return { 'error': 'Bad Request: missing url', }
request.response.status_int = 400 return { 'error': 'Bad Request: missing url', }
try: mark = BmarkMgr.get_by_hash(rdict['hash_id'], username=user.username) mark = _update_mark(mark, params)
except NoResultFound: request.response.status_code = 404 return { 'error': 'Bookmark with hash id {0} not found.'.format( rdict['hash_id']) }
else: # check if we already have this username=user.username)
# then let's store this thing # if we have a dt param then set the date to be that manual # date # date format by delapi specs: # CCYY-MM-DDThh:mm:ssZ fmt = "%Y-%m-%dT%H:%M:%SZ" stored_time = datetime.strptime(request.params['dt'], fmt) else:
# check to see if we know where this is coming from
params['url'], user.username, params.get('description', ''), params.get('extended', ''), params.get('tags', ''), dt=stored_time, inserted_by=inserted_by, )
# we need to process any commands associated as well
# if we have content, stick it on the object here content_type="text/html", url=mark.hashed.url)
# we need to flush here for new tag ids, etc
'bmark': mark_data, 'location': request.route_url('bmark_readable', hash_id=mark.hash_id, username=user.username), }
def bmark_remove(request): """Remove this bookmark from the system"""
username=user.username) 'message': "done", }
except NoResultFound: request.response.status_code = 404 return { 'error': 'Bookmark with hash id {0} not found.'.format( rdict['hash_id']) }
"""Get a list of the bmarks for the api call"""
# check if we have a page count submitted
# we only want to do the username if the username is in the url
# thou shalt not have more then the HARD MAX # @todo move this to the .ini as a setting count = HARD_MAX
# do we have any tags to filter upon
tags = [tags]
# if we don't have tags, we might have them sent by a non-js browser as a # string in a query string tags = params.get('tag_filter').split()
# @todo fix this! # if we allow showing of content the query hangs and fails on the # postgres side. Need to check the query and figure out what's up. # see bug #142 # We don't allow with_content by default because of this bug. limit=count, order_by=Bmark.stored.desc(), page=page, tags=tags, username=username, with_tags=True, )
# we should have the hashed information, we need the url and clicks as # total clicks to send back
'bmarks': result_set, 'max_count': RESULTS_MAX, 'count': len(recent_list), 'page': page, 'tag_filter': tags, }
def bmark_popular(request): """Get a list of the most popular bmarks for the api call""" rdict = request.matchdict params = request.params
# check if we have a page count submitted page = int(params.get('page', '0')) count = int(params.get('count', RESULTS_MAX)) with_content = _check_with_content(params)
if request.user and request.user.username: username = request.user.username else: username = None
# thou shalt not have more then the HARD MAX # @todo move this to the .ini as a setting if count > HARD_MAX: count = HARD_MAX
# do we have any tags to filter upon tags = rdict.get('tags', None)
if isinstance(tags, str): tags = [tags]
# if we don't have tags, we might have them sent by a non-js browser as a # string in a query string if not tags and 'tag_filter' in params: tags = params.get('tag_filter').split()
popular_list = BmarkMgr.find( limit=count, order_by=Bmark.clicks.desc(), page=page, tags=tags, username=username, with_content=with_content, with_tags=True, )
result_set = []
for res in popular_list: return_obj = dict(res) return_obj['tags'] = [dict(tag[1]) for tag in res.tags.items()]
# the hashed object is there as well, we need to pull the url and # clicks from it as total_clicks return_obj['url'] = res.hashed.url return_obj['total_clicks'] = res.hashed.clicks
if with_content: return_obj['readable'] = dict(res.hashed.readable)
result_set.append(return_obj)
return { 'bmarks': result_set, 'max_count': RESULTS_MAX, 'count': len(popular_list), 'page': page, 'tag_filter': tags, }
def bmark_export(request): """Export via the api call to json dump
"""
# log that the user exported this
'bmarks': [build_bmark(bmark) for bmark in bmark_list], 'count': len(bmark_list), 'date': str(datetime.utcnow()) }
def extension_sync(request): """Return a list of the bookmarks we know of in the system
For right now, send down a list of hash_ids
"""
'hash_list': [hash[0] for hash in hash_list] }
def search_results(request): """Search for the query terms in the matchdict/GET params
The ones in the matchdict win in the case that they both exist but we'll fall back to query string search=XXX
with_content is always GET and specifies if we're searching the fulltext of pages
"""
else: phrase = rdict.get('search', '')
username = request.user.username else:
# with content is always in the get string
# check if we have a page count submitted
else:
content=search_content, username=username if with_user else None, ct=count, page=page)
# the hashed object is there as well, we need to pull the url and # clicks from it as total_clicks
'search_results': constructed_results, 'result_count': len(constructed_results), 'phrase': phrase, 'page': page, 'with_content': search_content, 'username': username, }
def tag_complete(request): """Complete a tag based on the given text
:@param tag: GET string, tag=sqlalchemy :@param current: GET string of tags we already have python+database
"""
else: username = None
else:
current=current_tags, username=username) else: tags = []
# reset this for the payload join operation
'current': ",".join(current_tags), 'tags': [t.name for t in tags] }
# USER ACCOUNT INFORMATION CALLS def account_info(request): """Return the details of the user account specifed
expecting username in matchdict We only return a subset of data. We're not sharing keys such as api_key, password hash, etc.
"""
def account_update(request): """Update the account information for a user
:params name: :params email:
Callable by either a logged in user or the api key for mobile apps/etc
"""
name = params.get('name') user_acct.name = name
email = params.get('email') user_acct.email = email
email = json_body.get('email') user_acct.email = email
def api_key(request): """Return the currently logged in user's api key
This api call is available both on the website via a currently logged in user and via a valid api key passed into the request. In this way we should be able to add this to the mobile view with an ajax call as well as we do into the account information on the main site.
""" 'api_key': user_acct.api_key, 'username': user_acct.username }
def reset_password(request): """Change a user's password from the current string
:params current_password: :params new_password:
Callable by either a logged in user or the api key for mobile apps/etc
"""
# now also load the password info
# if we don't have any password info, try a json_body in case it's a json #POST params = request.json_body current = params.get('current_password', None) new = params.get('new_password', None)
request.response.status_int = 406 return { 'username': user_acct.username, 'error': "Come on, let's try a real password this time" }
# before we change the password, let's verify it # we're good to change it 'username': user_acct.username, 'message': "Password changed", } else: 'username': user_acct.username, 'error': "There was a typo somewhere. Please check your request" }
def suspend_acct(request): """Reset a user account to enable them to change their password"""
# we need to get the user from the email
# try the json body email = request.json_body.get('email', None)
'error': "Please submit an email address", }
'error': "Please submit a valid address", 'email': email }
# check if we've already gotten an activation for this user 'error': """You've already marked your account for reactivation. Please check your email for the reactivation link. Make sure to check your spam folder.""", 'username': user.username, }
# mark them for reactivation
# log it
# and then send an email notification # @todo the email side of things "Activate your Bookie account", settings)
'url': request.route_url( 'reset', username=user.username, reset_key=user.activation.code), 'username': user.username })
'message': """Your account has been marked for reactivation. Please check your email for instructions to reset your password""", }
def account_activate(request): """Reset a user after being suspended
:param username: required to know what user we're resetting :param activation: code needed to activate :param password: new password to use for the user
"""
# then try to get the same fields out of a json body json_body = request.json_body username = json_body.get('username', None) activation = json_body.get('code', None) password = json_body.get('password', None) new_username = json_body.get('new_username', None)
request.response.status_int = 406 return { 'error': "Come on, pick a real password please", }
# success so respond nicely
# if there's a new username and it's not the same as our current # username, update it try: user = UserMgr.get(username=username) user.username = new_username except IntegrityError, exc: request.response.status_int = 500 return { 'error': 'There was an issue setting your new username', 'exc': str(exc) }
'message': "Account activated, please log in.", 'username': username, } else: AuthLog.reactivate(username, success=False, code=activation) request.response.status_int = 500 return { 'error': "There was an issue attempting to activate this account.", }
def invite_user(request): """Invite a new user into the system.
:param username: user that is requested we invite someone :param email: email address of the new user
""" params = request.params
email = params.get('email', None) user = request.user
if not email: # try to get it from the json body email = request.json_body.get('email', None)
if not email: # if still no email, I give up! request.response.status_int = 406 return { 'username': user.username, 'error': "Please submit an email address" }
# first see if the user is already in the system exists = UserMgr.get(email=email) if exists: request.response.status_int = 406 return { 'username': exists.username, 'error': "This user is already a Bookie user!" }
new_user = user.invite(email) if new_user: LOG.error(new_user.username) # then this user is able to invite someone # log it AuthLog.reactivate(new_user.username)
# and then send an email notification # @todo the email side of things settings = request.registry.settings msg = InvitationMsg(new_user.email, "Enable your Bookie account", settings)
msg.send( request.route_url( 'reset', username=new_user.username, reset_key=new_user.activation.code)) return { 'message': 'You have invited: ' + new_user.email } else: # you have no invites request.response.status_int = 406 return { 'username': user.username, 'error': "You have no invites left at this time." }
def to_readable(request): """Get a list of urls, hash_ids we need to readable parse""" url_list = Bmark.query.outerjoin(Readable, Readable.bid == Bmark.bid).\ join(Bmark.hashed).\ options(contains_eager(Bmark.hashed)).\ filter(Readable.imported == None).all()
def data(urls): """Yield out the results with the url in the data streamed.""" for url in urls: d = dict(url) d['url'] = url.hashed.url yield d
return { 'urls': [u for u in data(url_list)]
}
def readable_reindex(request): """Force the fulltext index to rebuild
This loops through ALL bookmarks and might take a while to complete.
""" tasks.reindex_fulltext_allbookmarks.delay() return { 'success': True }
def accounts_inactive(request): """Return a list of the accounts that aren't activated.""" 'count': len(user_list), 'users': [dict(h) for h in user_list], }
def accounts_invites(request): """Return a list of the accounts that aren't activated.""" 'users': [(u.username, u.invite_ct) for u in user_list], }
def accounts_invites_add(request): """Set the number of invites a user has available.
:matchdict username: The user to give these invites to. :matchdict count: The number of invites to give them. """
else: request.response.status_int = 404 ret = {'error': "Invalid user account."} return ret else: request.response.status_int = 400 ret = {'error': "Bad request, missing parameters"} return ret
def import_list(request): """Provide some import related data.""" 'count': len(import_list), 'imports': [dict(h) for h in import_list], }
def import_reset(request): """Reset an import to try again""" rdict = request.matchdict import_id = rdict.get('id', None)
if not id: request.response.status_int = 400 ret = {'error': "Bad request, missing parameters"} return ret
imp = ImportQueueMgr.get(int(import_id)) imp.status = 0 tasks.importer_process.delay(imp.id)
ret = { 'import': dict(imp) } return ret
def user_list(request): """Provide list of users in the system.
Supported Query params: order, limit """ 'count': len(user_list), 'users': [dict(h) for h in user_list], }
def new_user(request): """Add a new user to the system manually.""" rdict = request.params
u = User()
u.username = unicode(rdict.get('username')) u.email = unicode(rdict.get('email')) passwd = get_random_word(8) u.password = passwd u.activated = True u.is_admin = False u.api_key = User.gen_api_key()
try: DBSession.add(u) DBSession.flush() # We need to return the password since the admin added the user # manually. This is only time we should have/give the original # password. ret = dict(u) ret['random_pass'] = passwd return ret
except IntegrityError, exc: # We might try to add a user that already exists. LOG.error(exc) request.response.status_int = 400 return { 'error': 'Bad Request: User exists.', }
def del_user(request): """Remove a bad user from the system via the api.
For admin use only.
Removes all of a user's bookmarks before removing the user.
"""
# Submit a username.
LOG.error('No username to remove.') request.response.status_int = 400 return { 'error': 'Bad Request: No username to remove.', }
LOG.error('Username not found.') request.response.status_int = 404 return { 'error': 'User not found.', }
# Delete all of the bmarks for this year. 'success': True, 'message': 'Removed user: ' + del_username } except Exception, exc: # There might be cascade issues or something that causes us to fail in # removing. LOG.error(exc) request.response.status_int = 500 return { 'error': 'Bad Request: ' + str(exc) }
def admin_bmark_remove(request): """Remove this bookmark from the system""" rdict = request.matchdict username = rdict.get('username') hash_id = rdict.get('hash_id')
try: bmark = BmarkMgr.get_by_hash(hash_id, username=username) print bmark if bmark: DBSession.delete(bmark) return { 'message': "done", } else: return { 'error': 'Bookmark not found.', }
except NoResultFound: request.response.status_code = 404 return { 'error': 'Bookmark with hash id {0} not found.'.format( rdict['hash_id']) }
def admin_applog(request): """Return applog data for admin use.""" rdict = request.GET
# Support optional filter parameters days = int(rdict.get('days', 1)) status = rdict.get('status', None) message = rdict.get('message', None)
log_list = AppLogMgr.find( days=days, message_filter=message, status=status, )
ret = { 'count': len(log_list), 'logs': [dict(l) for l in log_list], } return ret |