Я искал функцию usleep() в Python 2.7.
Кто-нибудь знает, существует ли он, возможно, с другим именем функции?
Я искал функцию usleep() в Python 2.7.
Кто-нибудь знает, существует ли он, возможно, с другим именем функции?
Так как usleep
обычно означает, что вы хотите отсрочить выполнение для x микросекунд, вы должны разделить значение секунд на 1000000.
import time
time.sleep(seconds/1000000.0)
time.sleep()
занимает секунды как параметр.
import time
usleep = lambda x: time.sleep(x/1000000.0)
usleep(100) #sleep during 100μs
from time import sleep
sleep(0.1) #sleep during 100ms
from time import sleep
sleep(seconds)
Будьте очень осторожны со временем. Я сгорел python3, используя time.sleep, потому что он немонотонен. Если настенные часы изменяются в обратном направлении, вызов time.sleep не будет закончен, пока настенные часы не пойдут туда, где это было бы, если бы сон прошел вперед, как планировалось. Я еще не нашел монотонный заблокированный сон для python.
Вместо этого я рекомендую Event.wait, например:
def call_repeatedly(interval, func, *args, **kwargs):
stopped = Event()
def loop():
while not stopped.wait(interval): # the first call is in `interval` secs
try:
func(*args)
except Exception as e:
logger.error(e);
if kwargs.get('exception'):
kwargs.get('exception')(e) # SEND exception to the specified function if there is one.
else:
raise Exception(e)
Thread(target=loop).start()
return stopped.set
Альтернативная функция сна для python.
Примечание. Не следует использовать для нескольких потоков из-за блокировки GIL, но для нескольких подпроцессов это нормально. То же самое с time.sleep()
Я переношу функцию C на Python. Я использую nanosleep() библиотеки C, которая останавливает этот поток, работающий в течение такого большого количества времени. Это не ожидание типа ожидания, которое использует много CPU для оценки некоторой математики. Коды заключаются в следующем. Поместите все в папку, скажем CWrapper.
C_functions.h
#include <time.h>
int c_sleep_msec(long milliseconds);
int c_sleep_nsec(long nanoseconds);
C_functions.c
#include "C_functions.h"
int c_sleep_msec(long milliseconds) {
struct timespec req;
//struct timespec rem;
if(milliseconds > 999) {
req.tv_sec = (int)(milliseconds / 1000); /* Must be Non-Negative */
req.tv_nsec = (milliseconds - ((long)req.tv_sec * 1000)) * 1000000; /* Must be in range of 0 to 999999999 */
}
else {
req.tv_sec = 0; /* Must be Non-Negative */
req.tv_nsec = milliseconds * 1000000; /* Must be in range of 0 to 999999999 */
}
//rem = NULL;
return nanosleep(&req , NULL);
}
//------------------------------------------------------
int c_sleep_nsec(long nanoseconds) {
struct timespec req;
//struct timespec rem;
if (nanoseconds > 999999999) {
req.tv_sec = (int)(nanoseconds / 1000000000);
req.tv_nsec = (nanoseconds - ((long)req.tv_sec * 1000000000));
}
else {
req.tv_sec = 0;
req.tv_nsec = nanoseconds;
}
//rem = NULL;
return nanosleep(&req , NULL);
}
Вы также можете создать функцию для микросекунды, используя тот же самый nanosleep()
CWrapper.pyx
cdef extern from "C_functions.h":
int c_sleep_msec(long milliseconds)
int c_sleep_nsec(long nanoseconds)
def sleep_msec(milliseconds):
return c_sleep_msec(milliseconds)
def sleep_nsec(nanoseconds):
return c_sleep_nsec(nanoseconds)
setup.py
from distutils.core import setup
from distutils.extension import Extension
from Pyrex.Distutils import build_ext
setup(
name = "CWrapper",
ext_modules=[ Extension("CWrapper", ["CWrapper.pyx", "C_functions.c"]) ],
cmdclass = {'build_ext': build_ext}
)
Установите python-pyrex. Затем запустите в терминале linux
python setup.py build_ext -i
Он создаст файлы CWrapper.c, build и CWrapper.so. Используйте CWrapper.so, где бы вы ни захотели, и просто импортируйте в python.
Примечание. Скомпилируйте отдельно для малины Pi.
Теперь проверьте функцию
Test_sleep.py
import serial
from multiprocessing import Process
import time
import CWrapper
class TestSleep:
def __init__(self):
self.delay_sec = 0.00000100
self.delay_msec = 30
self.delay_nsec = 1000 #200000000
self.start_time = time.time()
self.process_1 = Process(name="process_1", target=self.process_1_task, args=("process_1",))
self.process_1.daemon = True
self.process_1.start()
self.process_2 = Process(name="process_2", target=self.process_1_task, args=("process_2",))
self.process_2.daemon = True
self.process_2.start()
self.process_3 = Process(name="process_3", target=self.process_1_task, args=("process_3",))
self.process_3.daemon = True
self.process_3.start()
def process_1_task(self, process_name):
start = self.start_time
delay_msec = self.delay_msec
delay_sec = self.delay_sec
delay_nsec = self.delay_nsec
t1 = start
for i in range(1, 81):
status = CWrapper.sleep_msec(delay_msec)
# status = CWrapper.sleep_nsec(delay_nsec)
#status = time.sleep(delay_sec)
t2 = time.time()
elapsed_time = t2 - t1
t1 = t2
print process_name, i, "status:", status, "Elapsed-time:", elapsed_time
if __name__ == '__main__':
test = TestSleep()
# for i in range(1,10000):
# print "main thread", i
# time.sleep(0.1)
while True: # Since daemon=True, main thread should check join() or stay in loop
pass
Измените параметры delay_sec для time.sleep(), delay_msec для CWrapper.sleep_msec(), delay_nsec для CWrapper.sleep_nsec(). Раскомментируйте функцию, которую вы хотите протестировать в thread_1_task().
как насчет этого:
import time
def usleep(delay):
mdelay = delay /1000
now = time.time()
while now + mdelay > time.time():
pass