commit 2369ca508da5293bc1a96725cd5e4b3b58279a50 Author: Rik Veenboer Date: Fri Dec 18 17:33:45 2015 +0000 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ad0c759 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/flow-session.dat +/data +/json +/venv +/.Rhistory +/.RData +/__pycache__ +/*.pyc diff --git a/activate.bat b/activate.bat new file mode 100644 index 0000000..35092d4 --- /dev/null +++ b/activate.bat @@ -0,0 +1,7 @@ +@echo off +if not exist venv ( + pip install virtualenv + virtualenv venv +) +pip install -U -r requirements.txt +venv\Scripts\activate diff --git a/add-tasks.py b/add-tasks.py new file mode 100644 index 0000000..44db6a0 --- /dev/null +++ b/add-tasks.py @@ -0,0 +1,8 @@ +from tasks import * +# add.delay(4, 4) + +from datetime import date +start = date(2015, 4, 1) +end = date(2015, 5, 1) #date.today() +# findIt.delay(start, end) +findIt.direct(start, end) \ No newline at end of file diff --git a/beat.bat b/beat.bat new file mode 100644 index 0000000..6e1e212 --- /dev/null +++ b/beat.bat @@ -0,0 +1 @@ +celery -A beat beat \ No newline at end of file diff --git a/beat.py b/beat.py new file mode 100644 index 0000000..a76e654 --- /dev/null +++ b/beat.py @@ -0,0 +1,14 @@ +from tasks import celery +from datetime import timedelta + +class Config: + CELERYBEAT_SCHEDULE = { + 'add-every-x-seconds': { + 'task': 'tasks.add', + 'schedule': timedelta(seconds=1), + 'args': (1, 2) + }, + } + CELERY_TIMEZONE = 'Europe/London' + +celery.config_from_object(Config) \ No newline at end of file diff --git a/flow.py b/flow.py new file mode 100644 index 0000000..d4fb850 --- /dev/null +++ b/flow.py @@ -0,0 +1,124 @@ +import logging +import requests +import pickle +import os.path +import json +import re +import sys +import subprocess + +from io import StringIO +from datetime import timedelta + +class Flow: + format = '%d.%m.%Y' + zip = False # zipped downloads not yet implemented + + def __init__(self, logger = None, period = 2, file = 'flow-session.dat', jsondir = 'json', datadir = 'data', persist = True, restart = False): + self.logger = logger or logging.getLogger(__name__) + self.jsondir = jsondir + self.datadir = datadir + if not os.path.exists(jsondir): + os.makedirs(jsondir) + + self.period = 2 # weeks + self.file = file + self.persist = persist + self.restart = restart + if not restart and persist and os.path.exists(file): + self.logger.info('Reading session from file') + file = open(self.file, 'rb') + session_dump = file.read() + self.session = pickle.loads(session_dump) + file.close() + else: + self.session = requests.Session() + pass + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + if self.persist: + self.logger.info('Writing session to file') + session_dump = pickle.dumps(self.session) + file = open(self.file, 'wb') + file.write(session_dump) + file.close() + + def isLoggedIn(self): + self.logger.info('Checking login status') + result = self.session.get('https://flow.polar.com/training/') + return result.status_code is 200 + + def login(self, email, password): + if not self.restart and self.isLoggedIn(): + self.logger.info('Skip login') + return True + self.logger.info('Logging in') + result = self.session.post( + 'https://flow.polar.com/login', + data = { + 'email' : email, + 'password' : password + }, + verify = True + ) + return result.status_code is 200 + + def generateRanges(self, start, end, period = None, format = None): + period = period or self.period + format = format or self.format + end_tmp = start + while end_tmp < end: + start = end_tmp + timedelta(days = 1) + end_tmp = start + timedelta(days = 7 * period) + start_f = start.strftime(format) + end_f = min(end, end_tmp).strftime(format) + yield (start_f, end_f) + + def getEventsInRange(self, start, end): + ranges = self.generateRanges(start, end) + for range in ranges: + url = 'https://flow.polar.com/training/getCalendarEvents?start=%s&end=%s' % range + result = self.session.get(url) + if result.status_code is 200: + filename = '{}/{} - {}.json'.format(self.jsondir, range[0], range[1]) + self.logger.info('Writing event json to %s' % filename) + file = open(filename, 'wb') + file.write(result.text.encode('latin-1')) + file.close() + + def downloadTraining(self, id): + types = ('tcx', 'csv', 'gpx') + for type in types: + url = 'https://flow.polar.com/training/analysis/%d/export/%s/%s' % (id, type, str(self.zip).lower()) + result = self.session.get(url) + if result.status_code is 200: + content_disposition = result.headers['Content-Disposition'] + match = re.search('filename="([^"]+)";', content_disposition) + if match is not None: + filename = '{}/{}'.format(self.datadir, match.group(1)) + self.logger.info('Writing training to {}'.format(filename)) + file = open(filename, 'wb') + file.write(result.text.encode('latin-1')) + file.close() + + def parseCalenderEvents(self): + for root, dirs, filenames in os.walk(self.jsondir): + for filename in filenames: + self.logger.info('Parsing file %s' % filename) + file = open('{}/{}'.format(root, filename), 'r') + contents = json.load(StringIO(file.read())) + file.close() + for item in contents: + self.downloadTraining(item['listItemId']) + + def processTraining(self): + proc = subprocess.Popen(['RScript','test.R', 'Rik_Veenboer_2015-04-25_10-14-43'], stdout = subprocess.PIPE, universal_newlines = True) + while True: + line = proc.stdout.readline() + if line != '': + print("test:", line.rstrip()) + else: + break \ No newline at end of file diff --git a/limit-calls.py b/limit-calls.py new file mode 100644 index 0000000..e76fb73 --- /dev/null +++ b/limit-calls.py @@ -0,0 +1,132 @@ +import time +import threading +import base64 +import atexit +import sqlite3 +import sys + +class Persistent: + pass + +class SQLiteImplementation(Persistent): + def __init__(self, database='limit-calls.db', session='-'): + self.connection = sqlite3.connect(database) + self.cursor = self.connection.cursor() + self.cursor.execute('DROP TABLE IF EXISTS per_second') + self.cursor.execute('DROP TABLE IF EXISTS rate') + self.cursor.execute('CREATE TABLE IF NOT EXISTS per_second (id INTEGER PRIMARY KEY AUTOINCREMENT, target string, hash string, last_reset real, calls int, expire real)') + self.cursor.execute('CREATE TABLE IF NOT EXISTS rate (id INTEGER PRIMARY KEY AUTOINCREMENT, target string, hash string, last_call real, expire real)') + self.connection.commit() + + def __del__(self): + self.cursor.execute('DELETE from per_second WHERE expire > {:f}'.format(time.time())) + self.cursor.execute('DELETE from rate WHERE expire > {:f}'.format(time.time())) + self.connection.commit() + self.connection.close() + + def getCallsPerSecond(self, function, hash): + target = function.__name__ + query = "SELECT last_reset, calls FROM per_second WHERE target = '{:s}' AND hash = '{:s}'".format(function.__name__, hash) + self.cursor.execute(query); + row = self.cursor.fetchone() + if row is None: + return ([0.0], [0]) + else: + return ([row[0]], [row[1]]) + + def setCallsPerSecond(self, function, hash, lastReset, calls, expire): + query = "REPLACE INTO per_second (id, target, hash, last_reset, calls, expire) VALUES ((SELECT id FROM per_second WHERE target = '{0:s}' AND hash = '{1:s}'), '{0:s}', '{1:s}', {2:f}, {3:d}, {4:f})".format(function.__name__, hash, lastReset, calls, expire) + self.cursor.execute(query); + self.connection.commit() + + def getCallsRate(self, function, hash): + target = function.__name__ + query = "SELECT last_call FROM rate WHERE target = '{:s}' AND hash = '{:s}'".format(function.__name__, hash) + self.cursor.execute(query) + row = self.cursor.fetchone() + if row is None: + return ([0.0]) + else: + return ([row[0]]) + + def setCallsRate(self, function, hash, lastCall, expire): + query = "REPLACE INTO rate (id, target, hash, last_call, expire) VALUES ((SELECT id FROM rate WHERE target = '{0:s}' AND hash = '{1:s}'), '{0:s}', '{1:s}', {2:f}, {3:f})".format(function.__name__, hash, lastCall, expire) + self.cursor.execute(query) + self.connection.commit() + +def limitCallsPerSecond(maxCalls, perSeconds, persistent, sleep=True): + def decorate(function): + lock = threading.RLock() + hash = base64.b64encode('%d-%d-%s' % (maxCalls, perSeconds, sleep)) + (lastReset, calls) = persistent.getCallsPerSecond(function, hash) + def store(expire): + persistent.setCallsPerSecond(function, hash, lastReset[0], calls[0], expire) + def reset(time=time.time()): + lastReset[0] = time + calls[0] = maxCalls + store(time + perSeconds) + def wrapper(*args, **kargs): + lock.acquire() + now = time.time() + sinceLastReset = now - lastReset[0] + if sinceLastReset > perSeconds: + reset(now) + else: + calls[0] = calls[0] - 1 + store(now + perSeconds) + outOfCalls = calls[0] < 1 + if outOfCalls and sleep: + leftToWait = perSeconds - sinceLastReset + time.sleep(leftToWait) + reset() + leftToWait = False + lock.release() + if outOfCalls is False: + return function(*args, **kargs) + return wrapper + return decorate + +def limitCallsRate(maxPerSecond, perSecond, persistent, sleep=True): + def decorate(function): + lock = threading.RLock() + minInterval = perSecond / float(maxPerSecond) + hash = base64.b64encode(('%d-%d-%s' % (maxPerSecond, perSecond, sleep)).encode()).decode() + print(hash) + lastCall = persistent.getCallsRate(function, hash) + def store(expire): + persistent.setCallsRate(function, hash, lastCall[0], expire) + def wrapper(*args, **kargs): + lock.acquire() + elapsed = time.time() - lastCall[0] + leftToWait = minInterval - elapsed + if leftToWait > 0: + if sleep: + time.sleep(leftToWait) + else: + lock.release() + return + try: + toReturn = function(*args, **kargs) + finally: + lastCall[0] = time.time() + store(lastCall[0] + minInterval) + lock.release() + return toReturn + return wrapper + return decorate + +if __name__ == "__main__": + persistent = SQLiteImplementation() + @limitCallsPerSecond(3, 4, persistent) + @limitCallsRate(2, 1, persistent) + + def PrintNumber(num): + print("%s: %d" % (time.time(), num)) + time.sleep(0.01) + return True + + i = 1 + while i < 10000: + if not PrintNumber(i): + time.sleep(0.1) + i = i + 1 \ No newline at end of file diff --git a/other.R b/other.R new file mode 100644 index 0000000..bb0661b --- /dev/null +++ b/other.R @@ -0,0 +1,70 @@ +library(XML) +library(raster) + +shift.vec <- function (vec, shift) { + if(length(vec) <= abs(shift)) { + rep(NA ,length(vec)) + }else{ + if (shift >= 0) { + c(rep(NA, shift), vec[1:(length(vec)-shift)]) } + else { + c(vec[(abs(shift)+1):length(vec)], rep(NA, abs(shift))) } } } + +pfile <- htmlTreeParse("Rik_Veenboer_2015-11-10_17-21-59.gpx", useInternalNodes = T) + +# Get all elevations, times and coordinates via the respective xpath +elevations <- as.numeric(xpathSApply(pfile, path = "//trkpt/ele", xmlValue)) +times = xpathSApply(pfile, path = "//trkpt/time", xmlValue) +coords <- xpathSApply(pfile, path = "//trkpt", xmlAttrs) + +# Extract latitude and longitude from the coordinates +lats <- as.numeric(coords["lat",]) +lons <- as.numeric(coords["lon",]) +# Put everything in a dataframe and get rid of old variables +geodf <- data.frame(lat = lats, lon = lons, ele = elevations, time = times) +rm(list=c("elevations", "lats", "lons", "pfile", "times", "coords")) +head(geodf) + +# Shift vectors for lat and lon so that each row also contains the next position. +geodf$lat.p1 <- shift.vec(geodf$lat, -1) +geodf$lon.p1 <- shift.vec(geodf$lon, -1) +# Calculate distances (in metres) using the function pointDistance from the 'raster' package. +# Parameter 'lonlat' has to be TRUE! +geodf$dist.to.prev <- apply(geodf, 1, FUN = function (row) { + pointDistance(c(as.numeric(row["lat.p1"]), + as.numeric(row["lon.p1"])), + c(as.numeric(row["lat"]), as.numeric(row["lon"])), + lonlat = T) +}) +# Transform the column 'time' so that R knows how to interpret it. +geodf$time <- strptime(geodf$time, format = "%Y-%m-%dT%H:%M:%OS") +# Shift the time vector, too. +geodf$time.p1 <- shift.vec(geodf$time, -1) +# Calculate the number of seconds between two positions. +geodf$time.diff.to.prev <- as.numeric(difftime(geodf$time.p1, geodf$time)) +# Calculate metres per seconds, kilometres per hour and two LOWESS smoothers to get rid of some noise. +geodf$speed.m.per.sec <- geodf$dist.to.prev / geodf$time.diff.to.prev +geodf$speed.km.per.h <- geodf$speed.m.per.sec * 3.6 +geodf$speed.km.per.h <- ifelse(is.na(geodf$speed.km.per.h), 0, geodf$speed.km.per.h) +geodf$speed.km.per.h <- ifelse(geodf$speed.km.per.h > 40, 0, geodf$speed.km.per.h) +geodf$lowess.speed <- lowess(geodf$speed.km.per.h, f = 0.01)$y +geodf$lowess.ele <- lowess(geodf$ele, f = 0.02)$y + + +# Plot elevations and smoother +plot(geodf$ele, type = "l", bty = "n", xaxt = "n", ylab = "Elevatio", xlab = "", col = "grey40") +lines(geodf$lowess.ele, col = "red", lwd = 3) +legend(x="bottomright", legend = c("GPS elevation", "LOWESS elevation"), + col = c("grey40", "red"), lwd = c(1,3), bty = "n") + + +# Plot speeds and smoother +plot(geodf$speed.km.per.h, type = "l", bty = "n", xaxt = "n", ylab = "Speed (km/h)", xlab = "", + col = "grey40") +lines(geodf$lowess.speed, col = "blue", lwd = 3) +legend(x="bottom", legend = c("GPS speed", "LOWESS speed"), + col = c("grey40", "blue"), lwd = c(1,3), bty = "n") +abline(h = mean(geodf$speed.km.per.h), lty = 2, col = "blue") + +# Plot the track without any map, the shape of the track is already visible. +plot(rev(geodf$lon), rev(geodf$lat), type = "l", col = "red", lwd = 3, bty = "n", ylab = "Latitude", xlab = "Longitude") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..210d4f4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +requests +celery +redis +sqlalchemy \ No newline at end of file diff --git a/stash.txt b/stash.txt new file mode 100644 index 0000000..a983233 --- /dev/null +++ b/stash.txt @@ -0,0 +1,35 @@ +general data +https://flow.polar.com/activity/passivity/1.6.2015/12.7.2015?_=1439289791099 +https://flow.polar.com/activity/summary/1.6.2015/30.6.2015/month?_=1439289791100 +https://flow.polar.com/activity/data/1.6.2015/12.7.2015?_=1439289791101 +https://flow.polar.com/training/getCalendarWeekSummary (post) + 1. {"from":"01.06.2015","to":"12.07.2015"} + +data analyse +http://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.interp1d.html#scipy.interpolate.interp1d +http://fellrnr.com/wiki/Training_Monotony +http://fellrnr.com/wiki/TRIMP +article "A New Aproach to Monitoring Exercise Training" +http://www.runningforfitness.org/faq/rp +http://www.runningforfitness.org/faq/vo2-max + +gps format https://en.wikipedia.org/wiki/Geohash +in python http://geopy.readthedocs.org/en/latest/ + +plan + * verschillende aggregators + * hash over code + * toepassen op verschillende datasets + * ook van andere aggregator + * verwerk versie doorvoering + * voorbeelden + * door bearing bepalen, linksom of rechtsom + * ronde herkeninning, auto laps (prefereer dicht bij start) + * clusteren, tot afstand clusters bepaalde waarde bereikt (5 meter) + * resultaat per aggregator and hash opgeslagen + * daemon die kijkt wat er nog moet worden uitgevoerd + * selectieregels + * prioriteitregels + * visulatisaties + * afhankelijk van output van aggregator (specifiek versie) + * mogelijk om oude data te verwisselen \ No newline at end of file diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..b003812 --- /dev/null +++ b/tasks.py @@ -0,0 +1,27 @@ +import pickle +from celery import Celery +from datetime import date + +celery = Celery('tasks', backend='redis://localhost', broker='redis://localhost') + +def serializingTask(function): + @celery.task + def wrapper(string): + (args, kargs) = pickle.loads(string) + return function(*args, **kargs) + delay = wrapper.delay + wrapper.delay = lambda *args, **kargs: delay(pickle.dumps((args, kargs))) + wrapper.direct = function + return wrapper + +@celery.task +def add(x, y): + print(x) + return x + y + +@serializingTask +def findIt(start, end): + format = '%d.%m.%Y' + print(start.strftime(format)) + print(end.strftime(format)) + pass \ No newline at end of file diff --git a/test-store.R b/test-store.R new file mode 100644 index 0000000..490074a --- /dev/null +++ b/test-store.R @@ -0,0 +1,39 @@ +#' ## Storing R Objects in a SQLite Database + +#' Two packages we are using. The first is the ```RSQLite``` which will be used to create and manage an in-memory SQLite database. The second is ```igraph``` which I will use to create and visualize a random network. Some of the work I do is on network simulation. I often don't know the metrics I need from a simulated network when it's created, so I want to be able to store the networks that are created so that I can go back later and analyze them. +library(RSQLite) +library(igraph) + +#' Create a database in memory. +con <- dbConnect(SQLite(), "test.db") + +#' The table has two columns, an *id* column and a column called *graph* which is a **blob** type. This type just stores binary data. +dbGetQuery(con, 'create table if not exists graphs + (_id integer primary key autoincrement, + graph blob)') + +#' Create a bunch of random graphs [Watts-Strogatz graphs](http://en.wikipedia.org/wiki/Watts_and_Strogatz_model). +gs <- list() +for(i in 1:10) + gs[[i]] <- watts.strogatz.game(1, 100, 5, 0.05) + +#' Here's the meaty part. The *serialize* function will take an object and convert it to a raw vector of bytes. Then the *I* function forces the data.frame to store the whole vector as an entry in the data.frame. +df <- data.frame(a = 1:10, + g = I(lapply(gs, function(x) { serialize(x, NULL)}))) + +#' Now insert the data into the table. +dbGetPreparedQuery(con, 'insert into graphs (graph) values (:g)', + bind.data=df) + +#' Try getting the data out. +df2 <- dbGetQuery(con, "select * from graphs") + +#' Convert it back to a list of graphs. +gs2 <- lapply(df2$graph, 'unserialize') + +#' Compulsory picture of the network. +g <- gs2[[1]] +V(g)$size <- log(betweenness(g)) +plot(g, vertex.label = NA) + +dbDisconnect(con) \ No newline at end of file diff --git a/test.R b/test.R new file mode 100644 index 0000000..0cbf371 --- /dev/null +++ b/test.R @@ -0,0 +1,2 @@ +args <- commandArgs(trailingOnly = TRUE) +print(args) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..e0b5847 --- /dev/null +++ b/test.py @@ -0,0 +1,34 @@ +import logging +import sys + +from flow import Flow +from datetime import date + +import sys +import pickle +import re + +logging.basicConfig(level=logging.INFO) + +with Flow() as flow: + if True or flow.login('email', 'password'): + start = date(2015, 4, 1) + # end = date(2015, 5, 1) + end = date.today() + + test = date(2015, 4, 2) + + # print(start <= test <= end) + # flow.getEventsInRange(start, end) + # flow.parseCalenderEvents() + flow.processTraining() + +sys.exit() + +# class Jobs: + # def download(self, id): + # print(id) + # print(x) + +# for job in jobs[:]: + # getattr(Jobs, 'download')(None, *job[1]) diff --git a/worker.bat b/worker.bat new file mode 100644 index 0000000..e971d2b --- /dev/null +++ b/worker.bat @@ -0,0 +1 @@ +celery -A tasks worker \ No newline at end of file diff --git a/xtest.R b/xtest.R new file mode 100644 index 0000000..2e1b9ba --- /dev/null +++ b/xtest.R @@ -0,0 +1,30 @@ +#install.packages("XML") +#install.packages("plotKML") +#install.packages("maptools") +#install.packages("gpsbabel") +cat("\014") +library(XML) + + +file = "Rik_Veenboer_2015-11-10_17-21-59.gpx" +file = "test.gpx" + +data = xmlParse(file) + + +#y=as.data.frame(sapply(xml$trk$trkseg["trkpt"], rbind)) +bla=head(xml$trk$trkseg) + + + +result=data.frame( + elevation = unlist(lapply(bla, '[', c('ele'))), + time = as.numeric(as.POSIXct(unlist(lapply(bla, '[', c('time'))), format="%Y-%m-%dT%H:%M:%S"))) + + +latlon = t(matrix(as.numeric(unlist(lapply(bla, '[', c('.attrs')))), 2)) +xx=as.data.frame(t(matrix(as.numeric(unlist(lapply(bla, '[', c('.attrs')))), 2))) +colnames(xx) = c("lat", "lon") + + +xxx=merge(result, xx)